use std::fmt::Write as _;
use std::path::PathBuf;
use clap::Args;
use cortex_core::{
compose_policy_outcomes, AuthorityClass, PolicyContribution, PolicyDecision, PolicyOutcome,
TraceId,
};
use cortex_llm::{
blake3_hex, LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmRole, ReplayAdapter,
};
use cortex_memory::AdmissionDecision;
use cortex_reflect::{admission_request_for_memory, parse_reflection, SessionReflection};
use serde::Serialize;
use serde_json::json;
use crate::exit::Exit;
pub const RULE_ADMISSION_DECISION: &str = "reflect.admission_decision";
pub const RULE_FIXTURE_INTEGRITY: &str = "reflect.fixture_integrity";
pub const RULE_ADAPTER_AUTHORITY_CLASS: &str = "reflect.adapter_authority_class";
#[derive(Debug, Args)]
pub struct ReflectArgs {
#[arg(long, value_name = "TRACE_ID")]
pub trace: TraceId,
#[arg(long, value_name = "MODEL")]
pub model: String,
#[arg(long = "fixtures-dir", value_name = "DIR")]
pub fixtures_dir: Option<PathBuf>,
}
#[derive(Debug, Serialize)]
struct ReflectEnvelope {
#[serde(skip_serializing_if = "Option::is_none")]
payload: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
diagnostic: Option<serde_json::Value>,
policy_outcome: PolicyDecision,
}
pub fn run(args: ReflectArgs) -> Exit {
if args.model != "replay" {
eprintln!(
"cortex reflect: unsupported --model `{}`. Only `replay` is wired at lane 1.C.",
args.model
);
return Exit::Usage;
}
let fixtures_dir = resolve_fixtures_dir(args.fixtures_dir);
let adapter = match ReplayAdapter::new(&fixtures_dir) {
Ok(a) => a,
Err(LlmError::FixtureIntegrityFailed(msg)) => {
eprintln!("cortex reflect: fixture integrity failed: {msg}");
return Exit::QuarantinedInput;
}
Err(e) => {
eprintln!("cortex reflect: replay adapter setup failed: {e}");
return Exit::Internal;
}
};
let req = build_request(&args.trace);
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("cortex reflect: tokio runtime: {e}");
return Exit::Internal;
}
};
let resp = match rt.block_on(adapter.complete(req)) {
Ok(r) => r,
Err(LlmError::FixtureIntegrityFailed(msg)) => {
eprintln!("cortex reflect: fixture integrity failed: {msg}");
return Exit::QuarantinedInput;
}
Err(LlmError::NoFixture { model, prompt_hash }) => {
eprintln!(
"cortex reflect: no replay fixture for model={model} prompt_hash={prompt_hash}"
);
return Exit::PreconditionUnmet;
}
Err(e) => {
eprintln!("cortex reflect: adapter error: {e}");
return Exit::Internal;
}
};
let adapter_id = adapter.adapter_id();
let raw_hash = blake3_hex(resp.text.as_bytes());
let payload_value = parsed_payload(&resp);
let admission_summary = evaluate_admission(&payload_value, adapter_id, &raw_hash);
let policy_outcome = compose_reflect_policy(&admission_summary);
let envelope = match policy_outcome.final_outcome {
PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => ReflectEnvelope {
payload: Some(payload_value),
diagnostic: None,
policy_outcome,
},
PolicyOutcome::Quarantine => ReflectEnvelope {
payload: None,
diagnostic: Some(admission_diagnostic_json(&admission_summary)),
policy_outcome,
},
PolicyOutcome::Reject => {
eprintln!(
"cortex reflect: admission rejected: {}",
admission_summary.reason_label()
);
return Exit::QuarantinedInput;
}
};
let mut out = String::new();
writeln!(
&mut out,
"{}",
serde_json::to_string_pretty(&envelope)
.unwrap_or_else(|_| "{\"error\":\"serialize\"}".into())
)
.ok();
print!("{out}");
Exit::Ok
}
pub fn build_request(trace: &TraceId) -> LlmRequest {
LlmRequest {
model: "claude-3-5-sonnet-20240620".into(),
system: "Cortex reflection v1: produce a SessionReflection JSON for the given trace."
.into(),
messages: vec![LlmMessage {
role: LlmRole::User,
content: format!("trace: {trace}"),
}],
temperature: 0.0,
max_tokens: 1024,
json_schema: None,
timeout_ms: 30_000,
}
}
fn resolve_fixtures_dir(explicit: Option<PathBuf>) -> PathBuf {
if let Some(p) = explicit {
return p;
}
if let Some(p) = std::env::var_os("CORTEX_FIXTURES_DIR") {
return PathBuf::from(p);
}
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("replay")
}
fn parsed_payload(resp: &cortex_llm::LlmResponse) -> serde_json::Value {
if let Some(v) = resp.parsed_json.clone() {
return v;
}
match serde_json::from_str(&resp.text) {
Ok(v) => v,
Err(_) => serde_json::Value::String(resp.text.clone()),
}
}
#[derive(Debug)]
struct AdmissionSummary {
outcome: PolicyOutcome,
reason: &'static str,
detail: Option<String>,
}
impl AdmissionSummary {
fn allow() -> Self {
Self {
outcome: PolicyOutcome::Allow,
reason: "AXIOM admission accepted reflection candidates",
detail: None,
}
}
fn quarantine(detail: String) -> Self {
Self {
outcome: PolicyOutcome::Quarantine,
reason: "AXIOM admission quarantined reflection candidates",
detail: Some(detail),
}
}
fn reject(detail: String) -> Self {
Self {
outcome: PolicyOutcome::Reject,
reason: "AXIOM admission rejected reflection candidates",
detail: Some(detail),
}
}
fn reason_label(&self) -> String {
match &self.detail {
Some(detail) => format!("{} ({detail})", self.reason),
None => self.reason.to_string(),
}
}
}
fn evaluate_admission(
payload: &serde_json::Value,
adapter_id: &str,
raw_hash: &str,
) -> AdmissionSummary {
let reflection = match parse_reflection_value(payload) {
Ok(r) => r,
Err(detail) => {
return AdmissionSummary::reject(format!("reflection parse failed: {detail}"));
}
};
let mut quarantine_detail: Option<String> = None;
for (memory_index, memory) in reflection.memory_candidates.iter().enumerate() {
let request = admission_request_for_memory(&reflection, memory, adapter_id, raw_hash);
match request.admission_decision() {
AdmissionDecision::AdmitCandidate => {}
AdmissionDecision::Reject { reasons } => {
return AdmissionSummary::reject(format!(
"memory_candidates[{memory_index}] reasons={reasons:?}"
));
}
AdmissionDecision::Quarantine { reasons } => {
let detail = format!("memory_candidates[{memory_index}] reasons={reasons:?}");
quarantine_detail.get_or_insert(detail);
}
}
}
if let Some(detail) = quarantine_detail {
AdmissionSummary::quarantine(detail)
} else {
AdmissionSummary::allow()
}
}
fn parse_reflection_value(value: &serde_json::Value) -> Result<SessionReflection, String> {
let body = serde_json::to_string(value).map_err(|err| err.to_string())?;
parse_reflection(&body).map_err(|err| err.to_string())
}
fn compose_reflect_policy(admission: &AdmissionSummary) -> PolicyDecision {
let admission_contribution =
PolicyContribution::new(RULE_ADMISSION_DECISION, admission.outcome, admission.reason)
.expect("static reflect admission contribution is valid");
let fixture_contribution = PolicyContribution::new(
RULE_FIXTURE_INTEGRITY,
PolicyOutcome::Allow,
"replay adapter fixture integrity verified at construction and per call",
)
.expect("static reflect fixture integrity contribution is valid");
let authority_class = adapter_authority_class("replay");
let authority_contribution = PolicyContribution::new(
RULE_ADAPTER_AUTHORITY_CLASS,
adapter_authority_class_outcome(authority_class),
adapter_authority_class_reason(authority_class),
)
.expect("static reflect adapter authority class contribution is valid");
compose_policy_outcomes(
vec![
admission_contribution,
fixture_contribution,
authority_contribution,
],
None,
)
}
fn adapter_authority_class(adapter_id: &str) -> AuthorityClass {
match adapter_id {
"replay" => AuthorityClass::Derived,
_ => AuthorityClass::Untrusted,
}
}
fn adapter_authority_class_outcome(class: AuthorityClass) -> PolicyOutcome {
match class {
AuthorityClass::Derived | AuthorityClass::Observed => PolicyOutcome::Allow,
AuthorityClass::Verified | AuthorityClass::Operator => PolicyOutcome::Allow,
AuthorityClass::Untrusted => PolicyOutcome::Reject,
}
}
fn adapter_authority_class_reason(class: AuthorityClass) -> &'static str {
match class {
AuthorityClass::Derived => {
"reflection adapter classified as Derived; candidate-only emission allowed"
}
AuthorityClass::Observed => {
"reflection adapter classified as Observed; candidate-only emission allowed"
}
AuthorityClass::Verified => {
"reflection adapter classified as Verified; candidate-only emission allowed"
}
AuthorityClass::Operator => {
"reflection adapter classified as Operator; candidate-only emission allowed"
}
AuthorityClass::Untrusted => {
"reflection adapter classified as Untrusted; reflection emission refused"
}
}
}
fn admission_diagnostic_json(admission: &AdmissionSummary) -> serde_json::Value {
json!({
"reason": admission.reason,
"detail": admission.detail.as_deref().unwrap_or(""),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_request_is_deterministic() {
let t: TraceId = "trc_01ARZ3NDEKTSV4RRFFQ69G5FAW".parse().unwrap();
let a = build_request(&t);
let b = build_request(&t);
assert_eq!(a.prompt_hash(), b.prompt_hash());
}
#[test]
fn build_request_differs_per_trace() {
let t1: TraceId = "trc_01ARZ3NDEKTSV4RRFFQ69G5FAW".parse().unwrap();
let t2: TraceId = "trc_01ARZ3NDEKTSV4RRFFQ69G5FAX".parse().unwrap();
assert_ne!(
build_request(&t1).prompt_hash(),
build_request(&t2).prompt_hash()
);
}
#[test]
fn compose_reflect_policy_allows_when_admission_allows() {
let policy = compose_reflect_policy(&AdmissionSummary::allow());
assert_eq!(policy.final_outcome, PolicyOutcome::Allow);
let rule_ids: Vec<_> = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.map(|contribution| contribution.rule_id.as_str().to_string())
.collect();
assert!(rule_ids.iter().any(|id| id == RULE_ADMISSION_DECISION));
assert!(rule_ids.iter().any(|id| id == RULE_FIXTURE_INTEGRITY));
assert!(rule_ids.iter().any(|id| id == RULE_ADAPTER_AUTHORITY_CLASS));
}
#[test]
fn compose_reflect_policy_quarantines_when_admission_quarantines() {
let policy =
compose_reflect_policy(&AdmissionSummary::quarantine("open contradiction".into()));
assert_eq!(policy.final_outcome, PolicyOutcome::Quarantine);
}
#[test]
fn compose_reflect_policy_rejects_when_admission_rejects() {
let policy =
compose_reflect_policy(&AdmissionSummary::reject("missing source anchor".into()));
assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
}
#[test]
fn evaluate_admission_returns_reject_for_unparseable_payload() {
let payload = serde_json::json!({"not": "a session reflection"});
let summary = evaluate_admission(&payload, "replay", "hash_test");
assert_eq!(summary.outcome, PolicyOutcome::Reject);
}
#[test]
fn evaluate_admission_allows_empty_session_reflection() {
let payload = serde_json::json!({
"trace_id": "trc_01ARZ3NDEKTSV4RRFFQ69G5FAW",
"episode_candidates": [],
"memory_candidates": [],
"contradictions": [],
"doctrine_suggestions": []
});
let summary = evaluate_admission(&payload, "replay", "hash_test");
assert_eq!(summary.outcome, PolicyOutcome::Allow);
}
#[test]
fn evaluate_admission_quarantines_open_contradiction() {
let payload = serde_json::json!({
"trace_id": "trc_01ARZ3NDEKTSV4RRFFQ69G5FAW",
"episode_candidates": [{
"summary": "demo",
"source_event_ids": ["evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"],
"domains": ["agents"],
"entities": ["Cortex"],
"candidate_meaning": null,
"confidence": 0.5
}],
"memory_candidates": [{
"memory_type": "strategic",
"claim": "Reflection memory remains candidate-only.",
"source_episode_indexes": [0],
"applies_when": ["reflecting"],
"does_not_apply_when": ["promoting"],
"confidence": 0.8,
"initial_salience": {
"reusability": 0.5,
"consequence": 0.5,
"emotional_charge": 0.0
}
}],
"contradictions": [{"claim": "conflict"}],
"doctrine_suggestions": []
});
let summary = evaluate_admission(&payload, "replay", "hash_test");
assert_eq!(summary.outcome, PolicyOutcome::Quarantine);
}
#[test]
fn adapter_authority_class_classifies_replay_as_derived() {
assert_eq!(adapter_authority_class("replay"), AuthorityClass::Derived);
}
#[test]
fn adapter_authority_class_defaults_unknown_adapters_to_untrusted() {
assert_eq!(
adapter_authority_class("unknown-future-adapter"),
AuthorityClass::Untrusted
);
assert_eq!(
adapter_authority_class_outcome(AuthorityClass::Untrusted),
PolicyOutcome::Reject
);
}
}