use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use cortex_context::{ContextPackBuilder, ContextRefCandidate, ContextRefId, Sensitivity};
use cortex_core::{
compose_policy_outcomes, AuthorityClass, ClaimCeiling, PolicyContribution, PolicyOutcome,
RuntimeMode,
};
use cortex_ledger::{
JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
APPEND_RUNTIME_MODE_RULE_ID,
};
use cortex_llm::{
blake3_hex, LlmAdapter, LlmError, LlmRequest, LlmResponse, MaxSensitivity, OllamaHttpAdapter,
};
use cortex_retrieval::{
AuthorityLevel, AuthorityProofHint, ConflictingMemoryInput, ProofClosureHint,
};
use cortex_runtime::{run_configured, Run};
use cortex_store::mirror::{self, MIRROR_APPEND_PARITY_INVARIANT_RULE_ID};
use cortex_store::proof::verify_memory_proof_closure;
use cortex_store::repo::{ContradictionRepo, MemoryRepo};
use cortex_store::Pool;
use tracing::warn;
use crate::tool_handler::{GateId, ToolError, ToolHandler};
#[derive(Debug)]
pub struct CortexRunTool {
pub pool: Arc<Mutex<Pool>>,
pub event_log: PathBuf,
}
impl CortexRunTool {
#[must_use]
pub fn new(pool: Arc<Mutex<Pool>>, event_log: PathBuf) -> Self {
Self { pool, event_log }
}
}
impl ToolHandler for CortexRunTool {
fn name(&self) -> &'static str {
"cortex_run"
}
fn gate_set(&self) -> &'static [GateId] {
&[GateId::SessionWrite]
}
fn call(&self, params: serde_json::Value) -> Result<serde_json::Value, ToolError> {
let task = params
.get("task")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ToolError::InvalidParams(
"required parameter `task` is missing or not a string".into(),
)
})?
.to_owned();
if task.trim().is_empty() {
return Err(ToolError::InvalidParams("`task` must not be empty".into()));
}
let model_override: Option<String> = params
.get("model")
.and_then(|v| v.as_str())
.map(|s| s.to_owned());
if let Some(ref m) = model_override {
if m.trim().is_empty() {
return Err(ToolError::InvalidParams(
"`model` must not be empty when supplied".into(),
));
}
}
let backend = resolve_backend(model_override.as_deref());
let model_name = backend_model_name(&backend);
tracing::info!(
task = %task,
model = %model_name,
"cortex_run via MCP: dispatching task"
);
let pool_guard = self
.pool
.lock()
.map_err(|_| ToolError::Internal("pool lock poisoned".into()))?;
if let ResolvedBackend::Claude { max_sensitivity, .. } = &backend {
let configured_max = max_sensitivity
.parse::<MaxSensitivity>()
.unwrap_or(MaxSensitivity::Medium);
let repo = MemoryRepo::new(&pool_guard);
match repo.max_sensitivity_for_active_memories() {
Ok(memory_max_str) => {
let gate = cortex_llm::SensitivityGateResult::evaluate(
&memory_max_str,
configured_max,
);
tracing::info!(
max_memory_sensitivity = %gate.max_memory_sensitivity,
configured_max = ?gate.configured_max,
allowed = gate.allowed,
"cortex_run MCP: remote prompt domain-tag sensitivity gate"
);
if !gate.allowed {
return Err(ToolError::PolicyRejected(format!(
"sensitivity_exceeds_remote_threshold: memories at {} exceed configured max_sensitivity {}; remote dispatch refused",
gate.max_memory_sensitivity,
max_sensitivity,
)));
}
}
Err(err) => {
return Err(ToolError::Internal(format!(
"sensitivity gate store query failed: {err}; refusing remote dispatch"
)));
}
}
}
let pack = build_context_pack(&pool_guard, &task)?;
let context_memories_used = pack.selected_refs.len();
drop(pool_guard);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| ToolError::Internal(format!("failed to create tokio runtime: {err}")))?;
let report = runtime.block_on(async {
match &backend {
ResolvedBackend::Offline => {
let adapter = MpcOfflineAdapter;
let mut run = Run::new(task.clone(), pack)
.map_err(|err| ToolError::Internal(err.to_string()))?;
run.model = model_name.clone();
run_configured(run, &adapter)
.await
.map_err(|err| ToolError::Internal(err.to_string()))
}
ResolvedBackend::Claude {
model,
max_sensitivity,
} => {
let sensitivity = max_sensitivity
.parse::<MaxSensitivity>()
.unwrap_or(MaxSensitivity::Medium);
match cortex_llm::ClaudeHttpAdapter::new(model.clone(), Some(sensitivity)) {
Ok(adapter) => {
let mut run = Run::new(task.clone(), pack)
.map_err(|err| ToolError::Internal(err.to_string()))?;
run.model = model.clone();
run.runtime_mode = RuntimeMode::RemoteUnsigned;
run_configured(run, &adapter)
.await
.map_err(|err| ToolError::Internal(err.to_string()))
}
Err(err) => Err(ToolError::Internal(format!(
"ClaudeHttpAdapter init failed: {err}"
))),
}
}
ResolvedBackend::Ollama { endpoint, model } => {
use cortex_llm::OllamaConfig;
let config = OllamaConfig {
endpoint_url: endpoint.clone(),
model: model.clone(),
};
match OllamaHttpAdapter::new(config) {
Ok(adapter) => {
let mut run = Run::new(task.clone(), pack)
.map_err(|err| ToolError::Internal(err.to_string()))?;
run.model = model.clone();
run_configured(run, &adapter)
.await
.map_err(|err| ToolError::Internal(err.to_string()))
}
Err(err) => {
warn!(
"cortex_run: invalid ollama config: {err}; falling back to offline"
);
let adapter = MpcOfflineAdapter;
let mut run = Run::new(task.clone(), pack)
.map_err(|err| ToolError::Internal(err.to_string()))?;
run.model = model_name.clone();
run_configured(run, &adapter)
.await
.map_err(|err| ToolError::Internal(err.to_string()))
}
}
}
}
})?;
let response_text = report
.agent_response_event
.payload
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_owned();
tracing::info!(
task = %task,
model = %report.model,
response_len = response_text.len(),
raw_hash = %report.raw_hash,
"cortex_run via MCP: task='{}' model='{}' response_len={}",
task,
report.model,
response_text.len()
);
let session_indexed =
persist_agent_response_event_mcp(&self.pool, &self.event_log, &report);
Ok(serde_json::json!({
"task": task,
"model": report.model,
"response_text": response_text,
"raw_hash": report.raw_hash,
"persisted": session_indexed,
"session_indexed": session_indexed,
"context_memories_used": context_memories_used,
}))
}
}
fn persist_agent_response_event_mcp(
pool: &Arc<Mutex<Pool>>,
event_log: &std::path::Path,
report: &cortex_runtime::RunReport,
) -> bool {
let ledger_policy = mcp_local_development_ledger_policy();
let mirror_policy = mcp_mirror_parity_satisfied_policy();
match ledger_policy.final_outcome {
PolicyOutcome::Allow | PolicyOutcome::Warn => {}
_ => {
tracing::info!(
raw_hash = %report.raw_hash,
runtime_mode = ?report.runtime_mode,
"cortex_run: ledger write skipped: policy outcome {:?} does not permit unsigned append",
ledger_policy.final_outcome,
);
return false;
}
}
let mut log = match JsonlLog::open(event_log) {
Ok(log) => log,
Err(err) => {
warn!(
event_log = %event_log.display(),
error = %err,
"cortex_run: failed to open JSONL event log for ledger write"
);
return false;
}
};
let mut pool_guard = match pool.lock() {
Ok(guard) => guard,
Err(_) => {
warn!("cortex_run: pool lock poisoned; skipping ledger write");
return false;
}
};
match mirror::append_event(
&mut log,
&mut pool_guard,
report.agent_response_event.clone(),
&ledger_policy,
&mirror_policy,
) {
Ok(sealed) => {
tracing::info!(
event_hash = %sealed.event_hash,
raw_hash = %report.raw_hash,
"cortex_run: AgentResponse event written to JSONL ledger"
);
true
}
Err(err) => {
warn!(
error = %err,
raw_hash = %report.raw_hash,
"cortex_run: failed to persist AgentResponse event to ledger"
);
false
}
}
}
fn mcp_local_development_ledger_policy() -> cortex_core::PolicyDecision {
compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"cortex_run MCP: agent-response source tier gate satisfied",
)
.expect("static policy contribution is valid"),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"cortex_run MCP: non-user agent-response does not require user attestation",
)
.expect("static policy contribution is valid"),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Warn,
"cortex_run MCP: unsigned local-development ledger row (ADR 0037 §2 DevOnly)",
)
.expect("static policy contribution is valid"),
],
None,
)
}
fn mcp_mirror_parity_satisfied_policy() -> cortex_core::PolicyDecision {
compose_policy_outcomes(
vec![PolicyContribution::new(
MIRROR_APPEND_PARITY_INVARIANT_RULE_ID,
PolicyOutcome::Allow,
"cortex_run MCP: mirror parity preflight passes for empty-or-consistent ledger",
)
.expect("static policy contribution is valid")],
None,
)
}
enum ResolvedBackend {
Offline,
Claude {
model: String,
max_sensitivity: String,
},
Ollama {
endpoint: String,
model: String,
},
}
fn resolve_backend(model_override: Option<&str>) -> ResolvedBackend {
if let Some(spec) = model_override {
if let Some(model) = spec.strip_prefix("ollama:") {
let endpoint = std::env::var("CORTEX_LLM_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:11434".to_string());
return ResolvedBackend::Ollama {
endpoint,
model: model.to_string(),
};
}
if let Some(model) = spec.strip_prefix("claude:") {
return ResolvedBackend::Claude {
model: model.to_string(),
max_sensitivity: "medium".to_string(),
};
}
}
let env_backend = std::env::var("CORTEX_LLM_BACKEND")
.ok()
.filter(|s| !s.is_empty());
let env_model = std::env::var("CORTEX_LLM_MODEL")
.ok()
.filter(|s| !s.is_empty());
let env_endpoint = std::env::var("CORTEX_LLM_ENDPOINT")
.ok()
.filter(|s| !s.is_empty());
let backend_str = env_backend.as_deref().unwrap_or("offline");
match backend_str {
"ollama" => {
let model = env_model.unwrap_or_default();
let endpoint = env_endpoint.unwrap_or_else(|| "http://localhost:11434".to_string());
ResolvedBackend::Ollama { endpoint, model }
}
"claude" => {
let model = env_model.unwrap_or_default();
let max_sensitivity = std::env::var("CORTEX_LLM_MAX_SENSITIVITY")
.unwrap_or_else(|_| "medium".to_string());
ResolvedBackend::Claude {
model,
max_sensitivity,
}
}
_ => ResolvedBackend::Offline,
}
}
fn backend_model_name(backend: &ResolvedBackend) -> String {
match backend {
ResolvedBackend::Offline => "offline".to_string(),
ResolvedBackend::Claude { model, .. } => model.clone(),
ResolvedBackend::Ollama { model, .. } => format!("ollama:{model}"),
}
}
fn build_context_pack(pool: &Pool, task: &str) -> Result<cortex_context::ContextPack, ToolError> {
let repo = MemoryRepo::new(pool);
let active = repo
.list_by_status("active")
.map_err(|err| ToolError::Internal(format!("failed to read active memories: {err}")))?;
let mut builder = ContextPackBuilder::new(task, 4096_usize);
for memory in &active {
let proof = verify_memory_proof_closure(pool, &memory.id).map_err(|err| {
ToolError::Internal(format!(
"failed to verify memory {} proof closure: {err}",
memory.id
))
})?;
if proof.require_current_use_allowed().is_err() {
continue;
}
builder = builder.select_ref(
ContextRefCandidate::new(
ContextRefId::Memory {
memory_id: memory.id,
},
memory.claim.clone(),
)
.with_claim_metadata(
RuntimeMode::LocalUnsigned,
AuthorityClass::Derived,
proof.state().into(),
ClaimCeiling::LocalUnsigned,
)
.with_sensitivity(Sensitivity::Internal),
);
}
gate_open_contradictions(pool, &active)?;
builder
.build()
.map_err(|err| ToolError::Internal(format!("context pack build failed: {err}")))
}
fn gate_open_contradictions(
pool: &Pool,
memories: &[cortex_store::repo::MemoryRecord],
) -> Result<(), ToolError> {
use cortex_retrieval::resolve_conflicts;
use std::collections::{BTreeMap, BTreeSet};
let active_by_id: BTreeMap<String, &cortex_store::repo::MemoryRecord> =
memories.iter().map(|m| (m.id.to_string(), m)).collect();
let contradictions = ContradictionRepo::new(pool)
.list_open()
.map_err(|err| ToolError::Internal(format!("failed to read open contradictions: {err}")))?;
let mut affected_ids = BTreeSet::new();
let mut conflict_edges = BTreeMap::<String, BTreeSet<String>>::new();
for contradiction in contradictions {
let left_active = active_by_id.contains_key(&contradiction.left_ref);
let right_active = active_by_id.contains_key(&contradiction.right_ref);
if !left_active && !right_active {
continue;
}
if !(left_active && right_active) {
return Err(ToolError::PolicyRejected(format!(
"open contradiction {} references unavailable memory and cannot be resolved for default context-pack use",
contradiction.id
)));
}
affected_ids.insert(contradiction.left_ref.clone());
affected_ids.insert(contradiction.right_ref.clone());
conflict_edges
.entry(contradiction.left_ref.clone())
.or_default()
.insert(contradiction.right_ref.clone());
conflict_edges
.entry(contradiction.right_ref)
.or_default()
.insert(contradiction.left_ref);
}
if affected_ids.is_empty() {
return Ok(());
}
let inputs = affected_ids
.iter()
.filter_map(|id| active_by_id.get(id.as_str()).copied())
.map(|memory| {
ConflictingMemoryInput::new(
memory.id.to_string(),
Some(memory.id.to_string()),
memory.claim.clone(),
AuthorityProofHint {
authority: authority_level(&memory.authority),
proof: ProofClosureHint::FullChainVerified,
},
)
.with_conflicts(
conflict_edges
.get(&memory.id.to_string())
.map(|ids| ids.iter().cloned().collect())
.unwrap_or_default(),
)
})
.collect::<Vec<_>>();
let output = resolve_conflicts(&inputs, &[]);
output.require_default_use_allowed().map_err(|err| {
ToolError::PolicyRejected(format!(
"open contradiction blocks default context-pack use: {err}"
))
})
}
fn authority_level(authority: &str) -> AuthorityLevel {
match authority {
"user" | "operator" => AuthorityLevel::High,
"tool" | "system" => AuthorityLevel::Medium,
_ => AuthorityLevel::Low,
}
}
#[derive(Debug)]
struct MpcOfflineAdapter;
#[async_trait]
impl LlmAdapter for MpcOfflineAdapter {
fn adapter_id(&self) -> &'static str {
"mcp-offline"
}
async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
let text = format!("offline response for {}", req.model);
Ok(LlmResponse {
text: text.clone(),
parsed_json: None,
model: req.model,
usage: None,
raw_hash: blake3_hex(text.as_bytes()),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_pool() -> Arc<Mutex<Pool>> {
let pool = cortex_store::Pool::open_in_memory().expect("in-memory sqlite");
cortex_store::migrate::apply_pending(&pool).expect("in-memory migrations");
Arc::new(Mutex::new(pool))
}
fn make_tool_with_log(event_log: PathBuf) -> CortexRunTool {
CortexRunTool::new(make_pool(), event_log)
}
fn make_tool() -> (CortexRunTool, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("temp dir");
let log_path = dir.path().join("events.jsonl");
let tool = make_tool_with_log(log_path);
(tool, dir)
}
#[test]
fn tool_name_matches_schema_contract() {
let (tool, _dir) = make_tool();
assert_eq!(tool.name(), "cortex_run");
}
#[test]
fn gate_set_declares_session_write() {
let (tool, _dir) = make_tool();
assert!(
tool.gate_set().contains(&GateId::SessionWrite),
"gate_set must include SessionWrite"
);
}
#[test]
fn missing_task_returns_invalid_params() {
let (tool, _dir) = make_tool();
let err = tool
.call(serde_json::json!({}))
.expect_err("must reject missing task");
assert!(
matches!(err, ToolError::InvalidParams(_)),
"expected InvalidParams, got: {err:?}"
);
}
#[test]
fn empty_task_returns_invalid_params() {
let (tool, _dir) = make_tool();
let err = tool
.call(serde_json::json!({ "task": " " }))
.expect_err("must reject empty task");
assert!(
matches!(err, ToolError::InvalidParams(_)),
"expected InvalidParams, got: {err:?}"
);
}
#[test]
fn empty_model_override_returns_invalid_params() {
let (tool, _dir) = make_tool();
let err = tool
.call(serde_json::json!({ "task": "hello", "model": "" }))
.expect_err("must reject empty model");
assert!(
matches!(err, ToolError::InvalidParams(_)),
"expected InvalidParams, got: {err:?}"
);
}
#[test]
fn valid_task_writes_event_to_ledger() {
let dir = tempfile::tempdir().expect("temp dir");
let log_path = dir.path().join("events.jsonl");
let tool = make_tool_with_log(log_path.clone());
let result = tool
.call(serde_json::json!({ "task": "diagnose the system" }))
.expect("offline run must succeed");
assert_eq!(result["task"], "diagnose the system");
assert!(result["raw_hash"].as_str().is_some());
assert!(result["response_text"].as_str().is_some());
assert!(result["model"].as_str().is_some());
let session_indexed = result["session_indexed"]
.as_bool()
.expect("session_indexed must be a boolean");
assert_eq!(result["persisted"], result["session_indexed"]);
if session_indexed {
let log =
cortex_ledger::JsonlLog::open(&log_path).expect("JSONL log opens after tool call");
assert_eq!(
log.len(),
1,
"one AgentResponse event must be in the JSONL log"
);
}
}
#[test]
fn context_memories_used_is_zero_for_empty_store() {
let (tool, _dir) = make_tool();
let result = tool
.call(serde_json::json!({ "task": "summarise project status" }))
.expect("offline run must succeed");
let used = result["context_memories_used"]
.as_u64()
.expect("context_memories_used must be a non-negative integer");
assert_eq!(used, 0, "empty store produces no context memories");
}
#[test]
fn context_memories_used_reflects_active_memory_count_and_pack_is_non_empty() {
let pool_inner = cortex_store::Pool::open_in_memory().expect("in-memory sqlite");
cortex_store::migrate::apply_pending(&pool_inner).expect("apply migrations");
let event_id = cortex_core::EventId::new().to_string();
let memory_id = cortex_core::MemoryId::new().to_string();
pool_inner
.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json,
event_type, trace_id, session_id, domain_tags_json, payload_json,
payload_hash, prev_event_hash, event_hash
) VALUES (
?1, 1, '2026-05-13T00:00:00Z', '2026-05-13T00:00:00Z',
'{\"type\":\"tool\",\"name\":\"test\"}', 'cortex.event.tool_result.v1',
NULL, NULL, '[]', '{\"fixture\":true}',
'pp_test', NULL, 'eh_test'
);",
rusqlite::params![event_id],
)
.expect("insert source event fixture");
let source_events_json = serde_json::json!([event_id]).to_string();
pool_inner
.execute(
"INSERT INTO memories (
id, memory_type, status, claim, source_episodes_json,
source_events_json, domains_json, salience_json, confidence,
authority, applies_when_json, does_not_apply_when_json,
created_at, updated_at
) VALUES (
?1, 'semantic', 'active',
'Cortex injects memory context into every LLM request.',
'[]', ?2, '[]',
json_object('score', 0.9), 0.9, 'user',
'[]', '[]',
'2026-05-13T00:00:00Z', '2026-05-13T00:00:00Z'
);",
rusqlite::params![memory_id, source_events_json],
)
.expect("insert active memory fixture");
let dir = tempfile::tempdir().expect("temp dir");
let log_path = dir.path().join("events.jsonl");
let pool = Arc::new(Mutex::new(pool_inner));
let tool = CortexRunTool::new(Arc::clone(&pool), log_path);
let result = tool
.call(serde_json::json!({ "task": "summarise project status" }))
.expect("offline run with seeded memories must succeed");
let used = result["context_memories_used"]
.as_u64()
.expect("context_memories_used must be a non-negative integer");
assert!(
used >= 1,
"one active memory must produce context_memories_used >= 1; got {used}"
);
}
}