use car_memgine::{MemgineEngine, SkillOutcome, SkillTrigger};
use car_server_core::{run_dispatch, ServerState, ServerStateConfig};
use chrono::Utc;
use futures::{SinkExt, StreamExt};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{accept_async, connect_async, MaybeTlsStream, WebSocketStream};
fn state_with_engine(
journal_dir: std::path::PathBuf,
engine: MemgineEngine,
) -> Arc<ServerState> {
let approvals = journal_dir.join("approvals.jsonl");
let cfg = ServerStateConfig::new(journal_dir)
.with_shared_memgine(Arc::new(Mutex::new(engine)))
.with_approval_journal(approvals);
Arc::new(ServerState::with_config(cfg))
}
fn pressured_engine() -> MemgineEngine {
let mut e = MemgineEngine::new(None);
for i in 0..25 {
e.ingest_fact(
&format!("f{i}"),
&format!("k{i}"),
&format!("value {i}"),
"test",
"user",
Utc::now(),
"global",
None,
vec![],
false,
);
}
for i in 0..8 {
e.report_fact_outdated(&format!("f{i}"));
}
for name in ["good", "bad"] {
e.ingest_skill(
name,
"",
"shell",
SkillTrigger::default(),
"s",
None,
vec![],
vec![],
);
}
for _ in 0..4 {
e.report_outcome("bad", SkillOutcome::Fail);
}
for _ in 0..10 {
e.report_outcome("good", SkillOutcome::Success);
}
for i in 0..12 {
e.ingest_conversation("user", &format!("turn {i}: context signal"), Utc::now());
}
e
}
async fn connect(
state: &Arc<ServerState>,
) -> WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>> {
let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
0,
)))
.await
.expect("bind loopback");
let addr = listener.local_addr().expect("local_addr");
let st = state.clone();
tokio::spawn(async move {
let (stream, peer) = listener.accept().await.expect("accept");
let ws = accept_async(stream).await.expect("ws handshake");
let (write, read) = ws.split();
let _ = run_dispatch(read, Box::pin(write), peer.to_string(), st).await;
});
let url = format!("ws://{}", addr);
let (ws, _resp) = connect_async(&url).await.expect("ws client connect");
ws
}
async fn send_recv(
ws: &mut WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
request: serde_json::Value,
) -> serde_json::Value {
let body = serde_json::to_string(&request).expect("request to_string");
ws.send(Message::Text(body.into())).await.expect("send");
let resp = ws.next().await.expect("frame").expect("frame ok");
let text = match resp {
Message::Text(t) => t.to_string(),
other => panic!("expected Text frame, got {:?}", other),
};
serde_json::from_str(&text).expect("parse response JSON")
}
fn decision_components(plan: &serde_json::Value) -> Vec<String> {
plan["decisions"]
.as_array()
.expect("decisions")
.iter()
.map(|d| d["component"].as_str().unwrap().to_string())
.collect()
}
#[tokio::test]
async fn evolution_plan_populates_live_components_and_omits_absent_sources() {
let tmp = TempDir::new().unwrap();
let state = state_with_engine(tmp.path().join("journals"), pressured_engine());
let mut ws = connect(&state).await;
let resp = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "p1", "method": "evolution.plan", "params": {}
}),
)
.await;
let plan = resp.get("result").expect("result");
let components = decision_components(plan);
for c in ["memory", "skills", "context"] {
assert!(components.contains(&c.to_string()), "{c} in {components:?}");
}
assert!(
!components.contains(&"harness".to_string()),
"harness must be absent on an empty session log: {components:?}"
);
let evolve_now: Vec<&str> = plan["evolve_now"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap())
.collect();
assert!(evolve_now.contains(&"memory"), "{evolve_now:?}");
assert!(evolve_now.contains(&"skills"), "{evolve_now:?}");
}
#[tokio::test]
async fn evolution_run_dry_run_reports_without_side_effects() {
let tmp = TempDir::new().unwrap();
let state = state_with_engine(tmp.path().join("journals"), pressured_engine());
let mut ws = connect(&state).await;
let resp = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "r1", "method": "evolution.run",
"params": { "dry_run": true }
}),
)
.await;
let result = resp.get("result").expect("result");
let steps = result["steps"].as_array().expect("steps");
let memory = steps
.iter()
.find(|s| s["component"] == "memory")
.expect("memory step");
assert_eq!(memory["ran"], true);
let outcome = memory["outcome"].as_str().unwrap();
assert!(outcome.starts_with("dry_run: would consolidate"), "{outcome}");
assert!(outcome.contains("maintenance:"), "{outcome}");
let skills = steps
.iter()
.find(|s| s["component"] == "skills")
.expect("skills step");
assert_eq!(skills["ran"], false);
assert_eq!(skills["outcome"], "no inference engine");
let events = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "q1", "method": "events.query",
"params": { "kinds": ["evolution_triggered"] }
}),
)
.await;
assert_eq!(events["result"]["count"], 0, "{events}");
}
#[tokio::test]
async fn evolution_run_real_memory_consolidates_and_audits() {
let tmp = TempDir::new().unwrap();
let state = state_with_engine(tmp.path().join("journals"), pressured_engine());
let mut ws = connect(&state).await;
let resp = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "r2", "method": "evolution.run", "params": {}
}),
)
.await;
let result = resp.get("result").expect("result");
let steps = result["steps"].as_array().expect("steps");
let memory = steps
.iter()
.find(|s| s["component"] == "memory")
.expect("memory step");
assert_eq!(memory["ran"], true);
let outcome = memory["outcome"].as_str().unwrap();
assert!(outcome.contains("\"mechanism\":\"consolidate\""), "{outcome}");
assert!(outcome.contains("\"maintenance\""), "{outcome}");
let evolved: Vec<&str> = result["evolved"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap())
.collect();
assert!(evolved.contains(&"memory"), "{evolved:?}");
let events = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "q2", "method": "events.query",
"params": { "kinds": ["evolution_triggered"] }
}),
)
.await;
assert_eq!(events["result"]["count"], 1, "{events}");
let ev = &events["result"]["events"][0];
assert_eq!(ev["data"]["source"], "evolution.run");
}
fn harness_baseline() -> serde_json::Value {
serde_json::json!({
"trajectory_efficiency": {
"attempts_total": 20,
"actions_succeeded": 8,
"failed_attempts": 12,
"success_rate": 0.4
},
"recovery": { "retries": 12 }
})
}
async fn run_evolution(
ws: &mut WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
id: &str,
) -> serde_json::Value {
send_recv(
ws,
serde_json::json!({
"jsonrpc": "2.0", "id": id, "method": "evolution.run",
"params": { "harness_baseline_metrics": harness_baseline() }
}),
)
.await
}
fn harness_step(result: &serde_json::Value) -> serde_json::Value {
result["steps"]
.as_array()
.expect("steps")
.iter()
.find(|s| s["component"] == "harness")
.expect("harness step")
.clone()
}
#[tokio::test]
async fn approval_on_one_connection_applies_on_another() {
let tmp = TempDir::new().unwrap();
let state = state_with_engine(tmp.path().join("journals"), MemgineEngine::new(None));
let mut runner = connect(&state).await;
let r1 = run_evolution(&mut runner, "e1").await;
let result1 = r1.get("result").expect("result: {r1}");
let step1 = harness_step(result1);
assert_eq!(step1["ran"], true);
assert_eq!(step1["applied"], false, "{step1}");
assert!(
!result1["evolved"]
.as_array()
.unwrap()
.iter()
.any(|v| v == "harness"),
"pending-only run must not claim harness evolved (S2): {result1}"
);
let pending = result1["pending_approvals"]
.as_array()
.expect("pending_approvals surfaced");
let fingerprint = pending[0]["fingerprint"].as_str().unwrap().to_string();
assert!(fingerprint.starts_with("harness:retry:"), "{fingerprint}");
let mut approver = connect(&state).await;
let a = send_recv(
&mut approver,
serde_json::json!({
"jsonrpc": "2.0", "id": "ap1", "method": "permission.approve",
"params": { "fingerprint": fingerprint, "reason": "reviewed retry tuning" }
}),
)
.await;
assert!(a.get("result").is_some(), "approve failed: {a}");
let mut runner2 = connect(&state).await;
let r2 = run_evolution(&mut runner2, "e2").await;
let result2 = r2.get("result").expect("result");
let step2 = harness_step(result2);
assert_eq!(step2["ran"], true);
assert_eq!(step2["applied"], true, "{step2}");
assert!(
result2["evolved"]
.as_array()
.unwrap()
.iter()
.any(|v| v == "harness"),
"{result2}"
);
let outcome2 = step2["outcome"].as_str().unwrap();
assert!(outcome2.contains("\"applied\":1"), "{outcome2}");
assert!(outcome2.contains("human_approved"), "{outcome2}");
}
#[tokio::test]
async fn dry_run_lists_pending_approvals_without_side_effects() {
let tmp = TempDir::new().unwrap();
let state = state_with_engine(tmp.path().join("journals"), MemgineEngine::new(None));
let mut ws = connect(&state).await;
let r = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "d1", "method": "evolution.run",
"params": { "dry_run": true, "harness_baseline_metrics": harness_baseline() }
}),
)
.await;
let result = r.get("result").expect("result");
let pending = result["pending_approvals"]
.as_array()
.expect("dry_run must still list what needs approval");
assert!(!pending.is_empty());
assert!(
pending[0]["reason"].as_str().unwrap_or("").contains("harness_candidate_metrics"),
"the no-candidate-metrics reason is stated: {pending:?}"
);
let events = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "q", "method": "events.query",
"params": { "kinds": ["evolution_triggered"] }
}),
)
.await;
assert_eq!(events["result"]["count"], 0, "dry_run appends no audit event");
}
#[tokio::test]
async fn approval_survives_daemon_restart() {
let tmp = TempDir::new().unwrap();
let fingerprint = {
let state = state_with_engine(tmp.path().join("journals"), MemgineEngine::new(None));
let mut ws = connect(&state).await;
let r = run_evolution(&mut ws, "e1").await;
let fp = r["result"]["pending_approvals"][0]["fingerprint"]
.as_str()
.unwrap()
.to_string();
let a = send_recv(
&mut ws,
serde_json::json!({
"jsonrpc": "2.0", "id": "ap", "method": "permission.approve",
"params": { "fingerprint": fp, "reason": "ok" }
}),
)
.await;
assert!(a.get("result").is_some(), "{a}");
fp
};
let state2 = state_with_engine(tmp.path().join("journals"), MemgineEngine::new(None));
let mut ws2 = connect(&state2).await;
let r2 = run_evolution(&mut ws2, "e2").await;
let result2 = r2.get("result").expect("result");
let step2 = harness_step(result2);
assert_eq!(step2["applied"], true, "{step2}");
assert!(
result2.get("pending_approvals").is_none()
|| !result2["pending_approvals"]
.as_array()
.unwrap()
.iter()
.any(|p| p["fingerprint"] == fingerprint.as_str()),
"an approved fingerprint must not re-surface as pending: {result2}"
);
}