Skip to main content

oxios_memory/memory/
dream.rs

1#![allow(missing_docs)]
2//! Dream process — 4-phase background memory consolidation.
3//!
4//! Phase 1: Orient — scan current state, build map
5//! Phase 2: Gather Signal — find patterns, auto-protect, auto-classify
6//! Phase 3: Consolidate — compress, dedupe, resolve conflicts
7//! Phase 4: Prune & Index — update ROOT, remove stale entries
8//!
9//! Supports checkpointing for crash recovery.
10
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tokio::fs;
19use uuid::Uuid;
20
21use crate::memory::auto_classify::AutoClassifier;
22use crate::memory::auto_protect::AutoProtector;
23use crate::memory::compaction::CompactionTree;
24use crate::memory::decay::DecayEngine;
25use crate::memory::manager::MemoryManager;
26use crate::memory::root_index::{RootEntry, RootIndex, TopicEntry};
27use crate::memory::types::{MemoryEntry, MemoryTier, MemoryType, ProtectionLevel};
28
29// ---------------------------------------------------------------------------
30// DreamCheckpoint
31// ---------------------------------------------------------------------------
32
33/// Dream execution state (checkpoint for crash recovery).
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct DreamCheckpoint {
36    /// Unique dream ID.
37    pub dream_id: String,
38    /// When the dream started.
39    pub started_at: DateTime<Utc>,
40    /// Last completed phase (0 = not started).
41    pub completed_phase: u8,
42    /// Cached signals from Phase 2.
43    pub cached_signals: Option<Vec<MemorySignal>>,
44    /// Cached plan from Phase 3.
45    pub cached_plan: Option<ConsolidationPlan>,
46    /// Cached Phase 1 (Orient) state, so resume can skip the full re-scan.
47    pub cached_state: Option<DreamState>,
48}
49
50impl DreamCheckpoint {
51    /// Path for the checkpoint file within a space's memory directory.
52    pub fn path(space_dir: &Path) -> PathBuf {
53        space_dir.join("memory/.dream_checkpoint.json")
54    }
55
56    /// Path for the dream lock file.
57    pub fn lock_path(space_dir: &Path) -> PathBuf {
58        space_dir.join("memory/.dream.lock")
59    }
60
61    /// Check if a checkpoint is stale (older than 1 hour).
62    pub fn is_stale(&self) -> bool {
63        let age = Utc::now() - self.started_at;
64        age.num_hours() >= 1
65    }
66}
67
68// ---------------------------------------------------------------------------
69// DreamReport
70// ---------------------------------------------------------------------------
71
72/// Report from a dream (consolidation) run.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct DreamReport {
75    /// Unique dream ID.
76    pub dream_id: String,
77    /// When the dream started.
78    pub started_at: DateTime<Utc>,
79    /// When the dream completed.
80    pub completed_at: DateTime<Utc>,
81    /// Whether this was resumed from a checkpoint.
82    pub resumed_from_checkpoint: bool,
83    /// Entry count before dream.
84    pub entries_before: usize,
85    /// Entry count after dream.
86    pub entries_after: usize,
87    /// Number of entries compacted.
88    pub compacted: usize,
89    /// Number of entries tier-promoted (Cold→Warm, Warm→Hot).
90    pub promoted: usize,
91    /// Number of entries tier-demoted (Hot→Warm, Warm→Cold).
92    pub demoted: usize,
93    /// Number of protection level promotions.
94    pub protection_promoted: usize,
95    /// Number of protection level demotions.
96    pub protection_demoted: usize,
97    /// Number of entries deleted.
98    pub deleted: usize,
99    /// Number of contradictions resolved.
100    pub contradictions_resolved: usize,
101    /// Number of duplicates merged.
102    pub duplicates_merged: usize,
103    /// Number of auto-protected entries.
104    pub auto_protected: usize,
105    /// Number of auto-classified entries.
106    pub auto_classified: usize,
107    /// Number of type promotions (e.g., Fact → Skill).
108    pub type_promotions: usize,
109    /// Whether ROOT index was updated.
110    pub root_updated: bool,
111    /// Whether LLM was used for compaction.
112    pub used_llm: bool,
113    /// Number of PageRank importance updates (Phase 2).
114    pub pagerank_updates: usize,
115    /// Number of learning patterns persisted (Phase 4).
116    pub patterns_persisted: usize,
117    /// Whether hyperbolic embeddings were rebuilt (Phase 5).
118    pub hyperbolic_rebuilt: bool,
119    /// Number of memories re-ranked by Flash Attention (Phase 6).
120    pub flash_reranked: usize,
121    /// Duration in milliseconds.
122    pub duration_ms: u64,
123    /// Error if Dream failed (None = success).
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub error: Option<String>,
126}
127
128impl DreamReport {
129    /// Path for saving dream reports.
130    pub fn report_path(space_dir: &Path, dream_id: &str) -> PathBuf {
131        space_dir
132            .join("memory/dream_reports")
133            .join(format!("{dream_id}.json"))
134    }
135}
136
137// ---------------------------------------------------------------------------
138// Internal types
139// ---------------------------------------------------------------------------
140
141/// A signal detected during Dream Phase 2.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub enum MemorySignal {
144    /// A protection level change.
145    ProtectionChanged(ProtectionChange),
146    /// Auto-classify an entry.
147    AutoClassify { id: String, new_type: MemoryType },
148    /// Type promotion (e.g., Fact → Skill).
149    TypePromotion(TypePromotion),
150    /// Tier promotion candidate.
151    PromotionCandidate(TierChange),
152    /// Decay/deletion candidate.
153    DecayCandidate(DecayCandidate),
154    /// Duplicate detected.
155    Duplicate {
156        id_a: String,
157        id_b: String,
158        similarity: f64,
159    },
160    /// Contradiction detected.
161    Contradiction { newer_id: String, older_id: String },
162    /// PageRank-based importance boost (Phase 2).
163    PageRankBoost {
164        rowid: u64,
165        old_importance: f32,
166        new_importance: f32,
167        pagerank_score: f64,
168    },
169}
170
171/// A protection level change.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ProtectionChange {
174    pub id: String,
175    pub from: ProtectionLevel,
176    pub to: ProtectionLevel,
177    pub reason: String,
178}
179
180/// A type promotion suggestion.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct TypePromotion {
183    pub id: String,
184    pub current_type: MemoryType,
185    pub suggested_type: MemoryType,
186    pub repetitions: u32,
187}
188
189/// A tier change suggestion.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TierChange {
192    pub id: String,
193    pub from_tier: MemoryTier,
194    pub to_tier: MemoryTier,
195    pub reason: String,
196}
197
198/// A decay/deletion candidate.
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct DecayCandidate {
201    pub id: String,
202    pub decay_score: f32,
203    pub protection: ProtectionLevel,
204    pub memory_type: MemoryType,
205}
206
207/// Consolidation plan from Phase 3.
208#[derive(Debug, Clone, Default, Serialize, Deserialize)]
209pub struct ConsolidationPlan {
210    /// Protection level updates.
211    pub protection_updates: Vec<ProtectionChange>,
212    /// Type reclassification.
213    pub reclassify: Vec<ReclassifyPlan>,
214    /// Tier promotions.
215    pub promote: Vec<TierChange>,
216    /// Tier demotions.
217    pub demote: Vec<TierChange>,
218    /// Entries to delete.
219    pub delete: Vec<String>,
220    /// Entries to merge.
221    pub merge: Vec<MergePlan>,
222    /// PageRank-based importance updates (Phase 2).
223    pub pagerank_updates: Vec<PageRankUpdate>,
224}
225
226/// A PageRank-based importance update.
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PageRankUpdate {
229    /// Memory row ID.
230    pub rowid: u64,
231    /// Previous importance.
232    pub old_importance: f32,
233    /// New importance after PageRank boost.
234    pub new_importance: f32,
235    /// PageRank score (0.0–1.0).
236    pub pagerank_score: f64,
237}
238
239impl ConsolidationPlan {
240    /// Total number of changes in this plan.
241    pub fn total_changes(&self) -> usize {
242        self.protection_updates.len()
243            + self.reclassify.len()
244            + self.promote.len()
245            + self.demote.len()
246            + self.delete.len()
247            + self.merge.len()
248            + self.pagerank_updates.len()
249    }
250}
251
252/// A type reclassification plan.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct ReclassifyPlan {
255    pub id: String,
256    pub new_type: MemoryType,
257}
258
259/// A merge plan for duplicate entries.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct MergePlan {
262    pub keep_id: String,
263    pub remove_id: String,
264    pub merged_content: String,
265}
266
267// ---------------------------------------------------------------------------
268// DreamState (Phase 1 output)
269// ---------------------------------------------------------------------------
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct DreamState {
272    pub total_entries: usize,
273    pub hot_count: usize,
274    pub warm_count: usize,
275    pub cold_count: usize,
276    pub root_version: u64,
277    pub type_distribution: Vec<(MemoryType, usize)>,
278    pub protection_distribution: Vec<(ProtectionLevel, usize)>,
279    pub avg_decay: f32,
280}
281
282// ---------------------------------------------------------------------------
283// DreamConfig
284// ---------------------------------------------------------------------------
285
286/// Configuration extracted for Dream use.
287#[derive(Debug, Clone)]
288pub struct DreamConfig {
289    pub dream_enabled: bool,
290    pub dream_interval_hours: u64,
291    pub dream_min_sessions: u32,
292    pub hot_max_entries: usize,
293    pub warm_max_entries: usize,
294    pub cold_max_entries: usize,
295    pub hot_token_budget: usize,
296    pub decay_threshold: f32,
297    pub retention_days: u32,
298    pub decay_multiplier: f32,
299    pub auto_protection: bool,
300    pub protection_low_access: u32,
301    pub protection_medium_access: u32,
302    pub protection_high_access: u32,
303    pub protection_medium_sessions: u32,
304    pub protection_high_sessions: u32,
305    pub protection_demotion_enabled: bool,
306    pub protection_demotion_stale_days: u32,
307    pub auto_classification: bool,
308    pub type_promotion_repetitions: u32,
309    pub compaction_line_threshold: usize,
310    pub proactive_recall_limit: usize,
311    pub proactive_recall_threshold: f32,
312    /// Enable PageRank-based importance boost (Phase 2).
313    pub pagerank_enabled: bool,
314    /// PageRank damping factor (typically 0.85).
315    pub pagerank_damping: f64,
316    /// PageRank iteration count (typically 20-50).
317    pub pagerank_iterations: usize,
318    /// How much PageRank score influences importance (0.0–1.0).
319    pub pagerank_boost_factor: f32,
320}
321
322impl Default for DreamConfig {
323    fn default() -> Self {
324        Self {
325            dream_enabled: true,
326            dream_interval_hours: 24,
327            dream_min_sessions: 5,
328            hot_max_entries: 100,
329            warm_max_entries: 1000,
330            cold_max_entries: 10000,
331            hot_token_budget: 2000,
332            decay_threshold: 0.1,
333            retention_days: 90,
334            decay_multiplier: 0.95,
335            auto_protection: true,
336            protection_low_access: 3,
337            protection_medium_access: 10,
338            protection_high_access: 50,
339            protection_medium_sessions: 3,
340            protection_high_sessions: 10,
341            protection_demotion_enabled: true,
342            protection_demotion_stale_days: 30,
343            auto_classification: true,
344            type_promotion_repetitions: 3,
345            compaction_line_threshold: 10,
346            proactive_recall_limit: 5,
347            proactive_recall_threshold: 0.6,
348            pagerank_enabled: true,
349            pagerank_damping: 0.85,
350            pagerank_iterations: 30,
351            pagerank_boost_factor: 0.3,
352        }
353    }
354}
355
356// ---------------------------------------------------------------------------
357// DreamProcess
358// ---------------------------------------------------------------------------
359
360/// The Dream process — 4-phase background memory consolidation.
361///
362/// Runs automatically on a timer (default: every 24 hours) or can be
363/// triggered manually. Uses checkpointing for crash recovery.
364pub struct DreamProcess {
365    /// Reference to the memory manager.
366    memory_manager: Arc<MemoryManager>,
367    /// Decay engine.
368    decay_engine: DecayEngine,
369    /// Auto-protector.
370    auto_protector: AutoProtector,
371    /// Compaction tree.
372    #[allow(dead_code)] // Reserved for future dream-compaction integration
373    compaction_tree: CompactionTree,
374    /// Configuration.
375    config: DreamConfig,
376    /// Root index (read/write).
377    root_index: RwLock<RootIndex>,
378    /// Space directory for file storage.
379    space_dir: PathBuf,
380}
381
382impl std::fmt::Debug for DreamProcess {
383    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384        f.debug_struct("DreamProcess")
385            .field("space_dir", &self.space_dir.display())
386            .finish()
387    }
388}
389
390/// Result of Phase 4 (Prune & Index).
391struct Phase4Result {
392    contradictions_resolved: usize,
393    /// Number of memories re-ranked by Flash Attention (Phase 6).
394    flash_reranked: usize,
395    /// Number of learning patterns persisted (Phase 4: SONA).
396    patterns_persisted: usize,
397}
398
399impl DreamProcess {
400    /// Create a new DreamProcess.
401    pub fn new(
402        memory_manager: Arc<MemoryManager>,
403        config: DreamConfig,
404        space_dir: PathBuf,
405    ) -> Self {
406        let auto_protector = AutoProtector::new(
407            config.protection_low_access,
408            config.protection_medium_access,
409            config.protection_high_access,
410            config.protection_medium_sessions,
411            config.protection_high_sessions,
412            config.protection_demotion_stale_days,
413        );
414
415        Self {
416            memory_manager,
417            decay_engine: DecayEngine::new(config.decay_multiplier),
418            auto_protector,
419            compaction_tree: CompactionTree::new(config.compaction_line_threshold),
420            config,
421            root_index: RwLock::new(RootIndex::new()),
422            space_dir,
423        }
424    }
425
426    /// Check if a dream should run now based on configuration.
427    pub fn should_dream(&self, last_dream: Option<DateTime<Utc>>, sessions_since: u32) -> bool {
428        if !self.config.dream_enabled {
429            return false;
430        }
431
432        match last_dream {
433            None => true, // Never ran before
434            Some(last) => {
435                let hours = (Utc::now() - last).num_hours() as u64;
436                hours >= self.config.dream_interval_hours
437                    || sessions_since >= self.config.dream_min_sessions
438            }
439        }
440    }
441
442    /// Run the full 4-phase dream process.
443    ///
444    /// Returns a DreamReport with statistics about what was done.
445    pub async fn dream(&self) -> DreamReport {
446        let dream_id = Uuid::new_v4().to_string();
447        let started_at = Utc::now();
448        let entries_before = self.memory_manager.total_entries().await;
449
450        // Check for existing checkpoint
451        let resumed = self.load_checkpoint().await.ok().flatten();
452        let resumed_from_checkpoint = resumed.is_some();
453
454        let start_phase = resumed.as_ref().map(|cp| cp.completed_phase).unwrap_or(0);
455
456        // Run phases
457        let mut report = DreamReport {
458            dream_id: dream_id.clone(),
459
460            started_at,
461            completed_at: started_at,
462            resumed_from_checkpoint,
463            entries_before,
464            entries_after: entries_before,
465            compacted: 0,
466            promoted: 0,
467            demoted: 0,
468            protection_promoted: 0,
469            protection_demoted: 0,
470            deleted: 0,
471            contradictions_resolved: 0,
472            duplicates_merged: 0,
473            auto_protected: 0,
474            auto_classified: 0,
475            type_promotions: 0,
476            root_updated: false,
477            used_llm: false,
478            pagerank_updates: 0,
479            patterns_persisted: 0,
480            hyperbolic_rebuilt: false,
481            flash_reranked: 0,
482            duration_ms: 0,
483            error: None,
484        };
485
486        let result = async {
487            // Phase 1: Orient
488            // On resume (start_phase >= 1), reuse the cached Phase 1 state
489            // instead of re-scanning every tier/type (the original code always
490            // re-ran dream_orient(), defeating the checkpoint). Fall back to a
491            // fresh scan only if no state was cached.
492            let state = if start_phase < 1 {
493                self.dream_orient().await?
494            } else {
495                match resumed.as_ref().and_then(|cp| cp.cached_state.clone()) {
496                    Some(s) => s,
497                    None => self.dream_orient().await?,
498                }
499            };
500            self.save_checkpoint(&dream_id, 1, None, None, Some(&state))
501                .await?;
502
503            // Phase 2: Gather Signal
504            let signals = if start_phase < 2 {
505                self.dream_gather_signal().await?
506            } else {
507                resumed
508                    .as_ref()
509                    .and_then(|cp| cp.cached_signals.clone())
510                    .unwrap_or_default()
511            };
512            self.save_checkpoint(&dream_id, 2, Some(&signals), None, Some(&state))
513                .await?;
514
515            // Phase 3: Consolidate
516            let plan = if start_phase < 3 {
517                self.dream_consolidate(&signals).await?
518            } else {
519                resumed
520                    .as_ref()
521                    .and_then(|cp| cp.cached_plan.clone())
522                    .unwrap_or_default()
523            };
524            self.save_checkpoint(&dream_id, 3, Some(&signals), Some(&plan), Some(&state))
525                .await?;
526
527            // Phase 4: Prune & Index
528            let phase4_result = self.dream_prune_and_index(&plan).await?;
529
530            // Update report counters
531            report.protection_promoted = plan
532                .protection_updates
533                .iter()
534                .filter(|c| c.to > c.from)
535                .count();
536            report.protection_demoted = plan
537                .protection_updates
538                .iter()
539                .filter(|c| c.to < c.from)
540                .count();
541            report.promoted = plan.promote.len();
542            report.demoted = plan.demote.len();
543            report.deleted = plan.delete.len();
544            report.duplicates_merged = plan.merge.len();
545            report.type_promotions = plan.reclassify.len();
546            report.auto_protected = report.protection_promoted;
547            report.auto_classified = signals
548                .iter()
549                .filter(|s| matches!(s, MemorySignal::AutoClassify { .. }))
550                .count();
551            report.contradictions_resolved = phase4_result.contradictions_resolved;
552            report.root_updated = true;
553            report.pagerank_updates = plan.pagerank_updates.len();
554            report.hyperbolic_rebuilt = true;
555            report.flash_reranked = phase4_result.flash_reranked;
556            report.patterns_persisted = phase4_result.patterns_persisted;
557
558            // Clear checkpoint on success
559            self.clear_checkpoint().await.ok();
560
561            Ok::<(), anyhow::Error>(())
562        }
563        .await;
564
565        if let Err(e) = result {
566            report.error = Some(e.to_string());
567        }
568
569        report.completed_at = Utc::now();
570        report.duration_ms = (report.completed_at - report.started_at)
571            .num_milliseconds()
572            .max(0) as u64;
573        report.entries_after = self.memory_manager.total_entries().await;
574
575        // Save report
576        let report_path = DreamReport::report_path(&self.space_dir, &dream_id);
577        if let Some(parent) = report_path.parent() {
578            let _ = fs::create_dir_all(parent).await;
579        }
580        if let Ok(data) = serde_json::to_string_pretty(&report) {
581            let _ = fs::write(&report_path, data).await;
582        }
583
584        report
585    }
586
587    /// Spawn the dream as a background task.
588    pub fn spawn_dream_task(self: &Arc<Self>) {
589        let dream = Arc::clone(self);
590        tokio::spawn(async move {
591            let report = dream.dream().await;
592            if report.error.is_some() {
593                tracing::warn!(
594                    dream_id = %report.dream_id,
595                    error = ?report.error,
596                    "Dream process completed with error"
597                );
598            } else {
599                tracing::info!(
600                    dream_id = %report.dream_id,
601                    promoted = report.promoted,
602                    demoted = report.demoted,
603                    deleted = report.deleted,
604                    auto_protected = report.auto_protected,
605                    duration_ms = report.duration_ms,
606                    "Dream process completed"
607                );
608            }
609        });
610    }
611
612    // ── Phase implementations ──────────────────────────
613
614    async fn dream_orient(&self) -> Result<DreamState> {
615        let hot = self
616            .memory_manager
617            .list_by_tier(MemoryTier::Hot, 10_000)
618            .await
619            .unwrap_or_default();
620        let warm = self
621            .memory_manager
622            .list_by_tier(MemoryTier::Warm, 10_000)
623            .await
624            .unwrap_or_default();
625        let cold = self
626            .memory_manager
627            .list_by_tier(MemoryTier::Cold, 10_000)
628            .await
629            .unwrap_or_default();
630
631        let hot_count = hot.len();
632        let warm_count = warm.len();
633        let cold_count = cold.len();
634        let total = hot_count + warm_count + cold_count;
635
636        let root = self.root_index.read().clone();
637
638        let mut type_dist: Vec<(MemoryType, usize)> = Vec::new();
639        for mt in MemoryType::all() {
640            if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
641                let count = entries.len();
642                if count > 0 {
643                    type_dist.push((*mt, count));
644                }
645            }
646        }
647
648        let mut prot_dist: Vec<(ProtectionLevel, usize)> = Vec::new();
649        let all_entries: Vec<&MemoryEntry> =
650            hot.iter().chain(warm.iter()).chain(cold.iter()).collect();
651        for level in &[
652            ProtectionLevel::None,
653            ProtectionLevel::Low,
654            ProtectionLevel::Medium,
655            ProtectionLevel::High,
656            ProtectionLevel::Permanent,
657        ] {
658            let count = all_entries
659                .iter()
660                .filter(|e| e.protection == *level)
661                .count();
662            if count > 0 {
663                prot_dist.push((*level, count));
664            }
665        }
666
667        let avg_decay = if all_entries.is_empty() {
668            1.0
669        } else {
670            all_entries.iter().map(|e| e.decay_score).sum::<f32>() / all_entries.len() as f32
671        };
672
673        Ok(DreamState {
674            total_entries: total,
675            hot_count,
676            warm_count,
677            cold_count,
678            root_version: root.version,
679            type_distribution: type_dist,
680            protection_distribution: prot_dist,
681            avg_decay,
682        })
683    }
684
685    async fn dream_gather_signal(&self) -> Result<Vec<MemorySignal>> {
686        let mut signals = Vec::new();
687
688        // Gather all entries across all types
689        let mut all_entries = Vec::new();
690        for mt in MemoryType::all() {
691            if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
692                all_entries.extend(entries);
693            }
694        }
695
696        let now = Utc::now();
697
698        // 1. Protection re-evaluation
699        if self.config.auto_protection {
700            for entry in &all_entries {
701                let old_protection = entry.protection;
702                let new_protection = self.auto_protector.compute_protection(entry);
703
704                // Check demotion
705                let final_protection = if self.config.protection_demotion_enabled {
706                    self.auto_protector
707                        .should_demote_protection(entry, new_protection)
708                        .unwrap_or(new_protection)
709                } else {
710                    new_protection
711                };
712
713                if old_protection != final_protection {
714                    signals.push(MemorySignal::ProtectionChanged(ProtectionChange {
715                        id: entry.id.clone(),
716                        from: old_protection,
717                        to: final_protection,
718                        reason: format!(
719                            "access_count={}, sessions={}, corrected={}",
720                            entry.access_count, entry.session_appearances, entry.user_corrected
721                        ),
722                    }));
723                }
724            }
725        }
726
727        // 2. Auto-classification for entries that haven't been classified
728        if self.config.auto_classification {
729            for entry in &all_entries {
730                if entry.auto_classified || entry.memory_type == MemoryType::Knowledge {
731                    continue; // Skip already classified or knowledge-base entries
732                }
733                let inferred = AutoClassifier::infer_memory_type(&entry.content, "");
734                if inferred != entry.memory_type {
735                    signals.push(MemorySignal::AutoClassify {
736                        id: entry.id.clone(),
737                        new_type: inferred,
738                    });
739                }
740            }
741        }
742
743        // 3. Decay computation and deletion candidates
744        for entry in &all_entries {
745            let decay = self.decay_engine.compute_decay(entry, now);
746            if self
747                .decay_engine
748                .is_prunable(entry, self.config.decay_threshold, now)
749            {
750                signals.push(MemorySignal::DecayCandidate(DecayCandidate {
751                    id: entry.id.clone(),
752                    decay_score: decay,
753                    protection: entry.protection,
754                    memory_type: entry.memory_type,
755                }));
756            }
757        }
758
759        // 4. Tier overflow checks
760        let hot_count = all_entries
761            .iter()
762            .filter(|e| e.tier == MemoryTier::Hot)
763            .count();
764        if hot_count > self.config.hot_max_entries {
765            let overflow = hot_count - self.config.hot_max_entries;
766            let mut candidates: Vec<&MemoryEntry> = all_entries
767                .iter()
768                .filter(|e| {
769                    e.tier == MemoryTier::Hot && e.protection < ProtectionLevel::High && !e.pinned
770                })
771                .collect();
772            candidates.sort_by(|a, b| {
773                a.protection.cmp(&b.protection).then(
774                    a.decay_score
775                        .partial_cmp(&b.decay_score)
776                        .unwrap_or(std::cmp::Ordering::Equal),
777                )
778            });
779            for entry in candidates.into_iter().take(overflow) {
780                signals.push(MemorySignal::PromotionCandidate(TierChange {
781                    id: entry.id.clone(),
782                    from_tier: MemoryTier::Hot,
783                    to_tier: MemoryTier::Warm,
784                    reason: "Hot tier overflow".to_string(),
785                }));
786            }
787        }
788
789        // 5. PageRank-based importance boost (Phase 2)
790        if self.config.pagerank_enabled {
791            #[cfg(feature = "sqlite-memory")]
792            if let Some(sqlite) = self.memory_manager.sqlite_store() {
793                let scores = sqlite.compute_pagerank(
794                    self.config.pagerank_damping,
795                    self.config.pagerank_iterations,
796                    None,
797                );
798
799                if !scores.is_empty() {
800                    // Get current importance for each scored memory
801                    let conn = sqlite.db().conn();
802                    for (&rowid, &pr_score) in &scores {
803                        if let Ok(old_importance) = conn.query_row(
804                            "SELECT importance FROM memories WHERE rowid = ?1",
805                            rusqlite::params![rowid as i64],
806                            |row| row.get::<_, f32>(0),
807                        ) {
808                            let new_importance = (old_importance
809                                * (1.0 + self.config.pagerank_boost_factor * pr_score as f32))
810                                .clamp(0.0, 1.0);
811
812                            if (new_importance - old_importance).abs() > 0.001 {
813                                signals.push(MemorySignal::PageRankBoost {
814                                    rowid,
815                                    old_importance,
816                                    new_importance,
817                                    pagerank_score: pr_score,
818                                });
819                            }
820                        }
821                    }
822                }
823            }
824        }
825
826        Ok(signals)
827    }
828
829    async fn dream_consolidate(&self, signals: &[MemorySignal]) -> Result<ConsolidationPlan> {
830        let mut plan = ConsolidationPlan::default();
831
832        for signal in signals {
833            match signal {
834                MemorySignal::ProtectionChanged(change) => {
835                    plan.protection_updates.push(change.clone());
836                }
837                MemorySignal::AutoClassify { id, new_type } => {
838                    plan.reclassify.push(ReclassifyPlan {
839                        id: id.clone(),
840                        new_type: *new_type,
841                    });
842                }
843                MemorySignal::TypePromotion(promo) => {
844                    plan.reclassify.push(ReclassifyPlan {
845                        id: promo.id.clone(),
846                        new_type: promo.suggested_type,
847                    });
848                }
849                MemorySignal::PromotionCandidate(tc) => {
850                    plan.demote.push(tc.clone());
851                }
852                MemorySignal::DecayCandidate(dc) => {
853                    if dc.protection <= ProtectionLevel::Low {
854                        plan.delete.push(dc.id.clone());
855                    }
856                }
857                MemorySignal::Duplicate { id_a, id_b, .. } => {
858                    plan.merge.push(MergePlan {
859                        keep_id: id_a.clone(),
860                        remove_id: id_b.clone(),
861                        merged_content: String::new(), // Would be computed in full impl
862                    });
863                }
864                MemorySignal::Contradiction { newer_id, older_id } => {
865                    // Mark older as contradicted
866                    plan.merge.push(MergePlan {
867                        keep_id: newer_id.clone(),
868                        remove_id: older_id.clone(),
869                        merged_content: String::new(),
870                    });
871                }
872                MemorySignal::PageRankBoost {
873                    rowid,
874                    old_importance,
875                    new_importance,
876                    pagerank_score,
877                } => {
878                    plan.pagerank_updates.push(PageRankUpdate {
879                        rowid: *rowid,
880                        old_importance: *old_importance,
881                        new_importance: *new_importance,
882                        pagerank_score: *pagerank_score,
883                    });
884                }
885            }
886        }
887
888        Ok(plan)
889    }
890
891    async fn dream_prune_and_index(&self, plan: &ConsolidationPlan) -> Result<Phase4Result> {
892        let mut contradictions_resolved = 0;
893
894        // 1. Apply protection updates
895        for change in &plan.protection_updates {
896            if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&change.id).await {
897                entry.protection = change.to;
898                let _ = self.memory_manager.remember(entry).await;
899            }
900        }
901
902        // 2. Apply type reclassification
903        for reclassify in &plan.reclassify {
904            if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&reclassify.id).await {
905                entry.memory_type = reclassify.new_type;
906                entry.auto_classified = true;
907                let _ = self.memory_manager.remember(entry).await;
908            }
909        }
910
911        // 3. Apply tier changes
912        for tc in &plan.demote {
913            if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&tc.id).await {
914                entry.tier = tc.to_tier;
915                let _ = self.memory_manager.remember(entry).await;
916            }
917        }
918
919        // 4. Apply merges
920        for merge in &plan.merge {
921            contradictions_resolved += 1;
922            // Actually forget the merged-away entry. The previous code built a
923            // `forget()` future inside `Option::map` and discarded it without
924            // awaiting, so duplicate/contradictory entries were never removed.
925            if let Ok(Some(e)) = self.memory_manager.get_by_id(&merge.remove_id).await {
926                let _ = self.memory_manager.forget(&e.id, e.memory_type).await;
927            }
928        }
929
930        // 5. Apply PageRank importance updates (Phase 2)
931        #[cfg(feature = "sqlite-memory")]
932        if let Some(sqlite) = self.memory_manager.sqlite_store() {
933            for update in &plan.pagerank_updates {
934                let conn = sqlite.db().conn();
935                let _ = conn.execute(
936                    "UPDATE memories SET importance = ?1 WHERE rowid = ?2",
937                    rusqlite::params![update.new_importance, update.rowid as i64],
938                );
939            }
940        }
941
942        // 6. Apply deletions (with safety checks)
943        for id in &plan.delete {
944            if let Ok(Some(entry)) = self.memory_manager.get_by_id(id).await {
945                // Safety check: never delete protected entries
946                if entry.protection <= ProtectionLevel::Low
947                    && !entry.pinned
948                    && !entry.memory_type.is_auto_protected()
949                {
950                    let _ = self.memory_manager.forget(id, entry.memory_type).await;
951                }
952            }
953        }
954
955        // 7. Rebuild ROOT index
956        self.rebuild_root_index().await?;
957
958        // 8. Rebuild Hyperbolic Embeddings (Phase 5)
959        // RFC-018 b.1: hyperbolic core moved to oxios-memory; SQLite persistence
960        // is a kernel-side adapter (`hyperbolic_persist`).
961        #[cfg(feature = "sqlite-memory")]
962        if let Some(sqlite) = self.memory_manager.sqlite_store() {
963            let config = crate::memory::hyperbolic::HyperbolicConfig::default();
964            match crate::memory::sqlite::hyperbolic_persist::restore_from_sqlite(sqlite, config) {
965                Ok(he) => {
966                    let count = he.len();
967                    if count < 10 {
968                        tracing::debug!("Hyperbolic embeddings need rebuild (count < 10)");
969                    }
970                    tracing::debug!(count, "Hyperbolic embeddings loaded");
971                }
972                Err(e) => {
973                    tracing::debug!(error = %e, "Failed to restore hyperbolic embeddings (non-fatal)");
974                }
975            }
976        }
977
978        // 9. Persist & auto-promote learning patterns (Phase 4: SONA)
979        let patterns_persisted = {
980            #[cfg(feature = "sqlite-memory")]
981            if let Some(sqlite) = self.memory_manager.sqlite_store() {
982                // Auto-promote high-quality patterns to long-term storage
983                let _ = sqlite.auto_promote_patterns(0.8, 3);
984                // Count total patterns in store as the persistence metric
985                let conn = sqlite.db().conn();
986                let total: usize = conn
987                    .query_row("SELECT COUNT(*) FROM patterns", [], |row| row.get(0))
988                    .unwrap_or(0);
989                total
990            } else {
991                0
992            }
993            #[cfg(not(feature = "sqlite-memory"))]
994            {
995                0
996            }
997        };
998
999        // 10. Flash Attention reranking (Phase 6)
1000        let flash_reranked = {
1001            #[cfg(feature = "sqlite-memory")]
1002            if let Some(sqlite) = self.memory_manager.sqlite_store() {
1003                let hot = self
1004                    .memory_manager
1005                    .list_by_tier(MemoryTier::Hot, 50)
1006                    .await
1007                    .unwrap_or_default();
1008                if !hot.is_empty() {
1009                    let query: String = hot
1010                        .iter()
1011                        .take(3)
1012                        .map(|e| e.content.as_str())
1013                        .collect::<Vec<_>>()
1014                        .join(" ");
1015                    if !query.is_empty() {
1016                        match sqlite.recall_with_rerank(&query, hot.len()).await {
1017                            Ok(reranked) => reranked.len(),
1018                            Err(_) => 0,
1019                        }
1020                    } else {
1021                        0
1022                    }
1023                } else {
1024                    0
1025                }
1026            } else {
1027                0
1028            }
1029            #[cfg(not(feature = "sqlite-memory"))]
1030            {
1031                0
1032            }
1033        };
1034
1035        Ok(Phase4Result {
1036            contradictions_resolved,
1037            flash_reranked,
1038            patterns_persisted,
1039        })
1040    }
1041
1042    // ── Helper methods ──────────────────────────────────
1043
1044    /// Rebuild the ROOT index from current memory state.
1045    async fn rebuild_root_index(&self) -> Result<()> {
1046        let mut root = RootIndex::new();
1047        root.version += 1;
1048        root.updated_at = Utc::now();
1049
1050        let now = Utc::now();
1051        let mut all_entries = Vec::new();
1052        for mt in MemoryType::all() {
1053            if let Ok(entries) = self.memory_manager.list(*mt, 1_000).await {
1054                all_entries.extend(entries);
1055            }
1056        }
1057
1058        // Build active context (recent, important)
1059        let mut recent: Vec<&MemoryEntry> = all_entries
1060            .iter()
1061            .filter(|e| (now - e.accessed_at).num_days() <= 7 && e.importance >= 0.5)
1062            .collect();
1063        recent.sort_by(|a, b| {
1064            b.importance
1065                .partial_cmp(&a.importance)
1066                .unwrap_or(std::cmp::Ordering::Equal)
1067        });
1068        recent.truncate(20);
1069
1070        for entry in &recent {
1071            root.active_context.push(RootEntry {
1072                topic: entry.content.split('.').next().unwrap_or("").to_string(),
1073                memory_type: entry.memory_type,
1074                protection: entry.protection,
1075                age_days: (now - entry.created_at).num_days() as u32,
1076                reference: entry.id.clone(),
1077            });
1078        }
1079
1080        // Build topic index
1081        for entry in &all_entries {
1082            let first_sentence = entry
1083                .content
1084                .split('.')
1085                .next()
1086                .unwrap_or(&entry.content)
1087                .to_string();
1088            root.topics.push(TopicEntry {
1089                name: first_sentence.clone(),
1090                category: entry.memory_type.label().to_string(),
1091                age_days: (now - entry.created_at).num_days() as u32,
1092                description: entry.content.chars().take(100).collect(),
1093                reference: entry.id.clone(),
1094            });
1095        }
1096
1097        *self.root_index.write() = root;
1098        Ok(())
1099    }
1100
1101    /// Load a checkpoint if one exists.
1102    async fn load_checkpoint(&self) -> Result<Option<DreamCheckpoint>> {
1103        let path = DreamCheckpoint::path(&self.space_dir);
1104        if !path.exists() {
1105            return Ok(None);
1106        }
1107        let data = fs::read_to_string(&path).await?;
1108        let checkpoint: DreamCheckpoint = serde_json::from_str(&data)?;
1109        if checkpoint.is_stale() {
1110            tracing::info!("Stale checkpoint found, ignoring");
1111            return Ok(None);
1112        }
1113        Ok(Some(checkpoint))
1114    }
1115
1116    /// Save a checkpoint after completing a phase.
1117    async fn save_checkpoint(
1118        &self,
1119        dream_id: &str,
1120        completed_phase: u8,
1121        signals: Option<&[MemorySignal]>,
1122        plan: Option<&ConsolidationPlan>,
1123        state: Option<&DreamState>,
1124    ) -> Result<()> {
1125        let checkpoint = DreamCheckpoint {
1126            dream_id: dream_id.to_string(),
1127            started_at: Utc::now(),
1128            completed_phase,
1129            cached_signals: signals.map(|s| s.to_vec()),
1130            cached_plan: plan.cloned(),
1131            cached_state: state.cloned(),
1132        };
1133        let path = DreamCheckpoint::path(&self.space_dir);
1134        if let Some(parent) = path.parent() {
1135            let _ = fs::create_dir_all(parent).await;
1136        }
1137        let data = serde_json::to_string_pretty(&checkpoint)?;
1138        fs::write(&path, data).await?;
1139        Ok(())
1140    }
1141
1142    /// Clear the checkpoint after successful dream.
1143    async fn clear_checkpoint(&self) -> Result<()> {
1144        let path = DreamCheckpoint::path(&self.space_dir);
1145        if path.exists() {
1146            let _ = fs::remove_file(&path).await;
1147        }
1148        let lock_path = DreamCheckpoint::lock_path(&self.space_dir);
1149        if lock_path.exists() {
1150            let _ = fs::remove_file(&lock_path).await;
1151        }
1152        Ok(())
1153    }
1154}
1155
1156// ---------------------------------------------------------------------------
1157// Tests
1158// ---------------------------------------------------------------------------
1159
1160#[cfg(test)]
1161mod tests {
1162    use super::*;
1163
1164    #[test]
1165    fn test_dream_checkpoint_stale() {
1166        let cp = DreamCheckpoint {
1167            dream_id: "test".to_string(),
1168
1169            started_at: Utc::now() - chrono::Duration::hours(2),
1170            completed_phase: 2,
1171            cached_signals: None,
1172            cached_plan: None,
1173            cached_state: None,
1174        };
1175        assert!(cp.is_stale());
1176    }
1177
1178    #[test]
1179    fn test_dream_checkpoint_fresh() {
1180        let cp = DreamCheckpoint {
1181            dream_id: "test".to_string(),
1182
1183            started_at: Utc::now(),
1184            completed_phase: 2,
1185            cached_signals: None,
1186            cached_plan: None,
1187            cached_state: None,
1188        };
1189        assert!(!cp.is_stale());
1190    }
1191
1192    #[test]
1193    fn test_consolidation_plan_total_changes() {
1194        let mut plan = ConsolidationPlan::default();
1195        plan.protection_updates.push(ProtectionChange {
1196            id: "1".to_string(),
1197            from: ProtectionLevel::None,
1198            to: ProtectionLevel::Low,
1199            reason: "test".to_string(),
1200        });
1201        plan.delete.push("2".to_string());
1202        assert_eq!(plan.total_changes(), 2);
1203    }
1204
1205    #[test]
1206    fn test_should_dream_never_ran() {
1207        let config = DreamConfig::default();
1208        let temp = tempfile::tempdir().unwrap();
1209        let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1210            Arc::new(crate::memory::test_support::InMemoryStorage::default());
1211        let mgr = Arc::new(MemoryManager::new(storage));
1212        let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1213
1214        assert!(dream.should_dream(None, 0));
1215    }
1216
1217    #[test]
1218    fn test_should_dream_too_recent() {
1219        let config = DreamConfig::default();
1220        let temp = tempfile::tempdir().unwrap();
1221        let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1222            Arc::new(crate::memory::test_support::InMemoryStorage::default());
1223        let mgr = Arc::new(MemoryManager::new(storage));
1224        let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1225
1226        assert!(!dream.should_dream(Some(Utc::now()), 1));
1227    }
1228}