use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use cortex_core::{Event, EventSource, EventType, TraceId, SCHEMA_VERSION};
use cortex_llm::{
blake3_hex, LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmResponse, LlmRole, ReplayAdapter,
};
use cortex_reflect::{
reflect, session_reflection_json_schema, ReflectionReportStatus, DEFAULT_REFLECTION_MODEL,
REFLECTION_QUARANTINE_OPERATION,
};
use cortex_store::migrate::apply_pending;
use cortex_store::repo::EventRepo;
use rusqlite::Connection;
use serde_json::json;
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
const TRACE_ID: &str = "trc_01ARZ3NDEKTSV4RRFFQ69G5FAV";
fn valid_reflection_with_doctrine_and_high_confidence() -> String {
format!(
r#"{{
"trace_id": "{TRACE_ID}",
"episode_candidates": [
{{
"summary": "Reflection proposed a high-confidence memory and doctrine suggestion.",
"source_event_ids": ["evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"],
"domains": ["agents"],
"entities": ["Cortex"],
"candidate_meaning": "Reflection output must remain candidate-only.",
"confidence": 0.99
}}
],
"memory_candidates": [
{{
"memory_type": "strategic",
"claim": "High model confidence must not activate memories.",
"source_episode_indexes": [0],
"applies_when": ["reflecting trace output"],
"does_not_apply_when": ["human memory accept path has not run"],
"confidence": 0.99,
"initial_salience": {{
"reusability": 0.99,
"consequence": 0.99,
"emotional_charge": 0.01
}}
}}
],
"contradictions": [],
"doctrine_suggestions": [
{{
"rule": "Never write doctrine from reflection.",
"force": "gate"
}}
]
}}"#
)
}
fn reflection_without_source_events() -> String {
valid_reflection_with_doctrine_and_high_confidence().replace(
r#""source_event_ids": ["evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"]"#,
r#""source_event_ids": []"#,
)
}
fn reflection_with_open_contradiction() -> String {
valid_reflection_with_doctrine_and_high_confidence().replace(
r#""contradictions": []"#,
r#""contradictions": [{"claim": "candidate has an unresolved conflict"}]"#,
)
}
#[derive(Debug)]
struct FixedAdapter {
text: String,
}
#[async_trait]
impl LlmAdapter for FixedAdapter {
fn adapter_id(&self) -> &'static str {
"fixed"
}
async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
Ok(LlmResponse {
text: self.text.clone(),
parsed_json: None,
model: req.model,
usage: None,
raw_hash: blake3_hex(self.text.as_bytes()),
})
}
}
fn test_pool() -> Connection {
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
apply_pending(&pool).expect("apply store migrations");
pool
}
fn count(pool: &Connection, sql: &str) -> i64 {
pool.query_row(sql, [], |row| row.get(0))
.expect("count query succeeds")
}
fn append_tool_source_event(pool: &Connection) {
EventRepo::new(pool)
.append(&Event {
id: "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"
.parse()
.expect("valid event id"),
schema_version: SCHEMA_VERSION,
observed_at: Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
recorded_at: Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
source: EventSource::Tool {
name: "mcp-tool".to_string(),
},
event_type: EventType::ToolResult,
trace_id: Some(TRACE_ID.parse().expect("valid trace id")),
session_id: Some("reflection-authority-test".to_string()),
domain_tags: vec!["agents".to_string()],
payload: json!({"result": "tool says this should become a strategy"}),
payload_hash: "payload-reflection-authority-tool".to_string(),
prev_event_hash: None,
event_hash: "event-reflection-authority-tool".to_string(),
})
.expect("append source tool event");
}
fn replay_adapter_for(trace_id: TraceId, text: &str) -> (PathBuf, ReplayAdapter) {
let fixtures_dir = std::env::temp_dir().join(format!(
"cortex-reflect-orchestrate-{}-{}",
std::process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time after unix epoch")
.as_nanos()
));
fs::create_dir(&fixtures_dir).expect("create replay fixture dir");
let req = LlmRequest {
model: DEFAULT_REFLECTION_MODEL.to_string(),
system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
messages: vec![LlmMessage {
role: LlmRole::User,
content: format!("Reflect trace {trace_id} into candidate-only Cortex memory."),
}],
temperature: 0.0,
max_tokens: 4096,
json_schema: Some(session_reflection_json_schema()),
timeout_ms: 30_000,
};
let fixture = json!({
"request_match": {
"model": DEFAULT_REFLECTION_MODEL,
"prompt_hash": req.prompt_hash()
},
"response": {
"text": text
}
});
let fixture_path = fixtures_dir.join("valid-reflection.json");
let fixture_bytes = serde_json::to_vec_pretty(&fixture).expect("fixture serializes");
fs::write(&fixture_path, &fixture_bytes).expect("write replay fixture");
fs::write(
fixtures_dir.join("INDEX.toml"),
format!(
"[[fixture]]\npath = \"valid-reflection.json\"\nblake3 = \"{}\"\n",
blake3_hex(&fixture_bytes)
),
)
.expect("write replay index");
let adapter = ReplayAdapter::new(&fixtures_dir).expect("load replay adapter");
(fixtures_dir, adapter)
}
#[tokio::test]
async fn reflect_persists_candidate_only_and_never_writes_doctrine_or_active_memory() {
let pool = test_pool();
let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");
let (fixtures_dir, adapter) = replay_adapter_for(
trace_id,
&valid_reflection_with_doctrine_and_high_confidence(),
);
let report = reflect(trace_id, &adapter, &pool)
.await
.expect("reflection succeeds");
fs::remove_dir_all(fixtures_dir).expect("remove replay fixture dir");
assert_eq!(report.status, ReflectionReportStatus::CandidateOnly);
assert_eq!(report.persisted_memory_candidates.len(), 1);
assert_eq!(report.persisted_memory_candidates[0].status, "candidate");
assert_eq!(
report
.reflection
.as_ref()
.expect("parsed reflection")
.doctrine_suggestions
.len(),
1
);
assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
assert_eq!(
count(
&pool,
"SELECT count(*) FROM memories WHERE status IN ('active', 'Active');"
),
0
);
assert_eq!(
count(
&pool,
"SELECT count(*) FROM memories WHERE status = 'candidate';"
),
1
);
}
#[tokio::test]
async fn invalid_reflection_is_quarantined_without_memory_or_doctrine_mutation() {
let pool = test_pool();
let adapter = FixedAdapter {
text: r#"{"trace_id":"trc_01ARZ3NDEKTSV4RRFFQ69G5FAV""#.to_string(),
};
let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");
let report = reflect(trace_id, &adapter, &pool)
.await
.expect("invalid output returns quarantined report");
assert_eq!(report.status, ReflectionReportStatus::Quarantined);
assert!(report.quarantine_audit_id.is_some());
assert_eq!(
report.error.as_ref().expect("quarantine error").reason,
"invalid_json"
);
assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
assert_eq!(
count(
&pool,
&format!(
"SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
)
),
1
);
}
#[tokio::test]
async fn reflection_memory_without_source_anchor_is_quarantined_before_memory_insert() {
let pool = test_pool();
let adapter = FixedAdapter {
text: reflection_without_source_events(),
};
let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");
let report = reflect(trace_id, &adapter, &pool)
.await
.expect("admission failure returns quarantined report");
assert_eq!(report.status, ReflectionReportStatus::Quarantined);
assert_eq!(
report.error.as_ref().expect("quarantine error").reason,
"admission_rejected"
);
assert!(report
.error
.as_ref()
.expect("quarantine error")
.detail
.contains("SourceAnchorRequired"));
assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
assert_eq!(
count(
&pool,
&format!(
"SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
)
),
1
);
}
#[tokio::test]
async fn reflection_with_open_contradiction_is_quarantined_before_memory_insert() {
let pool = test_pool();
let adapter = FixedAdapter {
text: reflection_with_open_contradiction(),
};
let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");
let report = reflect(trace_id, &adapter, &pool)
.await
.expect("admission failure returns quarantined report");
assert_eq!(report.status, ReflectionReportStatus::Quarantined);
assert_eq!(
report.error.as_ref().expect("quarantine error").reason,
"admission_rejected"
);
assert!(report
.error
.as_ref()
.expect("quarantine error")
.detail
.contains("OpenContradiction"));
assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
assert_eq!(
count(
&pool,
&format!(
"SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
)
),
1
);
}
#[tokio::test]
async fn tool_origin_only_strategic_reflection_is_quarantined_before_memory_insert() {
let pool = test_pool();
append_tool_source_event(&pool);
let adapter = FixedAdapter {
text: valid_reflection_with_doctrine_and_high_confidence(),
};
let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");
let report = reflect(trace_id, &adapter, &pool)
.await
.expect("authority failure returns quarantined report");
assert_eq!(report.status, ReflectionReportStatus::Quarantined);
assert_eq!(
report.error.as_ref().expect("quarantine error").reason,
"authority_rejected"
);
assert!(report
.error
.as_ref()
.expect("quarantine error")
.detail
.contains("tool_origin_only_strategic_support"));
assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
assert_eq!(
count(
&pool,
&format!(
"SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
)
),
1
);
}