use serde_json::Value;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::mpsc;
use std::time::Duration;
static COUNTER: AtomicU32 = AtomicU32::new(0);
fn fresh_dir(prefix: &str) -> PathBuf {
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
let path = std::env::temp_dir().join(format!("wire-mesh-{prefix}-{pid}-{n}"));
let _ = std::fs::remove_dir_all(&path);
std::fs::create_dir_all(&path).unwrap();
path
}
fn wire_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_wire"))
}
fn wire(home: &PathBuf, args: &[&str]) -> std::process::Output {
let out = Command::new(wire_bin())
.args(args)
.env("WIRE_HOME", home)
.output()
.expect("spawn wire");
if !out.status.success() {
eprintln!(
"wire {args:?} failed under {home:?}:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
}
out
}
fn drive_pairing(host_home: &PathBuf, guest_home: &PathBuf, relay_url: &str) {
let mut host_proc = Command::new(wire_bin())
.args([
"pair-host",
"--relay",
relay_url,
"--yes",
"--timeout",
"30",
])
.env("WIRE_HOME", host_home)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn pair-host");
let stderr_pipe = host_proc.stderr.take().unwrap();
let (tx, rx) = mpsc::channel::<String>();
let stderr_capture = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let stderr_clone = stderr_capture.clone();
std::thread::spawn(move || {
let reader = BufReader::new(stderr_pipe);
let mut found = false;
for line in reader.lines().map_while(Result::ok) {
stderr_clone.lock().unwrap().push_str(&line);
stderr_clone.lock().unwrap().push('\n');
let trimmed = line.trim();
if !found && trimmed.len() == 9 && trimmed.chars().nth(2) == Some('-') {
tx.send(trimmed.to_string()).ok();
found = true;
}
}
});
let code = rx
.recv_timeout(Duration::from_secs(10))
.expect("never received code from pair-host");
let guest_handle = std::thread::spawn({
let guest_home = guest_home.clone();
let relay_url = relay_url.to_string();
let code = code.clone();
move || {
wire(
&guest_home,
&[
"pair-join",
&code,
"--relay",
&relay_url,
"--yes",
"--timeout",
"30",
],
)
}
});
let guest_out = guest_handle.join().expect("guest panicked");
assert!(
guest_out.status.success(),
"pair-join failed: stderr={}",
String::from_utf8_lossy(&guest_out.stderr)
);
let host_out = host_proc.wait_with_output().expect("host wait failed");
assert!(
host_out.status.success(),
"pair-host failed; captured stderr=\n{}",
stderr_capture.lock().unwrap()
);
let host_stdout = String::from_utf8(host_out.stdout).unwrap();
let guest_stdout = String::from_utf8(guest_out.stdout).unwrap();
let host_final: Value =
serde_json::from_str(host_stdout.trim().lines().last().unwrap()).unwrap();
let guest_final: Value =
serde_json::from_str(guest_stdout.trim().lines().last().unwrap()).unwrap();
assert_eq!(
host_final["sas"], guest_final["sas"],
"SAS mismatch between {host_home:?} and {guest_home:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn three_party_mesh_of_bilateral_round_trips() {
let relay_dir = fresh_dir("relay");
let relay = wire::relay_server::Relay::new(relay_dir).await.unwrap();
let app = relay.router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.ok() });
tokio::time::sleep(Duration::from_millis(50)).await;
let relay_url = format!("http://{addr}");
let paul = fresh_dir("paul");
let willard = fresh_dir("willard");
let carol = fresh_dir("carol");
assert!(wire(&paul, &["init", "paul"]).status.success());
assert!(wire(&willard, &["init", "willard"]).status.success());
assert!(wire(&carol, &["init", "carol"]).status.success());
drive_pairing(&paul, &willard, &relay_url);
drive_pairing(&paul, &carol, &relay_url);
drive_pairing(&willard, &carol, &relay_url);
for (home, expected_peers) in [
(&paul, vec!["willard", "carol"]),
(&willard, vec!["paul", "carol"]),
(&carol, vec!["paul", "willard"]),
] {
let trust: Value = serde_json::from_str(
&std::fs::read_to_string(home.join("config/wire/trust.json")).unwrap(),
)
.unwrap();
for peer in &expected_peers {
assert_eq!(
trust["agents"][peer]["tier"], "VERIFIED",
"{home:?} missing or wrong tier for peer {peer}"
);
}
let relay_state: Value = serde_json::from_str(
&std::fs::read_to_string(home.join("config/wire/relay.json")).unwrap(),
)
.unwrap();
for peer in &expected_peers {
assert!(
relay_state["peers"][peer].is_object(),
"{home:?} missing relay coords for {peer}"
);
}
}
assert!(
wire(&paul, &["send", "willard", "decision", "P->W"])
.status
.success()
);
assert!(
wire(&paul, &["send", "carol", "decision", "P->C"])
.status
.success()
);
assert!(
wire(&willard, &["send", "paul", "decision", "W->P"])
.status
.success()
);
assert!(
wire(&willard, &["send", "carol", "decision", "W->C"])
.status
.success()
);
assert!(
wire(&carol, &["send", "paul", "decision", "C->P"])
.status
.success()
);
assert!(
wire(&carol, &["send", "willard", "decision", "C->W"])
.status
.success()
);
for home in [&paul, &willard, &carol] {
let push_out = wire(home, &["push", "--json"]);
assert!(push_out.status.success());
let pj: Value = serde_json::from_slice(&push_out.stdout).unwrap();
assert_eq!(
pj["pushed"].as_array().unwrap().len(),
2,
"expected exactly 2 pushed for {home:?}, got {pj}"
);
}
for home in [&paul, &willard, &carol] {
let pull_out = wire(home, &["pull", "--json"]);
assert!(pull_out.status.success());
let pj: Value = serde_json::from_slice(&pull_out.stdout).unwrap();
assert_eq!(
pj["written"].as_array().unwrap().len(),
2,
"expected exactly 2 verified pulls for {home:?}, got {pj}"
);
assert_eq!(pj["rejected"].as_array().unwrap().len(), 0);
}
fn tail_bodies(home: &PathBuf, peer: &str) -> Vec<String> {
let out = wire(home, &["tail", peer, "--json"]);
assert!(out.status.success());
String::from_utf8(out.stdout)
.unwrap()
.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.map(|v| v["body"].as_str().unwrap_or("").to_string())
.collect()
}
assert_eq!(tail_bodies(&paul, "willard"), vec!["W->P".to_string()]);
assert_eq!(tail_bodies(&paul, "carol"), vec!["C->P".to_string()]);
assert_eq!(tail_bodies(&willard, "paul"), vec!["P->W".to_string()]);
assert_eq!(tail_bodies(&willard, "carol"), vec!["C->W".to_string()]);
assert_eq!(tail_bodies(&carol, "paul"), vec!["P->C".to_string()]);
assert_eq!(tail_bodies(&carol, "willard"), vec!["W->C".to_string()]);
for home in [&paul, &willard, &carol] {
let out = wire(home, &["tail", "--json"]);
let lines = String::from_utf8(out.stdout)
.unwrap()
.lines()
.filter(|l| !l.is_empty())
.count();
assert_eq!(
lines, 2,
"expected exactly 2 events in {home:?} inbox total, got {lines}"
);
}
}