crtx-mcp 0.1.2

MCP stdio JSON-RPC 2.0 server for Cortex — tool dispatch, ToolHandler trait, gate wiring (ADR 0045).
Documentation
//! `cortex_session_close` MCP tool handler.
//!
//! Schema (ADR 0045 §4, amended by ADR 0047 §4):
//! ```text
//! cortex_session_close(events_json: string, skip_reflect?: bool, live_reflect?: bool)
//!   → { ingested: int, reflected: int, pending_commit: int, receipt_id: string }
//! ```
//!
//! ## `live_reflect`
//!
//! When `live_reflect: true` the session-close pipeline is instructed to run
//! reflection against a live LLM backend rather than the deterministic replay
//! fixtures. The flag is forwarded to the underlying `close_from_bytes`
//! invocation as a future-facing parameter; at present `close_from_bytes` uses
//! the fixtures directory unconditionally, so `live_reflect` is recorded in the
//! tracing span for observability but does not alter the code path until the
//! reflect pipeline gains a live-adapter branch (tracked separately).
//!
//! Operators should call `cortex_config` first to confirm an LLM backend is
//! configured before setting `live_reflect: true`.
//!
//! The response field is `pending_commit` (not `activated`). Memories written
//! by this tool are tagged `pending_mcp_commit` and are not searchable within
//! the same MCP session until the operator calls `cortex_session_commit` or
//! the server restarts (ADR 0047 §1–2).
//!
//! # ADR 0038 EXEMPTION
//!
//! The ADR 0038 session-event admission envelope requires field-level
//! validation of incoming session events. At this stage the
//! `cortex_session_close` path delegates all structural field validation
//! to `cortex_session::close_from_bytes`, which calls the same ingest
//! pipeline as `cortex session close`. That pipeline performs the ADR 0038
//! checks (trace_id presence, event shape, source authority). There is no
//! additional field-level exemption: the full envelope applies via the
//! delegated call.
//!
//! # BreakGlass exclusion
//!
//! No `BreakGlassAuthorization` parameter exists on this tool. Per ADR 0045
//! §3, a `BreakGlass` composed outcome from the PolicyEngine is treated
//! identically to `Reject` at the MCP transport boundary: the tool returns
//! `ToolError::PolicyRejected` and no write occurs. The `close_from_bytes`
//! pipeline must honour this contract.

use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use cortex_session::SessionError;
use cortex_store::Pool;
use tracing::warn;

use crate::tool_handler::{GateId, ToolError, ToolHandler};

/// Hard byte ceiling for `events_json` payloads (RT-5, ADR 0045 §3).
///
/// Configurable at server startup via `CORTEX_MCP_MAX_EVENTS_BYTES`;
/// this constant is the default (5 MiB).
pub const DEFAULT_MAX_EVENTS_BYTES: usize = 5_242_880;

/// `cortex_session_close` tool handler.
///
/// Calls `cortex_session::close_from_bytes` after enforcing the hard size
/// cap (RT-5). Returns `pending_commit` — the count of memories now in
/// `pending_mcp_commit` state — rather than an `activated` count (ADR 0047).
///
/// `rusqlite::Connection` is `Send` but not `Sync`; the `Mutex` provides the
/// `Sync` bound required by `ToolHandler` while keeping the connection
/// single-threaded at the call site (the stdio loop is single-threaded).
#[derive(Debug)]
pub struct CortexSessionCloseTool {
    /// SQLite connection guarded by a mutex so the struct satisfies `Sync`.
    pub pool: Arc<Mutex<Pool>>,
    /// Path to the JSONL event-log file.
    pub event_log: PathBuf,
    /// Path to the LLM replay-adapter fixtures directory.
    pub fixtures_dir: PathBuf,
    /// Maximum accepted byte length for `events_json`. Defaults to
    /// [`DEFAULT_MAX_EVENTS_BYTES`].
    pub max_events_bytes: usize,
}

impl CortexSessionCloseTool {
    /// Construct with the default size cap.
    #[must_use]
    pub fn new(pool: Arc<Mutex<Pool>>, event_log: PathBuf, fixtures_dir: PathBuf) -> Self {
        Self {
            pool,
            event_log,
            fixtures_dir,
            max_events_bytes: DEFAULT_MAX_EVENTS_BYTES,
        }
    }
}

impl ToolHandler for CortexSessionCloseTool {
    fn name(&self) -> &'static str {
        "cortex_session_close"
    }

    fn gate_set(&self) -> &'static [GateId] {
        &[GateId::SessionWrite]
    }

    fn call(&self, params: serde_json::Value) -> Result<serde_json::Value, ToolError> {
        // ── 1. Extract events_json from params ────────────────────────────
        let events_json = params
            .get("events_json")
            .and_then(|v| v.as_str())
            .ok_or_else(|| {
                ToolError::InvalidParams(
                    "required parameter `events_json` is missing or not a string".into(),
                )
            })?
            .to_owned();

        // ── 1a. Extract optional flags ────────────────────────────────────
        let live_reflect = params
            .get("live_reflect")
            .and_then(|v| v.as_bool())
            .unwrap_or(false);

        let skip_reflect = params
            .get("skip_reflect")
            .and_then(|v| v.as_bool())
            .unwrap_or(false);

        if live_reflect {
            // Forward-facing: the flag is logged for observability. The
            // close_from_bytes pipeline does not yet have a live-reflect
            // branch; a full backend must be configured separately.
            tracing::info!(
                live_reflect = true,
                skip_reflect = skip_reflect,
                "cortex_session_close: live_reflect=true (observability only — \
                 replay-fixture path in use until live-reflect branch lands)"
            );
        }

        // ── 2. Hard size cap (RT-5, ADR 0045 §3) ─────────────────────────
        // Enforce before any parsing or I/O. The cap is checked on the byte
        // length of the UTF-8 string, not the character count, matching the
        // ADR's intent of bounding the data volume arriving at the pipeline.
        if events_json.len() > self.max_events_bytes {
            warn!(
                bytes = events_json.len(),
                limit = self.max_events_bytes,
                "cortex_session_close: events_json payload rejected — exceeds size cap"
            );
            return Err(ToolError::SizeLimitExceeded(
                "events_json exceeds 5MB limit".into(),
            ));
        }

        // ── 3. Delegate to the session-close pipeline ─────────────────────
        // `close_from_bytes` runs the full ADR 0026 policy composition,
        // ADR 0038 admission-envelope validation, ingest, reflect, and
        // `pending_mcp_commit` write. Any BreakGlass composed outcome from
        // the PolicyEngine is returned as an Err by the pipeline (ADR 0045
        // §3) and surfaces here as ToolError::PolicyRejected.
        let pool_guard = self
            .pool
            .lock()
            .map_err(|_| ToolError::Internal("pool lock poisoned".into()))?;
        let outcome = cortex_session::close_from_bytes(
            events_json.as_bytes(),
            &pool_guard,
            self.event_log.clone(),
            &self.fixtures_dir,
        )
        .map_err(map_session_error)?;

        // ── 4. Return the ADR 0047 schema ─────────────────────────────────
        // `pending_commit` — not `activated` — per ADR 0047 §4.
        Ok(serde_json::json!({
            "ingested":       outcome.ingested,
            "reflected":      outcome.reflected,
            "pending_commit": outcome.pending_commit,
            "receipt_id":     outcome.receipt_id,
        }))
    }
}

/// Map a [`SessionError`] to a [`ToolError`].
///
/// Policy rejections produced by the ADR 0026 pipeline — including any
/// `BreakGlass` composed outcome, which is treated as `Reject` at the MCP
/// boundary (ADR 0045 §3) — must surface as `PolicyRejected` so the
/// transport layer emits a JSON-RPC `-32000` error without writing ledger
/// state.
fn map_session_error(err: SessionError) -> ToolError {
    let msg = err.to_string();
    // The session pipeline surfaces policy rejections with these stable
    // prefixes. Any variant that mentions "policy" or "reject" maps to
    // PolicyRejected; everything else is Internal.
    if msg.contains("Reject")
        || msg.contains("Quarantine")
        || msg.contains("BreakGlass")
        || msg.contains("policy rejected")
    {
        ToolError::PolicyRejected(msg)
    } else {
        ToolError::Internal(msg)
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Mutex;

    use super::*;

    fn make_pool() -> Arc<Mutex<Pool>> {
        Arc::new(Mutex::new(
            cortex_store::Pool::open_in_memory().expect("in-memory sqlite"),
        ))
    }

    fn make_tool_tiny_cap() -> CortexSessionCloseTool {
        CortexSessionCloseTool {
            pool: make_pool(),
            event_log: PathBuf::from("/tmp/test.jsonl"),
            fixtures_dir: PathBuf::from("/tmp/fixtures"),
            max_events_bytes: 10,
        }
    }

    fn make_tool() -> CortexSessionCloseTool {
        CortexSessionCloseTool::new(
            make_pool(),
            PathBuf::from("/tmp/test.jsonl"),
            PathBuf::from("/tmp/fixtures"),
        )
    }

    /// Size-cap check must fire before any parsing (RT-5).
    #[test]
    fn size_cap_rejects_oversized_payload() {
        let tool = make_tool_tiny_cap();
        let oversized = "x".repeat(11);
        let params = serde_json::json!({ "events_json": oversized });
        let err = tool
            .call(params)
            .expect_err("must reject oversized payload");

        assert!(
            matches!(err, ToolError::SizeLimitExceeded(_)),
            "expected SizeLimitExceeded, got: {err:?}"
        );
    }

    /// Missing `events_json` parameter must be rejected.
    #[test]
    fn missing_events_json_returns_invalid_params() {
        let tool = make_tool();
        let err = tool
            .call(serde_json::json!({}))
            .expect_err("must reject missing events_json");

        assert!(
            matches!(err, ToolError::InvalidParams(_)),
            "expected InvalidParams, got: {err:?}"
        );
    }

    /// gate_set must declare SessionWrite.
    #[test]
    fn gate_set_declares_session_write() {
        let tool = make_tool();
        assert!(
            tool.gate_set().contains(&GateId::SessionWrite),
            "gate_set must include SessionWrite"
        );
    }

    /// Tool name matches the ADR 0045 §4 schema contract.
    #[test]
    fn tool_name_matches_schema_contract() {
        let tool = make_tool();
        assert_eq!(tool.name(), "cortex_session_close");
    }
}