use serde_json::Value;
use std::path::PathBuf;
use std::process::Command;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
static COUNTER: AtomicU32 = AtomicU32::new(0);
const FLOOD_COUNT: usize = 100;
const CONCURRENT_THREADS: usize = 5;
const CONCURRENT_PER_THREAD: usize = 20;
fn fresh_dir(prefix: &str) -> PathBuf {
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
let path = std::env::temp_dir().join(format!("wire-stress-{prefix}-{pid}-{n}"));
let _ = std::fs::remove_dir_all(&path);
std::fs::create_dir_all(&path).unwrap();
path
}
fn wire_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_wire"))
}
fn wire(home: &PathBuf, args: &[&str]) -> std::process::Output {
let out = Command::new(wire_bin())
.args(args)
.env("WIRE_HOME", home)
.output()
.expect("spawn wire");
if !out.status.success() {
eprintln!(
"wire {args:?} failed:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
}
out
}
fn wait_until<F: FnMut() -> bool>(deadline: Instant, mut f: F) -> bool {
while Instant::now() < deadline {
if f() {
return true;
}
std::thread::sleep(Duration::from_millis(100));
}
false
}
async fn spawn_federation_relay() -> String {
let dir = fresh_dir("relay");
let relay = wire::relay_server::Relay::new(dir).await.unwrap();
let app = relay.router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.ok() });
tokio::time::sleep(Duration::from_millis(50)).await;
format!("http://{addr}")
}
async fn pair_two_homes(
relay_url: &str,
alice_name: &str,
bob_name: &str,
) -> (PathBuf, PathBuf) {
let host_only = relay_url
.trim_start_matches("http://")
.split(':')
.next()
.unwrap_or("127.0.0.1");
let alice = fresh_dir(alice_name);
assert!(
wire(&alice, &["init", alice_name, "--relay", relay_url])
.status
.success()
);
assert!(
wire(
&alice,
&[
"claim",
alice_name,
"--public-url",
relay_url,
"--json"
]
)
.status
.success()
);
let bob = fresh_dir(bob_name);
assert!(
wire(&bob, &["init", bob_name, "--relay", relay_url])
.status
.success()
);
let bob_handle = format!("{alice_name}@{host_only}");
let add_out = wire(
&bob,
&["add", &bob_handle, "--relay", relay_url, "--json"],
);
assert!(
add_out.status.success(),
"bob `wire add` failed: {}",
String::from_utf8_lossy(&add_out.stderr)
);
let alice_has_pending = wait_until(Instant::now() + Duration::from_secs(15), || {
let _ = wire(&alice, &["pull", "--json"]);
let p = wire(&alice, &["pair-list-inbound", "--json"]);
String::from_utf8_lossy(&p.stdout).contains(bob_name)
});
assert!(
alice_has_pending,
"alice never saw pending-inbound from {bob_name}"
);
assert!(
wire(&alice, &["pair-accept", bob_name, "--json"])
.status
.success()
);
let bob_pinned_alice = wait_until(Instant::now() + Duration::from_secs(15), || {
let _ = wire(&bob, &["pull", "--json"]);
let p = wire(&bob, &["peers", "--json"]);
String::from_utf8_lossy(&p.stdout).contains(alice_name)
});
assert!(bob_pinned_alice, "bob never pinned alice via pair_drop_ack");
let p = wire(&alice, &["peers", "--json"]);
let body = String::from_utf8_lossy(&p.stdout);
assert!(
body.contains(bob_name),
"alice should have {bob_name} pinned, got: {body}"
);
(alice, bob)
}
fn count_inbox_lines(home: &PathBuf, peer: &str) -> usize {
let inbox = home.join("state").join("wire").join("inbox").join(format!("{peer}.jsonl"));
let body = std::fs::read_to_string(&inbox).unwrap_or_default();
body.lines().filter(|l| !l.trim().is_empty()).count()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_outbox_flood_500_messages_single_peer() {
let relay_url = spawn_federation_relay().await;
let (alice, bob) = pair_two_homes(&relay_url, "stress-alice-a", "stress-bob-a").await;
let start = Instant::now();
for i in 0..FLOOD_COUNT {
let body = format!("flood msg {i}");
let out = wire(&alice, &["send", "stress-bob-a", "claim", &body]);
assert!(
out.status.success(),
"send {i} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
eprintln!(
"stress: queued {} sends in {:?}",
FLOOD_COUNT,
start.elapsed()
);
let push_start = Instant::now();
let push_out = wire(&alice, &["push", "--json"]);
assert!(push_out.status.success(), "push failed");
let push: Value = serde_json::from_slice(&push_out.stdout).unwrap();
let pushed_count = push["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
let skipped_count = push["skipped"].as_array().map(|a| a.len()).unwrap_or(0);
eprintln!(
"stress: push #1 delivered {pushed_count} pushed + {skipped_count} skipped \
in {:?}",
push_start.elapsed()
);
assert_eq!(
pushed_count + skipped_count,
FLOOD_COUNT,
"push did not enumerate all {FLOOD_COUNT} events: pushed={pushed_count} \
skipped={skipped_count} (sum should equal FLOOD_COUNT)"
);
let pull_start = Instant::now();
let bob_received = wait_until(Instant::now() + Duration::from_secs(60), || {
let _ = wire(&bob, &["pull", "--json"]);
count_inbox_lines(&bob, "stress-alice-a") >= FLOOD_COUNT
});
let final_count = count_inbox_lines(&bob, "stress-alice-a");
eprintln!(
"stress: bob received {final_count}/{FLOOD_COUNT} events in {:?}",
pull_start.elapsed()
);
assert!(
bob_received,
"bob received only {final_count}/{FLOOD_COUNT} events"
);
let inbox = bob.join("state").join("wire").join("inbox").join("stress-alice-a.jsonl");
let body = std::fs::read_to_string(&inbox).unwrap();
let mut parsed_ok = 0;
for line in body.lines() {
if line.trim().is_empty() {
continue;
}
let v: Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("torn JSONL line in bob's inbox: {e}\nline: {line}"));
assert!(v.get("event_id").is_some(), "missing event_id: {line}");
parsed_ok += 1;
}
assert_eq!(parsed_ok, final_count);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_concurrent_sends_no_torn_writes() {
let relay_url = spawn_federation_relay().await;
let (alice, bob) = pair_two_homes(&relay_url, "stress-alice-b", "stress-bob-b").await;
let total = CONCURRENT_THREADS * CONCURRENT_PER_THREAD;
let start = Instant::now();
let handles: Vec<_> = (0..CONCURRENT_THREADS)
.map(|tid| {
let alice = alice.clone();
std::thread::spawn(move || {
for i in 0..CONCURRENT_PER_THREAD {
let body = format!("thread {tid} msg {i}");
let out = wire(&alice, &["send", "stress-bob-b", "claim", &body]);
assert!(
out.status.success(),
"thread {tid} send {i} failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
})
})
.collect();
for h in handles {
h.join().expect("sender thread panicked");
}
eprintln!(
"stress: {CONCURRENT_THREADS} threads × {CONCURRENT_PER_THREAD} sends = {total} in {:?}",
start.elapsed()
);
let outbox = alice
.join("state")
.join("wire")
.join("outbox")
.join("stress-bob-b.jsonl");
let body = std::fs::read_to_string(&outbox).expect("outbox missing");
let mut parsed_ok = 0;
for line in body.lines() {
if line.trim().is_empty() {
continue;
}
let v: Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("torn JSONL in alice's outbox: {e}\nline: {line}"));
assert!(v.get("event_id").is_some());
parsed_ok += 1;
}
assert_eq!(
parsed_ok, total,
"expected {total} parseable lines in outbox, got {parsed_ok}"
);
let push_out = wire(&alice, &["push", "--json"]);
assert!(push_out.status.success(), "push failed");
let push: Value = serde_json::from_slice(&push_out.stdout).unwrap();
let pushed_count = push["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
let skipped_count = push["skipped"].as_array().map(|a| a.len()).unwrap_or(0);
assert_eq!(
pushed_count + skipped_count,
total,
"push didn't enumerate all {total} events: pushed={pushed_count} skipped={skipped_count}"
);
assert!(wait_until(
Instant::now() + Duration::from_secs(60),
|| {
let _ = wire(&bob, &["pull", "--json"]);
count_inbox_lines(&bob, "stress-alice-b") >= total
},
), "bob never received {total} events");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_bind_relay_warns_on_pinned_peers_issue_7() {
let relay_url = spawn_federation_relay().await;
let (alice, _bob) = pair_two_homes(&relay_url, "stress-alice-c", "stress-bob-c").await;
let new_relay_url = spawn_federation_relay().await;
let migrate_out = wire(
&alice,
&["bind-relay", &new_relay_url, "--json"],
);
let failed_loudly = !migrate_out.status.success();
let stderr = String::from_utf8_lossy(&migrate_out.stderr).into_owned();
let stdout = String::from_utf8_lossy(&migrate_out.stdout).into_owned();
let combined = format!("{stderr}\n{stdout}");
let warned_about_peers = combined.to_lowercase().contains("pinned")
|| combined.to_lowercase().contains("black-hole")
|| combined.to_lowercase().contains("rotate-slot")
|| combined.to_lowercase().contains("wire_close")
|| combined.to_lowercase().contains("notify peer");
assert!(
failed_loudly || warned_about_peers,
"ISSUE #7 STILL OPEN: bind-relay silently migrated alice with a pinned peer (stress-bob-c). \
No warning emitted, no failure. Peers will push to a dead slot.\n\
migrate_out.status: {:?}\n\
stdout: {stdout}\n\
stderr: {stderr}",
migrate_out.status
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_send_to_nonexistent_slot_surfaces_error() {
let relay_url = spawn_federation_relay().await;
let (alice, _bob) = pair_two_homes(&relay_url, "stress-alice-d", "stress-bob-d").await;
let relay_state_path = alice
.join("config")
.join("wire")
.join("relay.json");
let bytes = std::fs::read(&relay_state_path).expect("relay.json missing");
let mut state: Value = serde_json::from_slice(&bytes).expect("relay.json malformed");
let fake_slot_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff";
state["peers"]["stress-bob-d"]["slot_id"] = serde_json::json!(fake_slot_id);
if let Some(eps) = state["peers"]["stress-bob-d"]["endpoints"].as_array_mut() {
for ep in eps.iter_mut() {
ep["slot_id"] = serde_json::json!(fake_slot_id);
}
}
std::fs::write(
&relay_state_path,
serde_json::to_vec_pretty(&state).unwrap(),
)
.unwrap();
assert!(wire(&alice, &["send", "stress-bob-d", "claim", "to the void"])
.status
.success());
let push_out = wire(&alice, &["push", "--json"]);
let stdout = String::from_utf8_lossy(&push_out.stdout).into_owned();
let stderr = String::from_utf8_lossy(&push_out.stderr).into_owned();
let combined = format!("{stdout}\n{stderr}").to_lowercase();
let surfaced = !push_out.status.success()
|| combined.contains("404")
|| combined.contains("not found")
|| combined.contains("slot not found")
|| combined.contains("\"failed\"")
|| combined.contains("\"errors\"")
|| combined.contains("dead slot");
assert!(
surfaced,
"ISSUE #7 OTHER HALF: push to a nonexistent slot reported success and emitted no \
operator-visible signal.\n\
status: {:?}\nstdout: {stdout}\nstderr: {stderr}",
push_out.status
);
}