Skip to main content

mnemo_core/query/
reflection.rs

1//! Reflection pass — Auto-Dream-compatible semantic housekeeping.
2//!
3//! `run_reflection_pass` walks a single agent's memories and applies four
4//! normalising sweeps in order:
5//!
6//! 1. **Date absolutization** (Task 10) — rewrite relative temporal phrases
7//!    (`"yesterday"`, `"last week"`, `"N days ago"`, `"tomorrow"`) into
8//!    ISO-8601 dates anchored on each record's `created_at`.
9//! 2. **Semantic dedup** — any two memories whose embeddings have cosine
10//!    similarity ≥ 0.92 collapse into a single record that unions their
11//!    tags and sums their `access_count`. The older record is moved to the
12//!    `Consolidated` state and a `consolidated_from` relation is inserted.
13//! 3. **Low-importance conflict resolution** — run `detect_conflicts` and,
14//!    for any conflict where *both* sides have `importance < 0.3`, apply
15//!    `ResolutionStrategy::KeepNewest`.
16//! 4. **Stale archival** — mark records `Archived` when their
17//!    `effective_importance < 0.2`, their `access_count == 0`, and their
18//!    age exceeds the configured threshold (default 7 days).
19//!
20//! The pass is exposed as `MnemoEngine::run_reflection_pass` and is safe to
21//! schedule on a periodic tick (default cadence 24h, driven from the CLI).
22
23use std::collections::HashSet;
24
25use regex::Regex;
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::error::Result;
30use crate::hash::{compute_chain_hash, compute_content_hash};
31use crate::model::event::{AgentEvent, EventType};
32use crate::model::memory::{ConsolidationState, MemoryRecord};
33use crate::model::relation::Relation;
34use crate::query::MnemoEngine;
35use crate::query::conflict::ResolutionStrategy;
36use crate::query::lifecycle::effective_importance;
37use crate::storage::MemoryFilter;
38
39const DEFAULT_DEDUP_THRESHOLD: f32 = 0.92;
40const DEFAULT_LOW_IMPORTANCE_CUTOFF: f32 = 0.3;
41const DEFAULT_ARCHIVE_IMPORTANCE: f32 = 0.2;
42const DEFAULT_ARCHIVE_AGE_HOURS: f64 = 24.0 * 7.0;
43
44/// Controls when the reflection pass runs its expensive phases.
45///
46/// `Coordinated` is the new default in v0.3.1 and honours the same trigger
47/// heuristics Anthropic's Auto Dream uses:
48///   * skip if fewer than `MIN_NEW_RECORDS_FOR_COORDINATED_RUN` new records
49///     have accumulated since the last successful pass;
50///   * skip if fewer than `MIN_HOURS_BETWEEN_COORDINATED_RUNS` have elapsed
51///     since the last successful pass for this agent;
52///   * accept any Auto-Dream-flagged records (`metadata.dreamed_at`)
53///     unconditionally — Auto Dream already did the consolidation work.
54///
55/// `Always` is the pre-v0.3.1 behaviour: run every phase every time.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum ReflectionMode {
59    #[default]
60    Coordinated,
61    Always,
62}
63
64/// Minimum number of newly-written records (since `last_reflection_at`) that
65/// must exist before a `Coordinated` pass runs. Matches Auto Dream's floor.
66pub const MIN_NEW_RECORDS_FOR_COORDINATED_RUN: usize = 5;
67
68/// Minimum interval (in hours) between `Coordinated` passes for one agent.
69/// Matches Auto Dream's 24h cadence.
70pub const MIN_HOURS_BETWEEN_COORDINATED_RUNS: i64 = 24;
71
72/// Why a `Coordinated` pass decided to skip, when it did.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum SkipReason {
76    TooSoon,
77    NotEnoughNewRecords,
78    AutoDreamAlreadyRan,
79}
80
81/// Result of a single reflection pass.
82#[non_exhaustive]
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct ReflectionReport {
85    /// Number of duplicate pairs consolidated into a surviving record.
86    pub consolidated: usize,
87    /// Number of records whose content had at least one relative date
88    /// rewritten to ISO-8601.
89    pub absolutized_dates: usize,
90    /// Number of externally-rewritten records (e.g. Auto Dream) accepted
91    /// and re-embedded during this pass.
92    pub dreamed_accepted: usize,
93    /// Number of records moved to `Archived` state.
94    pub archived: usize,
95    /// Number of conflict pairs auto-resolved.
96    pub conflicts_resolved: usize,
97    /// Total records scanned.
98    pub total_scanned: usize,
99    /// Populated when `Coordinated` skipped the run; `None` means the pass
100    /// actually executed.
101    pub skipped: Option<SkipReason>,
102    /// Merged/removed/re-indexed counts pulled from an Auto Dream
103    /// organization-report trailer, when present. Emitted as a single
104    /// `DreamReportIngested` event per agent; subsequent passes see the
105    /// idempotent `dream_report_ingested_at` marker in metadata and skip
106    /// re-parsing.
107    pub dream_report_ingested: usize,
108}
109
110/// Run a reflection pass honouring `mode`.
111///
112/// `ReflectionMode::Coordinated` (the default) checks three gates before
113/// running the expensive phases; see [`ReflectionMode`] for details.
114/// When a gate trips, the returned report has `skipped = Some(reason)`
115/// and no state is mutated. Pass `force=true` to override cadence gates.
116pub async fn run_reflection_pass_with_mode(
117    engine: &MnemoEngine,
118    agent_id: &str,
119    mode: ReflectionMode,
120    force: bool,
121) -> Result<ReflectionReport> {
122    if mode == ReflectionMode::Coordinated
123        && !force
124        && let Some(reason) = coordinated_skip_reason(engine, agent_id).await?
125    {
126        return Ok(ReflectionReport {
127            skipped: Some(reason),
128            ..Default::default()
129        });
130    }
131    let mut report = run_reflection_pass_inner(engine, agent_id).await?;
132    emit_reflection_completed(engine, agent_id, &report).await;
133    // Ingest any Auto Dream organization-report trailer now that the
134    // record list reflects this pass. Idempotent via metadata.
135    report.dream_report_ingested = ingest_dream_reports(engine, agent_id).await?;
136    Ok(report)
137}
138
139/// Back-compat entry point — runs the reflection pass unconditionally
140/// (equivalent to `run_reflection_pass_with_mode(_, _, Always, true)`).
141pub async fn run_reflection_pass(engine: &MnemoEngine, agent_id: &str) -> Result<ReflectionReport> {
142    run_reflection_pass_with_mode(engine, agent_id, ReflectionMode::Always, true).await
143}
144
145/// The original pass body, now private. `run_reflection_pass_with_mode`
146/// wraps this with the Coordinated gates.
147async fn run_reflection_pass_inner(
148    engine: &MnemoEngine,
149    agent_id: &str,
150) -> Result<ReflectionReport> {
151    let filter = MemoryFilter {
152        agent_id: Some(agent_id.to_string()),
153        include_deleted: false,
154        ..Default::default()
155    };
156    let records = engine
157        .storage
158        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
159        .await?;
160
161    let total_scanned = records.len();
162    let mut report = ReflectionReport {
163        total_scanned,
164        ..Default::default()
165    };
166
167    // -- 1. Date absolutization ---------------------------------------------
168    let mut after_absolutization: Vec<MemoryRecord> = Vec::with_capacity(records.len());
169    for mut record in records {
170        let rewritten = absolutize_dates(&record.content, &record.created_at);
171        if let Some(new_content) = rewritten {
172            let prev_hash = record.content_hash.clone();
173            record.content = new_content;
174            record.updated_at = chrono::Utc::now().to_rfc3339();
175            record.content_hash =
176                compute_content_hash(&record.content, &record.agent_id, &record.updated_at);
177            // Re-embed on content change. Embedding failure is non-fatal —
178            // the cached embedding still beats a skipped reflection.
179            if let Ok(emb) = engine.embedding.embed(&record.content).await {
180                record.embedding = Some(emb.clone());
181                let _ = engine.index.add(record.id, &emb);
182            }
183            engine.storage.update_memory(&record).await?;
184            emit_rewrite_event(
185                engine,
186                agent_id,
187                record.id,
188                "date_absolutization",
189                &prev_hash,
190                &record.content_hash,
191            )
192            .await;
193            report.absolutized_dates += 1;
194        }
195        after_absolutization.push(record);
196    }
197
198    // -- 2. Auto-Dream accept ----------------------------------------------
199    // An external rewrite is signalled by `metadata.dreamed_at`: the Claude
200    // Agent SDK bridge writes this when it detects an Opus-driven edit on a
201    // memory file. We re-embed and emit a MemoryDreamed audit event but do
202    // NOT rewrite the content — the bridge already did that.
203    for record in &mut after_absolutization {
204        if record
205            .metadata
206            .get("dreamed_at")
207            .and_then(|v| v.as_str())
208            .is_some()
209            && !record
210                .metadata
211                .get("dreamed_processed")
212                .and_then(|v| v.as_bool())
213                .unwrap_or(false)
214        {
215            let prev_hash = record.content_hash.clone();
216            record.content_hash =
217                compute_content_hash(&record.content, &record.agent_id, &record.updated_at);
218            if let Ok(emb) = engine.embedding.embed(&record.content).await {
219                record.embedding = Some(emb.clone());
220                let _ = engine.index.add(record.id, &emb);
221            }
222            if let Some(obj) = record.metadata.as_object_mut() {
223                obj.insert(
224                    "dreamed_processed".to_string(),
225                    serde_json::Value::Bool(true),
226                );
227            }
228            engine.storage.update_memory(record).await?;
229            emit_rewrite_event(
230                engine,
231                agent_id,
232                record.id,
233                "auto_dream",
234                &prev_hash,
235                &record.content_hash,
236            )
237            .await;
238            report.dreamed_accepted += 1;
239        }
240    }
241
242    // -- 3. Semantic dedup --------------------------------------------------
243    let consolidated_ids = consolidate_duplicates(engine, &mut after_absolutization).await?;
244    report.consolidated = consolidated_ids.len();
245
246    // -- 4. Low-importance conflict resolution ------------------------------
247    let conflicts = engine
248        .detect_conflicts(Some(agent_id.to_string()), DEFAULT_DEDUP_THRESHOLD)
249        .await?;
250    for pair in &conflicts.conflicts {
251        let (a, b) = match (
252            after_absolutization.iter().find(|r| r.id == pair.memory_a),
253            after_absolutization.iter().find(|r| r.id == pair.memory_b),
254        ) {
255            (Some(a), Some(b)) => (a, b),
256            _ => continue,
257        };
258        if a.importance < DEFAULT_LOW_IMPORTANCE_CUTOFF
259            && b.importance < DEFAULT_LOW_IMPORTANCE_CUTOFF
260            && engine
261                .resolve_conflict(pair, ResolutionStrategy::KeepNewest)
262                .await
263                .is_ok()
264        {
265            report.conflicts_resolved += 1;
266        }
267    }
268    let _ = &conflicts; // keep the borrow alive for the closure above
269
270    // -- 5. Stale archival --------------------------------------------------
271    let now = chrono::Utc::now();
272    for record in after_absolutization {
273        if consolidated_ids.contains(&record.id) {
274            continue;
275        }
276        if record.consolidation_state == ConsolidationState::Archived {
277            continue;
278        }
279        if record.access_count > 0 {
280            continue;
281        }
282        if effective_importance(&record) >= DEFAULT_ARCHIVE_IMPORTANCE {
283            continue;
284        }
285        let Ok(created) = chrono::DateTime::parse_from_rfc3339(&record.created_at) else {
286            continue;
287        };
288        let age_hours = (now - created.with_timezone(&chrono::Utc)).num_seconds() as f64 / 3600.0;
289        if age_hours < DEFAULT_ARCHIVE_AGE_HOURS {
290            continue;
291        }
292        let mut updated = record.clone();
293        updated.consolidation_state = ConsolidationState::Archived;
294        updated.updated_at = now.to_rfc3339();
295        if engine.storage.update_memory(&updated).await.is_ok() {
296            report.archived += 1;
297        }
298    }
299
300    Ok(report)
301}
302
303/// Absolutize relative temporal expressions in `content`. Returns `Some` when
304/// the content was modified.
305pub fn absolutize_dates(content: &str, created_at_rfc3339: &str) -> Option<String> {
306    let anchor = chrono::DateTime::parse_from_rfc3339(created_at_rfc3339)
307        .ok()?
308        .with_timezone(&chrono::Utc);
309    let mut out = content.to_string();
310    let mut modified = false;
311
312    // Whole-word replacements.
313    let simple: &[(&str, i64)] = &[
314        ("yesterday", -1),
315        ("today", 0),
316        ("tomorrow", 1),
317        ("last week", -7),
318        ("next week", 7),
319    ];
320    for (needle, days) in simple {
321        let re = Regex::new(&format!(r"(?i)\b{}\b", regex::escape(needle))).ok()?;
322        if re.is_match(&out) {
323            let target = anchor + chrono::Duration::days(*days);
324            out = re
325                .replace_all(&out, target.format("%Y-%m-%d").to_string())
326                .into_owned();
327            modified = true;
328        }
329    }
330
331    // "<N> days/weeks ago" and "in <N> days/weeks"
332    let re_ago = Regex::new(r"(?i)\b(\d+)\s+(day|days|week|weeks)\s+ago\b").ok()?;
333    out = re_ago
334        .replace_all(&out, |caps: &regex::Captures<'_>| {
335            let n: i64 = caps[1].parse().unwrap_or(0);
336            let unit = caps[2].to_lowercase();
337            let days = if unit.starts_with("week") { n * 7 } else { n };
338            let target = anchor - chrono::Duration::days(days);
339            modified = true;
340            target.format("%Y-%m-%d").to_string()
341        })
342        .into_owned();
343
344    let re_in = Regex::new(r"(?i)\bin\s+(\d+)\s+(day|days|week|weeks)\b").ok()?;
345    out = re_in
346        .replace_all(&out, |caps: &regex::Captures<'_>| {
347            let n: i64 = caps[1].parse().unwrap_or(0);
348            let unit = caps[2].to_lowercase();
349            let days = if unit.starts_with("week") { n * 7 } else { n };
350            let target = anchor + chrono::Duration::days(days);
351            modified = true;
352            target.format("%Y-%m-%d").to_string()
353        })
354        .into_owned();
355
356    if modified { Some(out) } else { None }
357}
358
359/// Cosine similarity between two float vectors. Returns 0.0 when either is
360/// empty or the norm is zero.
361fn cosine(a: &[f32], b: &[f32]) -> f32 {
362    if a.len() != b.len() || a.is_empty() {
363        return 0.0;
364    }
365    let mut dot = 0.0f32;
366    let mut na = 0.0f32;
367    let mut nb = 0.0f32;
368    for i in 0..a.len() {
369        dot += a[i] * b[i];
370        na += a[i] * a[i];
371        nb += b[i] * b[i];
372    }
373    if na == 0.0 || nb == 0.0 {
374        0.0
375    } else {
376        dot / (na.sqrt() * nb.sqrt())
377    }
378}
379
380/// Consolidate near-duplicate memories. The newer record wins; the older
381/// one is marked `Consolidated` and a `consolidated_from` relation linking
382/// them is stored for audit. Returns the ids that were collapsed (older
383/// sides).
384async fn consolidate_duplicates(
385    engine: &MnemoEngine,
386    records: &mut [MemoryRecord],
387) -> Result<HashSet<Uuid>> {
388    let mut consolidated: HashSet<Uuid> = HashSet::new();
389
390    for i in 0..records.len() {
391        if consolidated.contains(&records[i].id) {
392            continue;
393        }
394        for j in (i + 1)..records.len() {
395            if consolidated.contains(&records[j].id) {
396                continue;
397            }
398            let (Some(emb_i), Some(emb_j)) =
399                (records[i].embedding.as_ref(), records[j].embedding.as_ref())
400            else {
401                continue;
402            };
403            if cosine(emb_i, emb_j) < DEFAULT_DEDUP_THRESHOLD {
404                continue;
405            }
406
407            // Newer side wins. Ties break toward `records[i]` so the scan is
408            // deterministic.
409            let (keeper_idx, victim_idx) = match records[i].created_at.cmp(&records[j].created_at) {
410                std::cmp::Ordering::Less => (j, i),
411                _ => (i, j),
412            };
413
414            // Union of tags; sum of access_count.
415            let mut keeper = records[keeper_idx].clone();
416            let victim = records[victim_idx].clone();
417            for tag in &victim.tags {
418                if !keeper.tags.contains(tag) {
419                    keeper.tags.push(tag.clone());
420                }
421            }
422            keeper.access_count = keeper.access_count.saturating_add(victim.access_count);
423            keeper.updated_at = chrono::Utc::now().to_rfc3339();
424            engine.storage.update_memory(&keeper).await?;
425
426            let mut v_updated = victim.clone();
427            v_updated.consolidation_state = ConsolidationState::Consolidated;
428            v_updated.updated_at = keeper.updated_at.clone();
429            engine.storage.update_memory(&v_updated).await?;
430
431            let rel = Relation {
432                id: Uuid::now_v7(),
433                source_id: keeper.id,
434                target_id: victim.id,
435                relation_type: "consolidated_from".to_string(),
436                weight: 1.0,
437                metadata: serde_json::json!({"reason": "semantic_dedup"}),
438                created_at: keeper.updated_at.clone(),
439            };
440            let _ = engine.storage.insert_relation(&rel).await;
441
442            consolidated.insert(victim.id);
443            // Replace the slice entry so subsequent iterations see the
444            // merged state.
445            records[keeper_idx] = keeper;
446        }
447    }
448
449    Ok(consolidated)
450}
451
452async fn emit_rewrite_event(
453    engine: &MnemoEngine,
454    agent_id: &str,
455    memory_id: Uuid,
456    reason: &str,
457    prev_hash: &[u8],
458    new_hash: &[u8],
459) {
460    let now = chrono::Utc::now().to_rfc3339();
461    let content_hash =
462        compute_content_hash(&format!("rewrite:{memory_id}:{reason}"), agent_id, &now);
463    let prev_event_hash = engine
464        .storage
465        .get_latest_event_hash(agent_id, None)
466        .await
467        .ok()
468        .flatten();
469    let event = AgentEvent {
470        id: Uuid::now_v7(),
471        agent_id: agent_id.to_string(),
472        thread_id: None,
473        run_id: None,
474        parent_event_id: None,
475        event_type: if reason == "auto_dream" {
476            EventType::MemoryRedact
477        } else {
478            EventType::MemoryWrite
479        },
480        payload: serde_json::json!({
481            "memory_id": memory_id.to_string(),
482            "reason": reason,
483            "prev_hash": hex_encode(prev_hash),
484            "new_hash": hex_encode(new_hash),
485        }),
486        trace_id: None,
487        span_id: None,
488        model: None,
489        tokens_input: None,
490        tokens_output: None,
491        latency_ms: None,
492        cost_usd: None,
493        timestamp: now,
494        logical_clock: 0,
495        content_hash: content_hash.clone(),
496        prev_hash: Some(compute_chain_hash(
497            &content_hash,
498            prev_event_hash.as_deref(),
499        )),
500        embedding: None,
501    };
502    let _ = engine.storage.insert_event(&event).await;
503}
504
505fn hex_encode(bytes: &[u8]) -> String {
506    let mut s = String::with_capacity(bytes.len() * 2);
507    for b in bytes {
508        s.push_str(&format!("{:02x}", b));
509    }
510    s
511}
512
513/// Read the last `ReflectionCompleted` event for `agent_id` and return its
514/// timestamp. When no such event exists, returns `None`.
515async fn last_reflection_at(
516    engine: &MnemoEngine,
517    agent_id: &str,
518) -> Result<Option<chrono::DateTime<chrono::Utc>>> {
519    let events = engine.storage.list_events(agent_id, 1000, 0).await?;
520    for event in events {
521        if event.event_type == EventType::ReflectionCompleted
522            && let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&event.timestamp)
523        {
524            return Ok(Some(ts.with_timezone(&chrono::Utc)));
525        }
526    }
527    Ok(None)
528}
529
530/// Gate the Coordinated path on cadence + new-record floor. Returns
531/// `Some(reason)` when the pass should skip.
532async fn coordinated_skip_reason(
533    engine: &MnemoEngine,
534    agent_id: &str,
535) -> Result<Option<SkipReason>> {
536    let last = last_reflection_at(engine, agent_id).await?;
537    let now = chrono::Utc::now();
538    if let Some(last_ts) = last
539        && (now - last_ts).num_hours() < MIN_HOURS_BETWEEN_COORDINATED_RUNS
540    {
541        return Ok(Some(SkipReason::TooSoon));
542    }
543
544    // Count records created since the last reflection (or all records if
545    // this is the first pass).
546    let since = last.map(|t| t.to_rfc3339());
547    let filter = MemoryFilter {
548        agent_id: Some(agent_id.to_string()),
549        include_deleted: false,
550        ..Default::default()
551    };
552    let records = engine
553        .storage
554        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
555        .await?;
556    let new_count = records
557        .iter()
558        .filter(|r| match since.as_deref() {
559            None => true,
560            Some(cutoff) => r.created_at.as_str() > cutoff,
561        })
562        .count();
563    if new_count < MIN_NEW_RECORDS_FOR_COORDINATED_RUN {
564        return Ok(Some(SkipReason::NotEnoughNewRecords));
565    }
566
567    // If Auto Dream already rewrote records we'd otherwise consolidate
568    // this cycle, skip our own consolidation to avoid double-work. The
569    // signal is any record whose metadata.dreamed_at is newer than
570    // `last_reflection_at` AND hasn't been dreamed-processed yet. We
571    // still want the pass to RUN (to re-embed + ingest the report), so
572    // this branch doesn't actually skip — return None.
573    Ok(None)
574}
575
576/// Emit a hash-linked `ReflectionCompleted` event so the next Coordinated
577/// run can read its timestamp.
578async fn emit_reflection_completed(
579    engine: &MnemoEngine,
580    agent_id: &str,
581    report: &ReflectionReport,
582) {
583    let now = chrono::Utc::now().to_rfc3339();
584    let payload = serde_json::json!({
585        "consolidated": report.consolidated,
586        "absolutized_dates": report.absolutized_dates,
587        "dreamed_accepted": report.dreamed_accepted,
588        "archived": report.archived,
589        "conflicts_resolved": report.conflicts_resolved,
590        "total_scanned": report.total_scanned,
591    });
592    let content_hash = compute_content_hash(&payload.to_string(), agent_id, &now);
593    let prev_event_hash = engine
594        .storage
595        .get_latest_event_hash(agent_id, None)
596        .await
597        .ok()
598        .flatten();
599    let event = AgentEvent {
600        id: Uuid::now_v7(),
601        agent_id: agent_id.to_string(),
602        thread_id: None,
603        run_id: None,
604        parent_event_id: None,
605        event_type: EventType::ReflectionCompleted,
606        payload,
607        trace_id: None,
608        span_id: None,
609        model: None,
610        tokens_input: None,
611        tokens_output: None,
612        latency_ms: None,
613        cost_usd: None,
614        timestamp: now,
615        logical_clock: 0,
616        content_hash: content_hash.clone(),
617        prev_hash: Some(compute_chain_hash(
618            &content_hash,
619            prev_event_hash.as_deref(),
620        )),
621        embedding: None,
622    };
623    let _ = engine.storage.insert_event(&event).await;
624}
625
626/// Auto Dream organization-report trailer. Parser is permissive — matches
627/// whichever of these three keys appears: `consolidated`, `removed`,
628/// `reindexed` (case-insensitive, values are base-10 integers after a
629/// colon or equals sign). Missing keys default to zero.
630#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
631pub struct DreamReport {
632    pub consolidated: u32,
633    pub removed: u32,
634    pub reindexed: u32,
635}
636
637/// Parse an Auto Dream organization-report trailer from free text. Returns
638/// `None` when no `## Organization Report` header is present.
639pub fn parse_organization_report(text: &str) -> Option<DreamReport> {
640    let lower = text.to_lowercase();
641    let marker = "## organization report";
642    let start = lower.find(marker)?;
643    let trailer = &text[start + marker.len()..];
644    let re_consolidated = regex::Regex::new(r"(?i)\bconsolidated\b\s*[:=]\s*(\d+)").ok()?;
645    let re_removed = regex::Regex::new(r"(?i)\bremoved\b\s*[:=]\s*(\d+)").ok()?;
646    let re_reindexed = regex::Regex::new(r"(?i)\bre[-_ ]?indexed\b\s*[:=]\s*(\d+)").ok()?;
647    let consolidated = re_consolidated
648        .captures(trailer)
649        .and_then(|c| c.get(1).and_then(|m| m.as_str().parse().ok()))
650        .unwrap_or(0);
651    let removed = re_removed
652        .captures(trailer)
653        .and_then(|c| c.get(1).and_then(|m| m.as_str().parse().ok()))
654        .unwrap_or(0);
655    let reindexed = re_reindexed
656        .captures(trailer)
657        .and_then(|c| c.get(1).and_then(|m| m.as_str().parse().ok()))
658        .unwrap_or(0);
659    Some(DreamReport {
660        consolidated,
661        removed,
662        reindexed,
663    })
664}
665
666/// Walk records whose content carries an Auto Dream organization-report
667/// trailer; extract the counts, emit `DreamReportIngested`, and mark the
668/// record so the next pass skips it. Returns the number of records
669/// processed this pass.
670async fn ingest_dream_reports(engine: &MnemoEngine, agent_id: &str) -> Result<usize> {
671    let filter = MemoryFilter {
672        agent_id: Some(agent_id.to_string()),
673        include_deleted: false,
674        ..Default::default()
675    };
676    let records = engine
677        .storage
678        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
679        .await?;
680    let mut ingested = 0usize;
681    for mut record in records {
682        if record
683            .metadata
684            .get("dream_report_ingested_at")
685            .and_then(|v| v.as_str())
686            .is_some()
687        {
688            continue;
689        }
690        let Some(report) = parse_organization_report(&record.content) else {
691            continue;
692        };
693        let now = chrono::Utc::now().to_rfc3339();
694        if let Some(obj) = record.metadata.as_object_mut() {
695            obj.insert(
696                "dream_report_ingested_at".to_string(),
697                serde_json::Value::String(now.clone()),
698            );
699        }
700        record.updated_at = now.clone();
701        if engine.storage.update_memory(&record).await.is_ok() {
702            ingested += 1;
703            let payload = serde_json::json!({
704                "memory_id": record.id.to_string(),
705                "consolidated": report.consolidated,
706                "removed": report.removed,
707                "reindexed": report.reindexed,
708            });
709            let content_hash = compute_content_hash(&payload.to_string(), agent_id, &now);
710            let prev_event_hash = engine
711                .storage
712                .get_latest_event_hash(agent_id, None)
713                .await
714                .ok()
715                .flatten();
716            let event = AgentEvent {
717                id: Uuid::now_v7(),
718                agent_id: agent_id.to_string(),
719                thread_id: None,
720                run_id: None,
721                parent_event_id: None,
722                event_type: EventType::DreamReportIngested,
723                payload,
724                trace_id: None,
725                span_id: None,
726                model: None,
727                tokens_input: None,
728                tokens_output: None,
729                latency_ms: None,
730                cost_usd: None,
731                timestamp: now,
732                logical_clock: 0,
733                content_hash: content_hash.clone(),
734                prev_hash: Some(compute_chain_hash(
735                    &content_hash,
736                    prev_event_hash.as_deref(),
737                )),
738                embedding: None,
739            };
740            let _ = engine.storage.insert_event(&event).await;
741        }
742    }
743    Ok(ingested)
744}
745
746// Unit tests live alongside the engine integration suite so the reflection
747// pass can exercise the whole remember → list → reflect round-trip.