crtx-reflect 0.1.1

Reflection orchestration, prompts, candidate parsing, and schema validation.
Documentation
//! Minimal reflection orchestration for Lane 2.B.

use chrono::Utc;
use cortex_core::{MemoryId, TraceId};
use cortex_llm::{LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmResponse, LlmRole};
use cortex_store::repo::{EventRepo, MemoryCandidate as StoreMemoryCandidate, MemoryRepo};
use cortex_store::{Pool, StoreError};
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;

use crate::admission::validate_reflection_admission;
use crate::authority::validate_reflection_authority;
use crate::parse::{parse_reflection, session_reflection_json_schema};
use crate::quarantine::quarantine_record;
use crate::schema::{MemoryCandidate, MemoryType, SessionReflection};
use crate::ReflectError;

/// Model used by replay fixtures unless a caller needs a different adapter model.
pub const DEFAULT_REFLECTION_MODEL: &str = "replay-reflection-v1";

/// Stable status for reflect outputs.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReflectionReportStatus {
    /// Model output parsed and any durable memory rows are still candidates.
    CandidateOnly,
    /// Model output was rejected and a quarantine audit row was written.
    Quarantined,
}

/// Candidate persistence detail returned by [`reflect`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PersistedMemoryCandidate {
    /// Inserted memory id.
    pub id: MemoryId,
    /// Store status. Reflect may only create `candidate`.
    pub status: String,
}

/// Non-panicking report shape for reflection orchestration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ReflectionReport {
    /// Trace requested by the caller.
    pub trace_id: TraceId,
    /// Overall report status.
    pub status: ReflectionReportStatus,
    /// Adapter id that produced the output.
    pub adapter_id: String,
    /// Model echoed by the adapter.
    pub model: String,
    /// Hash of the raw adapter response.
    pub raw_hash: String,
    /// Parsed reflection, when candidate-only.
    pub reflection: Option<SessionReflection>,
    /// Durable candidate memory rows inserted by this call.
    pub persisted_memory_candidates: Vec<PersistedMemoryCandidate>,
    /// Quarantine audit row id, when quarantined.
    pub quarantine_audit_id: Option<cortex_core::AuditRecordId>,
    /// Parser reason, when quarantined.
    pub error: Option<ReflectionRejection>,
}

/// Parser rejection detail returned inside a quarantined report.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReflectionRejection {
    /// Stable parser reason code.
    pub reason: String,
    /// Operator-facing parser detail, without raw model output.
    pub detail: String,
}

/// Failures outside normal parse/schema quarantine.
#[derive(Debug, Error)]
pub enum ReflectOrchestrationError {
    /// LLM adapter failed before producing output.
    #[error("llm adapter failed: {0}")]
    Adapter(#[from] LlmError),
    /// Store operation failed.
    #[error("store failed: {0}")]
    Store(#[from] StoreError),
}

/// Reflect a trace through an LLM adapter and return candidate-only state.
///
/// Invalid JSON/schema output writes a quarantine audit row and returns a
/// quarantined report. Valid output may persist memory candidates, but never
/// writes doctrine and never promotes memories to `active`.
pub async fn reflect(
    trace_id: TraceId,
    adapter: &dyn LlmAdapter,
    pool: &Pool,
) -> Result<ReflectionReport, ReflectOrchestrationError> {
    let response = adapter.complete(reflection_request(trace_id)).await?;
    let adapter_id = adapter.adapter_id().to_string();
    let output = response_output(&response);

    match parse_reflection(&output) {
        Ok(reflection) => {
            if reflection.trace_id != trace_id {
                let err = ReflectError::InvalidSchema {
                    raw: output,
                    message: format!(
                        "reflection trace_id {} did not match requested trace_id {trace_id}",
                        reflection.trace_id
                    ),
                };
                let entry =
                    quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
                return Ok(quarantined_report(
                    trace_id, adapter_id, response, err, entry.id,
                ));
            }
            if let Err(err) =
                validate_reflection_admission(&reflection, &adapter_id, &response.raw_hash)
            {
                let err = ReflectError::AdmissionRejected {
                    raw: output,
                    message: err.to_string(),
                };
                let entry =
                    quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
                return Ok(quarantined_report(
                    trace_id, adapter_id, response, err, entry.id,
                ));
            }
            let events = EventRepo::new(pool);
            if let Err(err) = validate_reflection_authority(&reflection, |event_id| {
                events
                    .get_by_id(event_id)
                    .map(|event| event.map(|event| event.source))
            }) {
                let err = ReflectError::AuthorityRejected {
                    raw: output,
                    message: err.to_string(),
                };
                let entry =
                    quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
                return Ok(quarantined_report(
                    trace_id, adapter_id, response, err, entry.id,
                ));
            }
            let persisted_memory_candidates = persist_memory_candidates(&reflection, pool)?;
            Ok(ReflectionReport {
                trace_id,
                status: ReflectionReportStatus::CandidateOnly,
                adapter_id,
                model: response.model,
                raw_hash: response.raw_hash,
                reflection: Some(reflection),
                persisted_memory_candidates,
                quarantine_audit_id: None,
                error: None,
            })
        }
        Err(err) => {
            let entry = quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
            Ok(quarantined_report(
                trace_id, adapter_id, response, err, entry.id,
            ))
        }
    }
}

fn reflection_request(trace_id: TraceId) -> LlmRequest {
    LlmRequest {
        model: DEFAULT_REFLECTION_MODEL.to_string(),
        system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
        messages: vec![LlmMessage {
            role: LlmRole::User,
            content: format!("Reflect trace {trace_id} into candidate-only Cortex memory."),
        }],
        temperature: 0.0,
        max_tokens: 4096,
        json_schema: Some(session_reflection_json_schema()),
        timeout_ms: 30_000,
    }
}

fn response_output(response: &LlmResponse) -> String {
    response
        .parsed_json
        .as_ref()
        .map_or_else(|| response.text.clone(), serde_json::Value::to_string)
}

fn quarantined_report(
    trace_id: TraceId,
    adapter_id: String,
    response: LlmResponse,
    err: ReflectError,
    quarantine_audit_id: cortex_core::AuditRecordId,
) -> ReflectionReport {
    ReflectionReport {
        trace_id,
        status: ReflectionReportStatus::Quarantined,
        adapter_id,
        model: response.model,
        raw_hash: response.raw_hash,
        reflection: None,
        persisted_memory_candidates: Vec::new(),
        quarantine_audit_id: Some(quarantine_audit_id),
        error: Some(ReflectionRejection {
            reason: err.quarantine_reason().to_string(),
            detail: err.detail().to_string(),
        }),
    }
}

fn persist_memory_candidates(
    reflection: &SessionReflection,
    pool: &Pool,
) -> Result<Vec<PersistedMemoryCandidate>, StoreError> {
    let repo = MemoryRepo::new(pool);
    let now = Utc::now();
    let mut persisted = Vec::with_capacity(reflection.memory_candidates.len());

    for memory in &reflection.memory_candidates {
        let id = MemoryId::new();
        let source_events = source_event_ids(reflection, memory);
        let store_memory = StoreMemoryCandidate {
            id,
            memory_type: memory_type_wire(memory.memory_type).to_string(),
            claim: memory.claim.clone(),
            source_episodes_json: json!([]),
            source_events_json: json!(source_events),
            domains_json: json!(domains_for_memory(reflection, memory)),
            salience_json: json!({
                "reusability": memory.initial_salience.reusability,
                "consequence": memory.initial_salience.consequence,
                "emotional_charge": memory.initial_salience.emotional_charge
            }),
            confidence: memory.confidence,
            authority: "candidate".to_string(),
            applies_when_json: json!(memory.applies_when),
            does_not_apply_when_json: json!(memory.does_not_apply_when),
            created_at: now,
            updated_at: now,
        };
        repo.insert_candidate(&store_memory)?;
        persisted.push(PersistedMemoryCandidate {
            id,
            status: "candidate".to_string(),
        });
    }

    Ok(persisted)
}

fn source_event_ids(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
    memory
        .source_episode_indexes
        .iter()
        .flat_map(|idx| &reflection.episode_candidates[*idx].source_event_ids)
        .map(ToString::to_string)
        .collect()
}

fn domains_for_memory(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
    let mut domains = Vec::new();
    for idx in &memory.source_episode_indexes {
        for domain in &reflection.episode_candidates[*idx].domains {
            if !domains.contains(domain) {
                domains.push(domain.clone());
            }
        }
    }
    domains
}

fn memory_type_wire(memory_type: MemoryType) -> &'static str {
    match memory_type {
        MemoryType::Semantic => "semantic",
        MemoryType::Episodic => "episodic",
        MemoryType::Procedural => "procedural",
        MemoryType::Strategic => "strategic",
        MemoryType::Affective => "affective",
        MemoryType::Correction => "correction",
    }
}