Skip to main content

cortex_session/
lib.rs

1//! `cortex-session` — session-close pipeline extracted from `cortex-cli`.
2#![warn(missing_docs)]
3//!
4//! ## Why this crate exists
5//!
6//! `cortex-mcp` cannot depend on `cortex-cli` (that would be circular:
7//! `cortex-cli` will depend on `cortex-mcp` for the `Serve` subcommand).
8//! This crate exposes [`close_from_bytes`] so both `cortex-cli` and
9//! `cortex-mcp` can run the session-close pipeline.
10//!
11//! ## Pipeline
12//!
13//! 1. **Parse** — parse `raw` as session events JSON (envelope `{"events":[...]}`
14//!    or bare array `[...]` or single object).
15//! 2. **Ingest** — append non-duplicate events to the JSONL ledger at
16//!    `event_log`. User-sourced events are refused (no attestor available in
17//!    the library context; operator must call CLI `cortex ingest` for those).
18//! 3. **Extract trace** — extract the first `trace_id` from the parsed events.
19//!    If none, returns `CloseOutcome { pending_commit: 0, no_candidates: true }`.
20//! 4. **Reflect** — run `cortex_reflect::reflect()` over the trace via a
21//!    `ReplayAdapter` built from `fixtures_dir` (see [`close_from_bytes`]).
22//! 5. **Tag as pending** — for each reflected candidate, set lifecycle state
23//!    to `pending_mcp_commit` (ADR 0047). The caller (MCP server) promotes to
24//!    `active` on `cortex_session_commit`; the CLI wrapper calls
25//!    [`MemoryRepo::set_active`] itself after receiving the `CloseOutcome`.
26//! 6. **Pre-compute embeddings** — BLAKE3 stub embeddings via
27//!    [`LocalStubEmbedder`], written to the store so semantic search is
28//!    immediately warm after promotion.
29//! 7. **Return `CloseOutcome`** — `pending_commit` is the count of memories
30//!    in `pending_mcp_commit` state; callers decide whether to promote them.
31
32use std::fmt;
33use std::path::PathBuf;
34
35use chrono::Utc;
36use cortex_core::{
37    compose_policy_outcomes, Event, EventSource, PolicyContribution, PolicyOutcome, TraceId,
38};
39use cortex_ledger::{
40    JsonlError, JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID,
41    APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID, APPEND_RUNTIME_MODE_RULE_ID,
42};
43use cortex_reflect::ReflectionReportStatus;
44use cortex_retrieval::{EmbedRecord, Embedder, LocalStubEmbedder, STUB_BACKEND_ID};
45use cortex_store::repo::{EmbeddingRepo, EventRepo, MemoryRepo};
46use cortex_store::Pool;
47
48/// Outcome returned from [`close_from_bytes`].
49#[derive(Debug, Default)]
50pub struct CloseOutcome {
51    /// Number of events ingested to the JSONL ledger.
52    pub ingested: usize,
53    /// Number of memory candidates produced by reflection.
54    pub reflected: usize,
55    /// Number of memories written with `pending_mcp_commit` lifecycle state
56    /// (ADR 0047). Callers promote these to `active` via
57    /// `cortex_session_commit` (MCP path) or directly via
58    /// `MemoryRepo::set_active` (CLI path).
59    pub pending_commit: usize,
60    /// Stable receipt identifier for this close operation (trace_id or a
61    /// synthetic marker when no trace was present).
62    pub receipt_id: String,
63}
64
65/// Errors raised by the session-close pipeline.
66#[derive(Debug)]
67pub enum SessionError {
68    /// Ingest step failed (parse error or ledger I/O).
69    Ingest(String),
70    /// Reflection step failed.
71    Reflect(String),
72    /// Store operation failed.
73    Store(cortex_store::StoreError),
74    /// I/O error outside of the ledger path.
75    Io(std::io::Error),
76}
77
78impl fmt::Display for SessionError {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        match self {
81            Self::Ingest(msg) => write!(f, "session ingest failed: {msg}"),
82            Self::Reflect(msg) => write!(f, "session reflect failed: {msg}"),
83            Self::Store(err) => write!(f, "session store error: {err}"),
84            Self::Io(err) => write!(f, "session io error: {err}"),
85        }
86    }
87}
88
89impl std::error::Error for SessionError {
90    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
91        match self {
92            Self::Store(err) => Some(err),
93            Self::Io(err) => Some(err),
94            _ => None,
95        }
96    }
97}
98
99impl From<cortex_store::StoreError> for SessionError {
100    fn from(err: cortex_store::StoreError) -> Self {
101        Self::Store(err)
102    }
103}
104
105impl From<std::io::Error> for SessionError {
106    fn from(err: std::io::Error) -> Self {
107        Self::Io(err)
108    }
109}
110
111impl From<JsonlError> for SessionError {
112    fn from(err: JsonlError) -> Self {
113        Self::Ingest(err.to_string())
114    }
115}
116
117/// Run the full session-close pipeline.
118///
119/// # Arguments
120///
121/// - `raw` — raw bytes of the session events JSON file.
122/// - `pool` — open SQLite connection pool with all migrations applied.
123/// - `event_log` — path to the JSONL ledger file. Created if absent.
124/// - `fixtures_dir` — directory containing `INDEX.toml` and replay fixtures
125///   for `cortex_reflect::reflect()`. Must be supplied even if reflection is
126///   expected to produce no candidates (the adapter must initialise).
127///
128/// # Lifecycle state
129///
130/// Reflected candidates are placed in `pending_mcp_commit` status, NOT
131/// `active`. MCP callers leave them there until `cortex_session_commit` is
132/// called with the operator's confirmation token. CLI callers may call
133/// `MemoryRepo::set_active` on each `memory_id` in `CloseOutcome` directly
134/// after this function returns, since the CLI close is operator-initiated and
135/// synchronous (ADR 0047 §2 Alternatives).
136pub fn close_from_bytes(
137    raw: &[u8],
138    pool: &Pool,
139    event_log: PathBuf,
140    fixtures_dir: &std::path::Path,
141) -> Result<CloseOutcome, SessionError> {
142    // ─── Step 1: parse events ────────────────────────────────────────────────
143    let events = parse_events(raw).map_err(SessionError::Ingest)?;
144
145    // Extract trace_id before ingest (needed for reflect step).
146    let trace_id = extract_trace_id(&events);
147
148    // ─── Step 2: ingest events to JSONL ledger + SQLite events table ─────────
149    // Dual-write: append to JSONL first (existing path), then to the SQLite
150    // `events` table so `verify_memory_proof_closure` can find the source events
151    // and session-close memories are not quarantined in search results.
152    let ingested = ingest_events(&events, &event_log, pool)?;
153
154    let trace_id_for_reflect = match trace_id {
155        Some(tid) => tid,
156        None => {
157            tracing::debug!("session-close: no trace_id in events; no candidates proposed");
158            return Ok(CloseOutcome {
159                ingested,
160                reflected: 0,
161                pending_commit: 0,
162                receipt_id: "no_trace_id".to_string(),
163            });
164        }
165    };
166
167    // ─── Step 3: reflect ─────────────────────────────────────────────────────
168    let reflect_report =
169        run_reflect(trace_id_for_reflect, fixtures_dir, pool).map_err(SessionError::Reflect)?;
170
171    if reflect_report.status == ReflectionReportStatus::Quarantined {
172        tracing::debug!("session-close: reflection quarantined; no candidates proposed");
173        return Ok(CloseOutcome {
174            ingested,
175            reflected: 0,
176            pending_commit: 0,
177            receipt_id: trace_id_for_reflect.to_string(),
178        });
179    }
180
181    let candidate_ids: Vec<cortex_core::MemoryId> = reflect_report
182        .persisted_memory_candidates
183        .iter()
184        .map(|c| c.id)
185        .collect();
186    let reflected = candidate_ids.len();
187
188    if reflected == 0 {
189        tracing::debug!("session-close: no candidates proposed");
190        return Ok(CloseOutcome {
191            ingested,
192            reflected: 0,
193            pending_commit: 0,
194            receipt_id: trace_id_for_reflect.to_string(),
195        });
196    }
197
198    // ─── Step 4: tag candidates as pending_mcp_commit (ADR 0047) ─────────────
199    // `reflect()` already called `MemoryRepo::insert_candidate` for each
200    // candidate (see `persist_memory_candidates` in orchestrate.rs). The
201    // candidates are already in the store as `candidate`; we transition them
202    // to `pending_mcp_commit` here instead of `active`.
203    let repo = MemoryRepo::new(pool);
204    let now = Utc::now();
205    let mut pending_ids = Vec::new();
206
207    for memory_id in &candidate_ids {
208        match repo.set_pending_mcp_commit(memory_id, now) {
209            Ok(()) => {
210                pending_ids.push(*memory_id);
211            }
212            Err(err) => {
213                let err_str = err.to_string();
214                // If already in pending_mcp_commit, treat as idempotent.
215                if err_str.contains("not a candidate") {
216                    tracing::debug!(
217                        memory_id = %memory_id,
218                        "session-close: memory not a candidate (already transitioned); treating as pending"
219                    );
220                    pending_ids.push(*memory_id);
221                } else {
222                    tracing::warn!(
223                        memory_id = %memory_id,
224                        error = %err_str,
225                        "session-close: failed to set pending_mcp_commit for memory"
226                    );
227                }
228            }
229        }
230    }
231
232    // ─── Step 5: pre-compute embeddings ──────────────────────────────────────
233    let embed_repo = EmbeddingRepo::new(pool);
234    let embedder = LocalStubEmbedder::new();
235
236    for memory_id in &pending_ids {
237        let memory = match repo.get_by_id(memory_id) {
238            Ok(Some(m)) => m,
239            Ok(None) => {
240                tracing::warn!(memory_id = %memory_id, "session-close: memory not found for embedding");
241                continue;
242            }
243            Err(err) => {
244                tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to read memory for embedding");
245                continue;
246            }
247        };
248
249        let tags: Vec<String> = memory
250            .domains_json
251            .as_array()
252            .map(|arr| {
253                arr.iter()
254                    .filter_map(|v| v.as_str().map(ToString::to_string))
255                    .collect()
256            })
257            .unwrap_or_default();
258
259        let vec = match embedder.embed(&memory.claim, &tags) {
260            Ok(v) => v,
261            Err(err) => {
262                tracing::warn!(memory_id = %memory_id, error = %err, "session-close: embed failed");
263                continue;
264            }
265        };
266
267        let record = match EmbedRecord::new(*memory_id, STUB_BACKEND_ID, vec, now) {
268            Ok(r) => r,
269            Err(err) => {
270                tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to build embed record");
271                continue;
272            }
273        };
274
275        if let Err(err) = embed_repo.write(&record) {
276            tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to write embedding");
277        }
278    }
279
280    Ok(CloseOutcome {
281        ingested,
282        reflected,
283        pending_commit: pending_ids.len(),
284        receipt_id: trace_id_for_reflect.to_string(),
285    })
286}
287
288/// Parse raw bytes as session events JSON.
289///
290/// Accepts three shapes:
291/// 1. `{"events": [Event, ...]}` — explicit envelope.
292/// 2. `[Event, ...]` — bare array.
293/// 3. `{...}` — single event object.
294fn parse_events(raw: &[u8]) -> Result<Vec<Event>, String> {
295    let value: serde_json::Value =
296        serde_json::from_slice(raw).map_err(|err| format!("invalid JSON: {err}"))?;
297
298    let events: Vec<serde_json::Value> = match &value {
299        serde_json::Value::Object(map) => {
300            if let Some(events_field) = map.get("events") {
301                events_field
302                    .as_array()
303                    .ok_or_else(|| "events field is not an array".to_string())?
304                    .to_owned()
305            } else {
306                vec![value.clone()]
307            }
308        }
309        serde_json::Value::Array(arr) => arr.to_owned(),
310        _ => return Err("unexpected JSON shape: must be object or array".to_string()),
311    };
312
313    let mut parsed = Vec::with_capacity(events.len());
314    for (i, ev) in events.iter().enumerate() {
315        let event: Event = serde_json::from_value(ev.clone())
316            .map_err(|err| format!("event[{i}] failed to deserialize: {err}"))?;
317        parsed.push(event);
318    }
319    Ok(parsed)
320}
321
322/// Extract the first `trace_id` from a parsed event list.
323fn extract_trace_id(events: &[Event]) -> Option<TraceId> {
324    events.iter().find_map(|ev| ev.trace_id)
325}
326
327/// Append events to the JSONL ledger and the SQLite `events` table, skipping
328/// duplicates.
329///
330/// Refuses `EventSource::User` events — the library has no attestor; operator
331/// must use `cortex ingest --user-attestation` for those.
332///
333/// The dual-write to SQLite ensures that `verify_memory_proof_closure` can
334/// find source events for session-close memories so they are not quarantined
335/// in `cortex memory search`. `EventRepo::append` uses `INSERT OR IGNORE` so
336/// re-running is idempotent.
337fn ingest_events(
338    events: &[Event],
339    event_log: &std::path::Path,
340    pool: &Pool,
341) -> Result<usize, SessionError> {
342    // Refuse User-sourced events: no attestor available in the library context.
343    for event in events {
344        if matches!(event.source, EventSource::User) {
345            return Err(SessionError::Ingest(format!(
346                "EventSource::User event {} cannot be ingested without operator attestation; \
347                 use `cortex ingest --user-attestation` for user-sourced events",
348                event.id,
349            )));
350        }
351    }
352
353    // Ensure parent directory exists.
354    if let Some(parent) = event_log.parent() {
355        if !parent.as_os_str().is_empty() {
356            std::fs::create_dir_all(parent)?;
357        }
358    }
359
360    let mut log = JsonlLog::open(event_log)?;
361
362    // Collect existing event hashes for deduplication. We read the log and
363    // compare event ids by reading the existing lines. JsonlLog doesn't
364    // expose an iterator, but the ingest path in cortex-cli collects ids
365    // from the JSONL lines. We scan the file directly here.
366    let existing_ids = collect_existing_event_ids(event_log)?;
367
368    let event_repo = EventRepo::new(pool);
369    let mut appended = 0usize;
370    for event in events {
371        if existing_ids.contains(&event.id) {
372            // Also ensure the SQLite row exists for pre-existing JSONL events
373            // (idempotent — INSERT OR IGNORE).
374            event_repo.append(event).map_err(SessionError::Store)?;
375            continue;
376        }
377
378        // Compose the minimal policy for a non-user, non-signed append.
379        // All three required contributors must be present (ADR 0026).
380        let policy = session_append_policy(&event.source);
381        log.append(event.clone(), &policy)?;
382
383        // Dual-write to SQLite so proof-closure verification succeeds.
384        event_repo.append(event).map_err(SessionError::Store)?;
385
386        appended += 1;
387    }
388
389    Ok(appended)
390}
391
392/// Collect event ids that already exist in the JSONL log file.
393fn collect_existing_event_ids(
394    path: &std::path::Path,
395) -> Result<std::collections::HashSet<cortex_core::EventId>, SessionError> {
396    use std::io::BufRead;
397
398    if !path.exists() {
399        return Ok(std::collections::HashSet::new());
400    }
401
402    let file = std::fs::File::open(path)?;
403    let reader = std::io::BufReader::new(file);
404    let mut ids = std::collections::HashSet::new();
405
406    for line in reader.lines() {
407        let line = line?;
408        if line.trim().is_empty() {
409            continue;
410        }
411        // Each line is a SignedRow JSON — we only need the event.id field.
412        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
413            if let Some(id_str) = value
414                .get("event")
415                .and_then(|e| e.get("id"))
416                .and_then(|v| v.as_str())
417            {
418                if let Ok(event_id) = id_str.parse::<cortex_core::EventId>() {
419                    ids.insert(event_id);
420                }
421            }
422        }
423    }
424
425    Ok(ids)
426}
427
428/// Compose the ADR 0026 policy decision for a non-user session append.
429///
430/// Non-user events do not require attestation, so the attestation contributor
431/// is `Warn` (honest no-attestation floor). The tier gate is `Allow` for
432/// non-user sources. Runtime mode is `Warn` (local-development unsigned ledger
433/// path).
434fn session_append_policy(source: &EventSource) -> cortex_core::PolicyDecision {
435    // Tier gate: `ingest_events` already refuses `EventSource::User` before
436    // calling this function, so all remaining sources are at least `Observed`.
437    // Classify the source string for the policy record without importing the
438    // `event_source_trust_tier` helper from `cortex-cli` (that would be a
439    // circular dependency). ManualCorrection without an attestor is also
440    // refused upstream; every source that reaches here passes the floor.
441    let (tier_outcome, tier_reason): (PolicyOutcome, &str) = match source {
442        EventSource::User | EventSource::ManualCorrection => {
443            // Should be unreachable — ingest_events refuses these — but fail
444            // closed defensively rather than panic.
445            (
446                PolicyOutcome::Reject,
447                "event source User/ManualCorrection requires operator attestation; refused by ingest_events",
448            )
449        }
450        EventSource::ChildAgent { .. }
451        | EventSource::Tool { .. }
452        | EventSource::Runtime
453        | EventSource::ExternalOutcome => (
454            PolicyOutcome::Allow,
455            "event source meets ingest floor of Observed or above",
456        ),
457    };
458    let tier_contribution = PolicyContribution::new(
459        APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
460        tier_outcome,
461        tier_reason,
462    )
463    .expect("static session append tier contribution is valid");
464
465    // Attestation: no User events reach this path, so Warn (honest floor).
466    let attestation_contribution = PolicyContribution::new(
467        APPEND_ATTESTATION_REQUIRED_RULE_ID,
468        PolicyOutcome::Warn,
469        "session-close ingest: no User events in batch; operator attestation not required",
470    )
471    .expect("static session append attestation contribution is valid");
472
473    // Runtime mode: local-development / unsigned-ledger path.
474    let runtime_mode_contribution = PolicyContribution::new(
475        APPEND_RUNTIME_MODE_RULE_ID,
476        PolicyOutcome::Warn,
477        "session-close ingest: unsigned append (local-development ledger path)",
478    )
479    .expect("static session append runtime mode contribution is valid");
480
481    compose_policy_outcomes(
482        vec![
483            tier_contribution,
484            attestation_contribution,
485            runtime_mode_contribution,
486        ],
487        None,
488    )
489}
490
491/// Run the `cortex_reflect::reflect` pipeline.
492fn run_reflect(
493    trace_id: TraceId,
494    fixtures_dir: &std::path::Path,
495    pool: &Pool,
496) -> Result<cortex_reflect::ReflectionReport, String> {
497    use cortex_llm::ReplayAdapter;
498
499    let adapter = ReplayAdapter::new(fixtures_dir).map_err(|err| format!("{err}"))?;
500
501    let rt = tokio::runtime::Builder::new_current_thread()
502        .enable_all()
503        .build()
504        .map_err(|err| format!("failed to create tokio runtime: {err}"))?;
505
506    rt.block_on(cortex_reflect::reflect(trace_id, &adapter, pool))
507        .map_err(|err| format!("{err}"))
508}