use serde_json::Value;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
static COUNTER: AtomicU32 = AtomicU32::new(0);
const FLOOD_COUNT: usize = 50;
fn fresh_dir(prefix: &str) -> PathBuf {
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
let path = std::env::temp_dir().join(format!("wire-stress-within-{prefix}-{pid}-{n}"));
let _ = std::fs::remove_dir_all(&path);
std::fs::create_dir_all(&path).unwrap();
path
}
fn wire_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_wire"))
}
fn wire(home: &PathBuf, args: &[&str]) -> std::process::Output {
let out = Command::new(wire_bin())
.args(args)
.env("WIRE_HOME", home)
.output()
.expect("spawn wire");
if !out.status.success() {
eprintln!(
"wire {args:?} failed:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
}
out
}
fn wait_until<F: FnMut() -> bool>(deadline: Instant, mut f: F) -> bool {
while Instant::now() < deadline {
if f() {
return true;
}
std::thread::sleep(Duration::from_millis(100));
}
false
}
async fn spawn_federation_relay() -> String {
let dir = fresh_dir("relay-fed");
let relay = wire::relay_server::Relay::new(dir).await.unwrap();
let app = relay.router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.ok() });
tokio::time::sleep(Duration::from_millis(50)).await;
format!("http://{addr}")
}
async fn spawn_local_only_relay() -> String {
let dir = fresh_dir("relay-local");
let relay = wire::relay_server::Relay::new(dir).await.unwrap();
let app = relay.router_with_mode(wire::relay_server::ServerMode { local_only: true });
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.ok() });
tokio::time::sleep(Duration::from_millis(50)).await;
format!("http://{addr}")
}
async fn attach_local_endpoint(home: &Path, handle: &str, local_relay_url: &str) {
let client = reqwest::Client::new();
let resp = client
.post(format!("{local_relay_url}/v1/slot/allocate"))
.json(&serde_json::json!({"handle": handle}))
.send()
.await
.expect("local slot allocate request");
assert!(
resp.status().is_success(),
"local slot allocate failed: {}",
resp.status()
);
let alloc: Value = resp.json().await.expect("local alloc JSON");
let local_slot_id = alloc["slot_id"].as_str().unwrap().to_string();
let local_slot_token = alloc["slot_token"].as_str().unwrap().to_string();
let state_path = home.join("config").join("wire").join("relay.json");
let bytes = std::fs::read(&state_path).expect("relay.json missing");
let mut state: Value = serde_json::from_slice(&bytes).expect("relay.json malformed");
let fed_url = state["self"]["relay_url"]
.as_str()
.expect("self.relay_url missing")
.to_string();
let fed_slot_id = state["self"]["slot_id"]
.as_str()
.expect("self.slot_id missing")
.to_string();
let fed_slot_token = state["self"]["slot_token"]
.as_str()
.expect("self.slot_token missing")
.to_string();
state["self"]["endpoints"] = serde_json::json!([
{
"relay_url": fed_url,
"slot_id": fed_slot_id,
"slot_token": fed_slot_token,
"scope": "federation"
},
{
"relay_url": local_relay_url,
"slot_id": local_slot_id,
"slot_token": local_slot_token,
"scope": "local"
}
]);
std::fs::write(&state_path, serde_json::to_vec_pretty(&state).unwrap()).unwrap();
}
async fn pair_two_homes_with_local_endpoints(
fed_url: &str,
local_url: &str,
alice_name: &str,
bob_name: &str,
) -> (PathBuf, PathBuf) {
let host_only = fed_url
.trim_start_matches("http://")
.split(':')
.next()
.unwrap_or("127.0.0.1");
let alice = fresh_dir(alice_name);
assert!(
wire(&alice, &["init", alice_name, "--relay", fed_url])
.status
.success()
);
assert!(
wire(
&alice,
&["claim", alice_name, "--public-url", fed_url, "--json"]
)
.status
.success()
);
attach_local_endpoint(&alice, alice_name, local_url).await;
let bob = fresh_dir(bob_name);
assert!(
wire(&bob, &["init", bob_name, "--relay", fed_url])
.status
.success()
);
assert!(
wire(
&bob,
&["claim", bob_name, "--public-url", fed_url, "--json"]
)
.status
.success()
);
attach_local_endpoint(&bob, bob_name, local_url).await;
let bob_handle = format!("{alice_name}@{host_only}");
let add_out = wire(&bob, &["add", &bob_handle, "--relay", fed_url, "--json"]);
assert!(
add_out.status.success(),
"bob `wire add` failed: {}",
String::from_utf8_lossy(&add_out.stderr)
);
let alice_has_pending = wait_until(Instant::now() + Duration::from_secs(15), || {
let _ = wire(&alice, &["pull", "--json"]);
let p = wire(&alice, &["pair-list-inbound", "--json"]);
String::from_utf8_lossy(&p.stdout).contains(bob_name)
});
assert!(alice_has_pending, "alice never received pending-inbound");
assert!(
wire(&alice, &["pair-accept", bob_name, "--json"])
.status
.success()
);
let bob_pinned = wait_until(Instant::now() + Duration::from_secs(15), || {
let _ = wire(&bob, &["pull", "--json"]);
let p = wire(&bob, &["peers", "--json"]);
String::from_utf8_lossy(&p.stdout).contains(alice_name)
});
assert!(bob_pinned, "bob never pinned alice via pair_drop_ack");
(alice, bob)
}
fn count_inbox_lines(home: &Path, peer: &str) -> usize {
let inbox = home
.join("state")
.join("wire")
.join("inbox")
.join(format!("{peer}.jsonl"));
std::fs::read_to_string(&inbox)
.unwrap_or_default()
.lines()
.filter(|l| !l.trim().is_empty())
.count()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn pair_all_local_mesh_pairs_every_sister_session_v0_6_0() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("pair-all-local-mesh");
for name in &["alpha", "beth", "charlie"] {
let out = wire(
&home,
&[
"session",
"new",
name,
"--relay",
&fed_url,
"--with-local",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"session new {name} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
let out = wire(
&home,
&[
"session",
"pair-all-local",
"--federation-relay",
&fed_url,
"--settle-secs",
"1",
"--json",
],
);
assert!(
out.status.success(),
"pair-all-local failed: stdout={} stderr={}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
let summary: Value = serde_json::from_slice(&out.stdout).unwrap();
assert_eq!(
summary["pairs_attempted"].as_u64().unwrap_or(0),
3,
"expected 3 pairs (3 choose 2): {summary}"
);
assert_eq!(
summary["pairs_succeeded"].as_u64().unwrap_or(0),
3,
"expected all 3 succeeded: {summary}"
);
assert_eq!(summary["pairs_failed"].as_u64().unwrap_or(0), 0);
let sessions_root = home.join("sessions");
for name in &["alpha", "beth", "charlie"] {
let relay_path = sessions_root
.join(name)
.join("config")
.join("wire")
.join("relay.json");
let state: Value =
serde_json::from_slice(&std::fs::read(&relay_path).expect("relay.json missing"))
.expect("relay.json parse");
let peers = state["peers"]
.as_object()
.expect("session must have peers map");
for other in &["alpha", "beth", "charlie"] {
if other == name {
continue;
}
assert!(
peers.contains_key(*other),
"session {name} missing peer {other}: peers={:?}",
peers.keys().collect::<Vec<_>>()
);
}
}
let out2 = wire(
&home,
&[
"session",
"pair-all-local",
"--federation-relay",
&fed_url,
"--settle-secs",
"1",
"--json",
],
);
assert!(out2.status.success(), "pair-all-local re-run failed");
let summary2: Value = serde_json::from_slice(&out2.stdout).unwrap();
assert_eq!(
summary2["pairs_skipped_already_paired"]
.as_u64()
.unwrap_or(0),
3,
"re-run should skip 3 already-paired: {summary2}"
);
assert_eq!(summary2["pairs_succeeded"].as_u64().unwrap_or(0), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn local_only_sessions_pair_without_federation_v0_6_6() {
let _fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("local-only-mesh");
let session_names = ["wire", "slancha", "playground"];
for name in &session_names {
let out = wire(
&home,
&[
"session",
"new",
name,
"--local-only",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"local-only session new {name} failed: stderr={}",
String::from_utf8_lossy(&out.stderr)
);
}
let sessions_dir = home.join("sessions");
for name in &session_names {
let relay_path = sessions_dir
.join(name)
.join("config")
.join("wire")
.join("relay.json");
let state: Value =
serde_json::from_slice(&std::fs::read(&relay_path).expect("relay.json present"))
.expect("relay.json parse");
let endpoints = state["self"]["endpoints"]
.as_array()
.expect("endpoints array");
assert!(
endpoints
.iter()
.all(|e| e["scope"].as_str() == Some("local")),
"local-only session {name} has non-local endpoints: {endpoints:?}"
);
assert!(
!endpoints.is_empty(),
"local-only session {name} has no endpoints — local relay probe must have succeeded"
);
}
let pair_out = wire(
&home,
&["session", "pair-all-local", "--settle-secs", "1", "--json"],
);
assert!(
pair_out.status.success(),
"pair-all-local on local-only sessions failed: stderr={}",
String::from_utf8_lossy(&pair_out.stderr)
);
let summary: Value = serde_json::from_slice(&pair_out.stdout).expect("JSON");
assert_eq!(
summary["pairs_attempted"].as_u64().unwrap_or(0),
3,
"3 choose 2 = 3 pairs: {summary}"
);
assert_eq!(
summary["pairs_succeeded"].as_u64().unwrap_or(0),
3,
"every pair should succeed even with reserved-nick sessions: {summary}"
);
assert_eq!(summary["pairs_failed"].as_u64().unwrap_or(99), 0);
for name in &session_names {
let relay_path = sessions_dir
.join(name)
.join("config")
.join("wire")
.join("relay.json");
let state: Value = serde_json::from_slice(&std::fs::read(&relay_path).expect("relay.json"))
.expect("parse");
let peers = state["peers"].as_object().expect("peers map");
for other in &session_names {
if other == name {
continue;
}
assert!(
peers.contains_key(*other),
"session {name} missing local-paired peer {other}: peers={:?}",
peers.keys().collect::<Vec<_>>()
);
}
}
let wire_home = sessions_dir.join("wire");
let bcast = wire(
&wire_home,
&[
"mesh",
"broadcast",
"--scope",
"local",
"--json",
"hello from reserved-nick",
],
);
assert!(
bcast.status.success(),
"broadcast from local-only `wire` failed: stderr={}",
String::from_utf8_lossy(&bcast.stderr)
);
let bs: Value = serde_json::from_slice(&bcast.stdout).expect("JSON");
assert_eq!(
bs["delivered"].as_u64().unwrap_or(0),
2,
"broadcast should reach the other 2 sisters: {bs}"
);
assert_eq!(bs["failed"].as_u64().unwrap_or(99), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mesh_route_picks_one_sister_per_strategy_v0_6_5() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("mesh-route");
let session_roles = [
("alpha", "planner"),
("beth", "reviewer"),
("charlie", "reviewer"),
];
for (name, _) in &session_roles {
let out = wire(
&home,
&[
"session",
"new",
name,
"--relay",
&fed_url,
"--with-local",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"session new {name} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
let pair_out = wire(
&home,
&[
"session",
"pair-all-local",
"--federation-relay",
&fed_url,
"--settle-secs",
"1",
"--json",
],
);
assert!(
pair_out.status.success(),
"pair-all-local failed: stderr={}",
String::from_utf8_lossy(&pair_out.stderr)
);
for (name, role) in &session_roles {
let session_home = home.join("sessions").join(name);
let out = wire(&session_home, &["mesh", "role", "set", role]);
assert!(out.status.success(), "set role {role} for {name}");
}
let alpha_home = home.join("sessions").join("alpha");
let out = wire(
&alpha_home,
&[
"mesh",
"route",
"reviewer",
"--strategy",
"first",
"--json",
"deterministic",
],
);
assert!(
out.status.success(),
"first-strategy route failed: {}",
String::from_utf8_lossy(&out.stderr)
);
let summary: Value = serde_json::from_slice(&out.stdout).expect("JSON");
assert_eq!(summary["routed_to"].as_str(), Some("beth"));
assert_eq!(summary["delivered"].as_bool(), Some(true));
let mut rr_routes: Vec<String> = Vec::new();
for _ in 0..4 {
let out = wire(
&alpha_home,
&[
"mesh",
"route",
"reviewer",
"--strategy",
"round-robin",
"--json",
"rr",
],
);
assert!(
out.status.success(),
"rr route failed: {}",
String::from_utf8_lossy(&out.stderr)
);
let v: Value = serde_json::from_slice(&out.stdout).expect("JSON");
rr_routes.push(v["routed_to"].as_str().unwrap_or("?").to_string());
}
let beth_hits = rr_routes.iter().filter(|h| *h == "beth").count();
let charlie_hits = rr_routes.iter().filter(|h| *h == "charlie").count();
assert_eq!(
beth_hits, 2,
"round-robin should hit beth exactly 2× in 4 calls: routes={rr_routes:?}"
);
assert_eq!(
charlie_hits, 2,
"round-robin should hit charlie exactly 2× in 4 calls: routes={rr_routes:?}"
);
let mut beth_random = 0usize;
let mut charlie_random = 0usize;
for _ in 0..20 {
let out = wire(
&alpha_home,
&[
"mesh",
"route",
"reviewer",
"--strategy",
"random",
"--json",
"rand",
],
);
assert!(out.status.success(), "random route failed");
let v: Value = serde_json::from_slice(&out.stdout).expect("JSON");
match v["routed_to"].as_str().unwrap_or("?") {
"beth" => beth_random += 1,
"charlie" => charlie_random += 1,
other => panic!("unexpected route target `{other}`"),
}
}
assert!(
beth_random > 0 && charlie_random > 0,
"random should hit both reviewers in 20 calls: beth={beth_random} charlie={charlie_random}"
);
let none = wire(
&alpha_home,
&["mesh", "route", "nonexistent-role", "--json", "nope"],
);
assert!(!none.status.success(), "nonexistent role should error");
let stderr = String::from_utf8_lossy(&none.stderr);
assert!(
stderr.contains("wire mesh role list"),
"error message should hint at `wire mesh role list`: {stderr}"
);
let exc = wire(
&alpha_home,
&[
"mesh",
"route",
"reviewer",
"--exclude",
"beth",
"--strategy",
"first",
"--json",
"exclude",
],
);
assert!(exc.status.success());
let v: Value = serde_json::from_slice(&exc.stdout).expect("JSON");
assert_eq!(v["routed_to"].as_str(), Some("charlie"));
let cands = v["candidates"].as_array().unwrap();
assert_eq!(
cands.len(),
1,
"exclude should leave 1 candidate: {cands:?}"
);
assert_eq!(cands[0].as_str(), Some("charlie"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mesh_role_set_list_round_trips_v0_6_4() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("mesh-role");
let session_roles = [
("alpha", "planner"),
("beth", "reviewer"),
("charlie", "coder"),
];
for (name, _) in &session_roles {
let out = wire(
&home,
&[
"session",
"new",
name,
"--relay",
&fed_url,
"--with-local",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"session new {name} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
for (name, role) in &session_roles {
let session_home = home.join("sessions").join(name);
let out = wire(&session_home, &["mesh", "role", "set", role]);
assert!(
out.status.success(),
"mesh role set {role} for {name} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
for (name, _) in &session_roles {
let session_home = home.join("sessions").join(name);
let out = wire(&session_home, &["mesh", "role", "list", "--json"]);
assert!(
out.status.success(),
"mesh role list failed for {name}: {}",
String::from_utf8_lossy(&out.stderr)
);
let view: Value = serde_json::from_slice(&out.stdout).expect("JSON");
let rows = view["sessions"].as_array().expect("sessions array");
assert_eq!(rows.len(), 3, "list should see 3 sister sessions: {view}");
for (expected_name, expected_role) in &session_roles {
let found = rows
.iter()
.find(|r| r["name"].as_str() == Some(expected_name));
let row = found
.unwrap_or_else(|| panic!("session {expected_name} missing from list: {view}"));
assert_eq!(
row["role"].as_str(),
Some(*expected_role),
"session {expected_name} role mismatch: {row}"
);
}
}
let alpha_home = home.join("sessions").join("alpha");
let illegal = wire(&alpha_home, &["mesh", "role", "set", "bad role"]);
assert!(
!illegal.status.success(),
"space-in-role should be rejected"
);
assert!(
String::from_utf8_lossy(&illegal.stderr).contains("illegal char"),
"illegal char rejection message expected, got: {}",
String::from_utf8_lossy(&illegal.stderr)
);
let oversize: String = "a".repeat(33);
let too_long = wire(&alpha_home, &["mesh", "role", "set", &oversize]);
assert!(
!too_long.status.success(),
"33-char role should be rejected"
);
let cleared = wire(&alpha_home, &["mesh", "role", "clear", "--json"]);
assert!(cleared.status.success(), "mesh role clear failed");
let after: Value =
serde_json::from_slice(&wire(&alpha_home, &["mesh", "role", "list", "--json"]).stdout)
.expect("JSON");
let alpha_row = after["sessions"]
.as_array()
.and_then(|a| a.iter().find(|r| r["name"].as_str() == Some("alpha")))
.expect("alpha row");
assert!(
alpha_row["role"].is_null(),
"alpha role should be null after clear: {alpha_row}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mesh_broadcast_fans_to_every_paired_sister_v0_6_3() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("mesh-broadcast");
for name in &["alpha", "beth", "charlie"] {
let out = wire(
&home,
&[
"session",
"new",
name,
"--relay",
&fed_url,
"--with-local",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"session new {name} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
let pair_out = wire(
&home,
&[
"session",
"pair-all-local",
"--federation-relay",
&fed_url,
"--settle-secs",
"1",
"--json",
],
);
assert!(
pair_out.status.success(),
"pair-all-local failed: stderr={}",
String::from_utf8_lossy(&pair_out.stderr)
);
let alpha_home = home.join("sessions").join("alpha");
let out = wire(
&alpha_home,
&[
"mesh",
"broadcast",
"--scope",
"local",
"--kind",
"claim",
"--json",
"hello mesh from alpha",
],
);
assert!(
out.status.success(),
"mesh broadcast failed: stderr={}",
String::from_utf8_lossy(&out.stderr)
);
let summary: Value =
serde_json::from_slice(&out.stdout).expect("broadcast --json must be valid JSON");
let broadcast_id = summary["broadcast_id"]
.as_str()
.expect("broadcast_id present")
.to_string();
assert_eq!(
summary["delivered"].as_u64().unwrap_or(0),
2,
"expected delivery to both beth and charlie: {summary}"
);
assert_eq!(
summary["failed"].as_u64().unwrap_or(99),
0,
"no failures expected: {summary}"
);
assert_eq!(
summary["target_count"].as_u64().unwrap_or(0),
2,
"alpha should see 2 local-scope peers: {summary}"
);
let beth_home = home.join("sessions").join("beth");
let charlie_home = home.join("sessions").join("charlie");
let pull_deadline = Instant::now() + Duration::from_secs(5);
let mut beth_lines = 0;
let mut charlie_lines = 0;
while Instant::now() < pull_deadline {
let _ = wire(&beth_home, &["pull", "--json"]);
let _ = wire(&charlie_home, &["pull", "--json"]);
beth_lines = count_inbox_lines(&beth_home, "alpha");
charlie_lines = count_inbox_lines(&charlie_home, "alpha");
if beth_lines >= 1 && charlie_lines >= 1 {
break;
}
std::thread::sleep(Duration::from_millis(200));
}
assert!(
beth_lines >= 1,
"beth's inbox from alpha is empty after pull"
);
assert!(
charlie_lines >= 1,
"charlie's inbox from alpha is empty after pull"
);
fn find_broadcast_event(session_home: &Path, broadcast_id: &str) -> Option<Value> {
let inbox = session_home
.join("state")
.join("wire")
.join("inbox")
.join("alpha.jsonl");
let body = std::fs::read_to_string(&inbox).ok()?;
for line in body.lines() {
let v: Value = serde_json::from_str(line).ok()?;
let bid = v["body"]["broadcast_id"].as_str();
if bid == Some(broadcast_id) {
return Some(v);
}
}
None
}
let beth_event =
find_broadcast_event(&beth_home, &broadcast_id).expect("beth must have the broadcast");
let charlie_event = find_broadcast_event(&charlie_home, &broadcast_id)
.expect("charlie must have the broadcast");
let beth_eid = beth_event["event_id"].as_str().unwrap_or("");
let charlie_eid = charlie_event["event_id"].as_str().unwrap_or("");
assert!(!beth_eid.is_empty() && !charlie_eid.is_empty());
assert_ne!(
beth_eid, charlie_eid,
"per-recipient broadcast events must have distinct event_ids"
);
assert_eq!(
beth_event["body"]["content"].as_str(),
Some("hello mesh from alpha")
);
assert_eq!(
beth_event["body"]["broadcast_target_count"].as_u64(),
Some(2)
);
assert_eq!(
charlie_event["body"]["content"].as_str(),
Some("hello mesh from alpha")
);
let out2 = wire(
&alpha_home,
&[
"mesh",
"broadcast",
"--scope",
"local",
"--exclude",
"charlie",
"--json",
"second wave",
],
);
assert!(out2.status.success(), "exclude broadcast failed");
let summary2: Value = serde_json::from_slice(&out2.stdout).expect("JSON");
assert_eq!(
summary2["delivered"].as_u64().unwrap_or(99),
1,
"exclude should deliver to 1: {summary2}"
);
let excluded = summary2["skipped_excluded"]
.as_array()
.map(|a| a.iter().any(|v| v.as_str() == Some("charlie")))
.unwrap_or(false);
assert!(excluded, "charlie should appear in skipped_excluded");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mesh_status_reports_paired_mesh_v0_6_2() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("mesh-status-paired");
for name in &["alpha", "beth", "charlie"] {
let out = wire(
&home,
&[
"session",
"new",
name,
"--relay",
&fed_url,
"--with-local",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"session new {name} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
let pair_out = wire(
&home,
&[
"session",
"pair-all-local",
"--federation-relay",
&fed_url,
"--settle-secs",
"1",
"--json",
],
);
assert!(
pair_out.status.success(),
"pair-all-local failed: stderr={}",
String::from_utf8_lossy(&pair_out.stderr)
);
let out = wire(&home, &["session", "mesh-status", "--json"]);
assert!(
out.status.success(),
"mesh-status failed: stderr={}",
String::from_utf8_lossy(&out.stderr)
);
let view: Value = serde_json::from_slice(&out.stdout).expect("mesh-status JSON parse");
let summary = &view["summary"];
assert_eq!(
summary["session_count"].as_u64().unwrap_or(0),
3,
"expected 3 sessions: {view}"
);
assert_eq!(
summary["edge_count"].as_u64().unwrap_or(0),
3,
"expected 3 edges (3 choose 2): {view}"
);
assert_eq!(
summary["asymmetric"].as_u64().unwrap_or(99),
0,
"no edge should be asymmetric after pair-all-local: {view}"
);
let edges = view["edges"].as_array().expect("edges array");
assert_eq!(edges.len(), 3, "expected 3 mesh edges: {view}");
let mut any_fresh = false;
for e in edges {
assert_eq!(
e["bilateral"].as_bool(),
Some(true),
"edge {} ↔ {} not bilateral: {e}",
e["from"],
e["to"]
);
assert_eq!(
e["scope"].as_str(),
Some("local"),
"edge {} ↔ {} routed off-local: {e}",
e["from"],
e["to"]
);
if let Some(dirs) = e["directions"].as_object() {
for d in dirs.values() {
if d.get("last_pull_at_unix")
.map(|v| !v.is_null())
.unwrap_or(false)
{
any_fresh = true;
break;
}
}
}
}
assert!(
any_fresh,
"expected at least one direction with a recorded last_pull_at_unix: {view}"
);
let local_relays = view["local_relays"].as_array().expect("local_relays array");
assert!(
local_relays
.iter()
.any(|r| r["url"].as_str() == Some(local_url.as_str())
&& r["healthy"].as_bool() == Some(true)),
"expected our local relay healthy: {view}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn regression_session_new_with_local_writes_dual_endpoints_v0_5_20() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let home = fresh_dir("session-new-with-local");
let out = wire(
&home,
&[
"session",
"new",
"test-alpha",
"--relay",
&fed_url,
"--with-local",
"--local-relay",
&local_url,
"--no-daemon",
"--json",
],
);
assert!(
out.status.success(),
"wire session new --with-local failed: stdout={} stderr={}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
let relay_path = home
.join("sessions")
.join("test-alpha")
.join("config")
.join("wire")
.join("relay.json");
assert!(
relay_path.exists(),
"session's relay.json missing at {relay_path:?} — bootstrap didn't complete"
);
let bytes = std::fs::read(&relay_path).expect("read relay.json");
let state: Value = serde_json::from_slice(&bytes).expect("relay.json must be valid JSON");
let endpoints = state["self"]["endpoints"]
.as_array()
.expect("self.endpoints[] must be present — v0.5.20 fix landed this field");
let scopes: Vec<&str> = endpoints
.iter()
.filter_map(|e| e["scope"].as_str())
.collect();
assert!(
scopes.contains(&"federation"),
"expected scope=federation, got: {scopes:?}"
);
assert!(
scopes.contains(&"local"),
"expected scope=local (v0.5.20 fix for the silent --with-local regression): {scopes:?}"
);
let local = endpoints
.iter()
.find(|e| e["scope"].as_str() == Some("local"))
.expect("local endpoint not present despite --with-local");
assert_eq!(
local["relay_url"].as_str().unwrap_or(""),
local_url,
"local endpoint URL must match --local-relay arg"
);
let list_out = wire(&home, &["session", "list-local", "--json"]);
assert!(list_out.status.success(), "list-local failed");
let listing: Value =
serde_json::from_slice(&list_out.stdout).expect("list-local JSON must parse");
let group = &listing["local"][&local_url];
let nicks: Vec<&str> = group
.as_array()
.expect("list-local group must exist")
.iter()
.filter_map(|s| s["name"].as_str())
.collect();
assert!(
nicks.contains(&"test-alpha"),
"list-local must surface the session we just created: {nicks:?} in group {local_url}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_within_system_local_first_routing_v0_5_19() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let (alice, bob) =
pair_two_homes_with_local_endpoints(&fed_url, &local_url, "stress-w-alice", "stress-w-bob")
.await;
let queue_start = Instant::now();
for i in 0..FLOOD_COUNT {
let body = format!("within-system flood msg {i}");
let out = wire(&alice, &["send", "stress-w-bob", "claim", &body]);
assert!(
out.status.success(),
"send {i} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
eprintln!(
"within-system stress: queued {FLOOD_COUNT} events in {:?}",
queue_start.elapsed()
);
let push_start = Instant::now();
let push_out = wire(&alice, &["push", "--json"]);
assert!(push_out.status.success(), "push failed");
let push: Value = serde_json::from_slice(&push_out.stdout).unwrap();
let pushed = push["pushed"].as_array().expect("pushed array");
eprintln!(
"within-system stress: push delivered {} events in {:?}",
pushed.len(),
push_start.elapsed()
);
assert_eq!(
pushed.len(),
FLOOD_COUNT,
"expected all {FLOOD_COUNT} events to land via push.pushed[]"
);
let mut federation_leaked = 0usize;
for ev in pushed {
let scope = ev["scope"].as_str().unwrap_or("?");
if scope != "local" {
federation_leaked += 1;
eprintln!(
"scope leak: event {} delivered via `{scope}` not `local`: {ev}",
ev["event_id"].as_str().unwrap_or("?")
);
}
}
assert_eq!(
federation_leaked, 0,
"{federation_leaked}/{FLOOD_COUNT} events leaked to federation when local should have won"
);
let pull_start = Instant::now();
assert!(
wait_until(Instant::now() + Duration::from_secs(30), || {
let _ = wire(&bob, &["pull", "--json"]);
count_inbox_lines(&bob, "stress-w-alice") >= FLOOD_COUNT
}),
"bob never received all {FLOOD_COUNT} events (got {})",
count_inbox_lines(&bob, "stress-w-alice")
);
eprintln!(
"within-system stress: bob received {FLOOD_COUNT} events in {:?}",
pull_start.elapsed()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_within_system_failover_to_federation_on_local_death_v0_5_19() {
let fed_url = spawn_federation_relay().await;
let local_url = spawn_local_only_relay().await;
let (alice, _bob) = pair_two_homes_with_local_endpoints(
&fed_url,
&local_url,
"stress-w-alice-fb",
"stress-w-bob-fb",
)
.await;
let half = FLOOD_COUNT / 2;
for i in 0..half {
let body = format!("pre-failover msg {i}");
assert!(
wire(&alice, &["send", "stress-w-bob-fb", "claim", &body])
.status
.success()
);
}
let push_out = wire(&alice, &["push", "--json"]);
assert!(push_out.status.success());
let push: Value = serde_json::from_slice(&push_out.stdout).unwrap();
let pushed = push["pushed"].as_array().unwrap();
assert_eq!(pushed.len(), half, "pre-failover push count");
for ev in pushed {
assert_eq!(
ev["scope"].as_str().unwrap_or("?"),
"local",
"pre-failover should be local"
);
}
let alice_relay_state = alice.join("config").join("wire").join("relay.json");
let bytes = std::fs::read(&alice_relay_state).unwrap();
let mut state: Value = serde_json::from_slice(&bytes).unwrap();
if let Some(eps) = state["peers"]["stress-w-bob-fb"]["endpoints"].as_array_mut() {
for ep in eps.iter_mut() {
if ep["scope"].as_str() == Some("local") {
ep["relay_url"] = serde_json::json!("http://127.0.0.1:1"); }
}
}
std::fs::write(
&alice_relay_state,
serde_json::to_vec_pretty(&state).unwrap(),
)
.unwrap();
for i in half..FLOOD_COUNT {
let body = format!("post-failover msg {i}");
assert!(
wire(&alice, &["send", "stress-w-bob-fb", "claim", &body])
.status
.success()
);
}
let push_out = wire(&alice, &["push", "--json"]);
assert!(push_out.status.success());
let push: Value = serde_json::from_slice(&push_out.stdout).unwrap();
let pushed = push["pushed"].as_array().unwrap();
let skipped = push["skipped"].as_array().unwrap();
let new_post_failover = FLOOD_COUNT - half;
let mut fed_count = 0usize;
let mut new_delivered = 0usize;
for ev in pushed {
if ev["scope"].as_str() == Some("federation") {
fed_count += 1;
new_delivered += 1;
}
}
let new_transport_skips = skipped
.iter()
.filter(|s| s["reason"].as_str() != Some("duplicate") && s.get("event_id").is_some())
.count();
assert!(
new_transport_skips == 0,
"{new_transport_skips} new events skipped with transport errors during failover — \
expected graceful federation fallback. push: {push}"
);
assert!(
new_delivered >= new_post_failover || fed_count >= new_post_failover,
"fewer than {new_post_failover} post-failover events landed on federation \
(federation deliveries: {fed_count}). push: {push}"
);
}