#![warn(missing_docs)]
use std::fmt;
use std::path::PathBuf;
use chrono::Utc;
use cortex_core::{
compose_policy_outcomes, Event, EventSource, PolicyContribution, PolicyOutcome, TraceId,
};
use cortex_ledger::{
JsonlError, JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID,
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID, APPEND_RUNTIME_MODE_RULE_ID,
};
use cortex_reflect::ReflectionReportStatus;
use cortex_retrieval::{EmbedRecord, Embedder, LocalStubEmbedder, STUB_BACKEND_ID};
use cortex_store::repo::{EmbeddingRepo, EventRepo, MemoryRepo};
use cortex_store::Pool;
#[derive(Debug, Default)]
pub struct CloseOutcome {
pub ingested: usize,
pub reflected: usize,
pub pending_commit: usize,
pub receipt_id: String,
}
#[derive(Debug)]
pub enum SessionError {
Ingest(String),
Reflect(String),
Store(cortex_store::StoreError),
Io(std::io::Error),
}
impl fmt::Display for SessionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Ingest(msg) => write!(f, "session ingest failed: {msg}"),
Self::Reflect(msg) => write!(f, "session reflect failed: {msg}"),
Self::Store(err) => write!(f, "session store error: {err}"),
Self::Io(err) => write!(f, "session io error: {err}"),
}
}
}
impl std::error::Error for SessionError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Store(err) => Some(err),
Self::Io(err) => Some(err),
_ => None,
}
}
}
impl From<cortex_store::StoreError> for SessionError {
fn from(err: cortex_store::StoreError) -> Self {
Self::Store(err)
}
}
impl From<std::io::Error> for SessionError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
impl From<JsonlError> for SessionError {
fn from(err: JsonlError) -> Self {
Self::Ingest(err.to_string())
}
}
pub fn close_from_bytes(
raw: &[u8],
pool: &Pool,
event_log: PathBuf,
fixtures_dir: &std::path::Path,
) -> Result<CloseOutcome, SessionError> {
let events = parse_events(raw).map_err(SessionError::Ingest)?;
let trace_id = extract_trace_id(&events);
let ingested = ingest_events(&events, &event_log, pool)?;
let trace_id_for_reflect = match trace_id {
Some(tid) => tid,
None => {
tracing::debug!("session-close: no trace_id in events; no candidates proposed");
return Ok(CloseOutcome {
ingested,
reflected: 0,
pending_commit: 0,
receipt_id: "no_trace_id".to_string(),
});
}
};
let reflect_report =
run_reflect(trace_id_for_reflect, fixtures_dir, pool).map_err(SessionError::Reflect)?;
if reflect_report.status == ReflectionReportStatus::Quarantined {
tracing::debug!("session-close: reflection quarantined; no candidates proposed");
return Ok(CloseOutcome {
ingested,
reflected: 0,
pending_commit: 0,
receipt_id: trace_id_for_reflect.to_string(),
});
}
let candidate_ids: Vec<cortex_core::MemoryId> = reflect_report
.persisted_memory_candidates
.iter()
.map(|c| c.id)
.collect();
let reflected = candidate_ids.len();
if reflected == 0 {
tracing::debug!("session-close: no candidates proposed");
return Ok(CloseOutcome {
ingested,
reflected: 0,
pending_commit: 0,
receipt_id: trace_id_for_reflect.to_string(),
});
}
let repo = MemoryRepo::new(pool);
let now = Utc::now();
let mut pending_ids = Vec::new();
for memory_id in &candidate_ids {
match repo.set_pending_mcp_commit(memory_id, now) {
Ok(()) => {
pending_ids.push(*memory_id);
}
Err(err) => {
let err_str = err.to_string();
if err_str.contains("not a candidate") {
tracing::debug!(
memory_id = %memory_id,
"session-close: memory not a candidate (already transitioned); treating as pending"
);
pending_ids.push(*memory_id);
} else {
tracing::warn!(
memory_id = %memory_id,
error = %err_str,
"session-close: failed to set pending_mcp_commit for memory"
);
}
}
}
}
let embed_repo = EmbeddingRepo::new(pool);
let embedder = LocalStubEmbedder::new();
for memory_id in &pending_ids {
let memory = match repo.get_by_id(memory_id) {
Ok(Some(m)) => m,
Ok(None) => {
tracing::warn!(memory_id = %memory_id, "session-close: memory not found for embedding");
continue;
}
Err(err) => {
tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to read memory for embedding");
continue;
}
};
let tags: Vec<String> = memory
.domains_json
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(ToString::to_string))
.collect()
})
.unwrap_or_default();
let vec = match embedder.embed(&memory.claim, &tags) {
Ok(v) => v,
Err(err) => {
tracing::warn!(memory_id = %memory_id, error = %err, "session-close: embed failed");
continue;
}
};
let record = match EmbedRecord::new(*memory_id, STUB_BACKEND_ID, vec, now) {
Ok(r) => r,
Err(err) => {
tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to build embed record");
continue;
}
};
if let Err(err) = embed_repo.write(&record) {
tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to write embedding");
}
}
Ok(CloseOutcome {
ingested,
reflected,
pending_commit: pending_ids.len(),
receipt_id: trace_id_for_reflect.to_string(),
})
}
fn parse_events(raw: &[u8]) -> Result<Vec<Event>, String> {
let value: serde_json::Value =
serde_json::from_slice(raw).map_err(|err| format!("invalid JSON: {err}"))?;
let events: Vec<serde_json::Value> = match &value {
serde_json::Value::Object(map) => {
if let Some(events_field) = map.get("events") {
events_field
.as_array()
.ok_or_else(|| "events field is not an array".to_string())?
.to_owned()
} else {
vec![value.clone()]
}
}
serde_json::Value::Array(arr) => arr.to_owned(),
_ => return Err("unexpected JSON shape: must be object or array".to_string()),
};
let mut parsed = Vec::with_capacity(events.len());
for (i, ev) in events.iter().enumerate() {
let event: Event = serde_json::from_value(ev.clone())
.map_err(|err| format!("event[{i}] failed to deserialize: {err}"))?;
parsed.push(event);
}
Ok(parsed)
}
fn extract_trace_id(events: &[Event]) -> Option<TraceId> {
events.iter().find_map(|ev| ev.trace_id)
}
fn ingest_events(
events: &[Event],
event_log: &std::path::Path,
pool: &Pool,
) -> Result<usize, SessionError> {
for event in events {
if matches!(event.source, EventSource::User) {
return Err(SessionError::Ingest(format!(
"EventSource::User event {} cannot be ingested without operator attestation; \
use `cortex ingest --user-attestation` for user-sourced events",
event.id,
)));
}
}
if let Some(parent) = event_log.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
let mut log = JsonlLog::open(event_log)?;
let existing_ids = collect_existing_event_ids(event_log)?;
let event_repo = EventRepo::new(pool);
let mut appended = 0usize;
for event in events {
if existing_ids.contains(&event.id) {
event_repo.append(event).map_err(SessionError::Store)?;
continue;
}
let policy = session_append_policy(&event.source);
log.append(event.clone(), &policy)?;
event_repo.append(event).map_err(SessionError::Store)?;
appended += 1;
}
Ok(appended)
}
fn collect_existing_event_ids(
path: &std::path::Path,
) -> Result<std::collections::HashSet<cortex_core::EventId>, SessionError> {
use std::io::BufRead;
if !path.exists() {
return Ok(std::collections::HashSet::new());
}
let file = std::fs::File::open(path)?;
let reader = std::io::BufReader::new(file);
let mut ids = std::collections::HashSet::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(id_str) = value
.get("event")
.and_then(|e| e.get("id"))
.and_then(|v| v.as_str())
{
if let Ok(event_id) = id_str.parse::<cortex_core::EventId>() {
ids.insert(event_id);
}
}
}
}
Ok(ids)
}
fn session_append_policy(source: &EventSource) -> cortex_core::PolicyDecision {
let (tier_outcome, tier_reason): (PolicyOutcome, &str) = match source {
EventSource::User | EventSource::ManualCorrection => {
(
PolicyOutcome::Reject,
"event source User/ManualCorrection requires operator attestation; refused by ingest_events",
)
}
EventSource::ChildAgent { .. }
| EventSource::Tool { .. }
| EventSource::Runtime
| EventSource::ExternalOutcome => (
PolicyOutcome::Allow,
"event source meets ingest floor of Observed or above",
),
};
let tier_contribution = PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
tier_outcome,
tier_reason,
)
.expect("static session append tier contribution is valid");
let attestation_contribution = PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Warn,
"session-close ingest: no User events in batch; operator attestation not required",
)
.expect("static session append attestation contribution is valid");
let runtime_mode_contribution = PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Warn,
"session-close ingest: unsigned append (local-development ledger path)",
)
.expect("static session append runtime mode contribution is valid");
compose_policy_outcomes(
vec![
tier_contribution,
attestation_contribution,
runtime_mode_contribution,
],
None,
)
}
fn run_reflect(
trace_id: TraceId,
fixtures_dir: &std::path::Path,
pool: &Pool,
) -> Result<cortex_reflect::ReflectionReport, String> {
use cortex_llm::ReplayAdapter;
let adapter = ReplayAdapter::new(fixtures_dir).map_err(|err| format!("{err}"))?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| format!("failed to create tokio runtime: {err}"))?;
rt.block_on(cortex_reflect::reflect(trace_id, &adapter, pool))
.map_err(|err| format!("{err}"))
}