use std::fs;
use std::path::PathBuf;
use chrono::Utc;
use clap::{Args, Subcommand};
use cortex_core::{Event, TraceId};
use cortex_llm::{ClaudeHttpAdapter, LlmAdapter, OllamaConfig, OllamaHttpAdapter};
use cortex_reflect::ReflectionReportStatus;
use cortex_retrieval::{EmbedRecord, Embedder, LocalStubEmbedder, STUB_BACKEND_ID};
use cortex_store::repo::EmbeddingRepo;
use cortex_store::repo::EventRepo;
use cortex_store::repo::MemoryRepo;
use serde_json::json;
use crate::cmd::ingest::{run_inner as ingest_run_inner, IngestArgs};
use crate::cmd::open_default_store;
use crate::config::LlmBackend;
use crate::exit::Exit;
use crate::output::{self, Envelope};
pub const SESSION_CLOSE_INGEST_FAILED_INVARIANT: &str = "session.close.ingest_failed";
pub const SESSION_CLOSE_REFLECT_FAILED_INVARIANT: &str = "session.close.reflect_failed";
pub const SESSION_CLOSE_NO_CANDIDATES_INVARIANT: &str = "session.close.no_candidates";
#[derive(Debug, Args)]
pub struct CloseArgs {
#[arg(value_name = "EVENTS_PATH")]
pub events_path: PathBuf,
#[arg(long)]
pub dry_run: bool,
#[arg(long, value_name = "TEXT")]
pub label: Option<String>,
#[arg(long = "fixtures-dir", value_name = "DIR")]
pub fixtures_dir: Option<PathBuf>,
#[arg(long, value_name = "PATH")]
pub db: Option<PathBuf>,
#[arg(long = "event-log", value_name = "PATH")]
pub event_log: Option<PathBuf>,
#[arg(long)]
pub live_reflect: bool,
}
#[derive(Debug, Subcommand)]
pub enum SessionSub {
Close(CloseArgs),
}
#[derive(Debug, Default)]
struct CloseOutcome {
events_ingested: usize,
proposed_count: usize,
activated_count: usize,
already_existed_count: usize,
failed_structural_gate_count: usize,
failed_activation_count: usize,
embedding_count: usize,
embedding_error_count: usize,
activated_memory_ids: Vec<String>,
failed_ids: Vec<String>,
no_candidates: bool,
}
pub fn run(sub: SessionSub) -> Exit {
match sub {
SessionSub::Close(args) => close(args),
}
}
fn close(args: CloseArgs) -> Exit {
match close_inner(args) {
Ok(outcome) => emit_success_receipt(&outcome),
Err(exit) => exit,
}
}
fn close_inner(args: CloseArgs) -> Result<CloseOutcome, Exit> {
let raw = fs::read(&args.events_path).map_err(|err| {
let detail = format!(
"{SESSION_CLOSE_INGEST_FAILED_INVARIANT}: cannot read events file `{}`: {err}; no state was changed",
args.events_path.display()
);
eprintln!("session close: {detail}");
if output::json_enabled() {
emit_failure_envelope(
Exit::PreconditionUnmet,
SESSION_CLOSE_INGEST_FAILED_INVARIANT,
&detail,
);
}
Exit::PreconditionUnmet
})?;
let trace_id = extract_trace_id(&raw).map_err(|detail| {
let msg =
format!("{SESSION_CLOSE_INGEST_FAILED_INVARIANT}: {detail}; no state was changed");
eprintln!("session close: {msg}");
if output::json_enabled() {
emit_failure_envelope(
Exit::PreconditionUnmet,
SESSION_CLOSE_INGEST_FAILED_INVARIANT,
&msg,
);
}
Exit::PreconditionUnmet
})?;
let parsed_events = parse_events_from_raw(&raw).map_err(|detail| {
let msg =
format!("{SESSION_CLOSE_INGEST_FAILED_INVARIANT}: {detail}; no state was changed");
eprintln!("session close: {msg}");
if output::json_enabled() {
emit_failure_envelope(
Exit::PreconditionUnmet,
SESSION_CLOSE_INGEST_FAILED_INVARIANT,
&msg,
);
}
Exit::PreconditionUnmet
})?;
let events_ingested = if args.dry_run {
count_events_in_raw(&raw)
} else {
let ingest_outcome = ingest_run_inner(IngestArgs {
session: args.events_path.clone(),
event_log: args.event_log.clone(),
db: args.db.clone(),
user_attestation: None,
})
.map_err(|ingest_err| {
let detail = format!(
"{SESSION_CLOSE_INGEST_FAILED_INVARIANT}: ingest pipeline failed: {}; no state was changed",
ingest_err.detail
);
eprintln!("session close: {detail}");
if output::json_enabled() {
emit_failure_envelope(ingest_err.exit, SESSION_CLOSE_INGEST_FAILED_INVARIANT, &detail);
}
ingest_err.exit
})?;
ingest_outcome.appended.len() + ingest_outcome.skipped.len()
};
if args.dry_run {
let outcome = CloseOutcome {
events_ingested,
..Default::default()
};
return Ok(outcome);
}
let pool = open_default_store("session close").inspect_err(|&exit| {
let detail = format!(
"{SESSION_CLOSE_REFLECT_FAILED_INVARIANT}: failed to open store; no state was changed"
);
eprintln!("session close: {detail}");
if output::json_enabled() {
emit_failure_envelope(exit, SESSION_CLOSE_REFLECT_FAILED_INVARIANT, &detail);
}
})?;
{
let event_repo = EventRepo::new(&pool);
for event in &parsed_events {
event_repo.append(event).map_err(|err| {
let detail = format!(
"{SESSION_CLOSE_INGEST_FAILED_INVARIANT}: failed to write event {} to SQLite events table: {err}",
event.id
);
eprintln!("session close: {detail}");
if output::json_enabled() {
emit_failure_envelope(
Exit::Internal,
SESSION_CLOSE_INGEST_FAILED_INVARIANT,
&detail,
);
}
Exit::Internal
})?;
}
}
let trace_id_for_reflect = match trace_id {
Some(tid) => tid,
None => {
let outcome = CloseOutcome {
events_ingested,
no_candidates: true,
..Default::default()
};
eprintln!("session-close: no trace_id in events; no candidates proposed");
return Ok(outcome);
}
};
let fixtures_dir = resolve_fixtures_dir(args.fixtures_dir);
let reflect_report = run_reflect(
trace_id_for_reflect,
&fixtures_dir,
&pool,
args.live_reflect,
)
.map_err(|err| {
let detail = format!("{SESSION_CLOSE_REFLECT_FAILED_INVARIANT}: reflection failed: {err}");
eprintln!("session close: {detail}");
if output::json_enabled() {
emit_failure_envelope(
Exit::Internal,
SESSION_CLOSE_REFLECT_FAILED_INVARIANT,
&detail,
);
}
Exit::Internal
})?;
if reflect_report.status == ReflectionReportStatus::Quarantined {
let detail = format!(
"{SESSION_CLOSE_NO_CANDIDATES_INVARIANT}: reflection quarantined; no candidates proposed"
);
eprintln!("session-close: {detail}");
let outcome = CloseOutcome {
events_ingested,
no_candidates: true,
..Default::default()
};
return Ok(outcome);
}
let candidate_ids: Vec<cortex_core::MemoryId> = reflect_report
.persisted_memory_candidates
.iter()
.map(|c| c.id)
.collect();
let proposed_count = candidate_ids.len();
if proposed_count == 0 {
eprintln!("session-close: no candidates proposed");
let outcome = CloseOutcome {
events_ingested,
no_candidates: true,
..Default::default()
};
return Ok(outcome);
}
let repo = MemoryRepo::new(&pool);
let now = Utc::now();
let mut activated_ids = Vec::new();
let mut failed_ids = Vec::new();
let mut already_existed_count = 0usize;
let mut failed_activation_count = 0usize;
for memory_id in &candidate_ids {
match repo.set_active(memory_id, now) {
Ok(()) => {
activated_ids.push(memory_id.to_string());
}
Err(err) => {
let err_str = err.to_string();
if err_str.contains("not a candidate") {
already_existed_count += 1;
activated_ids.push(memory_id.to_string());
} else {
eprintln!(
"session close: warning: failed to activate memory {memory_id}: {err_str}"
);
failed_activation_count += 1;
failed_ids.push(memory_id.to_string());
}
}
}
}
let embed_repo = EmbeddingRepo::new(&pool);
let embedder = LocalStubEmbedder::new();
let mut embedding_count = 0usize;
let mut embedding_error_count = 0usize;
for memory_id_str in &activated_ids {
let memory_id: cortex_core::MemoryId = match memory_id_str.parse() {
Ok(id) => id,
Err(_) => {
embedding_error_count += 1;
continue;
}
};
let memory = match repo.get_by_id(&memory_id) {
Ok(Some(m)) => m,
Ok(None) => {
eprintln!("session close: warning: memory {memory_id_str} not found for embedding");
embedding_error_count += 1;
continue;
}
Err(err) => {
eprintln!(
"session close: warning: failed to read memory {memory_id_str} for embedding: {err}"
);
embedding_error_count += 1;
continue;
}
};
let tags: Vec<String> = memory
.domains_json
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(ToString::to_string))
.collect()
})
.unwrap_or_default();
let vec = match embedder.embed(&memory.claim, &tags) {
Ok(v) => v,
Err(err) => {
eprintln!("session close: warning: embed failed for memory {memory_id_str}: {err}");
embedding_error_count += 1;
continue;
}
};
let record = match EmbedRecord::new(memory_id, STUB_BACKEND_ID, vec, now) {
Ok(r) => r,
Err(err) => {
eprintln!(
"session close: warning: failed to build embed record for memory {memory_id_str}: {err}"
);
embedding_error_count += 1;
continue;
}
};
match embed_repo.write(&record) {
Ok(()) => embedding_count += 1,
Err(err) => {
eprintln!(
"session close: warning: failed to write embedding for memory {memory_id_str}: {err}"
);
embedding_error_count += 1;
}
}
}
Ok(CloseOutcome {
events_ingested,
proposed_count,
activated_count: activated_ids.len() - already_existed_count,
already_existed_count,
failed_structural_gate_count: 0, failed_activation_count,
embedding_count,
embedding_error_count,
activated_memory_ids: activated_ids,
failed_ids,
no_candidates: false,
})
}
fn run_reflect(
trace_id: TraceId,
fixtures_dir: &std::path::Path,
pool: &cortex_store::Pool,
live_reflect: bool,
) -> Result<cortex_reflect::ReflectionReport, String> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| format!("failed to create tokio runtime: {err}"))?;
if live_reflect {
let backend = LlmBackend::resolve();
match build_live_adapter(backend) {
Ok(adapter) => {
return rt
.block_on(cortex_reflect::reflect(trace_id, adapter.as_ref(), pool))
.map_err(|err| format!("{err}"));
}
Err(reason) => {
tracing::warn!(
reason = %reason,
"session-close: --live-reflect requested but live adapter construction failed; \
falling back to ReplayAdapter"
);
}
}
}
use cortex_llm::ReplayAdapter;
let adapter = ReplayAdapter::new(fixtures_dir).map_err(|err| format!("{err}"))?;
rt.block_on(cortex_reflect::reflect(trace_id, &adapter, pool))
.map_err(|err| format!("{err}"))
}
fn build_live_adapter(backend: LlmBackend) -> Result<Box<dyn LlmAdapter>, String> {
match backend {
LlmBackend::Ollama {
endpoint,
model,
timeout_ms: _,
} => {
let config = OllamaConfig::new(endpoint, model);
let adapter = OllamaHttpAdapter::new(config)
.map_err(|err| format!("OllamaHttpAdapter construction failed: {err}"))?;
Ok(Box::new(adapter))
}
LlmBackend::Claude {
model,
max_tokens: _,
timeout_ms: _,
max_sensitivity,
} => {
use cortex_llm::MaxSensitivity;
let sensitivity = match max_sensitivity.as_str() {
"low" => Some(MaxSensitivity::Low),
"high" => Some(MaxSensitivity::High),
_ => Some(MaxSensitivity::Medium),
};
let adapter = ClaudeHttpAdapter::new(model, sensitivity)
.map_err(|err| format!("ClaudeHttpAdapter construction failed: {err}"))?;
Ok(Box::new(adapter))
}
LlmBackend::OpenAiCompat { .. } => Err(
"backend is OpenAiCompat; reflection via OpenAI-compat is not yet supported — \
use Ollama or Claude for --live-reflect"
.to_string(),
),
LlmBackend::Offline => Err("backend is Offline; no live adapter available".to_string()),
}
}
fn resolve_fixtures_dir(explicit: Option<PathBuf>) -> PathBuf {
if let Some(p) = explicit {
return p;
}
if let Some(p) = std::env::var_os("CORTEX_FIXTURES_DIR") {
return PathBuf::from(p);
}
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("replay")
}
fn parse_events_from_raw(raw: &[u8]) -> Result<Vec<Event>, String> {
let value: serde_json::Value =
serde_json::from_slice(raw).map_err(|err| format!("invalid JSON: {err}"))?;
let event_values: Vec<serde_json::Value> = match value {
serde_json::Value::Object(ref map) => {
if let Some(arr) = map.get("events").and_then(|v| v.as_array()) {
arr.to_owned()
} else {
vec![value.clone()]
}
}
serde_json::Value::Array(ref arr) => arr.to_owned(),
_ => return Err("unexpected JSON shape: must be object or array".to_string()),
};
let mut events = Vec::with_capacity(event_values.len());
for (i, ev) in event_values.iter().enumerate() {
let event: Event = serde_json::from_value(ev.clone())
.map_err(|err| format!("event[{i}] failed to deserialize: {err}"))?;
events.push(event);
}
Ok(events)
}
fn extract_trace_id(raw: &[u8]) -> Result<Option<TraceId>, String> {
let value: serde_json::Value =
serde_json::from_slice(raw).map_err(|err| format!("invalid JSON: {err}"))?;
let events_array = match &value {
serde_json::Value::Object(map) => {
if let Some(events) = map.get("events") {
events
.as_array()
.ok_or_else(|| "events field is not an array".to_string())?
.to_owned()
} else {
vec![value.clone()]
}
}
serde_json::Value::Array(arr) => arr.to_owned(),
_ => return Err("unexpected JSON shape".to_string()),
};
for event in &events_array {
if let Some(tid) = event.get("trace_id").and_then(|v| v.as_str()) {
if !tid.is_empty() {
match tid.parse::<TraceId>() {
Ok(parsed) => return Ok(Some(parsed)),
Err(_) => continue,
}
}
}
}
Ok(None)
}
fn count_events_in_raw(raw: &[u8]) -> usize {
let Ok(value) = serde_json::from_slice::<serde_json::Value>(raw) else {
return 0;
};
match value {
serde_json::Value::Object(map) => {
if let Some(events) = map.get("events").and_then(|v| v.as_array()) {
events.len()
} else {
1
}
}
serde_json::Value::Array(arr) => arr.len(),
_ => 1,
}
}
fn emit_success_receipt(outcome: &CloseOutcome) -> Exit {
if output::json_enabled() {
let payload = json!({
"ingested_count": outcome.events_ingested,
"proposed_count": outcome.proposed_count,
"activated_count": outcome.activated_count,
"already_existed_count": outcome.already_existed_count,
"failed_structural_gate_count": outcome.failed_structural_gate_count,
"failed_activation_count": outcome.failed_activation_count,
"embedding_count": outcome.embedding_count,
"embedding_error_count": outcome.embedding_error_count,
"activated_memory_ids": outcome.activated_memory_ids,
"failed_ids": outcome.failed_ids,
"no_candidates": outcome.no_candidates,
});
let envelope = Envelope::new("cortex.session.close", Exit::Ok, payload);
return output::emit(&envelope, Exit::Ok);
}
println!("session-close: ingested {} events", outcome.events_ingested);
if outcome.no_candidates {
println!("session-close: no candidates proposed");
} else {
println!(
"session-close: proposed {} candidates",
outcome.proposed_count
);
println!(
"session-close: activated {} memories ({} failed structural gates, {} already existed)",
outcome.activated_count,
outcome.failed_structural_gate_count,
outcome.already_existed_count,
);
println!(
"session-close: computed {} embeddings ({} embedding errors)",
outcome.embedding_count, outcome.embedding_error_count,
);
}
println!("session-close: ok");
Exit::Ok
}
fn emit_failure_envelope(exit: Exit, invariant: &str, detail: &str) -> Exit {
if !output::json_enabled() {
return exit;
}
let payload = json!({
"invariant": invariant,
"detail": detail,
"ingested_count": 0,
"proposed_count": 0,
"activated_count": 0,
"embedding_count": 0,
});
let envelope = Envelope::new("cortex.session.close", exit, payload);
output::emit(&envelope, exit)
}