use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use cortex_session::SessionError;
use cortex_store::Pool;
use tracing::warn;
use crate::tool_handler::{GateId, ToolError, ToolHandler};
pub const DEFAULT_MAX_EVENTS_BYTES: usize = 5_242_880;
#[derive(Debug)]
pub struct CortexSessionCloseTool {
pub pool: Arc<Mutex<Pool>>,
pub event_log: PathBuf,
pub fixtures_dir: PathBuf,
pub max_events_bytes: usize,
}
impl CortexSessionCloseTool {
#[must_use]
pub fn new(pool: Arc<Mutex<Pool>>, event_log: PathBuf, fixtures_dir: PathBuf) -> Self {
Self {
pool,
event_log,
fixtures_dir,
max_events_bytes: DEFAULT_MAX_EVENTS_BYTES,
}
}
}
impl ToolHandler for CortexSessionCloseTool {
fn name(&self) -> &'static str {
"cortex_session_close"
}
fn gate_set(&self) -> &'static [GateId] {
&[GateId::SessionWrite]
}
fn call(&self, params: serde_json::Value) -> Result<serde_json::Value, ToolError> {
let events_json = params
.get("events_json")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ToolError::InvalidParams(
"required parameter `events_json` is missing or not a string".into(),
)
})?
.to_owned();
let live_reflect = params
.get("live_reflect")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let skip_reflect = params
.get("skip_reflect")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if live_reflect {
tracing::info!(
live_reflect = true,
skip_reflect = skip_reflect,
"cortex_session_close: live_reflect=true (observability only — \
replay-fixture path in use until live-reflect branch lands)"
);
}
if events_json.len() > self.max_events_bytes {
warn!(
bytes = events_json.len(),
limit = self.max_events_bytes,
"cortex_session_close: events_json payload rejected — exceeds size cap"
);
return Err(ToolError::SizeLimitExceeded(
"events_json exceeds 5MB limit".into(),
));
}
let pool_guard = self
.pool
.lock()
.map_err(|_| ToolError::Internal("pool lock poisoned".into()))?;
let outcome = cortex_session::close_from_bytes(
events_json.as_bytes(),
&pool_guard,
self.event_log.clone(),
&self.fixtures_dir,
)
.map_err(map_session_error)?;
Ok(serde_json::json!({
"ingested": outcome.ingested,
"reflected": outcome.reflected,
"pending_commit": outcome.pending_commit,
"receipt_id": outcome.receipt_id,
}))
}
}
fn map_session_error(err: SessionError) -> ToolError {
let msg = err.to_string();
if msg.contains("Reject")
|| msg.contains("Quarantine")
|| msg.contains("BreakGlass")
|| msg.contains("policy rejected")
{
ToolError::PolicyRejected(msg)
} else {
ToolError::Internal(msg)
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
fn make_pool() -> Arc<Mutex<Pool>> {
Arc::new(Mutex::new(
cortex_store::Pool::open_in_memory().expect("in-memory sqlite"),
))
}
fn make_tool_tiny_cap() -> CortexSessionCloseTool {
CortexSessionCloseTool {
pool: make_pool(),
event_log: PathBuf::from("/tmp/test.jsonl"),
fixtures_dir: PathBuf::from("/tmp/fixtures"),
max_events_bytes: 10,
}
}
fn make_tool() -> CortexSessionCloseTool {
CortexSessionCloseTool::new(
make_pool(),
PathBuf::from("/tmp/test.jsonl"),
PathBuf::from("/tmp/fixtures"),
)
}
#[test]
fn size_cap_rejects_oversized_payload() {
let tool = make_tool_tiny_cap();
let oversized = "x".repeat(11);
let params = serde_json::json!({ "events_json": oversized });
let err = tool
.call(params)
.expect_err("must reject oversized payload");
assert!(
matches!(err, ToolError::SizeLimitExceeded(_)),
"expected SizeLimitExceeded, got: {err:?}"
);
}
#[test]
fn missing_events_json_returns_invalid_params() {
let tool = make_tool();
let err = tool
.call(serde_json::json!({}))
.expect_err("must reject missing events_json");
assert!(
matches!(err, ToolError::InvalidParams(_)),
"expected InvalidParams, got: {err:?}"
);
}
#[test]
fn gate_set_declares_session_write() {
let tool = make_tool();
assert!(
tool.gate_set().contains(&GateId::SessionWrite),
"gate_set must include SessionWrite"
);
}
#[test]
fn tool_name_matches_schema_contract() {
let tool = make_tool();
assert_eq!(tool.name(), "cortex_session_close");
}
}