use std::io::{BufRead, Write};
use std::sync::{Arc, Mutex};
use anyhow::{Result, bail};
const MAX_RETRIES: usize = 4;
const RETRY_DELAY_MS: u64 = 400;
#[derive(Clone, Debug)]
struct ServerInfo {
port: u16,
token: Option<String>,
identifier: Option<String>,
product_name: Option<String>,
}
impl ServerInfo {
fn label(&self) -> String {
let name = self
.identifier
.as_deref()
.or(self.product_name.as_deref())
.unwrap_or("<unknown app>");
format!("{name} (port {})", self.port)
}
}
pub async fn run(wait: bool, app: Option<String>) -> Result<()> {
let app = app.or_else(|| std::env::var("VICTAURI_APP").ok());
let connection = Arc::new(Mutex::new(discover_and_select(wait, app.as_deref()).await?));
let session_id: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let cached_init: Arc<Mutex<Option<serde_json::Value>>> = Arc::new(Mutex::new(None));
let http = build_client()?;
let stdin = std::io::stdin();
let stdout = std::io::stdout();
for line in stdin.lock().lines() {
let Ok(line) = line else { break };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let msg: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => {
eprintln!("victauri-bridge: invalid JSON on stdin: {e}");
continue;
}
};
let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
let is_initialize = method == "initialize";
if is_initialize {
*cached_init.lock().expect("cached_init lock") = Some(msg.clone());
}
let is_notification = msg.get("id").is_none();
let mut last_err = None;
for attempt in 0..MAX_RETRIES {
if !is_initialize {
let need_reinit = session_id.lock().expect("session lock").is_none();
if need_reinit {
let init = cached_init.lock().expect("cached_init lock").clone();
if let Some(init) = init {
let (port, token) = conn_parts(&connection);
if let Ok(out) =
post_message(&http, port, token.as_deref(), None, &init).await
&& let Some(sid) = out.session_id
{
*session_id.lock().expect("session lock") = Some(sid);
}
}
}
}
let (port, token) = conn_parts(&connection);
let sid = session_id.lock().expect("session lock").clone();
match post_message(&http, port, token.as_deref(), sid.as_deref(), &msg).await {
Ok(out) => {
if let Some(new_sid) = out.session_id {
*session_id.lock().expect("session lock") = Some(new_sid);
}
if out.stale_session {
eprintln!(
"victauri-bridge: stale session (HTTP {}), re-establishing (attempt {}/{})",
out.status,
attempt + 1,
MAX_RETRIES
);
*session_id.lock().expect("session lock") = None;
if attempt + 1 < MAX_RETRIES {
tokio::time::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS))
.await;
if let Ok(new_conn) = discover_and_select(false, app.as_deref()).await {
*connection.lock().expect("conn lock") = new_conn;
}
}
last_err = Some(format!("Victauri returned {}", out.status));
continue;
}
if is_notification && out.accepted {
last_err = None;
break;
}
for payload in out.payloads {
let mut o = stdout.lock();
let _ = writeln!(o, "{payload}");
let _ = o.flush();
}
last_err = None;
break;
}
Err(e) => {
eprintln!(
"victauri-bridge: connection failed (attempt {}/{}): {e}",
attempt + 1,
MAX_RETRIES
);
*session_id.lock().expect("session lock") = None;
if attempt + 1 < MAX_RETRIES {
tokio::time::sleep(std::time::Duration::from_millis(
RETRY_DELAY_MS * (attempt as u64 + 1),
))
.await;
if let Ok(new_conn) = discover_and_select(true, app.as_deref()).await {
*connection.lock().expect("conn lock") = new_conn;
eprintln!("victauri-bridge: reconnected to {}", {
let g = connection.lock().expect("conn lock");
g.label()
});
}
}
last_err = Some(format!("Victauri server unreachable: {e}"));
continue;
}
}
}
if let Some(err_msg) = last_err
&& !is_notification
{
let err_resp = serde_json::json!({
"jsonrpc": "2.0",
"id": msg.get("id"),
"error": { "code": -32000, "message": err_msg }
});
let mut o = stdout.lock();
let _ = writeln!(o, "{err_resp}");
let _ = o.flush();
}
}
Ok(())
}
fn build_client() -> Result<reqwest::Client> {
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.map_err(Into::into)
}
fn conn_parts(connection: &Arc<Mutex<ServerInfo>>) -> (u16, Option<String>) {
let g = connection.lock().expect("conn lock");
(g.port, g.token.clone())
}
struct PostOutcome {
status: u16,
session_id: Option<String>,
stale_session: bool,
accepted: bool,
payloads: Vec<String>,
}
async fn post_message(
http: &reqwest::Client,
port: u16,
token: Option<&str>,
session_id: Option<&str>,
msg: &serde_json::Value,
) -> Result<PostOutcome> {
let url = format!("http://127.0.0.1:{port}/mcp");
let mut req = http
.post(&url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream");
if let Some(t) = token {
req = req.header("Authorization", format!("Bearer {t}"));
}
if let Some(sid) = session_id {
req = req.header("Mcp-Session-Id", sid);
}
let resp = req.json(msg).send().await?;
let status = resp.status().as_u16();
let new_sid = resp
.headers()
.get("mcp-session-id")
.and_then(|v| v.to_str().ok())
.map(String::from);
let stale_session = matches!(status, 404 | 409 | 422);
let accepted = status == 202;
let mut payloads = Vec::new();
if !stale_session && status != 202 {
let content_type = resp
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
let body = resp.text().await.unwrap_or_default();
if !(200..300).contains(&status) {
payloads.push(
serde_json::json!({
"jsonrpc": "2.0",
"id": msg.get("id"),
"error": { "code": -32000, "message": format!("Victauri returned {status}: {body}") }
})
.to_string(),
);
} else if content_type.contains("text/event-stream") {
for sse_line in body.lines() {
if let Some(data) = sse_line.strip_prefix("data: ") {
let data = data.trim();
if !data.is_empty() && serde_json::from_str::<serde_json::Value>(data).is_ok() {
payloads.push(data.to_string());
}
}
}
} else {
let body = body.trim();
if !body.is_empty() {
payloads.push(body.to_string());
}
}
}
Ok(PostOutcome {
status,
session_id: new_sid,
stale_session,
accepted,
payloads,
})
}
async fn discover_and_select(wait: bool, app: Option<&str>) -> Result<ServerInfo> {
let max_attempts = if wait { 30 } else { 3 };
let delay = std::time::Duration::from_secs(1);
for attempt in 0..max_attempts {
if let Ok(p) = std::env::var("VICTAURI_PORT")
&& let Ok(port) = p.parse::<u16>()
&& health_ok(port).await
{
return Ok(ServerInfo {
port,
token: std::env::var("VICTAURI_AUTH_TOKEN")
.ok()
.or_else(discover_any_token),
identifier: None,
product_name: None,
});
}
let mut servers = discover_servers();
let mut live = Vec::new();
for s in servers.drain(..) {
if health_ok(s.port).await {
live.push(s);
}
}
match select(&live, app) {
Selection::One(s) => {
eprintln!("victauri-bridge: connected to {}", s.label());
return Ok(s);
}
Selection::None if attempt + 1 < max_attempts => {
if attempt == 0 {
eprintln!("victauri-bridge: waiting for Victauri server...");
}
tokio::time::sleep(delay).await;
}
Selection::None => {
bail!(
"Could not connect to Victauri server.\n\
Is your Tauri app running (debug build)? Start it with: pnpm run tauri dev"
);
}
Selection::Ambiguous(labels) => {
bail!(
"Multiple Victauri apps are running:\n {}\n\
Specify which one with `victauri bridge --app <identifier>` (or set \
VICTAURI_APP). The identifier is your Tauri bundle identifier.",
labels.join("\n ")
);
}
}
}
bail!("Could not connect to a matching Victauri server")
}
enum Selection {
One(ServerInfo),
None,
Ambiguous(Vec<String>),
}
fn select(live: &[ServerInfo], app: Option<&str>) -> Selection {
if live.is_empty() {
return Selection::None;
}
if let Some(app) = app {
let needle = app.to_ascii_lowercase();
let exact = live.iter().find(|s| {
s.identifier
.as_deref()
.map(str::to_ascii_lowercase)
.as_deref()
== Some(&needle)
|| s.product_name
.as_deref()
.map(str::to_ascii_lowercase)
.as_deref()
== Some(&needle)
});
if let Some(s) = exact {
return Selection::One(s.clone());
}
let partial = live.iter().find(|s| {
s.identifier
.as_deref()
.is_some_and(|i| i.to_ascii_lowercase().contains(&needle))
|| s.product_name
.as_deref()
.is_some_and(|p| p.to_ascii_lowercase().contains(&needle))
});
return match partial {
Some(s) => Selection::One(s.clone()),
None => Selection::None,
};
}
if live.len() == 1 {
Selection::One(live[0].clone())
} else {
Selection::Ambiguous(live.iter().map(ServerInfo::label).collect())
}
}
fn discover_servers() -> Vec<ServerInfo> {
let root = std::env::temp_dir().join("victauri");
let mut out = Vec::new();
let Ok(entries) = std::fs::read_dir(&root) else {
return out;
};
for entry in entries.filter_map(Result::ok) {
let pid_str = entry.file_name().to_string_lossy().to_string();
let Ok(pid) = pid_str.parse::<u32>() else {
continue;
};
if !is_process_alive(pid) {
continue;
}
let dir = entry.path();
let Ok(port_s) = std::fs::read_to_string(dir.join("port")) else {
continue;
};
let Ok(port) = port_s.trim().parse::<u16>() else {
continue;
};
let token = std::fs::read_to_string(dir.join("token"))
.ok()
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty());
let (identifier, product_name) = std::fs::read_to_string(dir.join("metadata.json"))
.ok()
.and_then(|m| serde_json::from_str::<serde_json::Value>(&m).ok())
.map_or((None, None), |m| {
(
m.get("identifier")
.and_then(|v| v.as_str())
.map(String::from),
m.get("product_name")
.and_then(|v| v.as_str())
.map(String::from),
)
});
out.push(ServerInfo {
port,
token,
identifier,
product_name,
});
}
out
}
fn discover_any_token() -> Option<String> {
discover_servers().into_iter().find_map(|s| s.token)
}
async fn health_ok(port: u16) -> bool {
let url = format!("http://127.0.0.1:{port}/health");
reqwest::Client::new()
.get(&url)
.timeout(std::time::Duration::from_secs(3))
.send()
.await
.is_ok_and(|r| r.status().is_success())
}
#[cfg(windows)]
fn is_process_alive(pid: u32) -> bool {
use std::process::Command;
Command::new("tasklist")
.args(["/FI", &format!("PID eq {pid}"), "/NH"])
.output()
.is_ok_and(|o| {
let out = String::from_utf8_lossy(&o.stdout);
out.contains(&pid.to_string())
})
}
#[cfg(not(windows))]
fn is_process_alive(pid: u32) -> bool {
std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.stderr(std::process::Stdio::null())
.status()
.is_ok_and(|s| s.success())
}
#[cfg(test)]
mod tests {
use super::*;
fn srv(id: &str, name: &str, port: u16) -> ServerInfo {
ServerInfo {
port,
token: None,
identifier: Some(id.to_string()),
product_name: Some(name.to_string()),
}
}
#[test]
fn selects_sole_server_without_app() {
let live = vec![srv("com.a.app", "A", 7373)];
assert!(matches!(select(&live, None), Selection::One(s) if s.port == 7373));
}
#[test]
fn ambiguous_when_multiple_and_no_app() {
let live = vec![srv("com.a.app", "A", 7373), srv("com.b.app", "B", 7374)];
assert!(matches!(select(&live, None), Selection::Ambiguous(v) if v.len() == 2));
}
#[test]
fn selects_by_identifier_among_many() {
let live = vec![srv("com.a.app", "A", 7373), srv("com.4da.app", "4DA", 7374)];
match select(&live, Some("com.4da.app")) {
Selection::One(s) => assert_eq!(s.port, 7374),
_ => panic!("should pick 4DA by identifier"),
}
}
#[test]
fn selects_by_product_name_case_insensitive() {
let live = vec![
srv("com.a.app", "Demo", 7373),
srv("com.4da.app", "4DA", 7374),
];
match select(&live, Some("4da")) {
Selection::One(s) => assert_eq!(s.port, 7374),
_ => panic!("should pick by product name"),
}
}
#[test]
fn no_match_returns_none() {
let live = vec![srv("com.a.app", "A", 7373)];
assert!(matches!(
select(&live, Some("com.nope.app")),
Selection::None
));
}
#[test]
fn substring_identifier_match() {
let live = vec![srv("com.victauri.demo", "Demo", 7373)];
match select(&live, Some("demo")) {
Selection::One(s) => assert_eq!(s.port, 7373),
_ => panic!("substring of product/identifier should match"),
}
}
#[test]
fn discover_servers_reads_real_metadata_and_selects() {
let pid = std::process::id(); let dir = std::env::temp_dir().join("victauri").join(pid.to_string());
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join("port"), "61999").unwrap();
std::fs::write(dir.join("token"), "tok-xyz").unwrap();
std::fs::write(
dir.join("metadata.json"),
r#"{"pid":1,"port":61999,"identifier":"com.test.discover","product_name":"DiscoverTest"}"#,
)
.unwrap();
let servers = discover_servers();
let mine = servers
.iter()
.find(|s| s.identifier.as_deref() == Some("com.test.discover"))
.expect("bridge should discover the entry written for the live current pid");
assert_eq!(mine.port, 61999);
assert_eq!(mine.token.as_deref(), Some("tok-xyz"));
assert_eq!(mine.product_name.as_deref(), Some("DiscoverTest"));
assert!(matches!(
select(std::slice::from_ref(mine), Some("com.test.discover")),
Selection::One(_)
));
let _ = std::fs::remove_dir_all(&dir);
}
}