use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};
use cortex_core::{EventId, MemoryId, TraceId};
use cortex_llm::{blake3_hex, LlmMessage, LlmRequest, LlmRole};
use cortex_reflect::{session_reflection_json_schema, DEFAULT_REFLECTION_MODEL};
use cortex_store::migrate::apply_pending;
use cortex_store::repo::{MemoryCandidate, MemoryRepo};
use rusqlite::Connection;
use serde_json::json;
fn cortex_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_cortex"))
}
fn run_in(cwd: &Path, args: &[&str]) -> std::process::Output {
Command::new(cortex_bin())
.current_dir(cwd)
.env("XDG_DATA_HOME", cwd.join("xdg"))
.env("HOME", cwd)
.args(args)
.output()
.expect("spawn cortex")
}
fn assert_exit(out: &std::process::Output, expected: i32) {
let code = out.status.code().expect("process exited via signal");
assert_eq!(
code,
expected,
"expected exit {expected}, got {code}\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
fn init(tmp: &Path) -> PathBuf {
let out = run_in(tmp, &["init"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let db_line = stdout
.lines()
.find(|line| line.starts_with("cortex init: db"))
.expect("init stdout includes db path");
let path = db_line
.split_once('=')
.expect("db line has equals")
.1
.trim()
.split_once(" (")
.expect("db line has status suffix")
.0;
PathBuf::from(path)
}
fn tmp_dir(test_name: &str) -> tempfile::TempDir {
tempfile::Builder::new()
.prefix(&format!("cortex-session-{test_name}-"))
.tempdir()
.expect("create temp dir")
}
fn write_session_fixture(
dir: &Path,
trace_id: &str,
event_id_a: &str,
event_id_b: &str,
) -> PathBuf {
let session_path = dir.join("session.json");
let events = json!({
"events": [
{
"id": event_id_a,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:00Z",
"recorded_at": "2026-05-13T10:00:00Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": trace_id,
"session_id": "test-session-close",
"domain_tags": ["testing"],
"payload": { "text": "Session close test event one." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
},
{
"id": event_id_b,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:05Z",
"recorded_at": "2026-05-13T10:00:05Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": trace_id,
"session_id": "test-session-close",
"domain_tags": ["testing"],
"payload": { "text": "Session close test event two." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
}
]
});
fs::write(
&session_path,
serde_json::to_string_pretty(&events).unwrap(),
)
.expect("write session fixture");
session_path
}
fn write_notrace_session_fixture(dir: &Path, event_id: &str) -> PathBuf {
let session_path = dir.join("session-notrace.json");
let events = json!({
"events": [
{
"id": event_id,
"schema_version": 1,
"observed_at": "2026-05-13T10:00:00Z",
"recorded_at": "2026-05-13T10:00:00Z",
"source": { "type": "child_agent", "model": "replay" },
"event_type": "cortex.event.agent_response.v1",
"trace_id": null,
"session_id": "test-session-no-trace",
"domain_tags": ["testing"],
"payload": { "text": "No trace id event." },
"payload_hash": "",
"prev_event_hash": null,
"event_hash": ""
}
]
});
fs::write(
&session_path,
serde_json::to_string_pretty(&events).unwrap(),
)
.expect("write no-trace session fixture");
session_path
}
fn reflection_json(trace_id: &str, source_event_id: &str) -> String {
json!({
"trace_id": trace_id,
"episode_candidates": [
{
"summary": "Session close produced a test memory.",
"source_event_ids": [source_event_id],
"domains": ["testing"],
"entities": ["Cortex"],
"candidate_meaning": "Session indexing works end-to-end.",
"confidence": 0.85
}
],
"memory_candidates": [
{
"memory_type": "episodic",
"claim": "cortex session close activates memories immediately.",
"source_episode_indexes": [0],
"applies_when": ["session has ended"],
"does_not_apply_when": ["session is ongoing"],
"confidence": 0.85,
"initial_salience": {
"reusability": 0.8,
"consequence": 0.7,
"emotional_charge": 0.0
}
}
],
"contradictions": [],
"doctrine_suggestions": []
})
.to_string()
}
fn write_replay_fixtures(base_dir: &Path, trace_id: &str, reflection_text: &str) -> PathBuf {
let unique = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time after epoch")
.as_nanos();
let fixtures_dir = base_dir.join(format!("fixtures-{unique}"));
fs::create_dir(&fixtures_dir).expect("create fixtures dir");
let trace: TraceId = trace_id.parse().expect("valid trace id");
let req = LlmRequest {
model: DEFAULT_REFLECTION_MODEL.to_string(),
system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
messages: vec![LlmMessage {
role: LlmRole::User,
content: format!("Reflect trace {trace} into candidate-only Cortex memory."),
}],
temperature: 0.0,
max_tokens: 4096,
json_schema: Some(session_reflection_json_schema()),
timeout_ms: 30_000,
};
let fixture = json!({
"request_match": {
"model": DEFAULT_REFLECTION_MODEL,
"prompt_hash": req.prompt_hash()
},
"response": {
"text": reflection_text
}
});
let fixture_path = fixtures_dir.join("reflection.json");
let fixture_bytes = serde_json::to_vec_pretty(&fixture).expect("fixture serializes");
fs::write(&fixture_path, &fixture_bytes).expect("write fixture");
fs::write(
fixtures_dir.join("INDEX.toml"),
format!(
"[[fixture]]\npath = \"reflection.json\"\nblake3 = \"{}\"\n",
blake3_hex(&fixture_bytes)
),
)
.expect("write INDEX.toml");
fixtures_dir
}
#[test]
fn session_close_ingests_and_activates_memories() {
let tmp = tmp_dir("activate");
let db_path = init(tmp.path());
let trace_id = TraceId::new().to_string();
let event_id_a = EventId::new().to_string();
let event_id_b = EventId::new().to_string();
let session_path = write_session_fixture(tmp.path(), &trace_id, &event_id_a, &event_id_b);
let reflection = reflection_json(&trace_id, &event_id_a);
let fixtures_dir = write_replay_fixtures(tmp.path(), &trace_id, &reflection);
let out = run_in(
tmp.path(),
&[
"session",
"close",
session_path.to_str().unwrap(),
"--fixtures-dir",
fixtures_dir.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("session-close: ok") || stdout.contains("activated"),
"stdout should confirm success: stdout={stdout} stderr={}",
String::from_utf8_lossy(&out.stderr)
);
let pool = Connection::open(&db_path).expect("open db");
apply_pending(&pool).expect("apply migrations");
let repo = MemoryRepo::new(&pool);
let active = repo.list_by_status("active").expect("list active");
assert!(
!active.is_empty(),
"at least one memory should be active after session close"
);
}
#[test]
fn session_close_dry_run_makes_no_changes() {
let tmp = tmp_dir("dry_run");
let db_path = init(tmp.path());
let trace_id = TraceId::new().to_string();
let event_id_a = EventId::new().to_string();
let event_id_b = EventId::new().to_string();
let session_path = write_session_fixture(tmp.path(), &trace_id, &event_id_a, &event_id_b);
let reflection = reflection_json(&trace_id, &event_id_a);
let fixtures_dir = write_replay_fixtures(tmp.path(), &trace_id, &reflection);
let out = run_in(
tmp.path(),
&[
"session",
"close",
session_path.to_str().unwrap(),
"--dry-run",
"--fixtures-dir",
fixtures_dir.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let pool = Connection::open(&db_path).expect("open db");
apply_pending(&pool).expect("apply migrations");
let repo = MemoryRepo::new(&pool);
let active = repo.list_by_status("active").expect("list active");
assert!(
active.is_empty(),
"dry-run must not write any active memories; found: {active:?}"
);
let candidates = repo.list_candidates().expect("list candidates");
assert!(
candidates.is_empty(),
"dry-run must not write any candidates; found: {candidates:?}"
);
}
#[test]
fn session_close_empty_events_exits_cleanly() {
let tmp = tmp_dir("no_trace");
init(tmp.path());
let event_id = EventId::new().to_string();
let session_path = write_notrace_session_fixture(tmp.path(), &event_id);
let out = run_in(
tmp.path(),
&["session", "close", session_path.to_str().unwrap()],
);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let stderr = String::from_utf8_lossy(&out.stderr);
let combined = format!("{stdout}{stderr}");
assert!(
combined.contains("no_candidates")
|| combined.contains("no candidates")
|| combined.contains("no trace_id"),
"output should indicate no candidates: stdout={stdout} stderr={stderr}"
);
}
#[test]
fn session_close_json_envelope_has_counts() {
let tmp = tmp_dir("json_envelope");
init(tmp.path());
let trace_id = TraceId::new().to_string();
let event_id_a = EventId::new().to_string();
let event_id_b = EventId::new().to_string();
let session_path = write_session_fixture(tmp.path(), &trace_id, &event_id_a, &event_id_b);
let reflection = reflection_json(&trace_id, &event_id_a);
let fixtures_dir = write_replay_fixtures(tmp.path(), &trace_id, &reflection);
let out = run_in(
tmp.path(),
&[
"--json",
"session",
"close",
session_path.to_str().unwrap(),
"--fixtures-dir",
fixtures_dir.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let envelope: serde_json::Value =
serde_json::from_str(&stdout).expect("stdout must be valid JSON");
assert_eq!(
envelope.get("command").and_then(|v| v.as_str()),
Some("cortex.session.close"),
"envelope command field"
);
let report = envelope.get("report").expect("envelope has report field");
assert!(
report.get("activated_count").is_some(),
"report must have activated_count: {report}"
);
assert!(
report.get("proposed_count").is_some(),
"report must have proposed_count: {report}"
);
assert!(
report.get("embedding_count").is_some(),
"report must have embedding_count: {report}"
);
}
#[test]
fn session_close_pending_memories_not_returned_by_search() {
let tmp = tmp_dir("pending_search");
let db_path = init(tmp.path());
let pool = Connection::open(&db_path).expect("open db");
apply_pending(&pool).expect("apply migrations");
let repo = MemoryRepo::new(&pool);
let source_event_id = "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV";
pool.execute_batch(&format!(
"INSERT OR IGNORE INTO events \
(id, schema_version, observed_at, recorded_at, source_json, event_type, \
trace_id, session_id, domain_tags_json, payload_json, payload_hash, \
prev_event_hash, event_hash) \
VALUES \
('{source_event_id}', 1, '2026-05-13T10:00:00Z', '2026-05-13T10:00:00Z', \
'{{\"type\":\"tool\",\"name\":\"pending-search-test\"}}', \
'cortex.event.agent_response.v1', \
NULL, 'pending-search-test', '[\"testing\"]', \
'{{\"text\":\"pending search test\"}}', 'ph-pending-0', NULL, 'eh-pending-0');",
))
.expect("insert source event");
let memory_id: MemoryId = "mem_01ARZ3NDEKTSV4RRFFQ69G5FA9"
.parse()
.expect("valid memory id");
let candidate = MemoryCandidate {
id: memory_id,
memory_type: "episodic".into(),
claim: "cortex pending_mcp_commit memory must not appear in search results.".into(),
source_episodes_json: json!([]),
source_events_json: json!([source_event_id]),
domains_json: json!(["testing"]),
salience_json: json!({"score": 0.8}),
confidence: 0.85,
authority: "tool".into(),
applies_when_json: json!([]),
does_not_apply_when_json: json!([]),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
repo.insert_candidate(&candidate).expect("insert candidate");
repo.set_pending_mcp_commit(&memory_id, chrono::Utc::now())
.expect("set pending_mcp_commit");
let status: String = pool
.query_row(
"SELECT status FROM memories WHERE id = ?1;",
rusqlite::params![memory_id.to_string()],
|row| row.get(0),
)
.expect("memory row must exist");
assert_eq!(
status, "pending_mcp_commit",
"memory must be in pending_mcp_commit status before search"
);
let search_out = run_in(tmp.path(), &["--json", "memory", "search", "cortex"]);
assert_exit(&search_out, 0);
let search_stdout = String::from_utf8_lossy(&search_out.stdout);
let search_json: serde_json::Value =
serde_json::from_str(&search_stdout).expect("memory search output must be valid JSON");
let match_count = search_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert_eq!(
match_count, 0,
"pending_mcp_commit memories must not appear in search results: {search_json}"
);
let search_plain = run_in(tmp.path(), &["memory", "search", "cortex"]);
assert_exit(&search_plain, 0);
let plain_stdout = String::from_utf8_lossy(&search_plain.stdout);
assert!(
plain_stdout.contains("no matches"),
"plain-text search must report no matches for pending memory: {plain_stdout}"
);
}
#[test]
fn session_close_memories_are_searchable() {
let tmp = tmp_dir("searchable");
init(tmp.path());
let trace_id = TraceId::new().to_string();
let event_id_a = EventId::new().to_string();
let event_id_b = EventId::new().to_string();
let session_path = write_session_fixture(tmp.path(), &trace_id, &event_id_a, &event_id_b);
let reflection = reflection_json(&trace_id, &event_id_a);
let fixtures_dir = write_replay_fixtures(tmp.path(), &trace_id, &reflection);
let close_out = run_in(
tmp.path(),
&[
"session",
"close",
session_path.to_str().unwrap(),
"--fixtures-dir",
fixtures_dir.to_str().unwrap(),
],
);
assert_exit(&close_out, 0);
let close_stdout = String::from_utf8_lossy(&close_out.stdout);
assert!(
close_stdout.contains("activated"),
"session close must report activated memories: stdout={close_stdout} stderr={}",
String::from_utf8_lossy(&close_out.stderr)
);
let search_out = run_in(tmp.path(), &["--json", "memory", "search", "session"]);
assert_exit(&search_out, 0);
let search_stdout = String::from_utf8_lossy(&search_out.stdout);
let search_json: serde_json::Value =
serde_json::from_str(&search_stdout).expect("memory search --json must be valid JSON");
let match_count = search_json["report"]["match_count"]
.as_u64()
.expect("match_count must be a number");
assert!(
match_count >= 1,
"session-close memories must be searchable after proof-closure fix; \
got 0 matches — source events may not be in the SQLite events table: {search_json}"
);
}
#[test]
fn session_close_bad_events_file_exits_with_invariant() {
let tmp = tmp_dir("bad_file");
init(tmp.path());
let nonexistent = tmp.path().join("does-not-exist.json");
let out = run_in(
tmp.path(),
&["session", "close", nonexistent.to_str().unwrap()],
);
let code = out.status.code().expect("process exited");
assert_ne!(code, 0, "missing events file must exit non-zero");
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("session.close.ingest_failed"),
"stderr must contain the stable invariant: {stderr}"
);
}