Skip to main content

cortex_reflect/
orchestrate.rs

1//! Minimal reflection orchestration for Lane 2.B.
2
3use chrono::Utc;
4use cortex_core::{MemoryId, TraceId};
5use cortex_llm::{LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmResponse, LlmRole};
6use cortex_store::repo::{EventRepo, MemoryCandidate as StoreMemoryCandidate, MemoryRepo};
7use cortex_store::{Pool, StoreError};
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use thiserror::Error;
11
12use crate::admission::validate_reflection_admission;
13use crate::authority::validate_reflection_authority;
14use crate::parse::{parse_reflection, session_reflection_json_schema};
15use crate::quarantine::quarantine_record;
16use crate::schema::{MemoryCandidate, MemoryType, SessionReflection};
17use crate::ReflectError;
18
19/// Model used by replay fixtures unless a caller needs a different adapter model.
20pub const DEFAULT_REFLECTION_MODEL: &str = "replay-reflection-v1";
21
22/// Stable status for reflect outputs.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum ReflectionReportStatus {
26    /// Model output parsed and any durable memory rows are still candidates.
27    CandidateOnly,
28    /// Model output was rejected and a quarantine audit row was written.
29    Quarantined,
30}
31
32/// Candidate persistence detail returned by [`reflect`].
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct PersistedMemoryCandidate {
35    /// Inserted memory id.
36    pub id: MemoryId,
37    /// Store status. Reflect may only create `candidate`.
38    pub status: String,
39}
40
41/// Non-panicking report shape for reflection orchestration.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub struct ReflectionReport {
44    /// Trace requested by the caller.
45    pub trace_id: TraceId,
46    /// Overall report status.
47    pub status: ReflectionReportStatus,
48    /// Adapter id that produced the output.
49    pub adapter_id: String,
50    /// Model echoed by the adapter.
51    pub model: String,
52    /// Hash of the raw adapter response.
53    pub raw_hash: String,
54    /// Parsed reflection, when candidate-only.
55    pub reflection: Option<SessionReflection>,
56    /// Durable candidate memory rows inserted by this call.
57    pub persisted_memory_candidates: Vec<PersistedMemoryCandidate>,
58    /// Quarantine audit row id, when quarantined.
59    pub quarantine_audit_id: Option<cortex_core::AuditRecordId>,
60    /// Parser reason, when quarantined.
61    pub error: Option<ReflectionRejection>,
62}
63
64/// Parser rejection detail returned inside a quarantined report.
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct ReflectionRejection {
67    /// Stable parser reason code.
68    pub reason: String,
69    /// Operator-facing parser detail, without raw model output.
70    pub detail: String,
71}
72
73/// Failures outside normal parse/schema quarantine.
74#[derive(Debug, Error)]
75pub enum ReflectOrchestrationError {
76    /// LLM adapter failed before producing output.
77    #[error("llm adapter failed: {0}")]
78    Adapter(#[from] LlmError),
79    /// Store operation failed.
80    #[error("store failed: {0}")]
81    Store(#[from] StoreError),
82}
83
84/// Reflect a trace through an LLM adapter and return candidate-only state.
85///
86/// Invalid JSON/schema output writes a quarantine audit row and returns a
87/// quarantined report. Valid output may persist memory candidates, but never
88/// writes doctrine and never promotes memories to `active`.
89pub async fn reflect(
90    trace_id: TraceId,
91    adapter: &dyn LlmAdapter,
92    pool: &Pool,
93) -> Result<ReflectionReport, ReflectOrchestrationError> {
94    let response = adapter.complete(reflection_request(trace_id)).await?;
95    let adapter_id = adapter.adapter_id().to_string();
96    let output = response_output(&response);
97
98    match parse_reflection(&output) {
99        Ok(reflection) => {
100            if reflection.trace_id != trace_id {
101                let err = ReflectError::InvalidSchema {
102                    raw: output,
103                    message: format!(
104                        "reflection trace_id {} did not match requested trace_id {trace_id}",
105                        reflection.trace_id
106                    ),
107                };
108                let entry =
109                    quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
110                return Ok(quarantined_report(
111                    trace_id, adapter_id, response, err, entry.id,
112                ));
113            }
114            if let Err(err) =
115                validate_reflection_admission(&reflection, &adapter_id, &response.raw_hash)
116            {
117                let err = ReflectError::AdmissionRejected {
118                    raw: output,
119                    message: err.to_string(),
120                };
121                let entry =
122                    quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
123                return Ok(quarantined_report(
124                    trace_id, adapter_id, response, err, entry.id,
125                ));
126            }
127            let events = EventRepo::new(pool);
128            if let Err(err) = validate_reflection_authority(&reflection, |event_id| {
129                events
130                    .get_by_id(event_id)
131                    .map(|event| event.map(|event| event.source))
132            }) {
133                let err = ReflectError::AuthorityRejected {
134                    raw: output,
135                    message: err.to_string(),
136                };
137                let entry =
138                    quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
139                return Ok(quarantined_report(
140                    trace_id, adapter_id, response, err, entry.id,
141                ));
142            }
143            let persisted_memory_candidates = persist_memory_candidates(&reflection, pool)?;
144            Ok(ReflectionReport {
145                trace_id,
146                status: ReflectionReportStatus::CandidateOnly,
147                adapter_id,
148                model: response.model,
149                raw_hash: response.raw_hash,
150                reflection: Some(reflection),
151                persisted_memory_candidates,
152                quarantine_audit_id: None,
153                error: None,
154            })
155        }
156        Err(err) => {
157            let entry = quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
158            Ok(quarantined_report(
159                trace_id, adapter_id, response, err, entry.id,
160            ))
161        }
162    }
163}
164
165fn reflection_request(trace_id: TraceId) -> LlmRequest {
166    LlmRequest {
167        model: DEFAULT_REFLECTION_MODEL.to_string(),
168        system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
169        messages: vec![LlmMessage {
170            role: LlmRole::User,
171            content: format!("Reflect trace {trace_id} into candidate-only Cortex memory."),
172        }],
173        temperature: 0.0,
174        max_tokens: 4096,
175        json_schema: Some(session_reflection_json_schema()),
176        timeout_ms: 30_000,
177    }
178}
179
180fn response_output(response: &LlmResponse) -> String {
181    response
182        .parsed_json
183        .as_ref()
184        .map_or_else(|| response.text.clone(), serde_json::Value::to_string)
185}
186
187fn quarantined_report(
188    trace_id: TraceId,
189    adapter_id: String,
190    response: LlmResponse,
191    err: ReflectError,
192    quarantine_audit_id: cortex_core::AuditRecordId,
193) -> ReflectionReport {
194    ReflectionReport {
195        trace_id,
196        status: ReflectionReportStatus::Quarantined,
197        adapter_id,
198        model: response.model,
199        raw_hash: response.raw_hash,
200        reflection: None,
201        persisted_memory_candidates: Vec::new(),
202        quarantine_audit_id: Some(quarantine_audit_id),
203        error: Some(ReflectionRejection {
204            reason: err.quarantine_reason().to_string(),
205            detail: err.detail().to_string(),
206        }),
207    }
208}
209
210fn persist_memory_candidates(
211    reflection: &SessionReflection,
212    pool: &Pool,
213) -> Result<Vec<PersistedMemoryCandidate>, StoreError> {
214    let repo = MemoryRepo::new(pool);
215    let now = Utc::now();
216    let mut persisted = Vec::with_capacity(reflection.memory_candidates.len());
217
218    for memory in &reflection.memory_candidates {
219        let id = MemoryId::new();
220        let source_events = source_event_ids(reflection, memory);
221        let store_memory = StoreMemoryCandidate {
222            id,
223            memory_type: memory_type_wire(memory.memory_type).to_string(),
224            claim: memory.claim.clone(),
225            source_episodes_json: json!([]),
226            source_events_json: json!(source_events),
227            domains_json: json!(domains_for_memory(reflection, memory)),
228            salience_json: json!({
229                "reusability": memory.initial_salience.reusability,
230                "consequence": memory.initial_salience.consequence,
231                "emotional_charge": memory.initial_salience.emotional_charge
232            }),
233            confidence: memory.confidence,
234            authority: "candidate".to_string(),
235            applies_when_json: json!(memory.applies_when),
236            does_not_apply_when_json: json!(memory.does_not_apply_when),
237            created_at: now,
238            updated_at: now,
239        };
240        repo.insert_candidate(&store_memory)?;
241        persisted.push(PersistedMemoryCandidate {
242            id,
243            status: "candidate".to_string(),
244        });
245    }
246
247    Ok(persisted)
248}
249
250fn source_event_ids(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
251    memory
252        .source_episode_indexes
253        .iter()
254        .flat_map(|idx| &reflection.episode_candidates[*idx].source_event_ids)
255        .map(ToString::to_string)
256        .collect()
257}
258
259fn domains_for_memory(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
260    let mut domains = Vec::new();
261    for idx in &memory.source_episode_indexes {
262        for domain in &reflection.episode_candidates[*idx].domains {
263            if !domains.contains(domain) {
264                domains.push(domain.clone());
265            }
266        }
267    }
268    domains
269}
270
271fn memory_type_wire(memory_type: MemoryType) -> &'static str {
272    match memory_type {
273        MemoryType::Semantic => "semantic",
274        MemoryType::Episodic => "episodic",
275        MemoryType::Procedural => "procedural",
276        MemoryType::Strategic => "strategic",
277        MemoryType::Affective => "affective",
278        MemoryType::Correction => "correction",
279    }
280}