1#![warn(missing_docs)]
3use std::fmt;
33use std::path::PathBuf;
34
35use chrono::Utc;
36use cortex_core::{
37 compose_policy_outcomes, Event, EventSource, PolicyContribution, PolicyOutcome, TraceId,
38};
39use cortex_ledger::{
40 JsonlError, JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID,
41 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID, APPEND_RUNTIME_MODE_RULE_ID,
42};
43use cortex_reflect::ReflectionReportStatus;
44use cortex_retrieval::{EmbedRecord, Embedder, LocalStubEmbedder, STUB_BACKEND_ID};
45use cortex_store::repo::{EmbeddingRepo, EventRepo, MemoryRepo};
46use cortex_store::Pool;
47
48#[derive(Debug, Default)]
50pub struct CloseOutcome {
51 pub ingested: usize,
53 pub reflected: usize,
55 pub pending_commit: usize,
60 pub receipt_id: String,
63}
64
65#[derive(Debug)]
67pub enum SessionError {
68 Ingest(String),
70 Reflect(String),
72 Store(cortex_store::StoreError),
74 Io(std::io::Error),
76}
77
78impl fmt::Display for SessionError {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 match self {
81 Self::Ingest(msg) => write!(f, "session ingest failed: {msg}"),
82 Self::Reflect(msg) => write!(f, "session reflect failed: {msg}"),
83 Self::Store(err) => write!(f, "session store error: {err}"),
84 Self::Io(err) => write!(f, "session io error: {err}"),
85 }
86 }
87}
88
89impl std::error::Error for SessionError {
90 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
91 match self {
92 Self::Store(err) => Some(err),
93 Self::Io(err) => Some(err),
94 _ => None,
95 }
96 }
97}
98
99impl From<cortex_store::StoreError> for SessionError {
100 fn from(err: cortex_store::StoreError) -> Self {
101 Self::Store(err)
102 }
103}
104
105impl From<std::io::Error> for SessionError {
106 fn from(err: std::io::Error) -> Self {
107 Self::Io(err)
108 }
109}
110
111impl From<JsonlError> for SessionError {
112 fn from(err: JsonlError) -> Self {
113 Self::Ingest(err.to_string())
114 }
115}
116
117pub fn close_from_bytes(
137 raw: &[u8],
138 pool: &Pool,
139 event_log: PathBuf,
140 fixtures_dir: &std::path::Path,
141) -> Result<CloseOutcome, SessionError> {
142 let events = parse_events(raw).map_err(SessionError::Ingest)?;
144
145 let trace_id = extract_trace_id(&events);
147
148 let ingested = ingest_events(&events, &event_log, pool)?;
153
154 let trace_id_for_reflect = match trace_id {
155 Some(tid) => tid,
156 None => {
157 tracing::debug!("session-close: no trace_id in events; no candidates proposed");
158 return Ok(CloseOutcome {
159 ingested,
160 reflected: 0,
161 pending_commit: 0,
162 receipt_id: "no_trace_id".to_string(),
163 });
164 }
165 };
166
167 let reflect_report =
169 run_reflect(trace_id_for_reflect, fixtures_dir, pool).map_err(SessionError::Reflect)?;
170
171 if reflect_report.status == ReflectionReportStatus::Quarantined {
172 tracing::debug!("session-close: reflection quarantined; no candidates proposed");
173 return Ok(CloseOutcome {
174 ingested,
175 reflected: 0,
176 pending_commit: 0,
177 receipt_id: trace_id_for_reflect.to_string(),
178 });
179 }
180
181 let candidate_ids: Vec<cortex_core::MemoryId> = reflect_report
182 .persisted_memory_candidates
183 .iter()
184 .map(|c| c.id)
185 .collect();
186 let reflected = candidate_ids.len();
187
188 if reflected == 0 {
189 tracing::debug!("session-close: no candidates proposed");
190 return Ok(CloseOutcome {
191 ingested,
192 reflected: 0,
193 pending_commit: 0,
194 receipt_id: trace_id_for_reflect.to_string(),
195 });
196 }
197
198 let repo = MemoryRepo::new(pool);
204 let now = Utc::now();
205 let mut pending_ids = Vec::new();
206
207 for memory_id in &candidate_ids {
208 match repo.set_pending_mcp_commit(memory_id, now) {
209 Ok(()) => {
210 pending_ids.push(*memory_id);
211 }
212 Err(err) => {
213 let err_str = err.to_string();
214 if err_str.contains("not a candidate") {
216 tracing::debug!(
217 memory_id = %memory_id,
218 "session-close: memory not a candidate (already transitioned); treating as pending"
219 );
220 pending_ids.push(*memory_id);
221 } else {
222 tracing::warn!(
223 memory_id = %memory_id,
224 error = %err_str,
225 "session-close: failed to set pending_mcp_commit for memory"
226 );
227 }
228 }
229 }
230 }
231
232 let embed_repo = EmbeddingRepo::new(pool);
234 let embedder = LocalStubEmbedder::new();
235
236 for memory_id in &pending_ids {
237 let memory = match repo.get_by_id(memory_id) {
238 Ok(Some(m)) => m,
239 Ok(None) => {
240 tracing::warn!(memory_id = %memory_id, "session-close: memory not found for embedding");
241 continue;
242 }
243 Err(err) => {
244 tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to read memory for embedding");
245 continue;
246 }
247 };
248
249 let tags: Vec<String> = memory
250 .domains_json
251 .as_array()
252 .map(|arr| {
253 arr.iter()
254 .filter_map(|v| v.as_str().map(ToString::to_string))
255 .collect()
256 })
257 .unwrap_or_default();
258
259 let vec = match embedder.embed(&memory.claim, &tags) {
260 Ok(v) => v,
261 Err(err) => {
262 tracing::warn!(memory_id = %memory_id, error = %err, "session-close: embed failed");
263 continue;
264 }
265 };
266
267 let record = match EmbedRecord::new(*memory_id, STUB_BACKEND_ID, vec, now) {
268 Ok(r) => r,
269 Err(err) => {
270 tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to build embed record");
271 continue;
272 }
273 };
274
275 if let Err(err) = embed_repo.write(&record) {
276 tracing::warn!(memory_id = %memory_id, error = %err, "session-close: failed to write embedding");
277 }
278 }
279
280 Ok(CloseOutcome {
281 ingested,
282 reflected,
283 pending_commit: pending_ids.len(),
284 receipt_id: trace_id_for_reflect.to_string(),
285 })
286}
287
288fn parse_events(raw: &[u8]) -> Result<Vec<Event>, String> {
295 let value: serde_json::Value =
296 serde_json::from_slice(raw).map_err(|err| format!("invalid JSON: {err}"))?;
297
298 let events: Vec<serde_json::Value> = match &value {
299 serde_json::Value::Object(map) => {
300 if let Some(events_field) = map.get("events") {
301 events_field
302 .as_array()
303 .ok_or_else(|| "events field is not an array".to_string())?
304 .to_owned()
305 } else {
306 vec![value.clone()]
307 }
308 }
309 serde_json::Value::Array(arr) => arr.to_owned(),
310 _ => return Err("unexpected JSON shape: must be object or array".to_string()),
311 };
312
313 let mut parsed = Vec::with_capacity(events.len());
314 for (i, ev) in events.iter().enumerate() {
315 let event: Event = serde_json::from_value(ev.clone())
316 .map_err(|err| format!("event[{i}] failed to deserialize: {err}"))?;
317 parsed.push(event);
318 }
319 Ok(parsed)
320}
321
322fn extract_trace_id(events: &[Event]) -> Option<TraceId> {
324 events.iter().find_map(|ev| ev.trace_id)
325}
326
327fn ingest_events(
338 events: &[Event],
339 event_log: &std::path::Path,
340 pool: &Pool,
341) -> Result<usize, SessionError> {
342 for event in events {
344 if matches!(event.source, EventSource::User) {
345 return Err(SessionError::Ingest(format!(
346 "EventSource::User event {} cannot be ingested without operator attestation; \
347 use `cortex ingest --user-attestation` for user-sourced events",
348 event.id,
349 )));
350 }
351 }
352
353 if let Some(parent) = event_log.parent() {
355 if !parent.as_os_str().is_empty() {
356 std::fs::create_dir_all(parent)?;
357 }
358 }
359
360 let mut log = JsonlLog::open(event_log)?;
361
362 let existing_ids = collect_existing_event_ids(event_log)?;
367
368 let event_repo = EventRepo::new(pool);
369 let mut appended = 0usize;
370 for event in events {
371 if existing_ids.contains(&event.id) {
372 event_repo.append(event).map_err(SessionError::Store)?;
375 continue;
376 }
377
378 let policy = session_append_policy(&event.source);
381 log.append(event.clone(), &policy)?;
382
383 event_repo.append(event).map_err(SessionError::Store)?;
385
386 appended += 1;
387 }
388
389 Ok(appended)
390}
391
392fn collect_existing_event_ids(
394 path: &std::path::Path,
395) -> Result<std::collections::HashSet<cortex_core::EventId>, SessionError> {
396 use std::io::BufRead;
397
398 if !path.exists() {
399 return Ok(std::collections::HashSet::new());
400 }
401
402 let file = std::fs::File::open(path)?;
403 let reader = std::io::BufReader::new(file);
404 let mut ids = std::collections::HashSet::new();
405
406 for line in reader.lines() {
407 let line = line?;
408 if line.trim().is_empty() {
409 continue;
410 }
411 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
413 if let Some(id_str) = value
414 .get("event")
415 .and_then(|e| e.get("id"))
416 .and_then(|v| v.as_str())
417 {
418 if let Ok(event_id) = id_str.parse::<cortex_core::EventId>() {
419 ids.insert(event_id);
420 }
421 }
422 }
423 }
424
425 Ok(ids)
426}
427
428fn session_append_policy(source: &EventSource) -> cortex_core::PolicyDecision {
435 let (tier_outcome, tier_reason): (PolicyOutcome, &str) = match source {
442 EventSource::User | EventSource::ManualCorrection => {
443 (
446 PolicyOutcome::Reject,
447 "event source User/ManualCorrection requires operator attestation; refused by ingest_events",
448 )
449 }
450 EventSource::ChildAgent { .. }
451 | EventSource::Tool { .. }
452 | EventSource::Runtime
453 | EventSource::ExternalOutcome => (
454 PolicyOutcome::Allow,
455 "event source meets ingest floor of Observed or above",
456 ),
457 };
458 let tier_contribution = PolicyContribution::new(
459 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
460 tier_outcome,
461 tier_reason,
462 )
463 .expect("static session append tier contribution is valid");
464
465 let attestation_contribution = PolicyContribution::new(
467 APPEND_ATTESTATION_REQUIRED_RULE_ID,
468 PolicyOutcome::Warn,
469 "session-close ingest: no User events in batch; operator attestation not required",
470 )
471 .expect("static session append attestation contribution is valid");
472
473 let runtime_mode_contribution = PolicyContribution::new(
475 APPEND_RUNTIME_MODE_RULE_ID,
476 PolicyOutcome::Warn,
477 "session-close ingest: unsigned append (local-development ledger path)",
478 )
479 .expect("static session append runtime mode contribution is valid");
480
481 compose_policy_outcomes(
482 vec![
483 tier_contribution,
484 attestation_contribution,
485 runtime_mode_contribution,
486 ],
487 None,
488 )
489}
490
491fn run_reflect(
493 trace_id: TraceId,
494 fixtures_dir: &std::path::Path,
495 pool: &Pool,
496) -> Result<cortex_reflect::ReflectionReport, String> {
497 use cortex_llm::ReplayAdapter;
498
499 let adapter = ReplayAdapter::new(fixtures_dir).map_err(|err| format!("{err}"))?;
500
501 let rt = tokio::runtime::Builder::new_current_thread()
502 .enable_all()
503 .build()
504 .map_err(|err| format!("failed to create tokio runtime: {err}"))?;
505
506 rt.block_on(cortex_reflect::reflect(trace_id, &adapter, pool))
507 .map_err(|err| format!("{err}"))
508}