victauri-cli 0.7.5

CLI for Victauri — scaffold tests, check running apps, record sessions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
//! Stdio-to-HTTP bridge for MCP clients like Claude Code.
//!
//! Reads JSON-RPC messages from stdin, forwards them to Victauri's Streamable HTTP
//! endpoint, parses SSE responses, and writes them back to stdout. This bridges
//! the gap between MCP hosts that expect stdio transport and Victauri's HTTP server.
//!
//! Why this exists (and why agents should connect through it, not a fixed `url:`):
//!
//! * **Always reaches the RIGHT app.** A static `.mcp.json` URL hardcodes a port; when
//!   several Victauri apps run (or one falls back off a busy 7373), that port can point at
//!   the WRONG process. The bridge resolves the live backend **by app identity** at connect
//!   time and re-resolves on failure — so the agent can never get stuck talking to the
//!   wrong app. Select with `--app <identifier>` (or `VICTAURI_APP`); with no selector it
//!   uses the single running app, or errors clearly if several are running.
//! * **Survives server restarts.** Every dev rebuild/relaunch invalidates the MCP session.
//!   The bridge caches the `initialize` handshake and transparently re-establishes a fresh
//!   session (re-discovering the port) on a stale session (404/409/422) or connection drop,
//!   so the agent's tool calls keep working without a reconnect.

use std::io::{BufRead, Write};
use std::sync::{Arc, Mutex};

use anyhow::{Result, bail};

const MAX_RETRIES: usize = 4;
const RETRY_DELAY_MS: u64 = 400;

/// A discovered, live Victauri backend.
#[derive(Clone, Debug)]
struct ServerInfo {
    port: u16,
    token: Option<String>,
    identifier: Option<String>,
    product_name: Option<String>,
}

impl ServerInfo {
    fn label(&self) -> String {
        let name = self
            .identifier
            .as_deref()
            .or(self.product_name.as_deref())
            .unwrap_or("<unknown app>");
        format!("{name} (port {})", self.port)
    }
}

/// Run the stdio bridge against a discovered Victauri server.
///
/// `app` selects which app to bind when several are running (matches the Tauri bundle
/// identifier or product name; falls back to the `VICTAURI_APP` env var).
///
/// # Errors
///
/// Returns an error if no matching server can be reached.
pub async fn run(wait: bool, app: Option<String>) -> Result<()> {
    let app = app.or_else(|| std::env::var("VICTAURI_APP").ok());
    let connection = Arc::new(Mutex::new(discover_and_select(wait, app.as_deref()).await?));
    let session_id: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
    // Cache the MCP handshake so a session can be re-established transparently after the
    // backend restarts — the host (e.g. Claude Code) only sends `initialize` once.
    let cached_init: Arc<Mutex<Option<serde_json::Value>>> = Arc::new(Mutex::new(None));

    let http = build_client()?;

    let stdin = std::io::stdin();
    let stdout = std::io::stdout();

    for line in stdin.lock().lines() {
        let Ok(line) = line else { break };
        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }

        let msg: serde_json::Value = match serde_json::from_str(trimmed) {
            Ok(v) => v,
            Err(e) => {
                eprintln!("victauri-bridge: invalid JSON on stdin: {e}");
                continue;
            }
        };

        let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
        let is_initialize = method == "initialize";
        if is_initialize {
            *cached_init.lock().expect("cached_init lock") = Some(msg.clone());
        }
        let is_notification = msg.get("id").is_none();

        let mut last_err = None;

        for attempt in 0..MAX_RETRIES {
            // If the session was invalidated and we have a cached handshake, re-establish a
            // fresh session BEFORE replaying the real request (skip when the message itself
            // is the initialize). This is what makes restart-recovery actually work — the
            // old code cleared the session then re-sent the tool call with no session, which
            // the server rejects with 422 "expect initialize".
            if !is_initialize {
                let need_reinit = session_id.lock().expect("session lock").is_none();
                if need_reinit {
                    let init = cached_init.lock().expect("cached_init lock").clone();
                    if let Some(init) = init {
                        let (port, token) = conn_parts(&connection);
                        // We don't relay the re-init response to the host; it already believes
                        // it is initialized. On failure we fall through to the request attempt,
                        // which triggers re-discovery below.
                        if let Ok(out) =
                            post_message(&http, port, token.as_deref(), None, &init).await
                            && let Some(sid) = out.session_id
                        {
                            *session_id.lock().expect("session lock") = Some(sid);
                        }
                    }
                }
            }

            let (port, token) = conn_parts(&connection);
            let sid = session_id.lock().expect("session lock").clone();

            match post_message(&http, port, token.as_deref(), sid.as_deref(), &msg).await {
                Ok(out) => {
                    if let Some(new_sid) = out.session_id {
                        *session_id.lock().expect("session lock") = Some(new_sid);
                    }

                    if out.stale_session {
                        eprintln!(
                            "victauri-bridge: stale session (HTTP {}), re-establishing (attempt {}/{})",
                            out.status,
                            attempt + 1,
                            MAX_RETRIES
                        );
                        *session_id.lock().expect("session lock") = None;
                        if attempt + 1 < MAX_RETRIES {
                            tokio::time::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS))
                                .await;
                            if let Ok(new_conn) = discover_and_select(false, app.as_deref()).await {
                                *connection.lock().expect("conn lock") = new_conn;
                            }
                        }
                        last_err = Some(format!("Victauri returned {}", out.status));
                        continue;
                    }

                    if is_notification && out.accepted {
                        last_err = None;
                        break;
                    }

                    for payload in out.payloads {
                        let mut o = stdout.lock();
                        let _ = writeln!(o, "{payload}");
                        let _ = o.flush();
                    }
                    last_err = None;
                    break;
                }
                Err(e) => {
                    eprintln!(
                        "victauri-bridge: connection failed (attempt {}/{}): {e}",
                        attempt + 1,
                        MAX_RETRIES
                    );
                    *session_id.lock().expect("session lock") = None;
                    if attempt + 1 < MAX_RETRIES {
                        tokio::time::sleep(std::time::Duration::from_millis(
                            RETRY_DELAY_MS * (attempt as u64 + 1),
                        ))
                        .await;
                        if let Ok(new_conn) = discover_and_select(true, app.as_deref()).await {
                            *connection.lock().expect("conn lock") = new_conn;
                            eprintln!("victauri-bridge: reconnected to {}", {
                                let g = connection.lock().expect("conn lock");
                                g.label()
                            });
                        }
                    }
                    last_err = Some(format!("Victauri server unreachable: {e}"));
                    continue;
                }
            }
        }

        if let Some(err_msg) = last_err
            && !is_notification
        {
            let err_resp = serde_json::json!({
                "jsonrpc": "2.0",
                "id": msg.get("id"),
                "error": { "code": -32000, "message": err_msg }
            });
            let mut o = stdout.lock();
            let _ = writeln!(o, "{err_resp}");
            let _ = o.flush();
        }
    }

    Ok(())
}

fn build_client() -> Result<reqwest::Client> {
    reqwest::Client::builder()
        .timeout(std::time::Duration::from_secs(120))
        .connect_timeout(std::time::Duration::from_secs(10))
        .build()
        .map_err(Into::into)
}

fn conn_parts(connection: &Arc<Mutex<ServerInfo>>) -> (u16, Option<String>) {
    let g = connection.lock().expect("conn lock");
    (g.port, g.token.clone())
}

/// Outcome of forwarding one JSON-RPC message to the backend.
struct PostOutcome {
    status: u16,
    session_id: Option<String>,
    stale_session: bool,
    accepted: bool,
    payloads: Vec<String>,
}

/// Forward a single JSON-RPC message to `127.0.0.1:<port>/mcp` and parse the response.
async fn post_message(
    http: &reqwest::Client,
    port: u16,
    token: Option<&str>,
    session_id: Option<&str>,
    msg: &serde_json::Value,
) -> Result<PostOutcome> {
    let url = format!("http://127.0.0.1:{port}/mcp");
    let mut req = http
        .post(&url)
        .header("Content-Type", "application/json")
        .header("Accept", "application/json, text/event-stream");
    if let Some(t) = token {
        req = req.header("Authorization", format!("Bearer {t}"));
    }
    if let Some(sid) = session_id {
        req = req.header("Mcp-Session-Id", sid);
    }

    let resp = req.json(msg).send().await?;
    let status = resp.status().as_u16();
    let new_sid = resp
        .headers()
        .get("mcp-session-id")
        .and_then(|v| v.to_str().ok())
        .map(String::from);

    // 404/409 = unknown/terminated session; 422 = "expect initialize" (no/!init session).
    // All three mean "the session is gone — re-establish it".
    let stale_session = matches!(status, 404 | 409 | 422);
    let accepted = status == 202;

    let mut payloads = Vec::new();
    if !stale_session && status != 202 {
        let content_type = resp
            .headers()
            .get("content-type")
            .and_then(|v| v.to_str().ok())
            .unwrap_or("")
            .to_string();
        let body = resp.text().await.unwrap_or_default();

        if !(200..300).contains(&status) {
            // Surface a JSON-RPC error for the original request id.
            payloads.push(
                serde_json::json!({
                    "jsonrpc": "2.0",
                    "id": msg.get("id"),
                    "error": { "code": -32000, "message": format!("Victauri returned {status}: {body}") }
                })
                .to_string(),
            );
        } else if content_type.contains("text/event-stream") {
            for sse_line in body.lines() {
                if let Some(data) = sse_line.strip_prefix("data: ") {
                    let data = data.trim();
                    if !data.is_empty() && serde_json::from_str::<serde_json::Value>(data).is_ok() {
                        payloads.push(data.to_string());
                    }
                }
            }
        } else {
            let body = body.trim();
            if !body.is_empty() {
                payloads.push(body.to_string());
            }
        }
    }

    Ok(PostOutcome {
        status,
        session_id: new_sid,
        stale_session,
        accepted,
        payloads,
    })
}

/// Discover live Victauri backends and select the one matching `app` (or the only one).
async fn discover_and_select(wait: bool, app: Option<&str>) -> Result<ServerInfo> {
    let max_attempts = if wait { 30 } else { 3 };
    let delay = std::time::Duration::from_secs(1);

    for attempt in 0..max_attempts {
        // Explicit env override wins (a developer pinning a specific port).
        if let Ok(p) = std::env::var("VICTAURI_PORT")
            && let Ok(port) = p.parse::<u16>()
            && health_ok(port).await
        {
            return Ok(ServerInfo {
                port,
                token: std::env::var("VICTAURI_AUTH_TOKEN")
                    .ok()
                    .or_else(discover_any_token),
                identifier: None,
                product_name: None,
            });
        }

        let mut servers = discover_servers();
        // Keep only live ones (health-checked).
        let mut live = Vec::new();
        for s in servers.drain(..) {
            if health_ok(s.port).await {
                live.push(s);
            }
        }

        match select(&live, app) {
            Selection::One(s) => {
                eprintln!("victauri-bridge: connected to {}", s.label());
                return Ok(s);
            }
            Selection::None if attempt + 1 < max_attempts => {
                if attempt == 0 {
                    eprintln!("victauri-bridge: waiting for Victauri server...");
                }
                tokio::time::sleep(delay).await;
            }
            Selection::None => {
                bail!(
                    "Could not connect to Victauri server.\n\
                     Is your Tauri app running (debug build)? Start it with: pnpm run tauri dev"
                );
            }
            Selection::Ambiguous(labels) => {
                bail!(
                    "Multiple Victauri apps are running:\n  {}\n\
                     Specify which one with `victauri bridge --app <identifier>` (or set \
                     VICTAURI_APP). The identifier is your Tauri bundle identifier.",
                    labels.join("\n  ")
                );
            }
        }
    }

    bail!("Could not connect to a matching Victauri server")
}

enum Selection {
    One(ServerInfo),
    None,
    Ambiguous(Vec<String>),
}

/// Pick the server matching `app`, or the sole running server.
fn select(live: &[ServerInfo], app: Option<&str>) -> Selection {
    if live.is_empty() {
        return Selection::None;
    }
    if let Some(app) = app {
        let needle = app.to_ascii_lowercase();
        // Prefer an exact identifier/product_name match, then a substring match.
        let exact = live.iter().find(|s| {
            s.identifier
                .as_deref()
                .map(str::to_ascii_lowercase)
                .as_deref()
                == Some(&needle)
                || s.product_name
                    .as_deref()
                    .map(str::to_ascii_lowercase)
                    .as_deref()
                    == Some(&needle)
        });
        if let Some(s) = exact {
            return Selection::One(s.clone());
        }
        let partial = live.iter().find(|s| {
            s.identifier
                .as_deref()
                .is_some_and(|i| i.to_ascii_lowercase().contains(&needle))
                || s.product_name
                    .as_deref()
                    .is_some_and(|p| p.to_ascii_lowercase().contains(&needle))
        });
        return match partial {
            Some(s) => Selection::One(s.clone()),
            None => Selection::None,
        };
    }
    // No app specified: fine if exactly one is running; ambiguous otherwise.
    if live.len() == 1 {
        Selection::One(live[0].clone())
    } else {
        Selection::Ambiguous(live.iter().map(ServerInfo::label).collect())
    }
}

/// Scan `<temp>/victauri/<pid>/` for live-process discovery entries (port + token + identity).
fn discover_servers() -> Vec<ServerInfo> {
    let root = std::env::temp_dir().join("victauri");
    let mut out = Vec::new();
    let Ok(entries) = std::fs::read_dir(&root) else {
        return out;
    };
    for entry in entries.filter_map(Result::ok) {
        let pid_str = entry.file_name().to_string_lossy().to_string();
        let Ok(pid) = pid_str.parse::<u32>() else {
            continue;
        };
        if !is_process_alive(pid) {
            continue;
        }
        let dir = entry.path();
        let Ok(port_s) = std::fs::read_to_string(dir.join("port")) else {
            continue;
        };
        let Ok(port) = port_s.trim().parse::<u16>() else {
            continue;
        };
        let token = std::fs::read_to_string(dir.join("token"))
            .ok()
            .map(|t| t.trim().to_string())
            .filter(|t| !t.is_empty());
        let (identifier, product_name) = std::fs::read_to_string(dir.join("metadata.json"))
            .ok()
            .and_then(|m| serde_json::from_str::<serde_json::Value>(&m).ok())
            .map_or((None, None), |m| {
                (
                    m.get("identifier")
                        .and_then(|v| v.as_str())
                        .map(String::from),
                    m.get("product_name")
                        .and_then(|v| v.as_str())
                        .map(String::from),
                )
            });
        out.push(ServerInfo {
            port,
            token,
            identifier,
            product_name,
        });
    }
    out
}

/// First token found among live discovery entries (used only for the `VICTAURI_PORT` override).
fn discover_any_token() -> Option<String> {
    discover_servers().into_iter().find_map(|s| s.token)
}

async fn health_ok(port: u16) -> bool {
    let url = format!("http://127.0.0.1:{port}/health");
    reqwest::Client::new()
        .get(&url)
        .timeout(std::time::Duration::from_secs(3))
        .send()
        .await
        .is_ok_and(|r| r.status().is_success())
}

#[cfg(windows)]
fn is_process_alive(pid: u32) -> bool {
    use std::process::Command;
    Command::new("tasklist")
        .args(["/FI", &format!("PID eq {pid}"), "/NH"])
        .output()
        .is_ok_and(|o| {
            let out = String::from_utf8_lossy(&o.stdout);
            out.contains(&pid.to_string())
        })
}

#[cfg(not(windows))]
fn is_process_alive(pid: u32) -> bool {
    // Portable POSIX liveness check. `/proc` is Linux-only — on macOS it does not exist,
    // so the old `/proc/{pid}` test always returned false and the bridge filtered out every
    // discovery entry (it could find NO server on macOS). `kill -0` sends no signal but
    // succeeds iff the process exists and is signalable by us — and discovery entries are
    // our own user's processes. Works identically on macOS and Linux.
    std::process::Command::new("kill")
        .args(["-0", &pid.to_string()])
        .stderr(std::process::Stdio::null())
        .status()
        .is_ok_and(|s| s.success())
}

#[cfg(test)]
mod tests {
    use super::*;

    fn srv(id: &str, name: &str, port: u16) -> ServerInfo {
        ServerInfo {
            port,
            token: None,
            identifier: Some(id.to_string()),
            product_name: Some(name.to_string()),
        }
    }

    #[test]
    fn selects_sole_server_without_app() {
        let live = vec![srv("com.a.app", "A", 7373)];
        assert!(matches!(select(&live, None), Selection::One(s) if s.port == 7373));
    }

    #[test]
    fn ambiguous_when_multiple_and_no_app() {
        let live = vec![srv("com.a.app", "A", 7373), srv("com.b.app", "B", 7374)];
        assert!(matches!(select(&live, None), Selection::Ambiguous(v) if v.len() == 2));
    }

    #[test]
    fn selects_by_identifier_among_many() {
        let live = vec![srv("com.a.app", "A", 7373), srv("com.4da.app", "4DA", 7374)];
        match select(&live, Some("com.4da.app")) {
            Selection::One(s) => assert_eq!(s.port, 7374),
            _ => panic!("should pick 4DA by identifier"),
        }
    }

    #[test]
    fn selects_by_product_name_case_insensitive() {
        let live = vec![
            srv("com.a.app", "Demo", 7373),
            srv("com.4da.app", "4DA", 7374),
        ];
        match select(&live, Some("4da")) {
            Selection::One(s) => assert_eq!(s.port, 7374),
            _ => panic!("should pick by product name"),
        }
    }

    #[test]
    fn no_match_returns_none() {
        let live = vec![srv("com.a.app", "A", 7373)];
        assert!(matches!(
            select(&live, Some("com.nope.app")),
            Selection::None
        ));
    }

    #[test]
    fn substring_identifier_match() {
        let live = vec![srv("com.victauri.demo", "Demo", 7373)];
        match select(&live, Some("demo")) {
            Selection::One(s) => assert_eq!(s.port, 7373),
            _ => panic!("substring of product/identifier should match"),
        }
    }

    // End-to-end against REAL discovery files: the plugin writes port/token/metadata.json
    // under `<temp>/victauri/<pid>/`; this proves the bridge parses those real files and can
    // select the right app by identity — even amid the many stale dirs left by dead processes.
    #[test]
    fn discover_servers_reads_real_metadata_and_selects() {
        let pid = std::process::id(); // alive → passes is_process_alive
        let dir = std::env::temp_dir().join("victauri").join(pid.to_string());
        std::fs::create_dir_all(&dir).unwrap();
        std::fs::write(dir.join("port"), "61999").unwrap();
        std::fs::write(dir.join("token"), "tok-xyz").unwrap();
        std::fs::write(
            dir.join("metadata.json"),
            r#"{"pid":1,"port":61999,"identifier":"com.test.discover","product_name":"DiscoverTest"}"#,
        )
        .unwrap();

        let servers = discover_servers();
        let mine = servers
            .iter()
            .find(|s| s.identifier.as_deref() == Some("com.test.discover"))
            .expect("bridge should discover the entry written for the live current pid");
        assert_eq!(mine.port, 61999);
        assert_eq!(mine.token.as_deref(), Some("tok-xyz"));
        assert_eq!(mine.product_name.as_deref(), Some("DiscoverTest"));

        // And selection by identity picks it out.
        assert!(matches!(
            select(std::slice::from_ref(mine), Some("com.test.discover")),
            Selection::One(_)
        ));

        let _ = std::fs::remove_dir_all(&dir);
    }
}