use futures::future::join_all;
use serde_json::{json, Value};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::process::Command;
const HTTP_BASE: &str = "http://127.0.0.1:7070";
const BRIDGE_BIN: &str = "trusty-memory-mcp-bridge";
const HTTP_TIMEOUT: Duration = Duration::from_secs(30);
fn resolve_uds_path() -> PathBuf {
if let Ok(data_dir) = trusty_common::resolve_data_dir("trusty-memory") {
let addr_file = data_dir.join("uds_addr");
if let Ok(contents) = std::fs::read_to_string(&addr_file) {
let path = contents.trim();
if !path.is_empty() {
return PathBuf::from(path);
}
}
}
let runtime = std::env::var("XDG_RUNTIME_DIR")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| std::env::var("TMPDIR").ok().filter(|s| !s.is_empty()))
.unwrap_or_else(|| std::env::temp_dir().to_string_lossy().to_string());
PathBuf::from(runtime).join("trusty-memory.sock")
}
fn http_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(HTTP_TIMEOUT)
.tcp_nodelay(true)
.pool_max_idle_per_host(64)
.build()
.expect("reqwest client build")
}
async fn http_rpc(client: &reqwest::Client, req: Value) -> Result<(Value, Duration), String> {
let url = format!("{HTTP_BASE}/rpc");
let started = Instant::now();
let resp = client
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| format!("send: {e}"))?;
if !resp.status().is_success() {
return Err(format!("http status {}", resp.status()));
}
let body: Value = resp.json().await.map_err(|e| format!("parse: {e}"))?;
let elapsed = started.elapsed();
Ok((body, elapsed))
}
async fn assert_daemon_alive(client: &reqwest::Client) -> String {
let url = format!("{HTTP_BASE}/health");
let resp = client.get(&url).send().await.unwrap_or_else(|e| {
panic!(
"live daemon not reachable at {HTTP_BASE} ({e}); start it with `trusty-memory start`"
);
});
assert!(
resp.status().is_success(),
"GET /health returned {}",
resp.status()
);
let body: Value = resp.json().await.expect("parse /health");
let version = body["version"].as_str().unwrap_or("?").to_string();
let uds = resolve_uds_path();
assert!(
uds.exists(),
"UDS socket missing at {} — daemon may not have bound the UDS listener",
uds.display()
);
version
}
async fn probe_health(client: &reqwest::Client) -> Result<(f64, String), String> {
let url = format!("{HTTP_BASE}/health");
let resp = client
.get(&url)
.send()
.await
.map_err(|e| format!("send: {e}"))?;
if !resp.status().is_success() {
return Err(format!("status {}", resp.status()));
}
let body: Value = resp.json().await.map_err(|e| format!("parse: {e}"))?;
let rss = body["rss_mb"].as_f64().unwrap_or(0.0);
let status = body["status"].as_str().unwrap_or("?").to_string();
Ok((rss, status))
}
async fn provision_palace(client: &reqwest::Client, tag: &str) -> String {
let palace = format!("perf-{tag}-{}", uuid::Uuid::new_v4());
let create = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "palace_create",
"params": {"name": palace}
});
let (resp, _) = http_rpc(client, create).await.expect("palace_create");
assert!(
resp.get("error").is_none_or(|e| e.is_null()),
"palace_create failed: {resp:?}"
);
let seed = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "memory_remember",
"params": {
"palace": palace,
"text": "Seed entry for concurrent perf testing: this fixture exists so recall queries against the palace return at least one result, exercising the BM25 + vector retrieval pipeline rather than the empty-index fast path.",
"force": true
}
});
let (resp, _) = http_rpc(client, seed).await.expect("seed memory_remember");
assert!(
resp.get("error").is_none_or(|e| e.is_null()),
"seed memory_remember failed: {resp:?}"
);
palace
}
fn latency_stats(mut samples: Vec<Duration>) -> LatencyStats {
assert!(!samples.is_empty(), "latency_stats: empty sample vector");
samples.sort_unstable();
let n = samples.len();
let pct = |p: f64| -> Duration {
let idx = ((p * n as f64).ceil() as usize)
.saturating_sub(1)
.min(n - 1);
samples[idx]
};
let sum: Duration = samples.iter().sum();
let mean = sum / n as u32;
LatencyStats {
n,
min: samples[0],
mean,
p50: pct(0.50),
p95: pct(0.95),
p99: pct(0.99),
max: samples[n - 1],
}
}
#[derive(Debug, Clone, Copy)]
struct LatencyStats {
n: usize,
min: Duration,
mean: Duration,
p50: Duration,
p95: Duration,
p99: Duration,
max: Duration,
}
impl std::fmt::Display for LatencyStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"n={:>5} min={:>7?} mean={:>7?} p50={:>7?} p95={:>7?} p99={:>7?} max={:>7?}",
self.n, self.min, self.mean, self.p50, self.p95, self.p99, self.max
)
}
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_http_concurrent_reads() {
let client = http_client();
let version = assert_daemon_alive(&client).await;
let palace = provision_palace(&client, "http-reads").await;
let n_tasks: usize = 50;
let per_task: usize = 20;
let mut tasks = Vec::new();
let started = Instant::now();
for i in 0..n_tasks {
let client = client.clone();
let palace = palace.clone();
tasks.push(tokio::spawn(async move {
let mut latencies: Vec<Duration> = Vec::with_capacity(per_task);
let mut errors: usize = 0;
for j in 0..per_task {
let result = if j.is_multiple_of(2) {
let url = format!("{HTTP_BASE}/health");
let t = Instant::now();
match client.get(&url).send().await {
Ok(r) if r.status().is_success() => {
let _ = r.bytes().await;
Ok(t.elapsed())
}
Ok(r) => Err(format!("status {}", r.status())),
Err(e) => Err(format!("send: {e}")),
}
} else {
let req = json!({
"jsonrpc": "2.0",
"id": i * 100 + j,
"method": "memory_recall",
"params": {"palace": palace, "query": "seed entry", "top_k": 5}
});
match http_rpc(&client, req).await {
Ok((body, d)) => {
if body.get("error").map(|e| !e.is_null()).unwrap_or(false) {
Err(format!("rpc error: {}", body["error"]))
} else {
Ok(d)
}
}
Err(e) => Err(e),
}
};
match result {
Ok(d) => latencies.push(d),
Err(_) => errors += 1,
}
}
(latencies, errors)
}));
}
let mut all_latencies: Vec<Duration> = Vec::with_capacity(n_tasks * per_task);
let mut total_errors = 0usize;
for j in tasks {
let (lats, errs) = j.await.expect("task join");
all_latencies.extend(lats);
total_errors += errs;
}
let total_elapsed = started.elapsed();
let ops = (n_tasks * per_task) as f64;
let throughput = ops / total_elapsed.as_secs_f64();
let stats = latency_stats(all_latencies);
println!();
println!("=== test_http_concurrent_reads (daemon v{version}) ===");
println!(" tasks={n_tasks} per_task={per_task} total_ops={ops:.0} errors={total_errors}");
println!(" wall={total_elapsed:?} throughput={throughput:.1} req/s");
println!(" latency: {stats}");
assert_eq!(total_errors, 0, "expected 0 errors, got {total_errors}");
assert!(
stats.max < HTTP_TIMEOUT,
"max latency {:?} exceeded HTTP timeout {HTTP_TIMEOUT:?}",
stats.max
);
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_http_concurrent_rw() {
let client = http_client();
let version = assert_daemon_alive(&client).await;
let palace = provision_palace(&client, "http-rw").await;
let n_writers: usize = 20;
let n_readers: usize = 20;
let per_task: usize = 10;
let started = Instant::now();
type TaskResult = (String, Vec<Duration>, usize, Vec<String>);
let mut tasks: Vec<tokio::task::JoinHandle<TaskResult>> = Vec::new();
for i in 0..n_writers {
let client = client.clone();
let palace = palace.clone();
tasks.push(tokio::spawn(async move {
let mut latencies = Vec::with_capacity(per_task);
let mut errors = 0usize;
let mut sample_errors: Vec<String> = Vec::new();
for j in 0..per_task {
let unique = uuid::Uuid::new_v4();
let req = json!({
"jsonrpc": "2.0",
"id": 10_000 + i * 100 + j,
"method": "memory_remember",
"params": {
"palace": palace,
"text": format!("Concurrent writer {i} request {j} with unique nonce {unique} — \
long enough to satisfy the min-token gate and exercise \
the BM25 + vector embedding pipeline end-to-end."),
"force": true,
}
});
match http_rpc(&client, req).await {
Ok((body, d)) => {
if body.get("error").map(|e| !e.is_null()).unwrap_or(false) {
errors += 1;
if sample_errors.len() < 2 {
sample_errors.push(format!("{}", body["error"]));
}
} else {
latencies.push(d);
}
}
Err(e) => {
errors += 1;
if sample_errors.len() < 2 {
sample_errors.push(format!("transport: {e}"));
}
}
}
}
("write".to_string(), latencies, errors, sample_errors)
}));
}
for i in 0..n_readers {
let client = client.clone();
let palace = palace.clone();
tasks.push(tokio::spawn(async move {
let mut latencies = Vec::with_capacity(per_task);
let mut errors = 0usize;
let mut sample_errors: Vec<String> = Vec::new();
for j in 0..per_task {
let req = json!({
"jsonrpc": "2.0",
"id": 20_000 + i * 100 + j,
"method": "memory_recall",
"params": {"palace": palace, "query": "concurrent writer request", "top_k": 5}
});
match http_rpc(&client, req).await {
Ok((body, d)) => {
if body.get("error").map(|e| !e.is_null()).unwrap_or(false) {
errors += 1;
if sample_errors.len() < 2 {
sample_errors.push(format!("{}", body["error"]));
}
} else {
latencies.push(d);
}
}
Err(e) => {
errors += 1;
if sample_errors.len() < 2 {
sample_errors.push(format!("transport: {e}"));
}
}
}
}
("read".to_string(), latencies, errors, sample_errors)
}));
}
let mut write_latencies = Vec::new();
let mut read_latencies = Vec::new();
let mut write_errors = 0usize;
let mut read_errors = 0usize;
let mut write_samples: Vec<String> = Vec::new();
let mut read_samples: Vec<String> = Vec::new();
for j in tasks {
let (kind, lats, errs, samples) = j.await.expect("task join");
if kind == "write" {
write_latencies.extend(lats);
write_errors += errs;
for s in samples {
if write_samples.len() < 3 {
write_samples.push(s);
}
}
} else {
read_latencies.extend(lats);
read_errors += errs;
for s in samples {
if read_samples.len() < 3 {
read_samples.push(s);
}
}
}
}
let total_elapsed = started.elapsed();
let total_writes = n_writers * per_task;
let total_reads = n_readers * per_task;
let total_ops = total_writes + total_reads;
let throughput = total_ops as f64 / total_elapsed.as_secs_f64();
let write_success_rate = (write_latencies.len() as f64) / (total_writes as f64) * 100.0;
let read_success_rate = (read_latencies.len() as f64) / (total_reads as f64) * 100.0;
println!();
println!("=== test_http_concurrent_rw (daemon v{version}) ===");
println!(
" writers={n_writers}×{per_task}={total_writes} \
readers={n_readers}×{per_task}={total_reads} total={total_ops}"
);
println!(" wall={total_elapsed:?} throughput={throughput:.1} ops/s");
if !write_latencies.is_empty() {
println!(" WRITE: {}", latency_stats(write_latencies.clone()));
}
println!(" WRITE errors={write_errors} success_rate={write_success_rate:.2}%");
for s in &write_samples {
println!(" write sample error: {s}");
}
if !read_latencies.is_empty() {
println!(" READ : {}", latency_stats(read_latencies.clone()));
}
println!(" READ errors={read_errors} success_rate={read_success_rate:.2}%");
for s in &read_samples {
println!(" read sample error: {s}");
}
assert!(
read_success_rate >= 95.0,
"read success_rate {read_success_rate:.2}% below 95% floor"
);
assert!(
write_success_rate >= 95.0,
"write success rate {:.1}% below 95% — is #154 fix (PR #161) deployed?",
write_success_rate
);
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_http_burst() {
let client = http_client();
let version = assert_daemon_alive(&client).await;
let palace = provision_palace(&client, "burst").await;
let n: usize = 500;
let mut futs = Vec::with_capacity(n);
for i in 0..n {
let client = client.clone();
let palace = palace.clone();
let req = if i.is_multiple_of(2) {
json!({
"jsonrpc": "2.0",
"id": i,
"method": "memory_remember",
"params": {
"palace": palace,
"text": format!("Burst-test entry {i} with sufficient content length to satisfy \
the minimum-token threshold and produce a real embedding via \
the indexing pipeline."),
"force": true,
}
})
} else {
json!({
"jsonrpc": "2.0",
"id": i,
"method": "memory_recall",
"params": {"palace": palace, "query": "burst test entry", "top_k": 5}
})
};
futs.push(async move { http_rpc(&client, req).await });
}
let started = Instant::now();
let results = join_all(futs).await;
let total_elapsed = started.elapsed();
let mut latencies = Vec::with_capacity(n);
let mut transport_errors = 0usize;
let mut rpc_errors = 0usize;
let mut sample_errors: Vec<String> = Vec::new();
for r in results {
match r {
Ok((body, d)) => {
if body.get("error").map(|e| !e.is_null()).unwrap_or(false) {
rpc_errors += 1;
if sample_errors.len() < 3 {
sample_errors.push(format!("rpc: {}", body["error"]));
}
} else {
latencies.push(d);
}
}
Err(e) => {
transport_errors += 1;
if sample_errors.len() < 3 {
sample_errors.push(format!("transport: {e}"));
}
}
}
}
let errors = transport_errors + rpc_errors;
let error_rate = (errors as f64) / (n as f64) * 100.0;
let throughput = n as f64 / total_elapsed.as_secs_f64();
let n_success = latencies.len();
let stats = if latencies.is_empty() {
None
} else {
Some(latency_stats(latencies))
};
println!();
println!("=== test_http_burst (daemon v{version}) ===");
println!(" n={n} wall={total_elapsed:?} throughput={throughput:.1} req/s");
println!(" errors={errors} (transport={transport_errors} rpc={rpc_errors}) error_rate={error_rate:.2}%");
for s in &sample_errors {
println!(" sample: {s}");
}
if let Some(s) = stats {
println!(" latency: {s}");
}
let success_rate = (n_success as f64) / (n as f64) * 100.0;
println!(" success_rate={success_rate:.2}%");
assert!(
n_success > 0,
"burst returned zero successful responses (transport={transport_errors} rpc={rpc_errors})"
);
assert!(
success_rate > 95.0,
"burst success rate {:.1}% below 95% — is #154 fix (PR #161) deployed?",
success_rate
);
let (_, status_after) = probe_health(&client).await.expect("post-burst health");
println!(" post-burst /health.status = {status_after}");
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_uds_concurrent() {
let client = http_client();
let version = assert_daemon_alive(&client).await;
let palace = provision_palace(&client, "uds").await;
let uds_path = resolve_uds_path();
println!("UDS path: {}", uds_path.display());
let n_conns: usize = 20;
let per_conn: usize = 10;
let started = Instant::now();
let mut tasks: Vec<tokio::task::JoinHandle<(Vec<Duration>, usize)>> = Vec::new();
for i in 0..n_conns {
let path = uds_path.clone();
let palace = palace.clone();
tasks.push(tokio::spawn(async move {
let stream = match UnixStream::connect(&path).await {
Ok(s) => s,
Err(e) => {
eprintln!("connect {} failed: {e}", path.display());
return (Vec::new(), per_conn);
}
};
let (read_half, mut write_half) = stream.into_split();
let mut reader = BufReader::new(read_half);
let mut latencies = Vec::with_capacity(per_conn);
let mut errors = 0usize;
for j in 0..per_conn {
let req = json!({
"jsonrpc": "2.0",
"id": i * 1_000 + j,
"method": "memory_recall",
"params": {"palace": palace, "query": "seed entry", "top_k": 5}
});
let line = match serde_json::to_string(&req) {
Ok(s) => s + "\n",
Err(_) => {
errors += 1;
continue;
}
};
let t = Instant::now();
if write_half.write_all(line.as_bytes()).await.is_err() {
errors += 1;
continue;
}
if write_half.flush().await.is_err() {
errors += 1;
continue;
}
let mut response_line = String::new();
if reader.read_line(&mut response_line).await.is_err() {
errors += 1;
continue;
}
let elapsed = t.elapsed();
let body: Value = match serde_json::from_str(&response_line) {
Ok(v) => v,
Err(_) => {
errors += 1;
continue;
}
};
if body["jsonrpc"] != "2.0" {
errors += 1;
continue;
}
if body["id"] != json!(i * 1_000 + j) {
errors += 1;
continue;
}
if body.get("error").map(|e| !e.is_null()).unwrap_or(false) {
errors += 1;
continue;
}
latencies.push(elapsed);
}
(latencies, errors)
}));
}
let mut all_latencies = Vec::with_capacity(n_conns * per_conn);
let mut total_errors = 0usize;
for j in tasks {
let (lats, errs) = j.await.expect("task join");
all_latencies.extend(lats);
total_errors += errs;
}
let total_elapsed = started.elapsed();
let ops = (n_conns * per_conn) as f64;
let throughput = ops / total_elapsed.as_secs_f64();
let stats = latency_stats(all_latencies);
println!();
println!("=== test_uds_concurrent (daemon v{version}) ===");
println!(" conns={n_conns} per_conn={per_conn} total_ops={ops:.0} errors={total_errors}");
println!(" wall={total_elapsed:?} throughput={throughput:.1} req/s");
println!(" latency: {stats}");
assert_eq!(total_errors, 0, "expected 0 errors, got {total_errors}");
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_bridge_concurrent() {
let client = http_client();
let version = assert_daemon_alive(&client).await;
let n_bridges: usize = 10;
let per_bridge: usize = 5;
let started = Instant::now();
let mut tasks: Vec<tokio::task::JoinHandle<Result<Vec<Duration>, String>>> = Vec::new();
for i in 0..n_bridges {
tasks.push(tokio::spawn(async move {
let mut child = match Command::new(BRIDGE_BIN)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => return Err(format!("spawn {BRIDGE_BIN}: {e}")),
};
let mut stdin = child
.stdin
.take()
.ok_or_else(|| "stdin not captured".to_string())?;
let stdout = child
.stdout
.take()
.ok_or_else(|| "stdout not captured".to_string())?;
let mut reader = BufReader::new(stdout);
let mut latencies = Vec::with_capacity(per_bridge * 2);
for j in 0..per_bridge {
let id_a = i * 1_000 + j * 2;
let id_b = i * 1_000 + j * 2 + 1;
let req_a = serde_json::to_string(&json!({
"jsonrpc": "2.0",
"id": id_a,
"method": "tools/list",
"params": {}
}))
.map_err(|e| format!("serialise: {e}"))?
+ "\n";
let req_b = serde_json::to_string(&json!({
"jsonrpc": "2.0",
"id": id_b,
"method": "ping",
"params": {}
}))
.map_err(|e| format!("serialise: {e}"))?
+ "\n";
let t = Instant::now();
stdin
.write_all(req_a.as_bytes())
.await
.map_err(|e| format!("write a: {e}"))?;
stdin
.write_all(req_b.as_bytes())
.await
.map_err(|e| format!("write b: {e}"))?;
stdin.flush().await.map_err(|e| format!("flush: {e}"))?;
let mut line_a = String::new();
reader
.read_line(&mut line_a)
.await
.map_err(|e| format!("read a: {e}"))?;
if line_a.is_empty() {
return Err("eof before response a".to_string());
}
let resp_a: Value = serde_json::from_str(line_a.trim_end())
.map_err(|e| format!("parse a: {e} (line: {line_a:?})"))?;
if resp_a["id"] != json!(id_a) {
return Err(format!(
"expected id {id_a} got {} (full: {resp_a})",
resp_a["id"]
));
}
if resp_a.get("error").map(|e| !e.is_null()).unwrap_or(false) {
return Err(format!("rpc error a: {}", resp_a["error"]));
}
let mut line_b = String::new();
reader
.read_line(&mut line_b)
.await
.map_err(|e| format!("read b: {e}"))?;
if line_b.is_empty() {
return Err("eof before response b".to_string());
}
let resp_b: Value = serde_json::from_str(line_b.trim_end())
.map_err(|e| format!("parse b: {e} (line: {line_b:?})"))?;
if resp_b["id"] != json!(id_b) {
return Err(format!("expected id {id_b} got {}", resp_b["id"]));
}
if resp_b.get("error").map(|e| !e.is_null()).unwrap_or(false) {
return Err(format!("rpc error b: {}", resp_b["error"]));
}
latencies.push(t.elapsed());
}
drop(stdin);
let exit_status = tokio::time::timeout(Duration::from_secs(10), child.wait())
.await
.map_err(|_| "timeout waiting for bridge exit".to_string())?
.map_err(|e| format!("child.wait: {e}"))?;
if !exit_status.success() {
return Err(format!("bridge exited with {exit_status:?}"));
}
Ok(latencies)
}));
}
let mut all_latencies: Vec<Duration> = Vec::new();
let mut failures: Vec<String> = Vec::new();
for (i, t) in tasks.into_iter().enumerate() {
match t.await.expect("task join") {
Ok(lats) => all_latencies.extend(lats),
Err(e) => failures.push(format!("bridge {i}: {e}")),
}
}
let total_elapsed = started.elapsed();
let exchanges = (n_bridges * per_bridge) as f64;
let throughput = exchanges / total_elapsed.as_secs_f64();
println!();
println!("=== test_bridge_concurrent (daemon v{version}) ===");
println!(
" bridges={n_bridges} per_bridge={per_bridge} total_exchanges={exchanges:.0} failures={}",
failures.len()
);
println!(" wall={total_elapsed:?} throughput={throughput:.1} exchanges/s");
if !all_latencies.is_empty() {
println!(
" per-exchange (init+ping): {}",
latency_stats(all_latencies)
);
}
for f in &failures {
println!(" FAIL: {f}");
}
assert!(failures.is_empty(), "{} bridge(s) failed", failures.len());
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_http_sustained_load() {
let client = http_client();
let version = assert_daemon_alive(&client).await;
let palace = provision_palace(&client, "sustained").await;
let initial_health: Value = client
.get(format!("{HTTP_BASE}/health"))
.send()
.await
.expect("initial /health")
.json()
.await
.expect("parse /health");
let initial_rss = initial_health["rss_mb"].as_f64().unwrap_or(0.0);
let duration = Duration::from_secs(10);
let n_clients: usize = 10;
let deadline = Instant::now() + duration;
let total_ops = Arc::new(AtomicU64::new(0));
let total_errors = Arc::new(AtomicU64::new(0));
let mut tasks: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for i in 0..n_clients {
let client = client.clone();
let palace = palace.clone();
let total_ops = Arc::clone(&total_ops);
let total_errors = Arc::clone(&total_errors);
tasks.push(tokio::spawn(async move {
let mut j: u64 = 0;
while Instant::now() < deadline {
let req = if j.is_multiple_of(2) {
json!({
"jsonrpc": "2.0",
"id": i as u64 * 1_000_000 + j,
"method": "memory_remember",
"params": {
"palace": palace,
"text": format!("Sustained-load client {i} op {j} — long enough content to clear \
the min-token gate and exercise the embedding + KG pipelines."),
"force": true,
}
})
} else {
json!({
"jsonrpc": "2.0",
"id": i as u64 * 1_000_000 + j,
"method": "memory_recall",
"params": {"palace": palace, "query": "sustained load client", "top_k": 5}
})
};
match http_rpc(&client, req).await {
Ok((body, _)) => {
if body.get("error").map(|e| !e.is_null()).unwrap_or(false) {
total_errors.fetch_add(1, Ordering::Relaxed);
} else {
total_ops.fetch_add(1, Ordering::Relaxed);
}
}
Err(_) => {
total_errors.fetch_add(1, Ordering::Relaxed);
}
}
j += 1;
}
}));
}
let started = Instant::now();
for t in tasks {
let _ = t.await;
}
let wall = started.elapsed();
let ops = total_ops.load(Ordering::Relaxed);
let errs = total_errors.load(Ordering::Relaxed);
let throughput = ops as f64 / wall.as_secs_f64();
let error_rate = if ops + errs == 0 {
0.0
} else {
(errs as f64) / ((ops + errs) as f64) * 100.0
};
tokio::time::sleep(Duration::from_secs(2)).await;
let (final_rss, final_status) = probe_health(&client).await.expect("final /health");
let liveness_req = json!({
"jsonrpc": "2.0",
"id": 999_999,
"method": "palace_list",
"params": {}
});
let (live_body, _) = http_rpc(&client, liveness_req)
.await
.expect("post-load liveness palace_list");
let live_ok = live_body.get("error").is_none_or(|e| e.is_null())
&& live_body["result"]["palaces"].is_array();
println!();
println!("=== test_http_sustained_load (daemon v{version}) ===");
println!(" clients={n_clients} wall={wall:?} ops={ops} errors={errs}");
println!(" throughput={throughput:.1} ops/s error_rate={error_rate:.2}%");
println!(
" RSS: start={initial_rss:.0} MB end={final_rss:.0} MB delta={:+.0} MB",
final_rss - initial_rss
);
println!(" final /health.status = {final_status}");
println!(" post-load palace_list ok = {live_ok}");
assert!(
live_ok,
"post-load liveness call (palace_list) must succeed; body = {live_body:?}"
);
assert!(
error_rate < 5.0,
"sustained error rate {:.1}% above 5% — is #154 fix (PR #161) deployed?",
error_rate
);
}