use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{TimeZone, Utc};
use cortex_core::{AuditRecordId, Event, EventId, EventSource, EventType, TraceId, SCHEMA_VERSION};
use cortex_llm::{blake3_hex, LlmMessage, LlmRequest, LlmRole};
use cortex_reflect::{session_reflection_json_schema, DEFAULT_REFLECTION_MODEL};
use cortex_store::migrate::apply_pending;
use cortex_store::repo::memories::accept_candidate_policy_decision_test_allow;
use cortex_store::repo::{EventRepo, MemoryAcceptanceAudit, MemoryCandidate, MemoryRepo};
use rusqlite::Connection;
use serde_json::json;
fn cortex_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_cortex"))
}
fn run_in(cwd: &Path, args: &[&str]) -> std::process::Output {
Command::new(cortex_bin())
.current_dir(cwd)
.env("XDG_DATA_HOME", cwd.join("xdg"))
.env("HOME", cwd)
.args(args)
.output()
.expect("spawn cortex")
}
fn assert_exit(out: &std::process::Output, expected: i32) {
let code = out.status.code().expect("process exited via signal");
assert_eq!(
code,
expected,
"expected exit {expected}, got {code}\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
fn init(tmp: &Path) -> PathBuf {
let out = run_in(tmp, &["init"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let db_line = stdout
.lines()
.find(|line| line.starts_with("cortex init: db"))
.expect("init stdout must include db path line");
let path = db_line
.split_once('=')
.expect("db line must have '='")
.1
.trim()
.split_once(" (")
.expect("db line must have status suffix")
.0;
PathBuf::from(path)
}
fn tmp_dir(suffix: &str) -> tempfile::TempDir {
tempfile::Builder::new()
.prefix(&format!("cortex-e2e-full-{suffix}-"))
.tempdir()
.expect("create temp dir")
}
fn write_session_fixture(
dir: &Path,
name: &str,
trace_id: &str,
event_id_a: &str,
event_id_b: &str,
) -> PathBuf {
let path = dir.join(format!("{name}.json"));
let events = json!({
"events": [
{
"id": event_id_a,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:00Z",
"recorded_at": "2026-05-13T10:00:00Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": trace_id,
"session_id": "e2e-full-loop-session",
"domain_tags": ["testing"],
"payload": { "text": "E2E full loop test event one." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
},
{
"id": event_id_b,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:05Z",
"recorded_at": "2026-05-13T10:00:05Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": trace_id,
"session_id": "e2e-full-loop-session",
"domain_tags": ["testing"],
"payload": { "text": "E2E full loop test event two." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
}
]
});
fs::write(&path, serde_json::to_string_pretty(&events).unwrap())
.expect("write session fixture");
path
}
fn reflection_json(trace_id: &str, source_event_id: &str) -> String {
json!({
"trace_id": trace_id,
"episode_candidates": [
{
"summary": "E2E full loop produced a test memory.",
"source_event_ids": [source_event_id],
"domains": ["testing"],
"entities": ["Cortex"],
"candidate_meaning": "Full loop end-to-end works.",
"confidence": 0.85
}
],
"memory_candidates": [
{
"memory_type": "episodic",
"claim": "cortex session close activates memories for the full product loop.",
"source_episode_indexes": [0],
"applies_when": ["end-to-end testing"],
"does_not_apply_when": [],
"confidence": 0.85,
"initial_salience": {
"reusability": 0.8,
"consequence": 0.7,
"emotional_charge": 0.0
}
}
],
"contradictions": [],
"doctrine_suggestions": []
})
.to_string()
}
fn write_replay_fixtures(base_dir: &Path, trace_id: &str, reflection_text: &str) -> PathBuf {
let unique = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time after epoch")
.as_nanos();
let fixtures_dir = base_dir.join(format!("fixtures-{unique}"));
fs::create_dir(&fixtures_dir).expect("create fixtures dir");
let trace: TraceId = trace_id.parse().expect("valid trace id");
let req = LlmRequest {
model: DEFAULT_REFLECTION_MODEL.to_string(),
system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
messages: vec![LlmMessage {
role: LlmRole::User,
content: format!("Reflect trace {trace} into candidate-only Cortex memory."),
}],
temperature: 0.0,
max_tokens: 4096,
json_schema: Some(session_reflection_json_schema()),
timeout_ms: 30_000,
};
let fixture = json!({
"request_match": {
"model": DEFAULT_REFLECTION_MODEL,
"prompt_hash": req.prompt_hash()
},
"response": {
"text": reflection_text
}
});
let fixture_path = fixtures_dir.join("reflection.json");
let fixture_bytes = serde_json::to_vec_pretty(&fixture).expect("fixture serializes");
fs::write(&fixture_path, &fixture_bytes).expect("write fixture");
fs::write(
fixtures_dir.join("INDEX.toml"),
format!(
"[[fixture]]\npath = \"reflection.json\"\nblake3 = \"{}\"\n",
blake3_hex(&fixture_bytes)
),
)
.expect("write INDEX.toml");
fixtures_dir
}
fn insert_active_memory_with_lineage(
db_path: &Path,
memory_id: &str,
source_event_id: &str,
claim: &str,
domains: &[&str],
second: u32,
) {
let pool = Connection::open(db_path).expect("open db");
apply_pending(&pool).expect("apply migrations");
let event_repo = EventRepo::new(&pool);
if event_repo
.get_by_id(&source_event_id.parse::<EventId>().expect("valid event id"))
.expect("check event")
.is_none()
{
let ts = Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second).unwrap();
event_repo
.append(&Event {
id: source_event_id.parse().expect("valid event id"),
schema_version: SCHEMA_VERSION,
observed_at: ts,
recorded_at: ts,
source: EventSource::Tool {
name: "e2e-full-loop-test".into(),
},
event_type: EventType::ToolResult,
trace_id: None,
session_id: Some("e2e-full-loop-test".into()),
domain_tags: domains.iter().map(|d| (*d).to_string()).collect(),
payload: json!({"source": "e2e-full-loop-test", "second": second}),
payload_hash: format!("payload-full-e2e-{second}"),
prev_event_hash: None,
event_hash: format!("event-full-e2e-{second}"),
})
.expect("append source event to SQLite events table");
}
let repo = MemoryRepo::new(&pool);
let created_at = Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second).unwrap();
let candidate = MemoryCandidate {
id: memory_id.parse().expect("valid memory id"),
memory_type: "semantic".into(),
claim: claim.into(),
source_episodes_json: json!([]),
source_events_json: json!([source_event_id]),
domains_json: json!(domains),
salience_json: json!({"score": 0.85}),
confidence: 0.9,
authority: "user".into(),
applies_when_json: json!([]),
does_not_apply_when_json: json!([]),
created_at,
updated_at: created_at,
};
let id_str = candidate.id.to_string();
repo.insert_candidate(&candidate).expect("insert candidate");
let audit = MemoryAcceptanceAudit {
id: AuditRecordId::new(),
actor_json: json!({"kind": "e2e-full-loop-test"}),
reason: "e2e full loop search-eligible memory".into(),
source_refs_json: json!([id_str]),
created_at: Utc
.with_ymd_and_hms(2026, 5, 13, 12, 0, second + 1)
.unwrap(),
};
repo.accept_candidate(
&memory_id.parse().expect("valid memory id"),
Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, second + 2)
.unwrap(),
&audit,
&accept_candidate_policy_decision_test_allow(),
)
.expect("accept candidate");
}
#[test]
fn full_product_loop_init_close_search_context_outcome_health() {
let session_tmp = tmp_dir("session");
init(session_tmp.path());
let trace_id = TraceId::new().to_string();
let event_id_a = EventId::new().to_string();
let event_id_b = EventId::new().to_string();
let session_path = write_session_fixture(
session_tmp.path(),
"session-full-loop",
&trace_id,
&event_id_a,
&event_id_b,
);
let reflection = reflection_json(&trace_id, &event_id_a);
let fixtures_dir = write_replay_fixtures(session_tmp.path(), &trace_id, &reflection);
let close_out = run_in(
session_tmp.path(),
&[
"--json",
"session",
"close",
session_path.to_str().unwrap(),
"--fixtures-dir",
fixtures_dir.to_str().unwrap(),
],
);
assert_exit(&close_out, 0);
let close_stdout = String::from_utf8_lossy(&close_out.stdout);
let close_json: serde_json::Value = serde_json::from_str(&close_stdout)
.expect("session close --json stdout must be valid JSON");
assert_eq!(
close_json["command"].as_str(),
Some("cortex.session.close"),
"session close envelope command must be cortex.session.close: {close_json}"
);
let close_report = &close_json["report"];
let activated_count = close_report["activated_count"]
.as_u64()
.expect("activated_count must be a number");
assert!(
activated_count >= 1,
"session close must activate at least one memory; report: {close_report}"
);
assert_eq!(
close_report["no_candidates"].as_bool(),
Some(false),
"session close must not report no_candidates: {close_report}"
);
let activated_ids = close_report["activated_memory_ids"]
.as_array()
.expect("activated_memory_ids must be an array");
assert!(
!activated_ids.is_empty(),
"activated_memory_ids must not be empty: {close_report}"
);
let session_memory_id = activated_ids[0]
.as_str()
.expect("memory id must be a string")
.to_string();
let list_out = run_in(session_tmp.path(), &["--json", "memory", "list"]);
assert_exit(&list_out, 0);
let list_stdout = String::from_utf8_lossy(&list_out.stdout);
let list_json: serde_json::Value =
serde_json::from_str(&list_stdout).expect("memory list --json must be valid JSON");
let list_count = list_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
list_count >= 1,
"memory list must return at least one active memory after session close: {list_json}"
);
let health_pre_out = run_in(session_tmp.path(), &["--json", "memory", "health"]);
assert_exit(&health_pre_out, 0);
let health_pre_stdout = String::from_utf8_lossy(&health_pre_out.stdout);
let health_pre_json: serde_json::Value = serde_json::from_str(&health_pre_stdout)
.expect("memory health --json (pre) must be valid JSON");
let health_pre = &health_pre_json["report"];
let total_active_pre = health_pre["total_active"]
.as_u64()
.expect("total_active must be a number");
assert!(
total_active_pre >= 1,
"memory health total_active must be >= 1 after session close: {health_pre}"
);
assert!(
health_pre["low_confidence_count"].is_number(),
"health report must have low_confidence_count: {health_pre}"
);
assert!(
health_pre["never_validated_count"].is_number(),
"health report must have never_validated_count: {health_pre}"
);
assert!(
health_pre["no_outcome_count"].is_number(),
"health report must have no_outcome_count: {health_pre}"
);
let never_validated_pre = health_pre["never_validated_count"]
.as_u64()
.expect("never_validated_count must be a number");
assert!(
never_validated_pre >= 1,
"newly activated memories have validation_epoch=NULL so never_validated_count >= 1: {health_pre}"
);
let no_outcome_pre = health_pre["no_outcome_count"]
.as_u64()
.expect("no_outcome_count must be a number");
assert!(
no_outcome_pre >= 1,
"newly activated memories have no outcome records yet so no_outcome_count >= 1: {health_pre}"
);
let outcome_out = run_in(
session_tmp.path(),
&[
"memory",
"outcome",
"record",
"--memory-id",
&session_memory_id,
"--session",
"e2e-full-loop-session",
"--result",
"helpful",
],
);
assert_exit(&outcome_out, 0);
let outcome_stdout = String::from_utf8_lossy(&outcome_out.stdout);
assert!(
outcome_stdout.contains("outcome recorded"),
"outcome record must confirm success: stdout={outcome_stdout} stderr={}",
String::from_utf8_lossy(&outcome_out.stderr)
);
assert!(
outcome_stdout.contains(&session_memory_id),
"outcome record must echo the memory id: {outcome_stdout}"
);
let health_post_out = run_in(session_tmp.path(), &["--json", "memory", "health"]);
assert_exit(&health_post_out, 0);
let health_post_stdout = String::from_utf8_lossy(&health_post_out.stdout);
let health_post_json: serde_json::Value = serde_json::from_str(&health_post_stdout)
.expect("memory health --json (post) must be valid JSON");
let health_post = &health_post_json["report"];
let no_outcome_post = health_post["no_outcome_count"]
.as_u64()
.expect("no_outcome_count must be a number");
assert!(
no_outcome_post < no_outcome_pre,
"no_outcome_count must decrease after recording an outcome: \
before={no_outcome_pre} after={no_outcome_post}; {health_post}"
);
assert!(
health_post["never_validated_count"].is_number(),
"health report must still have never_validated_count after outcome: {health_post}"
);
let total_active_post = health_post["total_active"]
.as_u64()
.expect("total_active must be a number");
assert!(
total_active_post >= total_active_pre,
"total_active must not decrease; before={total_active_pre} after={total_active_post}"
);
let session_search_out = run_in(
session_tmp.path(),
&["--json", "memory", "search", "session"],
);
assert_exit(&session_search_out, 0);
let session_search_stdout = String::from_utf8_lossy(&session_search_out.stdout);
let session_search_json: serde_json::Value = serde_json::from_str(&session_search_stdout)
.expect("session-store memory search --json must be valid JSON");
let session_search_count = session_search_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
session_search_count >= 1,
"memory search must return at least one match for 'session' in the session store \
after proof-closure fix: {session_search_json}"
);
let search_tmp = tmp_dir("search");
let search_db = init(search_tmp.path());
let search_mem_id = "mem_01ARZ3NDEKTSV4RRFFQ69G5FAE";
let search_event_id = "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV";
let search_claim = "cortex full product loop is verified end-to-end without Ollama.";
insert_active_memory_with_lineage(
&search_db,
search_mem_id,
search_event_id,
search_claim,
&["testing"],
10,
);
let search_out = run_in(search_tmp.path(), &["--json", "memory", "search", "cortex"]);
assert_exit(&search_out, 0);
let search_stdout = String::from_utf8_lossy(&search_out.stdout);
let search_json: serde_json::Value =
serde_json::from_str(&search_stdout).expect("memory search --json must be valid JSON");
let search_count = search_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
search_count >= 1,
"memory search must return at least one match for 'cortex': {search_json}"
);
let matches = search_json["report"]["matches"]
.as_array()
.expect("matches must be an array");
let found = matches
.iter()
.any(|m| m["memory_id"].as_str() == Some(search_mem_id));
assert!(
found,
"search results must include the lineage-valid memory {search_mem_id}: {search_json}"
);
let ctx_out = run_in(
search_tmp.path(),
&["--json", "context", "build", "--task", "cortex"],
);
assert_exit(&ctx_out, 0);
let ctx_stdout = String::from_utf8_lossy(&ctx_out.stdout);
let ctx_json: serde_json::Value =
serde_json::from_str(&ctx_stdout).expect("context build --json must be valid JSON");
assert_eq!(
ctx_json["command"].as_str(),
Some("cortex.context.build"),
"context build envelope command must be cortex.context.build: {ctx_json}"
);
let selected_refs = ctx_json["report"]["selected_refs"]
.as_array()
.expect("context pack report must have selected_refs array");
assert!(
!selected_refs.is_empty(),
"context build must include at least one selected ref when active memories exist: {ctx_json}"
);
let search_outcome_out = run_in(
search_tmp.path(),
&[
"memory",
"outcome",
"record",
"--memory-id",
search_mem_id,
"--session",
"e2e-full-loop-search-session",
"--result",
"helpful",
],
);
assert_exit(&search_outcome_out, 0);
let search_outcome_stdout = String::from_utf8_lossy(&search_outcome_out.stdout);
assert!(
search_outcome_stdout.contains("outcome recorded"),
"search-store outcome record must confirm success: stdout={search_outcome_stdout} stderr={}",
String::from_utf8_lossy(&search_outcome_out.stderr)
);
assert!(
search_outcome_stdout.contains(search_mem_id),
"search-store outcome record must echo the memory id: {search_outcome_stdout}"
);
}