use chrono::Utc;
use cortex_core::{MemoryId, TraceId};
use cortex_llm::{LlmAdapter, LlmError, LlmMessage, LlmRequest, LlmResponse, LlmRole};
use cortex_store::repo::{EventRepo, MemoryCandidate as StoreMemoryCandidate, MemoryRepo};
use cortex_store::{Pool, StoreError};
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use crate::admission::validate_reflection_admission;
use crate::authority::validate_reflection_authority;
use crate::parse::{parse_reflection, session_reflection_json_schema};
use crate::quarantine::quarantine_record;
use crate::schema::{MemoryCandidate, MemoryType, SessionReflection};
use crate::ReflectError;
pub const DEFAULT_REFLECTION_MODEL: &str = "replay-reflection-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReflectionReportStatus {
CandidateOnly,
Quarantined,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PersistedMemoryCandidate {
pub id: MemoryId,
pub status: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ReflectionReport {
pub trace_id: TraceId,
pub status: ReflectionReportStatus,
pub adapter_id: String,
pub model: String,
pub raw_hash: String,
pub reflection: Option<SessionReflection>,
pub persisted_memory_candidates: Vec<PersistedMemoryCandidate>,
pub quarantine_audit_id: Option<cortex_core::AuditRecordId>,
pub error: Option<ReflectionRejection>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReflectionRejection {
pub reason: String,
pub detail: String,
}
#[derive(Debug, Error)]
pub enum ReflectOrchestrationError {
#[error("llm adapter failed: {0}")]
Adapter(#[from] LlmError),
#[error("store failed: {0}")]
Store(#[from] StoreError),
}
pub async fn reflect(
trace_id: TraceId,
adapter: &dyn LlmAdapter,
pool: &Pool,
) -> Result<ReflectionReport, ReflectOrchestrationError> {
let response = adapter.complete(reflection_request(trace_id)).await?;
let adapter_id = adapter.adapter_id().to_string();
let output = response_output(&response);
match parse_reflection(&output) {
Ok(reflection) => {
if reflection.trace_id != trace_id {
let err = ReflectError::InvalidSchema {
raw: output,
message: format!(
"reflection trace_id {} did not match requested trace_id {trace_id}",
reflection.trace_id
),
};
let entry =
quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
return Ok(quarantined_report(
trace_id, adapter_id, response, err, entry.id,
));
}
if let Err(err) =
validate_reflection_admission(&reflection, &adapter_id, &response.raw_hash)
{
let err = ReflectError::AdmissionRejected {
raw: output,
message: err.to_string(),
};
let entry =
quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
return Ok(quarantined_report(
trace_id, adapter_id, response, err, entry.id,
));
}
let events = EventRepo::new(pool);
if let Err(err) = validate_reflection_authority(&reflection, |event_id| {
events
.get_by_id(event_id)
.map(|event| event.map(|event| event.source))
}) {
let err = ReflectError::AuthorityRejected {
raw: output,
message: err.to_string(),
};
let entry =
quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
return Ok(quarantined_report(
trace_id, adapter_id, response, err, entry.id,
));
}
let persisted_memory_candidates = persist_memory_candidates(&reflection, pool)?;
Ok(ReflectionReport {
trace_id,
status: ReflectionReportStatus::CandidateOnly,
adapter_id,
model: response.model,
raw_hash: response.raw_hash,
reflection: Some(reflection),
persisted_memory_candidates,
quarantine_audit_id: None,
error: None,
})
}
Err(err) => {
let entry = quarantine_record(err.quarantine_payload(), err.quarantine_reason(), pool)?;
Ok(quarantined_report(
trace_id, adapter_id, response, err, entry.id,
))
}
}
}
fn reflection_request(trace_id: TraceId) -> LlmRequest {
LlmRequest {
model: DEFAULT_REFLECTION_MODEL.to_string(),
system: "Return SessionReflection JSON matching the supplied schema.".to_string(),
messages: vec![LlmMessage {
role: LlmRole::User,
content: format!("Reflect trace {trace_id} into candidate-only Cortex memory."),
}],
temperature: 0.0,
max_tokens: 4096,
json_schema: Some(session_reflection_json_schema()),
timeout_ms: 30_000,
}
}
fn response_output(response: &LlmResponse) -> String {
response
.parsed_json
.as_ref()
.map_or_else(|| response.text.clone(), serde_json::Value::to_string)
}
fn quarantined_report(
trace_id: TraceId,
adapter_id: String,
response: LlmResponse,
err: ReflectError,
quarantine_audit_id: cortex_core::AuditRecordId,
) -> ReflectionReport {
ReflectionReport {
trace_id,
status: ReflectionReportStatus::Quarantined,
adapter_id,
model: response.model,
raw_hash: response.raw_hash,
reflection: None,
persisted_memory_candidates: Vec::new(),
quarantine_audit_id: Some(quarantine_audit_id),
error: Some(ReflectionRejection {
reason: err.quarantine_reason().to_string(),
detail: err.detail().to_string(),
}),
}
}
fn persist_memory_candidates(
reflection: &SessionReflection,
pool: &Pool,
) -> Result<Vec<PersistedMemoryCandidate>, StoreError> {
let repo = MemoryRepo::new(pool);
let now = Utc::now();
let mut persisted = Vec::with_capacity(reflection.memory_candidates.len());
for memory in &reflection.memory_candidates {
let id = MemoryId::new();
let source_events = source_event_ids(reflection, memory);
let store_memory = StoreMemoryCandidate {
id,
memory_type: memory_type_wire(memory.memory_type).to_string(),
claim: memory.claim.clone(),
source_episodes_json: json!([]),
source_events_json: json!(source_events),
domains_json: json!(domains_for_memory(reflection, memory)),
salience_json: json!({
"reusability": memory.initial_salience.reusability,
"consequence": memory.initial_salience.consequence,
"emotional_charge": memory.initial_salience.emotional_charge
}),
confidence: memory.confidence,
authority: "candidate".to_string(),
applies_when_json: json!(memory.applies_when),
does_not_apply_when_json: json!(memory.does_not_apply_when),
created_at: now,
updated_at: now,
};
repo.insert_candidate(&store_memory)?;
persisted.push(PersistedMemoryCandidate {
id,
status: "candidate".to_string(),
});
}
Ok(persisted)
}
fn source_event_ids(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
memory
.source_episode_indexes
.iter()
.flat_map(|idx| &reflection.episode_candidates[*idx].source_event_ids)
.map(ToString::to_string)
.collect()
}
fn domains_for_memory(reflection: &SessionReflection, memory: &MemoryCandidate) -> Vec<String> {
let mut domains = Vec::new();
for idx in &memory.source_episode_indexes {
for domain in &reflection.episode_candidates[*idx].domains {
if !domains.contains(domain) {
domains.push(domain.clone());
}
}
}
domains
}
fn memory_type_wire(memory_type: MemoryType) -> &'static str {
match memory_type {
MemoryType::Semantic => "semantic",
MemoryType::Episodic => "episodic",
MemoryType::Procedural => "procedural",
MemoryType::Strategic => "strategic",
MemoryType::Affective => "affective",
MemoryType::Correction => "correction",
}
}