1use std::collections::HashSet;
24
25use regex::Regex;
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::error::Result;
30use crate::hash::{compute_chain_hash, compute_content_hash};
31use crate::model::event::{AgentEvent, EventType};
32use crate::model::memory::{ConsolidationState, MemoryRecord};
33use crate::model::relation::Relation;
34use crate::query::MnemoEngine;
35use crate::query::conflict::ResolutionStrategy;
36use crate::query::lifecycle::effective_importance;
37use crate::storage::MemoryFilter;
38
39const DEFAULT_DEDUP_THRESHOLD: f32 = 0.92;
40const DEFAULT_LOW_IMPORTANCE_CUTOFF: f32 = 0.3;
41const DEFAULT_ARCHIVE_IMPORTANCE: f32 = 0.2;
42const DEFAULT_ARCHIVE_AGE_HOURS: f64 = 24.0 * 7.0;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum ReflectionMode {
59 #[default]
60 Coordinated,
61 Always,
62}
63
64pub const MIN_NEW_RECORDS_FOR_COORDINATED_RUN: usize = 5;
67
68pub const MIN_HOURS_BETWEEN_COORDINATED_RUNS: i64 = 24;
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum SkipReason {
76 TooSoon,
77 NotEnoughNewRecords,
78 AutoDreamAlreadyRan,
79}
80
81#[non_exhaustive]
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct ReflectionReport {
85 pub consolidated: usize,
87 pub absolutized_dates: usize,
90 pub dreamed_accepted: usize,
93 pub archived: usize,
95 pub conflicts_resolved: usize,
97 pub total_scanned: usize,
99 pub skipped: Option<SkipReason>,
102 pub dream_report_ingested: usize,
108}
109
110pub async fn run_reflection_pass_with_mode(
117 engine: &MnemoEngine,
118 agent_id: &str,
119 mode: ReflectionMode,
120 force: bool,
121) -> Result<ReflectionReport> {
122 if mode == ReflectionMode::Coordinated
123 && !force
124 && let Some(reason) = coordinated_skip_reason(engine, agent_id).await?
125 {
126 return Ok(ReflectionReport {
127 skipped: Some(reason),
128 ..Default::default()
129 });
130 }
131 let mut report = run_reflection_pass_inner(engine, agent_id).await?;
132 emit_reflection_completed(engine, agent_id, &report).await;
133 report.dream_report_ingested = ingest_dream_reports(engine, agent_id).await?;
136 Ok(report)
137}
138
139pub async fn run_reflection_pass(engine: &MnemoEngine, agent_id: &str) -> Result<ReflectionReport> {
142 run_reflection_pass_with_mode(engine, agent_id, ReflectionMode::Always, true).await
143}
144
145async fn run_reflection_pass_inner(
148 engine: &MnemoEngine,
149 agent_id: &str,
150) -> Result<ReflectionReport> {
151 let filter = MemoryFilter {
152 agent_id: Some(agent_id.to_string()),
153 include_deleted: false,
154 ..Default::default()
155 };
156 let records = engine
157 .storage
158 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
159 .await?;
160
161 let total_scanned = records.len();
162 let mut report = ReflectionReport {
163 total_scanned,
164 ..Default::default()
165 };
166
167 let mut after_absolutization: Vec<MemoryRecord> = Vec::with_capacity(records.len());
169 for mut record in records {
170 let rewritten = absolutize_dates(&record.content, &record.created_at);
171 if let Some(new_content) = rewritten {
172 let prev_hash = record.content_hash.clone();
173 record.content = new_content;
174 record.updated_at = chrono::Utc::now().to_rfc3339();
175 record.content_hash =
176 compute_content_hash(&record.content, &record.agent_id, &record.updated_at);
177 if let Ok(emb) = engine.embedding.embed(&record.content).await {
180 record.embedding = Some(emb.clone());
181 let _ = engine.index.add(record.id, &emb);
182 }
183 engine.storage.update_memory(&record).await?;
184 emit_rewrite_event(
185 engine,
186 agent_id,
187 record.id,
188 "date_absolutization",
189 &prev_hash,
190 &record.content_hash,
191 )
192 .await;
193 report.absolutized_dates += 1;
194 }
195 after_absolutization.push(record);
196 }
197
198 for record in &mut after_absolutization {
204 if record
205 .metadata
206 .get("dreamed_at")
207 .and_then(|v| v.as_str())
208 .is_some()
209 && !record
210 .metadata
211 .get("dreamed_processed")
212 .and_then(|v| v.as_bool())
213 .unwrap_or(false)
214 {
215 let prev_hash = record.content_hash.clone();
216 record.content_hash =
217 compute_content_hash(&record.content, &record.agent_id, &record.updated_at);
218 if let Ok(emb) = engine.embedding.embed(&record.content).await {
219 record.embedding = Some(emb.clone());
220 let _ = engine.index.add(record.id, &emb);
221 }
222 if let Some(obj) = record.metadata.as_object_mut() {
223 obj.insert(
224 "dreamed_processed".to_string(),
225 serde_json::Value::Bool(true),
226 );
227 }
228 engine.storage.update_memory(record).await?;
229 emit_rewrite_event(
230 engine,
231 agent_id,
232 record.id,
233 "auto_dream",
234 &prev_hash,
235 &record.content_hash,
236 )
237 .await;
238 report.dreamed_accepted += 1;
239 }
240 }
241
242 let consolidated_ids = consolidate_duplicates(engine, &mut after_absolutization).await?;
244 report.consolidated = consolidated_ids.len();
245
246 let conflicts = engine
248 .detect_conflicts(Some(agent_id.to_string()), DEFAULT_DEDUP_THRESHOLD)
249 .await?;
250 for pair in &conflicts.conflicts {
251 let (a, b) = match (
252 after_absolutization.iter().find(|r| r.id == pair.memory_a),
253 after_absolutization.iter().find(|r| r.id == pair.memory_b),
254 ) {
255 (Some(a), Some(b)) => (a, b),
256 _ => continue,
257 };
258 if a.importance < DEFAULT_LOW_IMPORTANCE_CUTOFF
259 && b.importance < DEFAULT_LOW_IMPORTANCE_CUTOFF
260 && engine
261 .resolve_conflict(pair, ResolutionStrategy::KeepNewest)
262 .await
263 .is_ok()
264 {
265 report.conflicts_resolved += 1;
266 }
267 }
268 let _ = &conflicts; let now = chrono::Utc::now();
272 for record in after_absolutization {
273 if consolidated_ids.contains(&record.id) {
274 continue;
275 }
276 if record.consolidation_state == ConsolidationState::Archived {
277 continue;
278 }
279 if record.access_count > 0 {
280 continue;
281 }
282 if effective_importance(&record) >= DEFAULT_ARCHIVE_IMPORTANCE {
283 continue;
284 }
285 let Ok(created) = chrono::DateTime::parse_from_rfc3339(&record.created_at) else {
286 continue;
287 };
288 let age_hours = (now - created.with_timezone(&chrono::Utc)).num_seconds() as f64 / 3600.0;
289 if age_hours < DEFAULT_ARCHIVE_AGE_HOURS {
290 continue;
291 }
292 let mut updated = record.clone();
293 updated.consolidation_state = ConsolidationState::Archived;
294 updated.updated_at = now.to_rfc3339();
295 if engine.storage.update_memory(&updated).await.is_ok() {
296 report.archived += 1;
297 }
298 }
299
300 Ok(report)
301}
302
303pub fn absolutize_dates(content: &str, created_at_rfc3339: &str) -> Option<String> {
306 let anchor = chrono::DateTime::parse_from_rfc3339(created_at_rfc3339)
307 .ok()?
308 .with_timezone(&chrono::Utc);
309 let mut out = content.to_string();
310 let mut modified = false;
311
312 let simple: &[(&str, i64)] = &[
314 ("yesterday", -1),
315 ("today", 0),
316 ("tomorrow", 1),
317 ("last week", -7),
318 ("next week", 7),
319 ];
320 for (needle, days) in simple {
321 let re = Regex::new(&format!(r"(?i)\b{}\b", regex::escape(needle))).ok()?;
322 if re.is_match(&out) {
323 let target = anchor + chrono::Duration::days(*days);
324 out = re
325 .replace_all(&out, target.format("%Y-%m-%d").to_string())
326 .into_owned();
327 modified = true;
328 }
329 }
330
331 let re_ago = Regex::new(r"(?i)\b(\d+)\s+(day|days|week|weeks)\s+ago\b").ok()?;
333 out = re_ago
334 .replace_all(&out, |caps: ®ex::Captures<'_>| {
335 let n: i64 = caps[1].parse().unwrap_or(0);
336 let unit = caps[2].to_lowercase();
337 let days = if unit.starts_with("week") { n * 7 } else { n };
338 let target = anchor - chrono::Duration::days(days);
339 modified = true;
340 target.format("%Y-%m-%d").to_string()
341 })
342 .into_owned();
343
344 let re_in = Regex::new(r"(?i)\bin\s+(\d+)\s+(day|days|week|weeks)\b").ok()?;
345 out = re_in
346 .replace_all(&out, |caps: ®ex::Captures<'_>| {
347 let n: i64 = caps[1].parse().unwrap_or(0);
348 let unit = caps[2].to_lowercase();
349 let days = if unit.starts_with("week") { n * 7 } else { n };
350 let target = anchor + chrono::Duration::days(days);
351 modified = true;
352 target.format("%Y-%m-%d").to_string()
353 })
354 .into_owned();
355
356 if modified { Some(out) } else { None }
357}
358
359fn cosine(a: &[f32], b: &[f32]) -> f32 {
362 if a.len() != b.len() || a.is_empty() {
363 return 0.0;
364 }
365 let mut dot = 0.0f32;
366 let mut na = 0.0f32;
367 let mut nb = 0.0f32;
368 for i in 0..a.len() {
369 dot += a[i] * b[i];
370 na += a[i] * a[i];
371 nb += b[i] * b[i];
372 }
373 if na == 0.0 || nb == 0.0 {
374 0.0
375 } else {
376 dot / (na.sqrt() * nb.sqrt())
377 }
378}
379
380async fn consolidate_duplicates(
385 engine: &MnemoEngine,
386 records: &mut [MemoryRecord],
387) -> Result<HashSet<Uuid>> {
388 let mut consolidated: HashSet<Uuid> = HashSet::new();
389
390 for i in 0..records.len() {
391 if consolidated.contains(&records[i].id) {
392 continue;
393 }
394 for j in (i + 1)..records.len() {
395 if consolidated.contains(&records[j].id) {
396 continue;
397 }
398 let (Some(emb_i), Some(emb_j)) =
399 (records[i].embedding.as_ref(), records[j].embedding.as_ref())
400 else {
401 continue;
402 };
403 if cosine(emb_i, emb_j) < DEFAULT_DEDUP_THRESHOLD {
404 continue;
405 }
406
407 let (keeper_idx, victim_idx) = match records[i].created_at.cmp(&records[j].created_at) {
410 std::cmp::Ordering::Less => (j, i),
411 _ => (i, j),
412 };
413
414 let mut keeper = records[keeper_idx].clone();
416 let victim = records[victim_idx].clone();
417 for tag in &victim.tags {
418 if !keeper.tags.contains(tag) {
419 keeper.tags.push(tag.clone());
420 }
421 }
422 keeper.access_count = keeper.access_count.saturating_add(victim.access_count);
423 keeper.updated_at = chrono::Utc::now().to_rfc3339();
424 engine.storage.update_memory(&keeper).await?;
425
426 let mut v_updated = victim.clone();
427 v_updated.consolidation_state = ConsolidationState::Consolidated;
428 v_updated.updated_at = keeper.updated_at.clone();
429 engine.storage.update_memory(&v_updated).await?;
430
431 let rel = Relation {
432 id: Uuid::now_v7(),
433 source_id: keeper.id,
434 target_id: victim.id,
435 relation_type: "consolidated_from".to_string(),
436 weight: 1.0,
437 metadata: serde_json::json!({"reason": "semantic_dedup"}),
438 created_at: keeper.updated_at.clone(),
439 };
440 let _ = engine.storage.insert_relation(&rel).await;
441
442 consolidated.insert(victim.id);
443 records[keeper_idx] = keeper;
446 }
447 }
448
449 Ok(consolidated)
450}
451
452async fn emit_rewrite_event(
453 engine: &MnemoEngine,
454 agent_id: &str,
455 memory_id: Uuid,
456 reason: &str,
457 prev_hash: &[u8],
458 new_hash: &[u8],
459) {
460 let now = chrono::Utc::now().to_rfc3339();
461 let content_hash =
462 compute_content_hash(&format!("rewrite:{memory_id}:{reason}"), agent_id, &now);
463 let prev_event_hash = engine
464 .storage
465 .get_latest_event_hash(agent_id, None)
466 .await
467 .ok()
468 .flatten();
469 let event = AgentEvent {
470 id: Uuid::now_v7(),
471 agent_id: agent_id.to_string(),
472 thread_id: None,
473 run_id: None,
474 parent_event_id: None,
475 event_type: if reason == "auto_dream" {
476 EventType::MemoryRedact
477 } else {
478 EventType::MemoryWrite
479 },
480 payload: serde_json::json!({
481 "memory_id": memory_id.to_string(),
482 "reason": reason,
483 "prev_hash": hex_encode(prev_hash),
484 "new_hash": hex_encode(new_hash),
485 }),
486 trace_id: None,
487 span_id: None,
488 model: None,
489 tokens_input: None,
490 tokens_output: None,
491 latency_ms: None,
492 cost_usd: None,
493 timestamp: now,
494 logical_clock: 0,
495 content_hash: content_hash.clone(),
496 prev_hash: Some(compute_chain_hash(
497 &content_hash,
498 prev_event_hash.as_deref(),
499 )),
500 embedding: None,
501 };
502 let _ = engine.storage.insert_event(&event).await;
503}
504
505fn hex_encode(bytes: &[u8]) -> String {
506 let mut s = String::with_capacity(bytes.len() * 2);
507 for b in bytes {
508 s.push_str(&format!("{:02x}", b));
509 }
510 s
511}
512
513async fn last_reflection_at(
516 engine: &MnemoEngine,
517 agent_id: &str,
518) -> Result<Option<chrono::DateTime<chrono::Utc>>> {
519 let events = engine.storage.list_events(agent_id, 1000, 0).await?;
520 for event in events {
521 if event.event_type == EventType::ReflectionCompleted
522 && let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&event.timestamp)
523 {
524 return Ok(Some(ts.with_timezone(&chrono::Utc)));
525 }
526 }
527 Ok(None)
528}
529
530async fn coordinated_skip_reason(
533 engine: &MnemoEngine,
534 agent_id: &str,
535) -> Result<Option<SkipReason>> {
536 let last = last_reflection_at(engine, agent_id).await?;
537 let now = chrono::Utc::now();
538 if let Some(last_ts) = last
539 && (now - last_ts).num_hours() < MIN_HOURS_BETWEEN_COORDINATED_RUNS
540 {
541 return Ok(Some(SkipReason::TooSoon));
542 }
543
544 let since = last.map(|t| t.to_rfc3339());
547 let filter = MemoryFilter {
548 agent_id: Some(agent_id.to_string()),
549 include_deleted: false,
550 ..Default::default()
551 };
552 let records = engine
553 .storage
554 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
555 .await?;
556 let new_count = records
557 .iter()
558 .filter(|r| match since.as_deref() {
559 None => true,
560 Some(cutoff) => r.created_at.as_str() > cutoff,
561 })
562 .count();
563 if new_count < MIN_NEW_RECORDS_FOR_COORDINATED_RUN {
564 return Ok(Some(SkipReason::NotEnoughNewRecords));
565 }
566
567 Ok(None)
574}
575
576async fn emit_reflection_completed(
579 engine: &MnemoEngine,
580 agent_id: &str,
581 report: &ReflectionReport,
582) {
583 let now = chrono::Utc::now().to_rfc3339();
584 let payload = serde_json::json!({
585 "consolidated": report.consolidated,
586 "absolutized_dates": report.absolutized_dates,
587 "dreamed_accepted": report.dreamed_accepted,
588 "archived": report.archived,
589 "conflicts_resolved": report.conflicts_resolved,
590 "total_scanned": report.total_scanned,
591 });
592 let content_hash = compute_content_hash(&payload.to_string(), agent_id, &now);
593 let prev_event_hash = engine
594 .storage
595 .get_latest_event_hash(agent_id, None)
596 .await
597 .ok()
598 .flatten();
599 let event = AgentEvent {
600 id: Uuid::now_v7(),
601 agent_id: agent_id.to_string(),
602 thread_id: None,
603 run_id: None,
604 parent_event_id: None,
605 event_type: EventType::ReflectionCompleted,
606 payload,
607 trace_id: None,
608 span_id: None,
609 model: None,
610 tokens_input: None,
611 tokens_output: None,
612 latency_ms: None,
613 cost_usd: None,
614 timestamp: now,
615 logical_clock: 0,
616 content_hash: content_hash.clone(),
617 prev_hash: Some(compute_chain_hash(
618 &content_hash,
619 prev_event_hash.as_deref(),
620 )),
621 embedding: None,
622 };
623 let _ = engine.storage.insert_event(&event).await;
624}
625
626#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
631pub struct DreamReport {
632 pub consolidated: u32,
633 pub removed: u32,
634 pub reindexed: u32,
635}
636
637pub fn parse_organization_report(text: &str) -> Option<DreamReport> {
640 let lower = text.to_lowercase();
641 let marker = "## organization report";
642 let start = lower.find(marker)?;
643 let trailer = &text[start + marker.len()..];
644 let re_consolidated = regex::Regex::new(r"(?i)\bconsolidated\b\s*[:=]\s*(\d+)").ok()?;
645 let re_removed = regex::Regex::new(r"(?i)\bremoved\b\s*[:=]\s*(\d+)").ok()?;
646 let re_reindexed = regex::Regex::new(r"(?i)\bre[-_ ]?indexed\b\s*[:=]\s*(\d+)").ok()?;
647 let consolidated = re_consolidated
648 .captures(trailer)
649 .and_then(|c| c.get(1).and_then(|m| m.as_str().parse().ok()))
650 .unwrap_or(0);
651 let removed = re_removed
652 .captures(trailer)
653 .and_then(|c| c.get(1).and_then(|m| m.as_str().parse().ok()))
654 .unwrap_or(0);
655 let reindexed = re_reindexed
656 .captures(trailer)
657 .and_then(|c| c.get(1).and_then(|m| m.as_str().parse().ok()))
658 .unwrap_or(0);
659 Some(DreamReport {
660 consolidated,
661 removed,
662 reindexed,
663 })
664}
665
666async fn ingest_dream_reports(engine: &MnemoEngine, agent_id: &str) -> Result<usize> {
671 let filter = MemoryFilter {
672 agent_id: Some(agent_id.to_string()),
673 include_deleted: false,
674 ..Default::default()
675 };
676 let records = engine
677 .storage
678 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
679 .await?;
680 let mut ingested = 0usize;
681 for mut record in records {
682 if record
683 .metadata
684 .get("dream_report_ingested_at")
685 .and_then(|v| v.as_str())
686 .is_some()
687 {
688 continue;
689 }
690 let Some(report) = parse_organization_report(&record.content) else {
691 continue;
692 };
693 let now = chrono::Utc::now().to_rfc3339();
694 if let Some(obj) = record.metadata.as_object_mut() {
695 obj.insert(
696 "dream_report_ingested_at".to_string(),
697 serde_json::Value::String(now.clone()),
698 );
699 }
700 record.updated_at = now.clone();
701 if engine.storage.update_memory(&record).await.is_ok() {
702 ingested += 1;
703 let payload = serde_json::json!({
704 "memory_id": record.id.to_string(),
705 "consolidated": report.consolidated,
706 "removed": report.removed,
707 "reindexed": report.reindexed,
708 });
709 let content_hash = compute_content_hash(&payload.to_string(), agent_id, &now);
710 let prev_event_hash = engine
711 .storage
712 .get_latest_event_hash(agent_id, None)
713 .await
714 .ok()
715 .flatten();
716 let event = AgentEvent {
717 id: Uuid::now_v7(),
718 agent_id: agent_id.to_string(),
719 thread_id: None,
720 run_id: None,
721 parent_event_id: None,
722 event_type: EventType::DreamReportIngested,
723 payload,
724 trace_id: None,
725 span_id: None,
726 model: None,
727 tokens_input: None,
728 tokens_output: None,
729 latency_ms: None,
730 cost_usd: None,
731 timestamp: now,
732 logical_clock: 0,
733 content_hash: content_hash.clone(),
734 prev_hash: Some(compute_chain_hash(
735 &content_hash,
736 prev_event_hash.as_deref(),
737 )),
738 embedding: None,
739 };
740 let _ = engine.storage.insert_event(&event).await;
741 }
742 }
743 Ok(ingested)
744}
745
746