use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{TimeZone, Utc};
use cortex_core::{AuditRecordId, Event, EventId, EventSource, EventType, TraceId};
use cortex_llm::{blake3_hex, LlmMessage, LlmRequest, LlmRole};
use cortex_reflect::{session_reflection_json_schema, DEFAULT_REFLECTION_MODEL};
use cortex_store::migrate::apply_pending;
use cortex_store::repo::memories::accept_candidate_policy_decision_test_allow;
use cortex_store::repo::{EventRepo, MemoryAcceptanceAudit, MemoryCandidate, MemoryRepo};
use rusqlite::Connection;
use serde_json::json;
fn cortex_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_cortex"))
}
fn run_in(cwd: &Path, args: &[&str]) -> std::process::Output {
Command::new(cortex_bin())
.current_dir(cwd)
.env("XDG_DATA_HOME", cwd.join("xdg"))
.env("HOME", cwd)
.args(args)
.output()
.expect("spawn cortex")
}
fn assert_exit(out: &std::process::Output, expected: i32) {
let code = out.status.code().expect("process exited via signal");
assert_eq!(
code,
expected,
"expected exit {expected}, got {code}\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
fn init(tmp: &Path) -> PathBuf {
let out = run_in(tmp, &["init"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let db_line = stdout
.lines()
.find(|line| line.starts_with("cortex init: db"))
.expect("init stdout includes db path");
let path = db_line
.split_once('=')
.expect("db line has equals")
.1
.trim()
.split_once(" (")
.expect("db line has status suffix")
.0;
PathBuf::from(path)
}
fn tmp_dir(test_name: &str) -> tempfile::TempDir {
tempfile::Builder::new()
.prefix(&format!("cortex-e2e-loop-{test_name}-"))
.tempdir()
.expect("create temp dir")
}
fn write_session_fixture(
dir: &Path,
name: &str,
trace_id: &str,
event_id_a: &str,
event_id_b: &str,
) -> PathBuf {
let path = dir.join(format!("{name}.json"));
let events = json!({
"events": [
{
"id": event_id_a,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:00Z",
"recorded_at": "2026-05-13T10:00:00Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": trace_id,
"session_id": "e2e-loop-session",
"domain_tags": ["testing"],
"payload": { "text": "E2E memory loop test event one." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
},
{
"id": event_id_b,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:05Z",
"recorded_at": "2026-05-13T10:00:05Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": trace_id,
"session_id": "e2e-loop-session",
"domain_tags": ["testing"],
"payload": { "text": "E2E memory loop test event two." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
}
]
});
fs::write(&path, serde_json::to_string_pretty(&events).unwrap())
.expect("write session fixture");
path
}
fn reflection_json(trace_id: &str, source_event_id: &str, claim: &str) -> String {
json!({
"trace_id": trace_id,
"episode_candidates": [
{
"summary": "E2E memory loop produced a test memory.",
"source_event_ids": [source_event_id],
"domains": ["testing"],
"entities": ["Cortex"],
"candidate_meaning": "End-to-end loop works.",
"confidence": 0.85
}
],
"memory_candidates": [
{
"memory_type": "episodic",
"claim": claim,
"source_episode_indexes": [0],
"applies_when": ["testing"],
"does_not_apply_when": [],
"confidence": 0.85,
"initial_salience": {
"reusability": 0.8,
"consequence": 0.7,
"emotional_charge": 0.0
}
}
],
"contradictions": [],
"doctrine_suggestions": []
})
.to_string()
}
fn write_replay_fixtures(base_dir: &Path, trace_id: &str, reflection_text: &str) -> PathBuf {
let unique = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time after epoch")
.as_nanos();
let fixtures_dir = base_dir.join(format!("fixtures-{unique}"));
fs::create_dir(&fixtures_dir).expect("create fixtures dir");
let trace: TraceId = trace_id.parse().expect("valid trace id");
let req = LlmRequest {
model: DEFAULT_REFLECTION_MODEL.to_string(),
system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
messages: vec![LlmMessage {
role: LlmRole::User,
content: format!("Reflect trace {trace} into candidate-only Cortex memory."),
}],
temperature: 0.0,
max_tokens: 4096,
json_schema: Some(session_reflection_json_schema()),
timeout_ms: 30_000,
};
let fixture = json!({
"request_match": {
"model": DEFAULT_REFLECTION_MODEL,
"prompt_hash": req.prompt_hash()
},
"response": {
"text": reflection_text
}
});
let fixture_path = fixtures_dir.join("reflection.json");
let fixture_bytes = serde_json::to_vec_pretty(&fixture).expect("fixture serializes");
fs::write(&fixture_path, &fixture_bytes).expect("write fixture");
fs::write(
fixtures_dir.join("INDEX.toml"),
format!(
"[[fixture]]\npath = \"reflection.json\"\nblake3 = \"{}\"\n",
blake3_hex(&fixture_bytes)
),
)
.expect("write INDEX.toml");
fixtures_dir
}
fn insert_active_memory_with_lineage(
db_path: &Path,
memory_id: &str,
claim: &str,
domains: &[&str],
second: u32,
) {
let pool = Connection::open(db_path).expect("open db");
apply_pending(&pool).expect("apply migrations");
let event_id = "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV";
let event_repo = EventRepo::new(&pool);
if event_repo
.get_by_id(&event_id.parse::<EventId>().expect("valid event id"))
.expect("check event")
.is_none()
{
let ts = Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second).unwrap();
event_repo
.append(&Event {
id: event_id.parse().expect("valid event id"),
schema_version: cortex_core::SCHEMA_VERSION,
observed_at: ts,
recorded_at: ts,
source: EventSource::Tool {
name: "e2e-loop-test".into(),
},
event_type: EventType::ToolResult,
trace_id: None,
session_id: Some("e2e-loop-test".into()),
domain_tags: domains.iter().map(|d| (*d).to_string()).collect(),
payload: json!({"source": "e2e-loop-test", "second": second}),
payload_hash: format!("payload-e2e-{second}"),
prev_event_hash: None,
event_hash: format!("event-e2e-{second}"),
})
.expect("append source event to SQLite events table");
}
let repo = MemoryRepo::new(&pool);
let candidate = MemoryCandidate {
id: memory_id.parse().expect("valid memory id"),
memory_type: "semantic".into(),
claim: claim.into(),
source_episodes_json: json!([]),
source_events_json: json!([event_id]),
domains_json: json!(domains),
salience_json: json!({"score": 0.8}),
confidence: 0.85,
authority: "user".into(),
applies_when_json: json!([]),
does_not_apply_when_json: json!([]),
created_at: Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second).unwrap(),
updated_at: Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second).unwrap(),
};
let id = candidate.id.to_string();
repo.insert_candidate(&candidate).expect("insert candidate");
let audit = MemoryAcceptanceAudit {
id: AuditRecordId::new(),
actor_json: json!({"kind": "e2e-loop-test"}),
reason: "e2e loop test memory".into(),
source_refs_json: json!([id]),
created_at: Utc
.with_ymd_and_hms(2026, 5, 13, 12, 0, second + 1)
.unwrap(),
};
repo.accept_candidate(
&memory_id.parse().expect("valid memory id"),
Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second + 2)
.unwrap(),
&audit,
&accept_candidate_policy_decision_test_allow(),
)
.expect("accept candidate");
}
#[test]
fn full_memory_loop_session_close_commit_search_outcome() {
let tmp = tmp_dir("full_loop");
init(tmp.path());
let trace_id_1 = TraceId::new().to_string();
let event_id_1a = EventId::new().to_string();
let event_id_1b = EventId::new().to_string();
let session_path_1 = write_session_fixture(
tmp.path(),
"session-1",
&trace_id_1,
&event_id_1a,
&event_id_1b,
);
let claim_1 = "cortex session close activates memories immediately via the CLI path.";
let reflection_1 = reflection_json(&trace_id_1, &event_id_1a, claim_1);
let fixtures_dir_1 = write_replay_fixtures(tmp.path(), &trace_id_1, &reflection_1);
let close_1 = run_in(
tmp.path(),
&[
"--json",
"session",
"close",
session_path_1.to_str().unwrap(),
"--fixtures-dir",
fixtures_dir_1.to_str().unwrap(),
],
);
assert_exit(&close_1, 0);
let close_1_stdout = String::from_utf8_lossy(&close_1.stdout);
let close_1_json: serde_json::Value =
serde_json::from_str(&close_1_stdout).expect("session close JSON must be valid");
assert_eq!(
close_1_json["command"].as_str(),
Some("cortex.session.close"),
"envelope command field must be cortex.session.close: {close_1_json}"
);
let close_1_report = &close_1_json["report"];
let activated_count_1 = close_1_report["activated_count"]
.as_u64()
.expect("activated_count must be a number");
assert!(
activated_count_1 >= 1,
"first session close must activate at least one memory; report: {close_1_report}"
);
assert_eq!(
close_1_report["no_candidates"].as_bool(),
Some(false),
"first session close must not report no_candidates: {close_1_report}"
);
let list_out = run_in(tmp.path(), &["--json", "memory", "list"]);
assert_exit(&list_out, 0);
let list_stdout = String::from_utf8_lossy(&list_out.stdout);
let list_json: serde_json::Value =
serde_json::from_str(&list_stdout).expect("memory list JSON must be valid");
let list_count = list_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
list_count >= 1,
"memory list must return at least one active memory after session close: {list_json}"
);
let health_out = run_in(tmp.path(), &["--json", "memory", "health"]);
assert_exit(&health_out, 0);
let health_stdout = String::from_utf8_lossy(&health_out.stdout);
let health_json: serde_json::Value =
serde_json::from_str(&health_stdout).expect("memory health JSON must be valid");
let health_report = &health_json["report"];
let total_active = health_report["total_active"]
.as_u64()
.expect("total_active must be a number");
assert!(
total_active >= 1,
"memory health total_active must be >= 1 after session close: {health_report}"
);
assert!(
health_report["low_confidence_count"].is_number(),
"health report must have low_confidence_count: {health_report}"
);
assert!(
health_report["never_validated_count"].is_number(),
"health report must have never_validated_count: {health_report}"
);
assert!(
health_report["no_outcome_count"].is_number(),
"health report must have no_outcome_count: {health_report}"
);
let no_outcome_count = health_report["no_outcome_count"]
.as_u64()
.expect("no_outcome_count must be a number");
assert!(
no_outcome_count >= 1,
"newly activated memories have no outcome records yet: {health_report}"
);
let activated_ids = close_1_report["activated_memory_ids"]
.as_array()
.expect("activated_memory_ids must be an array");
assert!(
!activated_ids.is_empty(),
"activated_memory_ids must not be empty: {close_1_report}"
);
let memory_id = activated_ids[0]
.as_str()
.expect("memory id must be a string");
let outcome_out = run_in(
tmp.path(),
&[
"memory",
"outcome",
"record",
"--memory-id",
memory_id,
"--session",
"e2e-loop-session",
"--result",
"helpful",
],
);
assert_exit(&outcome_out, 0);
let outcome_stdout = String::from_utf8_lossy(&outcome_out.stdout);
assert!(
outcome_stdout.contains("outcome recorded"),
"outcome record must confirm success: stdout={outcome_stdout} stderr={}",
String::from_utf8_lossy(&outcome_out.stderr)
);
assert!(
outcome_stdout.contains(memory_id),
"outcome record must echo the memory id: {outcome_stdout}"
);
let health_after_out = run_in(tmp.path(), &["--json", "memory", "health"]);
assert_exit(&health_after_out, 0);
let health_after_stdout = String::from_utf8_lossy(&health_after_out.stdout);
let health_after_json: serde_json::Value =
serde_json::from_str(&health_after_stdout).expect("memory health after JSON must be valid");
let no_outcome_after = health_after_json["report"]["no_outcome_count"]
.as_u64()
.expect("no_outcome_count must be a number");
if total_active == 1 {
assert_eq!(
no_outcome_after, 0,
"after recording the only memory's outcome, no_outcome_count must be 0: {health_after_json}"
);
}
let trace_id_2 = TraceId::new().to_string();
let event_id_2a = EventId::new().to_string();
let event_id_2b = EventId::new().to_string();
let session_path_2 = write_session_fixture(
tmp.path(),
"session-2",
&trace_id_2,
&event_id_2a,
&event_id_2b,
);
let claim_2 = "cortex second session adds more memories to the active store.";
let reflection_2 = reflection_json(&trace_id_2, &event_id_2a, claim_2);
let fixtures_dir_2 = write_replay_fixtures(tmp.path(), &trace_id_2, &reflection_2);
let close_2 = run_in(
tmp.path(),
&[
"--json",
"session",
"close",
session_path_2.to_str().unwrap(),
"--fixtures-dir",
fixtures_dir_2.to_str().unwrap(),
],
);
assert_exit(&close_2, 0);
let close_2_stdout = String::from_utf8_lossy(&close_2.stdout);
let close_2_json: serde_json::Value =
serde_json::from_str(&close_2_stdout).expect("second session close JSON must be valid");
let close_2_report = &close_2_json["report"];
let activated_count_2 = close_2_report["activated_count"]
.as_u64()
.expect("activated_count must be a number");
assert!(
activated_count_2 >= 1,
"second session close must activate at least one memory: {close_2_report}"
);
let list_final = run_in(tmp.path(), &["--json", "memory", "list"]);
assert_exit(&list_final, 0);
let list_final_stdout = String::from_utf8_lossy(&list_final.stdout);
let list_final_json: serde_json::Value =
serde_json::from_str(&list_final_stdout).expect("final memory list JSON must be valid");
let list_final_count = list_final_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
list_final_count > list_count,
"second session close must grow the active memory count; before={list_count} after={list_final_count}"
);
}
#[test]
fn memory_search_returns_active_properly_lineaged_memories() {
let tmp = tmp_dir("search_lineage");
let db_path = init(tmp.path());
let search_mem_id = "mem_01ARZ3NDEKTSV4RRFFQ69G5FA5";
let claim = "cortex search validates that active memories with proper lineage are returned.";
insert_active_memory_with_lineage(&db_path, search_mem_id, claim, &["testing"], 10);
let search_out = run_in(tmp.path(), &["--json", "memory", "search", "cortex"]);
assert_exit(&search_out, 0);
let search_stdout = String::from_utf8_lossy(&search_out.stdout);
let search_json: serde_json::Value =
serde_json::from_str(&search_stdout).expect("memory search JSON must be valid");
let search_count = search_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
search_count >= 1,
"memory search must return the properly-lineaged active memory containing 'cortex': {search_json}"
);
let matches = search_json["report"]["matches"]
.as_array()
.expect("matches must be an array");
let found = matches
.iter()
.any(|m| m["memory_id"].as_str() == Some(search_mem_id));
assert!(
found,
"search results must include the memory with proper lineage {search_mem_id}: {search_json}"
);
let search_plain = run_in(tmp.path(), &["memory", "search", "cortex"]);
assert_exit(&search_plain, 0);
let plain_stdout = String::from_utf8_lossy(&search_plain.stdout);
assert!(
!plain_stdout.contains("no matches"),
"plain-text search must not say no-matches for an active memory: {plain_stdout}"
);
assert!(
plain_stdout.contains(search_mem_id),
"plain-text search must include the memory id: {plain_stdout}"
);
}