1use chrono::Utc;
4use cortex_core::{MemoryId, TraceId};
5use cortex_llm::{LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmResponse, LlmRole};
6use cortex_store::repo::{EventRepo, MemoryCandidate as StoreMemoryCandidate, MemoryRepo};
7use cortex_store::{Pool, StoreError};
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use thiserror::Error;
11
12use crate::admission::validate_reflection_admission;
13use crate::authority::validate_reflection_authority;
14use crate::parse::{parse_reflection, session_reflection_json_schema};
15use crate::quarantine::quarantine_record;
16use crate::schema::{MemoryCandidate, MemoryType, SessionReflection};
17use crate::ReflectError;
18
19pub const DEFAULT_REFLECTION_MODEL: &str = "replay-reflection-v1";
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum ReflectionReportStatus {
26 CandidateOnly,
28 Quarantined,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct PersistedMemoryCandidate {
35 pub id: MemoryId,
37 pub status: String,
39}
40
41#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub struct ReflectionReport {
44 pub trace_id: TraceId,
46 pub status: ReflectionReportStatus,
48 pub adapter_id: String,
50 pub model: String,
52 pub raw_hash: String,
54 pub reflection: Option<SessionReflection>,
56 pub persisted_memory_candidates: Vec<PersistedMemoryCandidate>,
58 pub quarantine_audit_id: Option<cortex_core::AuditRecordId>,
60 pub error: Option<ReflectionRejection>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct ReflectionRejection {
67 pub reason: String,
69 pub detail: String,
71}
72
73#[derive(Debug, Error)]
75pub enum ReflectOrchestrationError {
76 #[error("llm adapter failed: {0}")]
78 Adapter(#[from] LlmError),
79 #[error("store failed: {0}")]
81 Store(#[from] StoreError),
82}
83
84pub async fn reflect(
90 trace_id: TraceId,
91 adapter: &dyn LlmAdapter,
92 pool: &Pool,
93) -> Result<ReflectionReport, ReflectOrchestrationError> {
94 let response = adapter.complete(reflection_request(trace_id)).await?;
95 let adapter_id = adapter.adapter_id().to_string();
96 let output = response_output(&response);
97
98 match parse_reflection(&output) {
99 Ok(reflection) => {
100 if reflection.trace_id != trace_id {
101 let err = ReflectError::InvalidSchema {
102 raw: output,
103 message: format!(
104 "reflection trace_id {} did not match requested trace_id {trace_id}",
105 reflection.trace_id
106 ),
107 };
108 let entry =
109 quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
110 return Ok(quarantined_report(
111 trace_id, adapter_id, response, err, entry.id,
112 ));
113 }
114 if let Err(err) =
115 validate_reflection_admission(&reflection, &adapter_id, &response.raw_hash)
116 {
117 let err = ReflectError::AdmissionRejected {
118 raw: output,
119 message: err.to_string(),
120 };
121 let entry =
122 quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
123 return Ok(quarantined_report(
124 trace_id, adapter_id, response, err, entry.id,
125 ));
126 }
127 let events = EventRepo::new(pool);
128 if let Err(err) = validate_reflection_authority(&reflection, |event_id| {
129 events
130 .get_by_id(event_id)
131 .map(|event| event.map(|event| event.source))
132 }) {
133 let err = ReflectError::AuthorityRejected {
134 raw: output,
135 message: err.to_string(),
136 };
137 let entry =
138 quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
139 return Ok(quarantined_report(
140 trace_id, adapter_id, response, err, entry.id,
141 ));
142 }
143 let persisted_memory_candidates = persist_memory_candidates(&reflection, pool)?;
144 Ok(ReflectionReport {
145 trace_id,
146 status: ReflectionReportStatus::CandidateOnly,
147 adapter_id,
148 model: response.model,
149 raw_hash: response.raw_hash,
150 reflection: Some(reflection),
151 persisted_memory_candidates,
152 quarantine_audit_id: None,
153 error: None,
154 })
155 }
156 Err(err) => {
157 let entry = quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
158 Ok(quarantined_report(
159 trace_id, adapter_id, response, err, entry.id,
160 ))
161 }
162 }
163}
164
165fn reflection_request(trace_id: TraceId) -> LlmRequest {
166 LlmRequest {
167 model: DEFAULT_REFLECTION_MODEL.to_string(),
168 system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
169 messages: vec![LlmMessage {
170 role: LlmRole::User,
171 content: format!("Reflect trace {trace_id} into candidate-only Cortex memory."),
172 }],
173 temperature: 0.0,
174 max_tokens: 4096,
175 json_schema: Some(session_reflection_json_schema()),
176 timeout_ms: 30_000,
177 }
178}
179
180fn response_output(response: &LlmResponse) -> String {
181 response
182 .parsed_json
183 .as_ref()
184 .map_or_else(|| response.text.clone(), serde_json::Value::to_string)
185}
186
187fn quarantined_report(
188 trace_id: TraceId,
189 adapter_id: String,
190 response: LlmResponse,
191 err: ReflectError,
192 quarantine_audit_id: cortex_core::AuditRecordId,
193) -> ReflectionReport {
194 ReflectionReport {
195 trace_id,
196 status: ReflectionReportStatus::Quarantined,
197 adapter_id,
198 model: response.model,
199 raw_hash: response.raw_hash,
200 reflection: None,
201 persisted_memory_candidates: Vec::new(),
202 quarantine_audit_id: Some(quarantine_audit_id),
203 error: Some(ReflectionRejection {
204 reason: err.quarantine_reason().to_string(),
205 detail: err.detail().to_string(),
206 }),
207 }
208}
209
210fn persist_memory_candidates(
211 reflection: &SessionReflection,
212 pool: &Pool,
213) -> Result<Vec<PersistedMemoryCandidate>, StoreError> {
214 let repo = MemoryRepo::new(pool);
215 let now = Utc::now();
216 let mut persisted = Vec::with_capacity(reflection.memory_candidates.len());
217
218 for memory in &reflection.memory_candidates {
219 let id = MemoryId::new();
220 let source_events = source_event_ids(reflection, memory);
221 let store_memory = StoreMemoryCandidate {
222 id,
223 memory_type: memory_type_wire(memory.memory_type).to_string(),
224 claim: memory.claim.clone(),
225 source_episodes_json: json!([]),
226 source_events_json: json!(source_events),
227 domains_json: json!(domains_for_memory(reflection, memory)),
228 salience_json: json!({
229 "reusability": memory.initial_salience.reusability,
230 "consequence": memory.initial_salience.consequence,
231 "emotional_charge": memory.initial_salience.emotional_charge
232 }),
233 confidence: memory.confidence,
234 authority: "candidate".to_string(),
235 applies_when_json: json!(memory.applies_when),
236 does_not_apply_when_json: json!(memory.does_not_apply_when),
237 created_at: now,
238 updated_at: now,
239 };
240 repo.insert_candidate(&store_memory)?;
241 persisted.push(PersistedMemoryCandidate {
242 id,
243 status: "candidate".to_string(),
244 });
245 }
246
247 Ok(persisted)
248}
249
250fn source_event_ids(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
251 memory
252 .source_episode_indexes
253 .iter()
254 .flat_map(|idx| &reflection.episode_candidates[*idx].source_event_ids)
255 .map(ToString::to_string)
256 .collect()
257}
258
259fn domains_for_memory(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
260 let mut domains = Vec::new();
261 for idx in &memory.source_episode_indexes {
262 for domain in &reflection.episode_candidates[*idx].domains {
263 if !domains.contains(domain) {
264 domains.push(domain.clone());
265 }
266 }
267 }
268 domains
269}
270
271fn memory_type_wire(memory_type: MemoryType) -> &'static str {
272 match memory_type {
273 MemoryType::Semantic => "semantic",
274 MemoryType::Episodic => "episodic",
275 MemoryType::Procedural => "procedural",
276 MemoryType::Strategic => "strategic",
277 MemoryType::Affective => "affective",
278 MemoryType::Correction => "correction",
279 }
280}