crtx-reflect 0.1.1

Reflection orchestration, prompts, candidate parsing, and schema validation.
Documentation
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use cortex_core::{Event, EventSource, EventType, TraceId, SCHEMA_VERSION};
use cortex_llm::{
    blake3_hex, LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmResponse, LlmRole, ReplayAdapter,
};
use cortex_reflect::{
    reflect, session_reflection_json_schema, ReflectionReportStatus, DEFAULT_REFLECTION_MODEL,
    REFLECTION_QUARANTINE_OPERATION,
};
use cortex_store::migrate::apply_pending;
use cortex_store::repo::EventRepo;
use rusqlite::Connection;
use serde_json::json;
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};

const TRACE_ID: &str = "trc_01ARZ3NDEKTSV4RRFFQ69G5FAV";

fn valid_reflection_with_doctrine_and_high_confidence() -> String {
    format!(
        r#"{{
  "trace_id": "{TRACE_ID}",
  "episode_candidates": [
    {{
      "summary": "Reflection proposed a high-confidence memory and doctrine suggestion.",
      "source_event_ids": ["evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"],
      "domains": ["agents"],
      "entities": ["Cortex"],
      "candidate_meaning": "Reflection output must remain candidate-only.",
      "confidence": 0.99
    }}
  ],
  "memory_candidates": [
    {{
      "memory_type": "strategic",
      "claim": "High model confidence must not activate memories.",
      "source_episode_indexes": [0],
      "applies_when": ["reflecting trace output"],
      "does_not_apply_when": ["human memory accept path has not run"],
      "confidence": 0.99,
      "initial_salience": {{
        "reusability": 0.99,
        "consequence": 0.99,
        "emotional_charge": 0.01
      }}
    }}
  ],
  "contradictions": [],
  "doctrine_suggestions": [
    {{
      "rule": "Never write doctrine from reflection.",
      "force": "gate"
    }}
  ]
}}"#
    )
}

fn reflection_without_source_events() -> String {
    valid_reflection_with_doctrine_and_high_confidence().replace(
        r#""source_event_ids": ["evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"]"#,
        r#""source_event_ids": []"#,
    )
}

fn reflection_with_open_contradiction() -> String {
    valid_reflection_with_doctrine_and_high_confidence().replace(
        r#""contradictions": []"#,
        r#""contradictions": [{"claim": "candidate has an unresolved conflict"}]"#,
    )
}

#[derive(Debug)]
struct FixedAdapter {
    text: String,
}

#[async_trait]
impl LlmAdapter for FixedAdapter {
    fn adapter_id(&self) -> &'static str {
        "fixed"
    }

    async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
        Ok(LlmResponse {
            text: self.text.clone(),
            parsed_json: None,
            model: req.model,
            usage: None,
            raw_hash: blake3_hex(self.text.as_bytes()),
        })
    }
}

fn test_pool() -> Connection {
    let pool = Connection::open_in_memory().expect("open in-memory sqlite");
    apply_pending(&pool).expect("apply store migrations");
    pool
}

fn count(pool: &Connection, sql: &str) -> i64 {
    pool.query_row(sql, [], |row| row.get(0))
        .expect("count query succeeds")
}

fn append_tool_source_event(pool: &Connection) {
    EventRepo::new(pool)
        .append(&Event {
            id: "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"
                .parse()
                .expect("valid event id"),
            schema_version: SCHEMA_VERSION,
            observed_at: Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
            recorded_at: Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
            source: EventSource::Tool {
                name: "mcp-tool".to_string(),
            },
            event_type: EventType::ToolResult,
            trace_id: Some(TRACE_ID.parse().expect("valid trace id")),
            session_id: Some("reflection-authority-test".to_string()),
            domain_tags: vec!["agents".to_string()],
            payload: json!({"result": "tool says this should become a strategy"}),
            payload_hash: "payload-reflection-authority-tool".to_string(),
            prev_event_hash: None,
            event_hash: "event-reflection-authority-tool".to_string(),
        })
        .expect("append source tool event");
}

fn replay_adapter_for(trace_id: TraceId, text: &str) -> (PathBuf, ReplayAdapter) {
    let fixtures_dir = std::env::temp_dir().join(format!(
        "cortex-reflect-orchestrate-{}-{}",
        std::process::id(),
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("system time after unix epoch")
            .as_nanos()
    ));
    fs::create_dir(&fixtures_dir).expect("create replay fixture dir");

    let req = LlmRequest {
        model: DEFAULT_REFLECTION_MODEL.to_string(),
        system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
        messages: vec![LlmMessage {
            role: LlmRole::User,
            content: format!("Reflect trace {trace_id} into candidate-only Cortex memory."),
        }],
        temperature: 0.0,
        max_tokens: 4096,
        json_schema: Some(session_reflection_json_schema()),
        timeout_ms: 30_000,
    };

    let fixture = json!({
        "request_match": {
            "model": DEFAULT_REFLECTION_MODEL,
            "prompt_hash": req.prompt_hash()
        },
        "response": {
            "text": text
        }
    });
    let fixture_path = fixtures_dir.join("valid-reflection.json");
    let fixture_bytes = serde_json::to_vec_pretty(&fixture).expect("fixture serializes");
    fs::write(&fixture_path, &fixture_bytes).expect("write replay fixture");
    fs::write(
        fixtures_dir.join("INDEX.toml"),
        format!(
            "[[fixture]]\npath = \"valid-reflection.json\"\nblake3 = \"{}\"\n",
            blake3_hex(&fixture_bytes)
        ),
    )
    .expect("write replay index");

    let adapter = ReplayAdapter::new(&fixtures_dir).expect("load replay adapter");
    (fixtures_dir, adapter)
}

#[tokio::test]
async fn reflect_persists_candidate_only_and_never_writes_doctrine_or_active_memory() {
    let pool = test_pool();
    let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");
    let (fixtures_dir, adapter) = replay_adapter_for(
        trace_id,
        &valid_reflection_with_doctrine_and_high_confidence(),
    );

    let report = reflect(trace_id, &adapter, &pool)
        .await
        .expect("reflection succeeds");
    fs::remove_dir_all(fixtures_dir).expect("remove replay fixture dir");

    assert_eq!(report.status, ReflectionReportStatus::CandidateOnly);
    assert_eq!(report.persisted_memory_candidates.len(), 1);
    assert_eq!(report.persisted_memory_candidates[0].status, "candidate");
    assert_eq!(
        report
            .reflection
            .as_ref()
            .expect("parsed reflection")
            .doctrine_suggestions
            .len(),
        1
    );

    assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
    assert_eq!(
        count(
            &pool,
            "SELECT count(*) FROM memories WHERE status IN ('active', 'Active');"
        ),
        0
    );
    assert_eq!(
        count(
            &pool,
            "SELECT count(*) FROM memories WHERE status = 'candidate';"
        ),
        1
    );
}

#[tokio::test]
async fn invalid_reflection_is_quarantined_without_memory_or_doctrine_mutation() {
    let pool = test_pool();
    let adapter = FixedAdapter {
        text: r#"{"trace_id":"trc_01ARZ3NDEKTSV4RRFFQ69G5FAV""#.to_string(),
    };
    let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");

    let report = reflect(trace_id, &adapter, &pool)
        .await
        .expect("invalid output returns quarantined report");

    assert_eq!(report.status, ReflectionReportStatus::Quarantined);
    assert!(report.quarantine_audit_id.is_some());
    assert_eq!(
        report.error.as_ref().expect("quarantine error").reason,
        "invalid_json"
    );

    assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
    assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
    assert_eq!(
        count(
            &pool,
            &format!(
                "SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
            )
        ),
        1
    );
}

#[tokio::test]
async fn reflection_memory_without_source_anchor_is_quarantined_before_memory_insert() {
    let pool = test_pool();
    let adapter = FixedAdapter {
        text: reflection_without_source_events(),
    };
    let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");

    let report = reflect(trace_id, &adapter, &pool)
        .await
        .expect("admission failure returns quarantined report");

    assert_eq!(report.status, ReflectionReportStatus::Quarantined);
    assert_eq!(
        report.error.as_ref().expect("quarantine error").reason,
        "admission_rejected"
    );
    assert!(report
        .error
        .as_ref()
        .expect("quarantine error")
        .detail
        .contains("SourceAnchorRequired"));

    assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
    assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
    assert_eq!(
        count(
            &pool,
            &format!(
                "SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
            )
        ),
        1
    );
}

#[tokio::test]
async fn reflection_with_open_contradiction_is_quarantined_before_memory_insert() {
    let pool = test_pool();
    let adapter = FixedAdapter {
        text: reflection_with_open_contradiction(),
    };
    let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");

    let report = reflect(trace_id, &adapter, &pool)
        .await
        .expect("admission failure returns quarantined report");

    assert_eq!(report.status, ReflectionReportStatus::Quarantined);
    assert_eq!(
        report.error.as_ref().expect("quarantine error").reason,
        "admission_rejected"
    );
    assert!(report
        .error
        .as_ref()
        .expect("quarantine error")
        .detail
        .contains("OpenContradiction"));

    assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
    assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
    assert_eq!(
        count(
            &pool,
            &format!(
                "SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
            )
        ),
        1
    );
}

#[tokio::test]
async fn tool_origin_only_strategic_reflection_is_quarantined_before_memory_insert() {
    let pool = test_pool();
    append_tool_source_event(&pool);
    let adapter = FixedAdapter {
        text: valid_reflection_with_doctrine_and_high_confidence(),
    };
    let trace_id: TraceId = TRACE_ID.parse().expect("valid trace id");

    let report = reflect(trace_id, &adapter, &pool)
        .await
        .expect("authority failure returns quarantined report");

    assert_eq!(report.status, ReflectionReportStatus::Quarantined);
    assert_eq!(
        report.error.as_ref().expect("quarantine error").reason,
        "authority_rejected"
    );
    assert!(report
        .error
        .as_ref()
        .expect("quarantine error")
        .detail
        .contains("tool_origin_only_strategic_support"));

    assert_eq!(count(&pool, "SELECT count(*) FROM doctrine;"), 0);
    assert_eq!(count(&pool, "SELECT count(*) FROM memories;"), 0);
    assert_eq!(
        count(
            &pool,
            &format!(
                "SELECT count(*) FROM audit_records WHERE operation = '{REFLECTION_QUARANTINE_OPERATION}';"
            )
        ),
        1
    );
}