cortex_mcp/tools/session_close.rs
1//! `cortex_session_close` MCP tool handler.
2//!
3//! Schema (ADR 0045 §4, amended by ADR 0047 §4):
4//! ```text
5//! cortex_session_close(events_json: string, skip_reflect?: bool, live_reflect?: bool)
6//! → { ingested: int, reflected: int, pending_commit: int, receipt_id: string }
7//! ```
8//!
9//! ## `live_reflect`
10//!
11//! When `live_reflect: true` the session-close pipeline is instructed to run
12//! reflection against a live LLM backend rather than the deterministic replay
13//! fixtures. The flag is forwarded to the underlying `close_from_bytes`
14//! invocation as a future-facing parameter; at present `close_from_bytes` uses
15//! the fixtures directory unconditionally, so `live_reflect` is recorded in the
16//! tracing span for observability but does not alter the code path until the
17//! reflect pipeline gains a live-adapter branch (tracked separately).
18//!
19//! Operators should call `cortex_config` first to confirm an LLM backend is
20//! configured before setting `live_reflect: true`.
21//!
22//! The response field is `pending_commit` (not `activated`). Memories written
23//! by this tool are tagged `pending_mcp_commit` and are not searchable within
24//! the same MCP session until the operator calls `cortex_session_commit` or
25//! the server restarts (ADR 0047 §1–2).
26//!
27//! # ADR 0038 EXEMPTION
28//!
29//! The ADR 0038 session-event admission envelope requires field-level
30//! validation of incoming session events. At this stage the
31//! `cortex_session_close` path delegates all structural field validation
32//! to `cortex_session::close_from_bytes`, which calls the same ingest
33//! pipeline as `cortex session close`. That pipeline performs the ADR 0038
34//! checks (trace_id presence, event shape, source authority). There is no
35//! additional field-level exemption: the full envelope applies via the
36//! delegated call.
37//!
38//! # BreakGlass exclusion
39//!
40//! No `BreakGlassAuthorization` parameter exists on this tool. Per ADR 0045
41//! §3, a `BreakGlass` composed outcome from the PolicyEngine is treated
42//! identically to `Reject` at the MCP transport boundary: the tool returns
43//! `ToolError::PolicyRejected` and no write occurs. The `close_from_bytes`
44//! pipeline must honour this contract.
45
46use std::path::PathBuf;
47use std::sync::{Arc, Mutex};
48
49use cortex_session::SessionError;
50use cortex_store::Pool;
51use tracing::warn;
52
53use crate::tool_handler::{GateId, ToolError, ToolHandler};
54
55/// Hard byte ceiling for `events_json` payloads (RT-5, ADR 0045 §3).
56///
57/// Configurable at server startup via `CORTEX_MCP_MAX_EVENTS_BYTES`;
58/// this constant is the default (5 MiB).
59pub const DEFAULT_MAX_EVENTS_BYTES: usize = 5_242_880;
60
61/// `cortex_session_close` tool handler.
62///
63/// Calls `cortex_session::close_from_bytes` after enforcing the hard size
64/// cap (RT-5). Returns `pending_commit` — the count of memories now in
65/// `pending_mcp_commit` state — rather than an `activated` count (ADR 0047).
66///
67/// `rusqlite::Connection` is `Send` but not `Sync`; the `Mutex` provides the
68/// `Sync` bound required by `ToolHandler` while keeping the connection
69/// single-threaded at the call site (the stdio loop is single-threaded).
70#[derive(Debug)]
71pub struct CortexSessionCloseTool {
72 /// SQLite connection guarded by a mutex so the struct satisfies `Sync`.
73 pub pool: Arc<Mutex<Pool>>,
74 /// Path to the JSONL event-log file.
75 pub event_log: PathBuf,
76 /// Path to the LLM replay-adapter fixtures directory.
77 pub fixtures_dir: PathBuf,
78 /// Maximum accepted byte length for `events_json`. Defaults to
79 /// [`DEFAULT_MAX_EVENTS_BYTES`].
80 pub max_events_bytes: usize,
81}
82
83impl CortexSessionCloseTool {
84 /// Construct with the default size cap.
85 #[must_use]
86 pub fn new(pool: Arc<Mutex<Pool>>, event_log: PathBuf, fixtures_dir: PathBuf) -> Self {
87 Self {
88 pool,
89 event_log,
90 fixtures_dir,
91 max_events_bytes: DEFAULT_MAX_EVENTS_BYTES,
92 }
93 }
94}
95
96impl ToolHandler for CortexSessionCloseTool {
97 fn name(&self) -> &'static str {
98 "cortex_session_close"
99 }
100
101 fn gate_set(&self) -> &'static [GateId] {
102 &[GateId::SessionWrite]
103 }
104
105 fn call(&self, params: serde_json::Value) -> Result<serde_json::Value, ToolError> {
106 // ── 1. Extract events_json from params ────────────────────────────
107 let events_json = params
108 .get("events_json")
109 .and_then(|v| v.as_str())
110 .ok_or_else(|| {
111 ToolError::InvalidParams(
112 "required parameter `events_json` is missing or not a string".into(),
113 )
114 })?
115 .to_owned();
116
117 // ── 1a. Extract optional flags ────────────────────────────────────
118 let live_reflect = params
119 .get("live_reflect")
120 .and_then(|v| v.as_bool())
121 .unwrap_or(false);
122
123 let skip_reflect = params
124 .get("skip_reflect")
125 .and_then(|v| v.as_bool())
126 .unwrap_or(false);
127
128 if live_reflect {
129 // Forward-facing: the flag is logged for observability. The
130 // close_from_bytes pipeline does not yet have a live-reflect
131 // branch; a full backend must be configured separately.
132 tracing::info!(
133 live_reflect = true,
134 skip_reflect = skip_reflect,
135 "cortex_session_close: live_reflect=true (observability only — \
136 replay-fixture path in use until live-reflect branch lands)"
137 );
138 }
139
140 // ── 2. Hard size cap (RT-5, ADR 0045 §3) ─────────────────────────
141 // Enforce before any parsing or I/O. The cap is checked on the byte
142 // length of the UTF-8 string, not the character count, matching the
143 // ADR's intent of bounding the data volume arriving at the pipeline.
144 if events_json.len() > self.max_events_bytes {
145 warn!(
146 bytes = events_json.len(),
147 limit = self.max_events_bytes,
148 "cortex_session_close: events_json payload rejected — exceeds size cap"
149 );
150 return Err(ToolError::SizeLimitExceeded(
151 "events_json exceeds 5MB limit".into(),
152 ));
153 }
154
155 // ── 3. Delegate to the session-close pipeline ─────────────────────
156 // `close_from_bytes` runs the full ADR 0026 policy composition,
157 // ADR 0038 admission-envelope validation, ingest, reflect, and
158 // `pending_mcp_commit` write. Any BreakGlass composed outcome from
159 // the PolicyEngine is returned as an Err by the pipeline (ADR 0045
160 // §3) and surfaces here as ToolError::PolicyRejected.
161 let pool_guard = self
162 .pool
163 .lock()
164 .map_err(|_| ToolError::Internal("pool lock poisoned".into()))?;
165 let outcome = cortex_session::close_from_bytes(
166 events_json.as_bytes(),
167 &pool_guard,
168 self.event_log.clone(),
169 &self.fixtures_dir,
170 )
171 .map_err(map_session_error)?;
172
173 // ── 4. Return the ADR 0047 schema ─────────────────────────────────
174 // `pending_commit` — not `activated` — per ADR 0047 §4.
175 Ok(serde_json::json!({
176 "ingested": outcome.ingested,
177 "reflected": outcome.reflected,
178 "pending_commit": outcome.pending_commit,
179 "receipt_id": outcome.receipt_id,
180 }))
181 }
182}
183
184/// Map a [`SessionError`] to a [`ToolError`].
185///
186/// Policy rejections produced by the ADR 0026 pipeline — including any
187/// `BreakGlass` composed outcome, which is treated as `Reject` at the MCP
188/// boundary (ADR 0045 §3) — must surface as `PolicyRejected` so the
189/// transport layer emits a JSON-RPC `-32000` error without writing ledger
190/// state.
191fn map_session_error(err: SessionError) -> ToolError {
192 let msg = err.to_string();
193 // The session pipeline surfaces policy rejections with these stable
194 // prefixes. Any variant that mentions "policy" or "reject" maps to
195 // PolicyRejected; everything else is Internal.
196 if msg.contains("Reject")
197 || msg.contains("Quarantine")
198 || msg.contains("BreakGlass")
199 || msg.contains("policy rejected")
200 {
201 ToolError::PolicyRejected(msg)
202 } else {
203 ToolError::Internal(msg)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use std::sync::Mutex;
210
211 use super::*;
212
213 fn make_pool() -> Arc<Mutex<Pool>> {
214 Arc::new(Mutex::new(
215 cortex_store::Pool::open_in_memory().expect("in-memory sqlite"),
216 ))
217 }
218
219 fn make_tool_tiny_cap() -> CortexSessionCloseTool {
220 CortexSessionCloseTool {
221 pool: make_pool(),
222 event_log: PathBuf::from("/tmp/test.jsonl"),
223 fixtures_dir: PathBuf::from("/tmp/fixtures"),
224 max_events_bytes: 10,
225 }
226 }
227
228 fn make_tool() -> CortexSessionCloseTool {
229 CortexSessionCloseTool::new(
230 make_pool(),
231 PathBuf::from("/tmp/test.jsonl"),
232 PathBuf::from("/tmp/fixtures"),
233 )
234 }
235
236 /// Size-cap check must fire before any parsing (RT-5).
237 #[test]
238 fn size_cap_rejects_oversized_payload() {
239 let tool = make_tool_tiny_cap();
240 let oversized = "x".repeat(11);
241 let params = serde_json::json!({ "events_json": oversized });
242 let err = tool
243 .call(params)
244 .expect_err("must reject oversized payload");
245
246 assert!(
247 matches!(err, ToolError::SizeLimitExceeded(_)),
248 "expected SizeLimitExceeded, got: {err:?}"
249 );
250 }
251
252 /// Missing `events_json` parameter must be rejected.
253 #[test]
254 fn missing_events_json_returns_invalid_params() {
255 let tool = make_tool();
256 let err = tool
257 .call(serde_json::json!({}))
258 .expect_err("must reject missing events_json");
259
260 assert!(
261 matches!(err, ToolError::InvalidParams(_)),
262 "expected InvalidParams, got: {err:?}"
263 );
264 }
265
266 /// gate_set must declare SessionWrite.
267 #[test]
268 fn gate_set_declares_session_write() {
269 let tool = make_tool();
270 assert!(
271 tool.gate_set().contains(&GateId::SessionWrite),
272 "gate_set must include SessionWrite"
273 );
274 }
275
276 /// Tool name matches the ADR 0045 §4 schema contract.
277 #[test]
278 fn tool_name_matches_schema_contract() {
279 let tool = make_tool();
280 assert_eq!(tool.name(), "cortex_session_close");
281 }
282}