use std::time::Duration;
use assert_cmd::Command as AssertCommand;
use net_aggregator_daemon::{boot, drain_registry, BootedDaemon, Cli};
use tempfile::{NamedTempFile, TempDir};
use tokio::io::AsyncWriteExt;
const PSK_HEX: &str = "4242424242424242424242424242424242424242424242424242424242424242";
async fn write_temp_config(toml: &str) -> NamedTempFile {
let mut f = NamedTempFile::new().expect("tempfile");
let path = f.path().to_path_buf();
{
let mut handle = tokio::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(&path)
.await
.expect("open tempfile");
handle
.write_all(toml.as_bytes())
.await
.expect("write tempfile");
handle.flush().await.expect("flush tempfile");
}
let _ = &mut f;
f
}
fn cli_cmd(_booted: &BootedDaemon, home_dir: &TempDir) -> AssertCommand {
let mut cmd = AssertCommand::cargo_bin("net-mesh").expect("cargo_bin");
cmd.env("HOME", home_dir.path())
.env("XDG_CONFIG_HOME", home_dir.path())
.env("USERPROFILE", home_dir.path());
cmd.arg("aggregator");
cmd
}
fn attach_args(booted: &BootedDaemon, vec: &mut Vec<String>) {
vec.push("--node-addr".into());
vec.push(booted.bound_addr.to_string());
vec.push("--node-pubkey".into());
vec.push(hex::encode(booted.public_key));
vec.push("--node-id".into());
vec.push(booted.mesh.node_id().to_string());
vec.push("--psk-hex".into());
vec.push(PSK_HEX.into());
}
async fn run_cli(
booted: &BootedDaemon,
home_dir: &TempDir,
verb: &str,
extra: &[&str],
) -> (i32, String, String) {
let mut argv: Vec<String> = vec![verb.into()];
for s in extra {
argv.push((*s).into());
}
attach_args(booted, &mut argv);
let bin_cmd = cli_cmd(booted, home_dir);
let argv_owned = argv.clone();
tokio::task::spawn_blocking(move || {
let mut cmd = bin_cmd;
cmd.args(&argv_owned);
let output = cmd.output().expect("invoke net-mesh");
let code = output.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
(code, stdout, stderr)
})
.await
.expect("spawn_blocking")
}
async fn boot_daemon(toml: &str) -> (BootedDaemon, NamedTempFile) {
let cfg = write_temp_config(toml).await;
let cli = Cli {
config: cfg.path().to_path_buf(),
listen: None,
verbose: 0,
print_bootstrap: false,
};
let booted = boot(cli).await.expect("daemon boot");
booted.mesh.start();
(booted, cfg)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn ls_remote_lists_configured_groups() {
let toml = format!(
r#"
listen = "127.0.0.1:0"
psk_hex = "{PSK_HEX}"
[[group]]
name = "alpha"
source_subnet = "3.7"
fold_kinds = [1]
replica_count = 2
summary_interval_ms = 50
[[group]]
name = "beta"
source_subnet = "3.8"
fold_kinds = [1]
replica_count = 1
summary_interval_ms = 50
"#
);
let home = TempDir::new().expect("home tempdir");
let (booted, _cfg) = boot_daemon(&toml).await;
let (code, stdout, stderr) = run_cli(&booted, &home, "ls", &["--remote"]).await;
assert_eq!(code, 0, "ls --remote failed: stderr={stderr}");
let parsed: serde_json::Value =
serde_json::from_str(&stdout).unwrap_or_else(|e| panic!("non-JSON stdout ({e}): {stdout}"));
assert_eq!(parsed["group_count"], 2, "stdout={stdout}");
let names: Vec<&str> = parsed["groups"]
.as_array()
.expect("groups array")
.iter()
.map(|g| g["name"].as_str().expect("name string"))
.collect();
assert_eq!(names, vec!["alpha", "beta"]);
drain_registry(&booted.registry).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn spawn_against_template_adds_a_group() {
let toml = format!(
r#"
listen = "127.0.0.1:0"
psk_hex = "{PSK_HEX}"
[[template]]
name = "primary"
source_subnet = "3.7"
fold_kinds = [1]
summary_interval_ms = 50
"#
);
let home = TempDir::new().expect("home tempdir");
let (booted, _cfg) = boot_daemon(&toml).await;
let (code, stdout, stderr) = run_cli(
&booted,
&home,
"spawn",
&[
"--template",
"primary",
"--name",
"dynamic",
"--replica-count",
"2",
],
)
.await;
assert_eq!(code, 0, "spawn failed: stderr={stderr}");
let parsed: serde_json::Value =
serde_json::from_str(&stdout).unwrap_or_else(|e| panic!("non-JSON stdout ({e}): {stdout}"));
assert_eq!(parsed["name"], "dynamic");
assert_eq!(parsed["replica_count"], 2);
let (code, stdout, _) = run_cli(&booted, &home, "ls", &["--remote"]).await;
assert_eq!(code, 0);
let parsed: serde_json::Value = serde_json::from_str(&stdout).expect("parse ls");
assert_eq!(parsed["group_count"], 1);
assert_eq!(parsed["groups"][0]["name"], "dynamic");
drain_registry(&booted.registry).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn scale_resizes_existing_group() {
let toml = format!(
r#"
listen = "127.0.0.1:0"
psk_hex = "{PSK_HEX}"
[[template]]
name = "primary"
source_subnet = "3.7"
fold_kinds = [1]
summary_interval_ms = 50
[[group]]
name = "alpha"
source_subnet = "3.7"
fold_kinds = [1]
replica_count = 2
summary_interval_ms = 50
"#
);
let home = TempDir::new().expect("home tempdir");
let (booted, _cfg) = boot_daemon(&toml).await;
let (code, stdout, stderr) = run_cli(
&booted,
&home,
"scale",
&[
"--name",
"alpha",
"--template",
"primary",
"--replica-count",
"4",
],
)
.await;
assert_eq!(code, 0, "scale failed: stderr={stderr}");
let parsed: serde_json::Value = serde_json::from_str(&stdout).expect("parse scale");
assert_eq!(parsed["replica_count"], 4);
assert_eq!(parsed["name"], "alpha");
drain_registry(&booted.registry).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "needs replica node_id discovery — the fold.query service \
handler is keyed on each replica's keypair-derived node_id, \
not on the daemon's main mesh node_id. Either expose a \
`booted.registry.first_replica_node_id(group)` helper or \
spawn a single-node registry mode."]
async fn query_returns_summary_for_configured_group() {
let toml = format!(
r#"
listen = "127.0.0.1:0"
psk_hex = "{PSK_HEX}"
[[group]]
name = "alpha"
source_subnet = "3.7"
fold_kinds = [1]
replica_count = 1
summary_interval_ms = 50
"#
);
let home = TempDir::new().expect("home tempdir");
let (booted, _cfg) = boot_daemon(&toml).await;
tokio::time::sleep(Duration::from_millis(150)).await;
let target = booted.mesh.node_id().to_string();
let (code, stdout, stderr) = run_cli(
&booted,
&home,
"query",
&[&target, "--kind", "0x0001", "--fresh"],
)
.await;
assert_eq!(code, 0, "query failed: stderr={stderr}");
let parsed: serde_json::Value =
serde_json::from_str(&stdout).unwrap_or_else(|e| panic!("non-JSON stdout ({e}): {stdout}"));
assert_eq!(parsed["fresh"], true);
assert_eq!(parsed["fold_kind"], "0x0001");
drain_registry(&booted.registry).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn missing_pubkey_exits_with_invalid_args() {
let mut cmd = AssertCommand::cargo_bin("net-mesh").expect("cargo_bin");
let home = TempDir::new().expect("home tempdir");
cmd.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path())
.env("USERPROFILE", home.path())
.args([
"aggregator",
"query",
"0x1",
"--kind",
"0x0001",
"--node-addr",
"127.0.0.1:1",
"--node-id",
"1",
"--psk-hex",
PSK_HEX,
]);
let result = tokio::task::spawn_blocking(move || cmd.output())
.await
.expect("spawn_blocking")
.expect("invoke");
assert_eq!(
result.status.code(),
Some(2),
"expected exit code 2 (InvalidArgs); stderr={}",
String::from_utf8_lossy(&result.stderr)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bad_pubkey_hex_exits_with_invalid_args() {
let mut cmd = AssertCommand::cargo_bin("net-mesh").expect("cargo_bin");
let home = TempDir::new().expect("home tempdir");
cmd.env("HOME", home.path())
.env("XDG_CONFIG_HOME", home.path())
.env("USERPROFILE", home.path())
.args([
"aggregator",
"query",
"0x1",
"--kind",
"0x0001",
"--node-addr",
"127.0.0.1:1",
"--node-pubkey",
"0xnotvalidhex",
"--node-id",
"1",
"--psk-hex",
PSK_HEX,
]);
let result = tokio::task::spawn_blocking(move || cmd.output())
.await
.expect("spawn_blocking")
.expect("invoke");
assert_eq!(result.status.code(), Some(2));
}