Skip to main content

cortex_mcp/tools/
session_close.rs

1//! `cortex_session_close` MCP tool handler.
2//!
3//! Schema (ADR 0045 §4, amended by ADR 0047 §4):
4//! ```text
5//! cortex_session_close(events_json: string, skip_reflect?: bool, live_reflect?: bool)
6//!   → { ingested: int, reflected: int, pending_commit: int, receipt_id: string }
7//! ```
8//!
9//! ## `live_reflect`
10//!
11//! When `live_reflect: true` the session-close pipeline is instructed to run
12//! reflection against a live LLM backend rather than the deterministic replay
13//! fixtures. The flag is forwarded to the underlying `close_from_bytes`
14//! invocation as a future-facing parameter; at present `close_from_bytes` uses
15//! the fixtures directory unconditionally, so `live_reflect` is recorded in the
16//! tracing span for observability but does not alter the code path until the
17//! reflect pipeline gains a live-adapter branch (tracked separately).
18//!
19//! Operators should call `cortex_config` first to confirm an LLM backend is
20//! configured before setting `live_reflect: true`.
21//!
22//! The response field is `pending_commit` (not `activated`). Memories written
23//! by this tool are tagged `pending_mcp_commit` and are not searchable within
24//! the same MCP session until the operator calls `cortex_session_commit` or
25//! the server restarts (ADR 0047 §1–2).
26//!
27//! # ADR 0038 EXEMPTION
28//!
29//! The ADR 0038 session-event admission envelope requires field-level
30//! validation of incoming session events. At this stage the
31//! `cortex_session_close` path delegates all structural field validation
32//! to `cortex_session::close_from_bytes`, which calls the same ingest
33//! pipeline as `cortex session close`. That pipeline performs the ADR 0038
34//! checks (trace_id presence, event shape, source authority). There is no
35//! additional field-level exemption: the full envelope applies via the
36//! delegated call.
37//!
38//! # BreakGlass exclusion
39//!
40//! No `BreakGlassAuthorization` parameter exists on this tool. Per ADR 0045
41//! §3, a `BreakGlass` composed outcome from the PolicyEngine is treated
42//! identically to `Reject` at the MCP transport boundary: the tool returns
43//! `ToolError::PolicyRejected` and no write occurs. The `close_from_bytes`
44//! pipeline must honour this contract.
45
46use std::path::PathBuf;
47use std::sync::{Arc, Mutex};
48
49use cortex_session::SessionError;
50use cortex_store::Pool;
51use tracing::warn;
52
53use crate::tool_handler::{GateId, ToolError, ToolHandler};
54
55/// Hard byte ceiling for `events_json` payloads (RT-5, ADR 0045 §3).
56///
57/// Configurable at server startup via `CORTEX_MCP_MAX_EVENTS_BYTES`;
58/// this constant is the default (5 MiB).
59pub const DEFAULT_MAX_EVENTS_BYTES: usize = 5_242_880;
60
61/// `cortex_session_close` tool handler.
62///
63/// Calls `cortex_session::close_from_bytes` after enforcing the hard size
64/// cap (RT-5). Returns `pending_commit` — the count of memories now in
65/// `pending_mcp_commit` state — rather than an `activated` count (ADR 0047).
66///
67/// `rusqlite::Connection` is `Send` but not `Sync`; the `Mutex` provides the
68/// `Sync` bound required by `ToolHandler` while keeping the connection
69/// single-threaded at the call site (the stdio loop is single-threaded).
70#[derive(Debug)]
71pub struct CortexSessionCloseTool {
72    /// SQLite connection guarded by a mutex so the struct satisfies `Sync`.
73    pub pool: Arc<Mutex<Pool>>,
74    /// Path to the JSONL event-log file.
75    pub event_log: PathBuf,
76    /// Path to the LLM replay-adapter fixtures directory.
77    pub fixtures_dir: PathBuf,
78    /// Maximum accepted byte length for `events_json`. Defaults to
79    /// [`DEFAULT_MAX_EVENTS_BYTES`].
80    pub max_events_bytes: usize,
81}
82
83impl CortexSessionCloseTool {
84    /// Construct with the default size cap.
85    #[must_use]
86    pub fn new(pool: Arc<Mutex<Pool>>, event_log: PathBuf, fixtures_dir: PathBuf) -> Self {
87        Self {
88            pool,
89            event_log,
90            fixtures_dir,
91            max_events_bytes: DEFAULT_MAX_EVENTS_BYTES,
92        }
93    }
94}
95
96impl ToolHandler for CortexSessionCloseTool {
97    fn name(&self) -> &'static str {
98        "cortex_session_close"
99    }
100
101    fn gate_set(&self) -> &'static [GateId] {
102        &[GateId::SessionWrite]
103    }
104
105    fn call(&self, params: serde_json::Value) -> Result<serde_json::Value, ToolError> {
106        // ── 1. Extract events_json from params ────────────────────────────
107        let events_json = params
108            .get("events_json")
109            .and_then(|v| v.as_str())
110            .ok_or_else(|| {
111                ToolError::InvalidParams(
112                    "required parameter `events_json` is missing or not a string".into(),
113                )
114            })?
115            .to_owned();
116
117        // ── 1a. Extract optional flags ────────────────────────────────────
118        let live_reflect = params
119            .get("live_reflect")
120            .and_then(|v| v.as_bool())
121            .unwrap_or(false);
122
123        let skip_reflect = params
124            .get("skip_reflect")
125            .and_then(|v| v.as_bool())
126            .unwrap_or(false);
127
128        if live_reflect {
129            // Forward-facing: the flag is logged for observability. The
130            // close_from_bytes pipeline does not yet have a live-reflect
131            // branch; a full backend must be configured separately.
132            tracing::info!(
133                live_reflect = true,
134                skip_reflect = skip_reflect,
135                "cortex_session_close: live_reflect=true (observability only — \
136                 replay-fixture path in use until live-reflect branch lands)"
137            );
138        }
139
140        // ── 2. Hard size cap (RT-5, ADR 0045 §3) ─────────────────────────
141        // Enforce before any parsing or I/O. The cap is checked on the byte
142        // length of the UTF-8 string, not the character count, matching the
143        // ADR's intent of bounding the data volume arriving at the pipeline.
144        if events_json.len() > self.max_events_bytes {
145            warn!(
146                bytes = events_json.len(),
147                limit = self.max_events_bytes,
148                "cortex_session_close: events_json payload rejected — exceeds size cap"
149            );
150            return Err(ToolError::SizeLimitExceeded(
151                "events_json exceeds 5MB limit".into(),
152            ));
153        }
154
155        // ── 3. Delegate to the session-close pipeline ─────────────────────
156        // `close_from_bytes` runs the full ADR 0026 policy composition,
157        // ADR 0038 admission-envelope validation, ingest, reflect, and
158        // `pending_mcp_commit` write. Any BreakGlass composed outcome from
159        // the PolicyEngine is returned as an Err by the pipeline (ADR 0045
160        // §3) and surfaces here as ToolError::PolicyRejected.
161        let pool_guard = self
162            .pool
163            .lock()
164            .map_err(|_| ToolError::Internal("pool lock poisoned".into()))?;
165        let outcome = cortex_session::close_from_bytes(
166            events_json.as_bytes(),
167            &pool_guard,
168            self.event_log.clone(),
169            &self.fixtures_dir,
170        )
171        .map_err(map_session_error)?;
172
173        // ── 4. Return the ADR 0047 schema ─────────────────────────────────
174        // `pending_commit` — not `activated` — per ADR 0047 §4.
175        Ok(serde_json::json!({
176            "ingested":       outcome.ingested,
177            "reflected":      outcome.reflected,
178            "pending_commit": outcome.pending_commit,
179            "receipt_id":     outcome.receipt_id,
180        }))
181    }
182}
183
184/// Map a [`SessionError`] to a [`ToolError`].
185///
186/// Policy rejections produced by the ADR 0026 pipeline — including any
187/// `BreakGlass` composed outcome, which is treated as `Reject` at the MCP
188/// boundary (ADR 0045 §3) — must surface as `PolicyRejected` so the
189/// transport layer emits a JSON-RPC `-32000` error without writing ledger
190/// state.
191fn map_session_error(err: SessionError) -> ToolError {
192    let msg = err.to_string();
193    // The session pipeline surfaces policy rejections with these stable
194    // prefixes. Any variant that mentions "policy" or "reject" maps to
195    // PolicyRejected; everything else is Internal.
196    if msg.contains("Reject")
197        || msg.contains("Quarantine")
198        || msg.contains("BreakGlass")
199        || msg.contains("policy rejected")
200    {
201        ToolError::PolicyRejected(msg)
202    } else {
203        ToolError::Internal(msg)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use std::sync::Mutex;
210
211    use super::*;
212
213    fn make_pool() -> Arc<Mutex<Pool>> {
214        Arc::new(Mutex::new(
215            cortex_store::Pool::open_in_memory().expect("in-memory sqlite"),
216        ))
217    }
218
219    fn make_tool_tiny_cap() -> CortexSessionCloseTool {
220        CortexSessionCloseTool {
221            pool: make_pool(),
222            event_log: PathBuf::from("/tmp/test.jsonl"),
223            fixtures_dir: PathBuf::from("/tmp/fixtures"),
224            max_events_bytes: 10,
225        }
226    }
227
228    fn make_tool() -> CortexSessionCloseTool {
229        CortexSessionCloseTool::new(
230            make_pool(),
231            PathBuf::from("/tmp/test.jsonl"),
232            PathBuf::from("/tmp/fixtures"),
233        )
234    }
235
236    /// Size-cap check must fire before any parsing (RT-5).
237    #[test]
238    fn size_cap_rejects_oversized_payload() {
239        let tool = make_tool_tiny_cap();
240        let oversized = "x".repeat(11);
241        let params = serde_json::json!({ "events_json": oversized });
242        let err = tool
243            .call(params)
244            .expect_err("must reject oversized payload");
245
246        assert!(
247            matches!(err, ToolError::SizeLimitExceeded(_)),
248            "expected SizeLimitExceeded, got: {err:?}"
249        );
250    }
251
252    /// Missing `events_json` parameter must be rejected.
253    #[test]
254    fn missing_events_json_returns_invalid_params() {
255        let tool = make_tool();
256        let err = tool
257            .call(serde_json::json!({}))
258            .expect_err("must reject missing events_json");
259
260        assert!(
261            matches!(err, ToolError::InvalidParams(_)),
262            "expected InvalidParams, got: {err:?}"
263        );
264    }
265
266    /// gate_set must declare SessionWrite.
267    #[test]
268    fn gate_set_declares_session_write() {
269        let tool = make_tool();
270        assert!(
271            tool.gate_set().contains(&GateId::SessionWrite),
272            "gate_set must include SessionWrite"
273        );
274    }
275
276    /// Tool name matches the ADR 0045 §4 schema contract.
277    #[test]
278    fn tool_name_matches_schema_contract() {
279        let tool = make_tool();
280        assert_eq!(tool.name(), "cortex_session_close");
281    }
282}