Skip to main content

oxios_memory/memory/
dream.rs

1#![allow(missing_docs)]
2//! Dream process — 4-phase background memory consolidation.
3//!
4//! Phase 1: Orient — scan current state, build map
5//! Phase 2: Gather Signal — find patterns, auto-protect, auto-classify
6//! Phase 3: Consolidate — compress, dedupe, resolve conflicts
7//! Phase 4: Prune & Index — update ROOT, remove stale entries
8//!
9//! Supports checkpointing for crash recovery.
10
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tokio::fs;
19use uuid::Uuid;
20
21use crate::memory::auto_classify::AutoClassifier;
22use crate::memory::auto_protect::AutoProtector;
23use crate::memory::compaction::CompactionTree;
24use crate::memory::decay::DecayEngine;
25use crate::memory::manager::MemoryManager;
26use crate::memory::root_index::{RootEntry, RootIndex, TopicEntry};
27use crate::memory::types::{MemoryEntry, MemoryTier, MemoryType, ProtectionLevel};
28
29// ---------------------------------------------------------------------------
30// DreamCheckpoint
31// ---------------------------------------------------------------------------
32
33/// Dream execution state (checkpoint for crash recovery).
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct DreamCheckpoint {
36    /// Unique dream ID.
37    pub dream_id: String,
38    /// When the dream started.
39    pub started_at: DateTime<Utc>,
40    /// Last completed phase (0 = not started).
41    pub completed_phase: u8,
42    /// Cached signals from Phase 2.
43    pub cached_signals: Option<Vec<MemorySignal>>,
44    /// Cached plan from Phase 3.
45    pub cached_plan: Option<ConsolidationPlan>,
46}
47
48impl DreamCheckpoint {
49    /// Path for the checkpoint file within a space's memory directory.
50    pub fn path(space_dir: &Path) -> PathBuf {
51        space_dir.join("memory/.dream_checkpoint.json")
52    }
53
54    /// Path for the dream lock file.
55    pub fn lock_path(space_dir: &Path) -> PathBuf {
56        space_dir.join("memory/.dream.lock")
57    }
58
59    /// Check if a checkpoint is stale (older than 1 hour).
60    pub fn is_stale(&self) -> bool {
61        let age = Utc::now() - self.started_at;
62        age.num_hours() >= 1
63    }
64}
65
66// ---------------------------------------------------------------------------
67// DreamReport
68// ---------------------------------------------------------------------------
69
70/// Report from a dream (consolidation) run.
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DreamReport {
73    /// Unique dream ID.
74    pub dream_id: String,
75    /// When the dream started.
76    pub started_at: DateTime<Utc>,
77    /// When the dream completed.
78    pub completed_at: DateTime<Utc>,
79    /// Whether this was resumed from a checkpoint.
80    pub resumed_from_checkpoint: bool,
81    /// Entry count before dream.
82    pub entries_before: usize,
83    /// Entry count after dream.
84    pub entries_after: usize,
85    /// Number of entries compacted.
86    pub compacted: usize,
87    /// Number of entries tier-promoted (Cold→Warm, Warm→Hot).
88    pub promoted: usize,
89    /// Number of entries tier-demoted (Hot→Warm, Warm→Cold).
90    pub demoted: usize,
91    /// Number of protection level promotions.
92    pub protection_promoted: usize,
93    /// Number of protection level demotions.
94    pub protection_demoted: usize,
95    /// Number of entries deleted.
96    pub deleted: usize,
97    /// Number of contradictions resolved.
98    pub contradictions_resolved: usize,
99    /// Number of duplicates merged.
100    pub duplicates_merged: usize,
101    /// Number of auto-protected entries.
102    pub auto_protected: usize,
103    /// Number of auto-classified entries.
104    pub auto_classified: usize,
105    /// Number of type promotions (e.g., Fact → Skill).
106    pub type_promotions: usize,
107    /// Whether ROOT index was updated.
108    pub root_updated: bool,
109    /// Whether LLM was used for compaction.
110    pub used_llm: bool,
111    /// Number of PageRank importance updates (Phase 2).
112    pub pagerank_updates: usize,
113    /// Number of learning patterns persisted (Phase 4).
114    pub patterns_persisted: usize,
115    /// Whether hyperbolic embeddings were rebuilt (Phase 5).
116    pub hyperbolic_rebuilt: bool,
117    /// Number of memories re-ranked by Flash Attention (Phase 6).
118    pub flash_reranked: usize,
119    /// Duration in milliseconds.
120    pub duration_ms: u64,
121    /// Error if Dream failed (None = success).
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub error: Option<String>,
124}
125
126impl DreamReport {
127    /// Path for saving dream reports.
128    pub fn report_path(space_dir: &Path, dream_id: &str) -> PathBuf {
129        space_dir
130            .join("memory/dream_reports")
131            .join(format!("{dream_id}.json"))
132    }
133}
134
135// ---------------------------------------------------------------------------
136// Internal types
137// ---------------------------------------------------------------------------
138
139/// A signal detected during Dream Phase 2.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub enum MemorySignal {
142    /// A protection level change.
143    ProtectionChanged(ProtectionChange),
144    /// Auto-classify an entry.
145    AutoClassify { id: String, new_type: MemoryType },
146    /// Type promotion (e.g., Fact → Skill).
147    TypePromotion(TypePromotion),
148    /// Tier promotion candidate.
149    PromotionCandidate(TierChange),
150    /// Decay/deletion candidate.
151    DecayCandidate(DecayCandidate),
152    /// Duplicate detected.
153    Duplicate {
154        id_a: String,
155        id_b: String,
156        similarity: f64,
157    },
158    /// Contradiction detected.
159    Contradiction { newer_id: String, older_id: String },
160    /// PageRank-based importance boost (Phase 2).
161    PageRankBoost {
162        rowid: u64,
163        old_importance: f32,
164        new_importance: f32,
165        pagerank_score: f64,
166    },
167}
168
169/// A protection level change.
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct ProtectionChange {
172    pub id: String,
173    pub from: ProtectionLevel,
174    pub to: ProtectionLevel,
175    pub reason: String,
176}
177
178/// A type promotion suggestion.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct TypePromotion {
181    pub id: String,
182    pub current_type: MemoryType,
183    pub suggested_type: MemoryType,
184    pub repetitions: u32,
185}
186
187/// A tier change suggestion.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct TierChange {
190    pub id: String,
191    pub from_tier: MemoryTier,
192    pub to_tier: MemoryTier,
193    pub reason: String,
194}
195
196/// A decay/deletion candidate.
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct DecayCandidate {
199    pub id: String,
200    pub decay_score: f32,
201    pub protection: ProtectionLevel,
202    pub memory_type: MemoryType,
203}
204
205/// Consolidation plan from Phase 3.
206#[derive(Debug, Clone, Default, Serialize, Deserialize)]
207pub struct ConsolidationPlan {
208    /// Protection level updates.
209    pub protection_updates: Vec<ProtectionChange>,
210    /// Type reclassification.
211    pub reclassify: Vec<ReclassifyPlan>,
212    /// Tier promotions.
213    pub promote: Vec<TierChange>,
214    /// Tier demotions.
215    pub demote: Vec<TierChange>,
216    /// Entries to delete.
217    pub delete: Vec<String>,
218    /// Entries to merge.
219    pub merge: Vec<MergePlan>,
220    /// PageRank-based importance updates (Phase 2).
221    pub pagerank_updates: Vec<PageRankUpdate>,
222}
223
224/// A PageRank-based importance update.
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct PageRankUpdate {
227    /// Memory row ID.
228    pub rowid: u64,
229    /// Previous importance.
230    pub old_importance: f32,
231    /// New importance after PageRank boost.
232    pub new_importance: f32,
233    /// PageRank score (0.0–1.0).
234    pub pagerank_score: f64,
235}
236
237impl ConsolidationPlan {
238    /// Total number of changes in this plan.
239    pub fn total_changes(&self) -> usize {
240        self.protection_updates.len()
241            + self.reclassify.len()
242            + self.promote.len()
243            + self.demote.len()
244            + self.delete.len()
245            + self.merge.len()
246            + self.pagerank_updates.len()
247    }
248}
249
250/// A type reclassification plan.
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct ReclassifyPlan {
253    pub id: String,
254    pub new_type: MemoryType,
255}
256
257/// A merge plan for duplicate entries.
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MergePlan {
260    pub keep_id: String,
261    pub remove_id: String,
262    pub merged_content: String,
263}
264
265// ---------------------------------------------------------------------------
266// DreamState (Phase 1 output)
267// ---------------------------------------------------------------------------
268
269/// State snapshot from Phase 1 (Orient).
270#[derive(Debug, Clone)]
271pub struct DreamState {
272    pub total_entries: usize,
273    pub hot_count: usize,
274    pub warm_count: usize,
275    pub cold_count: usize,
276    pub root_version: u64,
277    pub type_distribution: Vec<(MemoryType, usize)>,
278    pub protection_distribution: Vec<(ProtectionLevel, usize)>,
279    pub avg_decay: f32,
280}
281
282// ---------------------------------------------------------------------------
283// DreamConfig
284// ---------------------------------------------------------------------------
285
286/// Configuration extracted for Dream use.
287#[derive(Debug, Clone)]
288pub struct DreamConfig {
289    pub dream_enabled: bool,
290    pub dream_interval_hours: u64,
291    pub dream_min_sessions: u32,
292    pub hot_max_entries: usize,
293    pub warm_max_entries: usize,
294    pub cold_max_entries: usize,
295    pub hot_token_budget: usize,
296    pub decay_threshold: f32,
297    pub retention_days: u32,
298    pub decay_multiplier: f32,
299    pub auto_protection: bool,
300    pub protection_low_access: u32,
301    pub protection_medium_access: u32,
302    pub protection_high_access: u32,
303    pub protection_medium_sessions: u32,
304    pub protection_high_sessions: u32,
305    pub protection_demotion_enabled: bool,
306    pub protection_demotion_stale_days: u32,
307    pub auto_classification: bool,
308    pub type_promotion_repetitions: u32,
309    pub compaction_line_threshold: usize,
310    pub proactive_recall_limit: usize,
311    pub proactive_recall_threshold: f32,
312    /// Enable PageRank-based importance boost (Phase 2).
313    pub pagerank_enabled: bool,
314    /// PageRank damping factor (typically 0.85).
315    pub pagerank_damping: f64,
316    /// PageRank iteration count (typically 20-50).
317    pub pagerank_iterations: usize,
318    /// How much PageRank score influences importance (0.0–1.0).
319    pub pagerank_boost_factor: f32,
320}
321
322impl Default for DreamConfig {
323    fn default() -> Self {
324        Self {
325            dream_enabled: true,
326            dream_interval_hours: 24,
327            dream_min_sessions: 5,
328            hot_max_entries: 100,
329            warm_max_entries: 1000,
330            cold_max_entries: 10000,
331            hot_token_budget: 2000,
332            decay_threshold: 0.1,
333            retention_days: 90,
334            decay_multiplier: 0.95,
335            auto_protection: true,
336            protection_low_access: 3,
337            protection_medium_access: 10,
338            protection_high_access: 50,
339            protection_medium_sessions: 3,
340            protection_high_sessions: 10,
341            protection_demotion_enabled: true,
342            protection_demotion_stale_days: 30,
343            auto_classification: true,
344            type_promotion_repetitions: 3,
345            compaction_line_threshold: 10,
346            proactive_recall_limit: 5,
347            proactive_recall_threshold: 0.6,
348            pagerank_enabled: true,
349            pagerank_damping: 0.85,
350            pagerank_iterations: 30,
351            pagerank_boost_factor: 0.3,
352        }
353    }
354}
355
356// ---------------------------------------------------------------------------
357// DreamProcess
358// ---------------------------------------------------------------------------
359
360/// The Dream process — 4-phase background memory consolidation.
361///
362/// Runs automatically on a timer (default: every 24 hours) or can be
363/// triggered manually. Uses checkpointing for crash recovery.
364pub struct DreamProcess {
365    /// Reference to the memory manager.
366    memory_manager: Arc<MemoryManager>,
367    /// Decay engine.
368    decay_engine: DecayEngine,
369    /// Auto-protector.
370    auto_protector: AutoProtector,
371    /// Compaction tree.
372    #[allow(dead_code)] // Reserved for future dream-compaction integration
373    compaction_tree: CompactionTree,
374    /// Configuration.
375    config: DreamConfig,
376    /// Root index (read/write).
377    root_index: RwLock<RootIndex>,
378    /// Space directory for file storage.
379    space_dir: PathBuf,
380}
381
382impl std::fmt::Debug for DreamProcess {
383    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384        f.debug_struct("DreamProcess")
385            .field("space_dir", &self.space_dir.display())
386            .finish()
387    }
388}
389
390/// Result of Phase 4 (Prune & Index).
391struct Phase4Result {
392    contradictions_resolved: usize,
393    /// Number of memories re-ranked by Flash Attention (Phase 6).
394    flash_reranked: usize,
395    /// Number of learning patterns persisted (Phase 4: SONA).
396    patterns_persisted: usize,
397}
398
399impl DreamProcess {
400    /// Create a new DreamProcess.
401    pub fn new(
402        memory_manager: Arc<MemoryManager>,
403        config: DreamConfig,
404        space_dir: PathBuf,
405    ) -> Self {
406        let auto_protector = AutoProtector::new(
407            config.protection_low_access,
408            config.protection_medium_access,
409            config.protection_high_access,
410            config.protection_medium_sessions,
411            config.protection_high_sessions,
412            config.protection_demotion_stale_days,
413        );
414
415        Self {
416            memory_manager,
417            decay_engine: DecayEngine::new(config.decay_multiplier),
418            auto_protector,
419            compaction_tree: CompactionTree::new(config.compaction_line_threshold),
420            config,
421            root_index: RwLock::new(RootIndex::new()),
422            space_dir,
423        }
424    }
425
426    /// Check if a dream should run now based on configuration.
427    pub fn should_dream(&self, last_dream: Option<DateTime<Utc>>, sessions_since: u32) -> bool {
428        if !self.config.dream_enabled {
429            return false;
430        }
431
432        match last_dream {
433            None => true, // Never ran before
434            Some(last) => {
435                let hours = (Utc::now() - last).num_hours() as u64;
436                hours >= self.config.dream_interval_hours
437                    || sessions_since >= self.config.dream_min_sessions
438            }
439        }
440    }
441
442    /// Run the full 4-phase dream process.
443    ///
444    /// Returns a DreamReport with statistics about what was done.
445    pub async fn dream(&self) -> DreamReport {
446        let dream_id = Uuid::new_v4().to_string();
447        let started_at = Utc::now();
448        let entries_before = self.memory_manager.total_entries().await;
449
450        // Check for existing checkpoint
451        let resumed = self.load_checkpoint().await.ok().flatten();
452        let resumed_from_checkpoint = resumed.is_some();
453
454        let start_phase = resumed.as_ref().map(|cp| cp.completed_phase).unwrap_or(0);
455
456        // Run phases
457        let mut report = DreamReport {
458            dream_id: dream_id.clone(),
459
460            started_at,
461            completed_at: started_at,
462            resumed_from_checkpoint,
463            entries_before,
464            entries_after: entries_before,
465            compacted: 0,
466            promoted: 0,
467            demoted: 0,
468            protection_promoted: 0,
469            protection_demoted: 0,
470            deleted: 0,
471            contradictions_resolved: 0,
472            duplicates_merged: 0,
473            auto_protected: 0,
474            auto_classified: 0,
475            type_promotions: 0,
476            root_updated: false,
477            used_llm: false,
478            pagerank_updates: 0,
479            patterns_persisted: 0,
480            hyperbolic_rebuilt: false,
481            flash_reranked: 0,
482            duration_ms: 0,
483            error: None,
484        };
485
486        let result = async {
487            // Phase 1: Orient
488            let _state = if start_phase < 1 {
489                self.dream_orient().await?
490            } else {
491                // Skip — use cached state (in a full impl, we'd cache this)
492                self.dream_orient().await?
493            };
494            self.save_checkpoint(&dream_id, 1, None, None).await?;
495
496            // Phase 2: Gather Signal
497            let signals = if start_phase < 2 {
498                self.dream_gather_signal().await?
499            } else {
500                resumed
501                    .as_ref()
502                    .and_then(|cp| cp.cached_signals.clone())
503                    .unwrap_or_default()
504            };
505            self.save_checkpoint(&dream_id, 2, Some(&signals), None)
506                .await?;
507
508            // Phase 3: Consolidate
509            let plan = if start_phase < 3 {
510                self.dream_consolidate(&signals).await?
511            } else {
512                resumed
513                    .as_ref()
514                    .and_then(|cp| cp.cached_plan.clone())
515                    .unwrap_or_default()
516            };
517            self.save_checkpoint(&dream_id, 3, Some(&signals), Some(&plan))
518                .await?;
519
520            // Phase 4: Prune & Index
521            let phase4_result = self.dream_prune_and_index(&plan).await?;
522
523            // Update report counters
524            report.protection_promoted = plan
525                .protection_updates
526                .iter()
527                .filter(|c| c.to > c.from)
528                .count();
529            report.protection_demoted = plan
530                .protection_updates
531                .iter()
532                .filter(|c| c.to < c.from)
533                .count();
534            report.promoted = plan.promote.len();
535            report.demoted = plan.demote.len();
536            report.deleted = plan.delete.len();
537            report.duplicates_merged = plan.merge.len();
538            report.type_promotions = plan.reclassify.len();
539            report.auto_protected = report.protection_promoted;
540            report.auto_classified = signals
541                .iter()
542                .filter(|s| matches!(s, MemorySignal::AutoClassify { .. }))
543                .count();
544            report.contradictions_resolved = phase4_result.contradictions_resolved;
545            report.root_updated = true;
546            report.pagerank_updates = plan.pagerank_updates.len();
547            report.hyperbolic_rebuilt = true;
548            report.flash_reranked = phase4_result.flash_reranked;
549            report.patterns_persisted = phase4_result.patterns_persisted;
550
551            // Clear checkpoint on success
552            self.clear_checkpoint().await.ok();
553
554            Ok::<(), anyhow::Error>(())
555        }
556        .await;
557
558        if let Err(e) = result {
559            report.error = Some(e.to_string());
560        }
561
562        report.completed_at = Utc::now();
563        report.duration_ms = (report.completed_at - report.started_at)
564            .num_milliseconds()
565            .max(0) as u64;
566        report.entries_after = self.memory_manager.total_entries().await;
567
568        // Save report
569        let report_path = DreamReport::report_path(&self.space_dir, &dream_id);
570        if let Some(parent) = report_path.parent() {
571            let _ = fs::create_dir_all(parent).await;
572        }
573        if let Ok(data) = serde_json::to_string_pretty(&report) {
574            let _ = fs::write(&report_path, data).await;
575        }
576
577        report
578    }
579
580    /// Spawn the dream as a background task.
581    pub fn spawn_dream_task(self: &Arc<Self>) {
582        let dream = Arc::clone(self);
583        tokio::spawn(async move {
584            let report = dream.dream().await;
585            if report.error.is_some() {
586                tracing::warn!(
587                    dream_id = %report.dream_id,
588                    error = ?report.error,
589                    "Dream process completed with error"
590                );
591            } else {
592                tracing::info!(
593                    dream_id = %report.dream_id,
594                    promoted = report.promoted,
595                    demoted = report.demoted,
596                    deleted = report.deleted,
597                    auto_protected = report.auto_protected,
598                    duration_ms = report.duration_ms,
599                    "Dream process completed"
600                );
601            }
602        });
603    }
604
605    // ── Phase implementations ──────────────────────────
606
607    async fn dream_orient(&self) -> Result<DreamState> {
608        let hot = self
609            .memory_manager
610            .list_by_tier(MemoryTier::Hot, 10_000)
611            .await
612            .unwrap_or_default();
613        let warm = self
614            .memory_manager
615            .list_by_tier(MemoryTier::Warm, 10_000)
616            .await
617            .unwrap_or_default();
618        let cold = self
619            .memory_manager
620            .list_by_tier(MemoryTier::Cold, 10_000)
621            .await
622            .unwrap_or_default();
623
624        let hot_count = hot.len();
625        let warm_count = warm.len();
626        let cold_count = cold.len();
627        let total = hot_count + warm_count + cold_count;
628
629        let root = self.root_index.read().clone();
630
631        let mut type_dist: Vec<(MemoryType, usize)> = Vec::new();
632        for mt in MemoryType::all() {
633            if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
634                let count = entries.len();
635                if count > 0 {
636                    type_dist.push((*mt, count));
637                }
638            }
639        }
640
641        let mut prot_dist: Vec<(ProtectionLevel, usize)> = Vec::new();
642        let all_entries: Vec<&MemoryEntry> =
643            hot.iter().chain(warm.iter()).chain(cold.iter()).collect();
644        for level in &[
645            ProtectionLevel::None,
646            ProtectionLevel::Low,
647            ProtectionLevel::Medium,
648            ProtectionLevel::High,
649            ProtectionLevel::Permanent,
650        ] {
651            let count = all_entries
652                .iter()
653                .filter(|e| e.protection == *level)
654                .count();
655            if count > 0 {
656                prot_dist.push((*level, count));
657            }
658        }
659
660        let avg_decay = if all_entries.is_empty() {
661            1.0
662        } else {
663            all_entries.iter().map(|e| e.decay_score).sum::<f32>() / all_entries.len() as f32
664        };
665
666        Ok(DreamState {
667            total_entries: total,
668            hot_count,
669            warm_count,
670            cold_count,
671            root_version: root.version,
672            type_distribution: type_dist,
673            protection_distribution: prot_dist,
674            avg_decay,
675        })
676    }
677
678    async fn dream_gather_signal(&self) -> Result<Vec<MemorySignal>> {
679        let mut signals = Vec::new();
680
681        // Gather all entries across all types
682        let mut all_entries = Vec::new();
683        for mt in MemoryType::all() {
684            if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
685                all_entries.extend(entries);
686            }
687        }
688
689        let now = Utc::now();
690
691        // 1. Protection re-evaluation
692        if self.config.auto_protection {
693            for entry in &all_entries {
694                let old_protection = entry.protection;
695                let new_protection = self.auto_protector.compute_protection(entry);
696
697                // Check demotion
698                let final_protection = if self.config.protection_demotion_enabled {
699                    self.auto_protector
700                        .should_demote_protection(entry, new_protection)
701                        .unwrap_or(new_protection)
702                } else {
703                    new_protection
704                };
705
706                if old_protection != final_protection {
707                    signals.push(MemorySignal::ProtectionChanged(ProtectionChange {
708                        id: entry.id.clone(),
709                        from: old_protection,
710                        to: final_protection,
711                        reason: format!(
712                            "access_count={}, sessions={}, corrected={}",
713                            entry.access_count, entry.session_appearances, entry.user_corrected
714                        ),
715                    }));
716                }
717            }
718        }
719
720        // 2. Auto-classification for entries that haven't been classified
721        if self.config.auto_classification {
722            for entry in &all_entries {
723                if entry.auto_classified || entry.memory_type == MemoryType::Knowledge {
724                    continue; // Skip already classified or knowledge-base entries
725                }
726                let inferred = AutoClassifier::infer_memory_type(&entry.content, "");
727                if inferred != entry.memory_type {
728                    signals.push(MemorySignal::AutoClassify {
729                        id: entry.id.clone(),
730                        new_type: inferred,
731                    });
732                }
733            }
734        }
735
736        // 3. Decay computation and deletion candidates
737        for entry in &all_entries {
738            let decay = self.decay_engine.compute_decay(entry, now);
739            if self
740                .decay_engine
741                .is_prunable(entry, self.config.decay_threshold)
742            {
743                signals.push(MemorySignal::DecayCandidate(DecayCandidate {
744                    id: entry.id.clone(),
745                    decay_score: decay,
746                    protection: entry.protection,
747                    memory_type: entry.memory_type,
748                }));
749            }
750        }
751
752        // 4. Tier overflow checks
753        let hot_count = all_entries
754            .iter()
755            .filter(|e| e.tier == MemoryTier::Hot)
756            .count();
757        if hot_count > self.config.hot_max_entries {
758            let overflow = hot_count - self.config.hot_max_entries;
759            let mut candidates: Vec<&MemoryEntry> = all_entries
760                .iter()
761                .filter(|e| {
762                    e.tier == MemoryTier::Hot && e.protection < ProtectionLevel::High && !e.pinned
763                })
764                .collect();
765            candidates.sort_by(|a, b| {
766                a.protection.cmp(&b.protection).then(
767                    a.decay_score
768                        .partial_cmp(&b.decay_score)
769                        .unwrap_or(std::cmp::Ordering::Equal),
770                )
771            });
772            for entry in candidates.into_iter().take(overflow) {
773                signals.push(MemorySignal::PromotionCandidate(TierChange {
774                    id: entry.id.clone(),
775                    from_tier: MemoryTier::Hot,
776                    to_tier: MemoryTier::Warm,
777                    reason: "Hot tier overflow".to_string(),
778                }));
779            }
780        }
781
782        // 5. PageRank-based importance boost (Phase 2)
783        if self.config.pagerank_enabled {
784            #[cfg(feature = "sqlite-memory")]
785            if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
786                let scores = sqlite.compute_pagerank(
787                    self.config.pagerank_damping,
788                    self.config.pagerank_iterations,
789                    None,
790                );
791
792                if !scores.is_empty() {
793                    // Get current importance for each scored memory
794                    let conn = sqlite.db().conn();
795                    for (&rowid, &pr_score) in &scores {
796                        if let Ok(old_importance) = conn.query_row(
797                            "SELECT importance FROM memories WHERE rowid = ?1",
798                            rusqlite::params![rowid as i64],
799                            |row| row.get::<_, f32>(0),
800                        ) {
801                            let new_importance = (old_importance
802                                * (1.0 + self.config.pagerank_boost_factor * pr_score as f32))
803                                .clamp(0.0, 1.0);
804
805                            if (new_importance - old_importance).abs() > 0.001 {
806                                signals.push(MemorySignal::PageRankBoost {
807                                    rowid,
808                                    old_importance,
809                                    new_importance,
810                                    pagerank_score: pr_score,
811                                });
812                            }
813                        }
814                    }
815                }
816            }
817        }
818
819        Ok(signals)
820    }
821
822    async fn dream_consolidate(&self, signals: &[MemorySignal]) -> Result<ConsolidationPlan> {
823        let mut plan = ConsolidationPlan::default();
824
825        for signal in signals {
826            match signal {
827                MemorySignal::ProtectionChanged(change) => {
828                    plan.protection_updates.push(change.clone());
829                }
830                MemorySignal::AutoClassify { id, new_type } => {
831                    plan.reclassify.push(ReclassifyPlan {
832                        id: id.clone(),
833                        new_type: *new_type,
834                    });
835                }
836                MemorySignal::TypePromotion(promo) => {
837                    plan.reclassify.push(ReclassifyPlan {
838                        id: promo.id.clone(),
839                        new_type: promo.suggested_type,
840                    });
841                }
842                MemorySignal::PromotionCandidate(tc) => {
843                    plan.demote.push(tc.clone());
844                }
845                MemorySignal::DecayCandidate(dc) => {
846                    if dc.protection <= ProtectionLevel::Low {
847                        plan.delete.push(dc.id.clone());
848                    }
849                }
850                MemorySignal::Duplicate { id_a, id_b, .. } => {
851                    plan.merge.push(MergePlan {
852                        keep_id: id_a.clone(),
853                        remove_id: id_b.clone(),
854                        merged_content: String::new(), // Would be computed in full impl
855                    });
856                }
857                MemorySignal::Contradiction { newer_id, older_id } => {
858                    // Mark older as contradicted
859                    plan.merge.push(MergePlan {
860                        keep_id: newer_id.clone(),
861                        remove_id: older_id.clone(),
862                        merged_content: String::new(),
863                    });
864                }
865                MemorySignal::PageRankBoost {
866                    rowid,
867                    old_importance,
868                    new_importance,
869                    pagerank_score,
870                } => {
871                    plan.pagerank_updates.push(PageRankUpdate {
872                        rowid: *rowid,
873                        old_importance: *old_importance,
874                        new_importance: *new_importance,
875                        pagerank_score: *pagerank_score,
876                    });
877                }
878            }
879        }
880
881        Ok(plan)
882    }
883
884    async fn dream_prune_and_index(&self, plan: &ConsolidationPlan) -> Result<Phase4Result> {
885        let mut contradictions_resolved = 0;
886
887        // 1. Apply protection updates
888        for change in &plan.protection_updates {
889            if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&change.id).await {
890                entry.protection = change.to;
891                let _ = self.memory_manager.remember(entry).await;
892            }
893        }
894
895        // 2. Apply type reclassification
896        for reclassify in &plan.reclassify {
897            if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&reclassify.id).await {
898                entry.memory_type = reclassify.new_type;
899                entry.auto_classified = true;
900                let _ = self.memory_manager.remember(entry).await;
901            }
902        }
903
904        // 3. Apply tier changes
905        for tc in &plan.demote {
906            if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&tc.id).await {
907                entry.tier = tc.to_tier;
908                let _ = self.memory_manager.remember(entry).await;
909            }
910        }
911
912        // 4. Apply merges
913        for merge in &plan.merge {
914            contradictions_resolved += 1;
915            // Remove the older/duplicate entry
916            let _ = self
917                .memory_manager
918                .get_by_id(&merge.remove_id)
919                .await
920                .ok()
921                .flatten()
922                .map(
923                    |e| async move { self.memory_manager.forget(&e.id, e.memory_type).await.ok() },
924                );
925        }
926
927        // 5. Apply PageRank importance updates (Phase 2)
928        #[cfg(feature = "sqlite-memory")]
929        if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
930            for update in &plan.pagerank_updates {
931                let conn = sqlite.db().conn();
932                let _ = conn.execute(
933                    "UPDATE memories SET importance = ?1 WHERE rowid = ?2",
934                    rusqlite::params![update.new_importance, update.rowid as i64],
935                );
936            }
937        }
938
939        // 6. Apply deletions (with safety checks)
940        for id in &plan.delete {
941            if let Ok(Some(entry)) = self.memory_manager.get_by_id(id).await {
942                // Safety check: never delete protected entries
943                if entry.protection <= ProtectionLevel::Low
944                    && !entry.pinned
945                    && !entry.memory_type.is_auto_protected()
946                {
947                    let _ = self.memory_manager.forget(id, entry.memory_type).await;
948                }
949            }
950        }
951
952        // 7. Rebuild ROOT index
953        self.rebuild_root_index().await?;
954
955        // 8. Rebuild Hyperbolic Embeddings (Phase 5)
956        // RFC-018 b.1: hyperbolic core moved to oxios-memory; SQLite persistence
957        // is a kernel-side adapter (`hyperbolic_persist`).
958        #[cfg(feature = "sqlite-memory")]
959        if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
960            let config = crate::memory::hyperbolic::HyperbolicConfig::default();
961            match crate::memory::sqlite::hyperbolic_persist::restore_from_sqlite(sqlite, config) {
962                Ok(he) => {
963                    let count = he.len();
964                    if count < 10 {
965                        tracing::debug!("Hyperbolic embeddings need rebuild (count < 10)");
966                    }
967                    tracing::debug!(count, "Hyperbolic embeddings loaded");
968                }
969                Err(e) => {
970                    tracing::debug!(error = %e, "Failed to restore hyperbolic embeddings (non-fatal)");
971                }
972            }
973        }
974
975        // 9. Persist & auto-promote learning patterns (Phase 4: SONA)
976        let patterns_persisted = {
977            #[cfg(feature = "sqlite-memory")]
978            if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
979                // Auto-promote high-quality patterns to long-term storage
980                let _ = sqlite.auto_promote_patterns(0.8, 3);
981                // Count total patterns in store as the persistence metric
982                let conn = sqlite.db().conn();
983                let total: usize = conn
984                    .query_row("SELECT COUNT(*) FROM patterns", [], |row| row.get(0))
985                    .unwrap_or(0);
986                total
987            } else {
988                0
989            }
990            #[cfg(not(feature = "sqlite-memory"))]
991            {
992                0
993            }
994        };
995
996        // 10. Flash Attention reranking (Phase 6)
997        let flash_reranked = {
998            #[cfg(feature = "sqlite-memory")]
999            if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
1000                let hot = self
1001                    .memory_manager
1002                    .list_by_tier(MemoryTier::Hot, 50)
1003                    .await
1004                    .unwrap_or_default();
1005                if !hot.is_empty() {
1006                    let query: String = hot
1007                        .iter()
1008                        .take(3)
1009                        .map(|e| e.content.as_str())
1010                        .collect::<Vec<_>>()
1011                        .join(" ");
1012                    if !query.is_empty() {
1013                        match sqlite.recall_with_rerank(&query, hot.len()).await {
1014                            Ok(reranked) => reranked.len(),
1015                            Err(_) => 0,
1016                        }
1017                    } else {
1018                        0
1019                    }
1020                } else {
1021                    0
1022                }
1023            } else {
1024                0
1025            }
1026            #[cfg(not(feature = "sqlite-memory"))]
1027            {
1028                0
1029            }
1030        };
1031
1032        Ok(Phase4Result {
1033            contradictions_resolved,
1034            flash_reranked,
1035            patterns_persisted,
1036        })
1037    }
1038
1039    // ── Helper methods ──────────────────────────────────
1040
1041    /// Rebuild the ROOT index from current memory state.
1042    async fn rebuild_root_index(&self) -> Result<()> {
1043        let mut root = RootIndex::new();
1044        root.version += 1;
1045        root.updated_at = Utc::now();
1046
1047        let now = Utc::now();
1048        let mut all_entries = Vec::new();
1049        for mt in MemoryType::all() {
1050            if let Ok(entries) = self.memory_manager.list(*mt, 1_000).await {
1051                all_entries.extend(entries);
1052            }
1053        }
1054
1055        // Build active context (recent, important)
1056        let mut recent: Vec<&MemoryEntry> = all_entries
1057            .iter()
1058            .filter(|e| (now - e.accessed_at).num_days() <= 7 && e.importance >= 0.5)
1059            .collect();
1060        recent.sort_by(|a, b| {
1061            b.importance
1062                .partial_cmp(&a.importance)
1063                .unwrap_or(std::cmp::Ordering::Equal)
1064        });
1065        recent.truncate(20);
1066
1067        for entry in &recent {
1068            root.active_context.push(RootEntry {
1069                topic: entry.content.split('.').next().unwrap_or("").to_string(),
1070                memory_type: entry.memory_type,
1071                protection: entry.protection,
1072                age_days: (now - entry.created_at).num_days() as u32,
1073                reference: entry.id.clone(),
1074            });
1075        }
1076
1077        // Build topic index
1078        for entry in &all_entries {
1079            let first_sentence = entry
1080                .content
1081                .split('.')
1082                .next()
1083                .unwrap_or(&entry.content)
1084                .to_string();
1085            root.topics.push(TopicEntry {
1086                name: first_sentence.clone(),
1087                category: entry.memory_type.label().to_string(),
1088                age_days: (now - entry.created_at).num_days() as u32,
1089                description: entry.content.chars().take(100).collect(),
1090                reference: entry.id.clone(),
1091            });
1092        }
1093
1094        *self.root_index.write() = root;
1095        Ok(())
1096    }
1097
1098    /// Load a checkpoint if one exists.
1099    async fn load_checkpoint(&self) -> Result<Option<DreamCheckpoint>> {
1100        let path = DreamCheckpoint::path(&self.space_dir);
1101        if !path.exists() {
1102            return Ok(None);
1103        }
1104        let data = fs::read_to_string(&path).await?;
1105        let checkpoint: DreamCheckpoint = serde_json::from_str(&data)?;
1106        if checkpoint.is_stale() {
1107            tracing::info!("Stale checkpoint found, ignoring");
1108            return Ok(None);
1109        }
1110        Ok(Some(checkpoint))
1111    }
1112
1113    /// Save a checkpoint after completing a phase.
1114    async fn save_checkpoint(
1115        &self,
1116        dream_id: &str,
1117        completed_phase: u8,
1118        signals: Option<&[MemorySignal]>,
1119        plan: Option<&ConsolidationPlan>,
1120    ) -> Result<()> {
1121        let checkpoint = DreamCheckpoint {
1122            dream_id: dream_id.to_string(),
1123
1124            started_at: Utc::now(),
1125            completed_phase,
1126            cached_signals: signals.map(|s| s.to_vec()),
1127            cached_plan: plan.cloned(),
1128        };
1129        let path = DreamCheckpoint::path(&self.space_dir);
1130        if let Some(parent) = path.parent() {
1131            let _ = fs::create_dir_all(parent).await;
1132        }
1133        let data = serde_json::to_string_pretty(&checkpoint)?;
1134        fs::write(&path, data).await?;
1135        Ok(())
1136    }
1137
1138    /// Clear the checkpoint after successful dream.
1139    async fn clear_checkpoint(&self) -> Result<()> {
1140        let path = DreamCheckpoint::path(&self.space_dir);
1141        if path.exists() {
1142            let _ = fs::remove_file(&path).await;
1143        }
1144        let lock_path = DreamCheckpoint::lock_path(&self.space_dir);
1145        if lock_path.exists() {
1146            let _ = fs::remove_file(&lock_path).await;
1147        }
1148        Ok(())
1149    }
1150}
1151
1152// ---------------------------------------------------------------------------
1153// Tests
1154// ---------------------------------------------------------------------------
1155
1156#[cfg(test)]
1157mod tests {
1158    use super::*;
1159
1160    #[test]
1161    fn test_dream_checkpoint_stale() {
1162        let cp = DreamCheckpoint {
1163            dream_id: "test".to_string(),
1164
1165            started_at: Utc::now() - chrono::Duration::hours(2),
1166            completed_phase: 2,
1167            cached_signals: None,
1168            cached_plan: None,
1169        };
1170        assert!(cp.is_stale());
1171    }
1172
1173    #[test]
1174    fn test_dream_checkpoint_fresh() {
1175        let cp = DreamCheckpoint {
1176            dream_id: "test".to_string(),
1177
1178            started_at: Utc::now(),
1179            completed_phase: 2,
1180            cached_signals: None,
1181            cached_plan: None,
1182        };
1183        assert!(!cp.is_stale());
1184    }
1185
1186    #[test]
1187    fn test_consolidation_plan_total_changes() {
1188        let mut plan = ConsolidationPlan::default();
1189        plan.protection_updates.push(ProtectionChange {
1190            id: "1".to_string(),
1191            from: ProtectionLevel::None,
1192            to: ProtectionLevel::Low,
1193            reason: "test".to_string(),
1194        });
1195        plan.delete.push("2".to_string());
1196        assert_eq!(plan.total_changes(), 2);
1197    }
1198
1199    #[test]
1200    fn test_should_dream_never_ran() {
1201        let config = DreamConfig::default();
1202        let temp = tempfile::tempdir().unwrap();
1203        let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1204            Arc::new(crate::memory::test_support::InMemoryStorage::default());
1205        let mgr = Arc::new(MemoryManager::new(storage));
1206        let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1207
1208        assert!(dream.should_dream(None, 0));
1209    }
1210
1211    #[test]
1212    fn test_should_dream_too_recent() {
1213        let config = DreamConfig::default();
1214        let temp = tempfile::tempdir().unwrap();
1215        let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1216            Arc::new(crate::memory::test_support::InMemoryStorage::default());
1217        let mgr = Arc::new(MemoryManager::new(storage));
1218        let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1219
1220        assert!(!dream.should_dream(Some(Utc::now()), 1));
1221    }
1222}