1#![allow(missing_docs)]
2use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tokio::fs;
19use uuid::Uuid;
20
21use crate::memory::auto_classify::AutoClassifier;
22use crate::memory::auto_protect::AutoProtector;
23use crate::memory::compaction::CompactionTree;
24use crate::memory::decay::DecayEngine;
25use crate::memory::manager::MemoryManager;
26use crate::memory::root_index::{RootEntry, RootIndex, TopicEntry};
27use crate::memory::types::{MemoryEntry, MemoryTier, MemoryType, ProtectionLevel};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct DreamCheckpoint {
36 pub dream_id: String,
38 pub started_at: DateTime<Utc>,
40 pub completed_phase: u8,
42 pub cached_signals: Option<Vec<MemorySignal>>,
44 pub cached_plan: Option<ConsolidationPlan>,
46 pub cached_state: Option<DreamState>,
48}
49
50impl DreamCheckpoint {
51 pub fn path(space_dir: &Path) -> PathBuf {
53 space_dir.join("memory/.dream_checkpoint.json")
54 }
55
56 pub fn lock_path(space_dir: &Path) -> PathBuf {
58 space_dir.join("memory/.dream.lock")
59 }
60
61 pub fn is_stale(&self) -> bool {
63 let age = Utc::now() - self.started_at;
64 age.num_hours() >= 1
65 }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct DreamReport {
75 pub dream_id: String,
77 pub started_at: DateTime<Utc>,
79 pub completed_at: DateTime<Utc>,
81 pub resumed_from_checkpoint: bool,
83 pub entries_before: usize,
85 pub entries_after: usize,
87 pub compacted: usize,
89 pub promoted: usize,
91 pub demoted: usize,
93 pub protection_promoted: usize,
95 pub protection_demoted: usize,
97 pub deleted: usize,
99 pub contradictions_resolved: usize,
101 pub duplicates_merged: usize,
103 pub auto_protected: usize,
105 pub auto_classified: usize,
107 pub type_promotions: usize,
109 pub root_updated: bool,
111 pub used_llm: bool,
113 pub pagerank_updates: usize,
115 pub patterns_persisted: usize,
117 pub hyperbolic_rebuilt: bool,
119 pub flash_reranked: usize,
121 pub duration_ms: u64,
123 #[serde(skip_serializing_if = "Option::is_none")]
125 pub error: Option<String>,
126}
127
128impl DreamReport {
129 pub fn report_path(space_dir: &Path, dream_id: &str) -> PathBuf {
131 space_dir
132 .join("memory/dream_reports")
133 .join(format!("{dream_id}.json"))
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
143pub enum MemorySignal {
144 ProtectionChanged(ProtectionChange),
146 AutoClassify { id: String, new_type: MemoryType },
148 TypePromotion(TypePromotion),
150 PromotionCandidate(TierChange),
152 DecayCandidate(DecayCandidate),
154 Duplicate {
156 id_a: String,
157 id_b: String,
158 similarity: f64,
159 },
160 Contradiction { newer_id: String, older_id: String },
162 PageRankBoost {
164 rowid: u64,
165 old_importance: f32,
166 new_importance: f32,
167 pagerank_score: f64,
168 },
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ProtectionChange {
174 pub id: String,
175 pub from: ProtectionLevel,
176 pub to: ProtectionLevel,
177 pub reason: String,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct TypePromotion {
183 pub id: String,
184 pub current_type: MemoryType,
185 pub suggested_type: MemoryType,
186 pub repetitions: u32,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TierChange {
192 pub id: String,
193 pub from_tier: MemoryTier,
194 pub to_tier: MemoryTier,
195 pub reason: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct DecayCandidate {
201 pub id: String,
202 pub decay_score: f32,
203 pub protection: ProtectionLevel,
204 pub memory_type: MemoryType,
205}
206
207#[derive(Debug, Clone, Default, Serialize, Deserialize)]
209pub struct ConsolidationPlan {
210 pub protection_updates: Vec<ProtectionChange>,
212 pub reclassify: Vec<ReclassifyPlan>,
214 pub promote: Vec<TierChange>,
216 pub demote: Vec<TierChange>,
218 pub delete: Vec<String>,
220 pub merge: Vec<MergePlan>,
222 pub pagerank_updates: Vec<PageRankUpdate>,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PageRankUpdate {
229 pub rowid: u64,
231 pub old_importance: f32,
233 pub new_importance: f32,
235 pub pagerank_score: f64,
237}
238
239impl ConsolidationPlan {
240 pub fn total_changes(&self) -> usize {
242 self.protection_updates.len()
243 + self.reclassify.len()
244 + self.promote.len()
245 + self.demote.len()
246 + self.delete.len()
247 + self.merge.len()
248 + self.pagerank_updates.len()
249 }
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct ReclassifyPlan {
255 pub id: String,
256 pub new_type: MemoryType,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct MergePlan {
262 pub keep_id: String,
263 pub remove_id: String,
264 pub merged_content: String,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct DreamState {
272 pub total_entries: usize,
273 pub hot_count: usize,
274 pub warm_count: usize,
275 pub cold_count: usize,
276 pub root_version: u64,
277 pub type_distribution: Vec<(MemoryType, usize)>,
278 pub protection_distribution: Vec<(ProtectionLevel, usize)>,
279 pub avg_decay: f32,
280}
281
282#[derive(Debug, Clone)]
288pub struct DreamConfig {
289 pub dream_enabled: bool,
290 pub dream_interval_hours: u64,
291 pub dream_min_sessions: u32,
292 pub hot_max_entries: usize,
293 pub warm_max_entries: usize,
294 pub cold_max_entries: usize,
295 pub hot_token_budget: usize,
296 pub decay_threshold: f32,
297 pub retention_days: u32,
298 pub decay_multiplier: f32,
299 pub auto_protection: bool,
300 pub protection_low_access: u32,
301 pub protection_medium_access: u32,
302 pub protection_high_access: u32,
303 pub protection_medium_sessions: u32,
304 pub protection_high_sessions: u32,
305 pub protection_demotion_enabled: bool,
306 pub protection_demotion_stale_days: u32,
307 pub auto_classification: bool,
308 pub type_promotion_repetitions: u32,
309 pub compaction_line_threshold: usize,
310 pub proactive_recall_limit: usize,
311 pub proactive_recall_threshold: f32,
312 pub pagerank_enabled: bool,
314 pub pagerank_damping: f64,
316 pub pagerank_iterations: usize,
318 pub pagerank_boost_factor: f32,
320}
321
322impl Default for DreamConfig {
323 fn default() -> Self {
324 Self {
325 dream_enabled: true,
326 dream_interval_hours: 24,
327 dream_min_sessions: 5,
328 hot_max_entries: 100,
329 warm_max_entries: 1000,
330 cold_max_entries: 10000,
331 hot_token_budget: 2000,
332 decay_threshold: 0.1,
333 retention_days: 90,
334 decay_multiplier: 0.95,
335 auto_protection: true,
336 protection_low_access: 3,
337 protection_medium_access: 10,
338 protection_high_access: 50,
339 protection_medium_sessions: 3,
340 protection_high_sessions: 10,
341 protection_demotion_enabled: true,
342 protection_demotion_stale_days: 30,
343 auto_classification: true,
344 type_promotion_repetitions: 3,
345 compaction_line_threshold: 10,
346 proactive_recall_limit: 5,
347 proactive_recall_threshold: 0.6,
348 pagerank_enabled: true,
349 pagerank_damping: 0.85,
350 pagerank_iterations: 30,
351 pagerank_boost_factor: 0.3,
352 }
353 }
354}
355
356pub struct DreamProcess {
365 memory_manager: Arc<MemoryManager>,
367 decay_engine: DecayEngine,
369 auto_protector: AutoProtector,
371 #[allow(dead_code)] compaction_tree: CompactionTree,
374 config: DreamConfig,
376 root_index: RwLock<RootIndex>,
378 space_dir: PathBuf,
380}
381
382impl std::fmt::Debug for DreamProcess {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 f.debug_struct("DreamProcess")
385 .field("space_dir", &self.space_dir.display())
386 .finish()
387 }
388}
389
390struct Phase4Result {
392 contradictions_resolved: usize,
393 flash_reranked: usize,
395 patterns_persisted: usize,
397}
398
399impl DreamProcess {
400 pub fn new(
402 memory_manager: Arc<MemoryManager>,
403 config: DreamConfig,
404 space_dir: PathBuf,
405 ) -> Self {
406 let auto_protector = AutoProtector::new(
407 config.protection_low_access,
408 config.protection_medium_access,
409 config.protection_high_access,
410 config.protection_medium_sessions,
411 config.protection_high_sessions,
412 config.protection_demotion_stale_days,
413 );
414
415 Self {
416 memory_manager,
417 decay_engine: DecayEngine::new(config.decay_multiplier),
418 auto_protector,
419 compaction_tree: CompactionTree::new(config.compaction_line_threshold),
420 config,
421 root_index: RwLock::new(RootIndex::new()),
422 space_dir,
423 }
424 }
425
426 pub fn should_dream(&self, last_dream: Option<DateTime<Utc>>, sessions_since: u32) -> bool {
428 if !self.config.dream_enabled {
429 return false;
430 }
431
432 match last_dream {
433 None => true, Some(last) => {
435 let hours = (Utc::now() - last).num_hours() as u64;
436 hours >= self.config.dream_interval_hours
437 || sessions_since >= self.config.dream_min_sessions
438 }
439 }
440 }
441
442 pub async fn dream(&self) -> DreamReport {
446 let dream_id = Uuid::new_v4().to_string();
447 let started_at = Utc::now();
448 let entries_before = self.memory_manager.total_entries().await;
449
450 let resumed = self.load_checkpoint().await.ok().flatten();
452 let resumed_from_checkpoint = resumed.is_some();
453
454 let start_phase = resumed.as_ref().map(|cp| cp.completed_phase).unwrap_or(0);
455
456 let mut report = DreamReport {
458 dream_id: dream_id.clone(),
459
460 started_at,
461 completed_at: started_at,
462 resumed_from_checkpoint,
463 entries_before,
464 entries_after: entries_before,
465 compacted: 0,
466 promoted: 0,
467 demoted: 0,
468 protection_promoted: 0,
469 protection_demoted: 0,
470 deleted: 0,
471 contradictions_resolved: 0,
472 duplicates_merged: 0,
473 auto_protected: 0,
474 auto_classified: 0,
475 type_promotions: 0,
476 root_updated: false,
477 used_llm: false,
478 pagerank_updates: 0,
479 patterns_persisted: 0,
480 hyperbolic_rebuilt: false,
481 flash_reranked: 0,
482 duration_ms: 0,
483 error: None,
484 };
485
486 let result = async {
487 let state = if start_phase < 1 {
493 self.dream_orient().await?
494 } else {
495 match resumed.as_ref().and_then(|cp| cp.cached_state.clone()) {
496 Some(s) => s,
497 None => self.dream_orient().await?,
498 }
499 };
500 self.save_checkpoint(&dream_id, 1, None, None, Some(&state))
501 .await?;
502
503 let signals = if start_phase < 2 {
505 self.dream_gather_signal().await?
506 } else {
507 resumed
508 .as_ref()
509 .and_then(|cp| cp.cached_signals.clone())
510 .unwrap_or_default()
511 };
512 self.save_checkpoint(&dream_id, 2, Some(&signals), None, Some(&state))
513 .await?;
514
515 let plan = if start_phase < 3 {
517 self.dream_consolidate(&signals).await?
518 } else {
519 resumed
520 .as_ref()
521 .and_then(|cp| cp.cached_plan.clone())
522 .unwrap_or_default()
523 };
524 self.save_checkpoint(&dream_id, 3, Some(&signals), Some(&plan), Some(&state))
525 .await?;
526
527 let phase4_result = self.dream_prune_and_index(&plan).await?;
529
530 report.protection_promoted = plan
532 .protection_updates
533 .iter()
534 .filter(|c| c.to > c.from)
535 .count();
536 report.protection_demoted = plan
537 .protection_updates
538 .iter()
539 .filter(|c| c.to < c.from)
540 .count();
541 report.promoted = plan.promote.len();
542 report.demoted = plan.demote.len();
543 report.deleted = plan.delete.len();
544 report.duplicates_merged = plan.merge.len();
545 report.type_promotions = plan.reclassify.len();
546 report.auto_protected = report.protection_promoted;
547 report.auto_classified = signals
548 .iter()
549 .filter(|s| matches!(s, MemorySignal::AutoClassify { .. }))
550 .count();
551 report.contradictions_resolved = phase4_result.contradictions_resolved;
552 report.root_updated = true;
553 report.pagerank_updates = plan.pagerank_updates.len();
554 report.hyperbolic_rebuilt = true;
555 report.flash_reranked = phase4_result.flash_reranked;
556 report.patterns_persisted = phase4_result.patterns_persisted;
557
558 self.clear_checkpoint().await.ok();
560
561 Ok::<(), anyhow::Error>(())
562 }
563 .await;
564
565 if let Err(e) = result {
566 report.error = Some(e.to_string());
567 }
568
569 report.completed_at = Utc::now();
570 report.duration_ms = (report.completed_at - report.started_at)
571 .num_milliseconds()
572 .max(0) as u64;
573 report.entries_after = self.memory_manager.total_entries().await;
574
575 let report_path = DreamReport::report_path(&self.space_dir, &dream_id);
577 if let Some(parent) = report_path.parent() {
578 let _ = fs::create_dir_all(parent).await;
579 }
580 if let Ok(data) = serde_json::to_string_pretty(&report) {
581 let _ = fs::write(&report_path, data).await;
582 }
583
584 report
585 }
586
587 pub fn spawn_dream_task(self: &Arc<Self>) {
589 let dream = Arc::clone(self);
590 tokio::spawn(async move {
591 let report = dream.dream().await;
592 if report.error.is_some() {
593 tracing::warn!(
594 dream_id = %report.dream_id,
595 error = ?report.error,
596 "Dream process completed with error"
597 );
598 } else {
599 tracing::info!(
600 dream_id = %report.dream_id,
601 promoted = report.promoted,
602 demoted = report.demoted,
603 deleted = report.deleted,
604 auto_protected = report.auto_protected,
605 duration_ms = report.duration_ms,
606 "Dream process completed"
607 );
608 }
609 });
610 }
611
612 async fn dream_orient(&self) -> Result<DreamState> {
615 let hot = self
616 .memory_manager
617 .list_by_tier(MemoryTier::Hot, 10_000)
618 .await
619 .unwrap_or_default();
620 let warm = self
621 .memory_manager
622 .list_by_tier(MemoryTier::Warm, 10_000)
623 .await
624 .unwrap_or_default();
625 let cold = self
626 .memory_manager
627 .list_by_tier(MemoryTier::Cold, 10_000)
628 .await
629 .unwrap_or_default();
630
631 let hot_count = hot.len();
632 let warm_count = warm.len();
633 let cold_count = cold.len();
634 let total = hot_count + warm_count + cold_count;
635
636 let root = self.root_index.read().clone();
637
638 let mut type_dist: Vec<(MemoryType, usize)> = Vec::new();
639 for mt in MemoryType::all() {
640 if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
641 let count = entries.len();
642 if count > 0 {
643 type_dist.push((*mt, count));
644 }
645 }
646 }
647
648 let mut prot_dist: Vec<(ProtectionLevel, usize)> = Vec::new();
649 let all_entries: Vec<&MemoryEntry> =
650 hot.iter().chain(warm.iter()).chain(cold.iter()).collect();
651 for level in &[
652 ProtectionLevel::None,
653 ProtectionLevel::Low,
654 ProtectionLevel::Medium,
655 ProtectionLevel::High,
656 ProtectionLevel::Permanent,
657 ] {
658 let count = all_entries
659 .iter()
660 .filter(|e| e.protection == *level)
661 .count();
662 if count > 0 {
663 prot_dist.push((*level, count));
664 }
665 }
666
667 let avg_decay = if all_entries.is_empty() {
668 1.0
669 } else {
670 all_entries.iter().map(|e| e.decay_score).sum::<f32>() / all_entries.len() as f32
671 };
672
673 Ok(DreamState {
674 total_entries: total,
675 hot_count,
676 warm_count,
677 cold_count,
678 root_version: root.version,
679 type_distribution: type_dist,
680 protection_distribution: prot_dist,
681 avg_decay,
682 })
683 }
684
685 async fn dream_gather_signal(&self) -> Result<Vec<MemorySignal>> {
686 let mut signals = Vec::new();
687
688 let mut all_entries = Vec::new();
690 for mt in MemoryType::all() {
691 if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
692 all_entries.extend(entries);
693 }
694 }
695
696 let now = Utc::now();
697
698 if self.config.auto_protection {
700 for entry in &all_entries {
701 let old_protection = entry.protection;
702 let new_protection = self.auto_protector.compute_protection(entry);
703
704 let final_protection = if self.config.protection_demotion_enabled {
706 self.auto_protector
707 .should_demote_protection(entry, new_protection)
708 .unwrap_or(new_protection)
709 } else {
710 new_protection
711 };
712
713 if old_protection != final_protection {
714 signals.push(MemorySignal::ProtectionChanged(ProtectionChange {
715 id: entry.id.clone(),
716 from: old_protection,
717 to: final_protection,
718 reason: format!(
719 "access_count={}, sessions={}, corrected={}",
720 entry.access_count, entry.session_appearances, entry.user_corrected
721 ),
722 }));
723 }
724 }
725 }
726
727 if self.config.auto_classification {
729 for entry in &all_entries {
730 if entry.auto_classified || entry.memory_type == MemoryType::Knowledge {
731 continue; }
733 let inferred = AutoClassifier::infer_memory_type(&entry.content, "");
734 if inferred != entry.memory_type {
735 signals.push(MemorySignal::AutoClassify {
736 id: entry.id.clone(),
737 new_type: inferred,
738 });
739 }
740 }
741 }
742
743 for entry in &all_entries {
745 let decay = self.decay_engine.compute_decay(entry, now);
746 if self
747 .decay_engine
748 .is_prunable(entry, self.config.decay_threshold, now)
749 {
750 signals.push(MemorySignal::DecayCandidate(DecayCandidate {
751 id: entry.id.clone(),
752 decay_score: decay,
753 protection: entry.protection,
754 memory_type: entry.memory_type,
755 }));
756 }
757 }
758
759 let hot_count = all_entries
761 .iter()
762 .filter(|e| e.tier == MemoryTier::Hot)
763 .count();
764 if hot_count > self.config.hot_max_entries {
765 let overflow = hot_count - self.config.hot_max_entries;
766 let mut candidates: Vec<&MemoryEntry> = all_entries
767 .iter()
768 .filter(|e| {
769 e.tier == MemoryTier::Hot && e.protection < ProtectionLevel::High && !e.pinned
770 })
771 .collect();
772 candidates.sort_by(|a, b| {
773 a.protection.cmp(&b.protection).then(
774 a.decay_score
775 .partial_cmp(&b.decay_score)
776 .unwrap_or(std::cmp::Ordering::Equal),
777 )
778 });
779 for entry in candidates.into_iter().take(overflow) {
780 signals.push(MemorySignal::PromotionCandidate(TierChange {
781 id: entry.id.clone(),
782 from_tier: MemoryTier::Hot,
783 to_tier: MemoryTier::Warm,
784 reason: "Hot tier overflow".to_string(),
785 }));
786 }
787 }
788
789 if self.config.pagerank_enabled {
791 #[cfg(feature = "sqlite-memory")]
792 if let Some(sqlite) = self.memory_manager.sqlite_store() {
793 let scores = sqlite.compute_pagerank(
794 self.config.pagerank_damping,
795 self.config.pagerank_iterations,
796 None,
797 );
798
799 if !scores.is_empty() {
800 let conn = sqlite.db().conn();
802 for (&rowid, &pr_score) in &scores {
803 if let Ok(old_importance) = conn.query_row(
804 "SELECT importance FROM memories WHERE rowid = ?1",
805 rusqlite::params![rowid as i64],
806 |row| row.get::<_, f32>(0),
807 ) {
808 let new_importance = (old_importance
809 * (1.0 + self.config.pagerank_boost_factor * pr_score as f32))
810 .clamp(0.0, 1.0);
811
812 if (new_importance - old_importance).abs() > 0.001 {
813 signals.push(MemorySignal::PageRankBoost {
814 rowid,
815 old_importance,
816 new_importance,
817 pagerank_score: pr_score,
818 });
819 }
820 }
821 }
822 }
823 }
824 }
825
826 Ok(signals)
827 }
828
829 async fn dream_consolidate(&self, signals: &[MemorySignal]) -> Result<ConsolidationPlan> {
830 let mut plan = ConsolidationPlan::default();
831
832 for signal in signals {
833 match signal {
834 MemorySignal::ProtectionChanged(change) => {
835 plan.protection_updates.push(change.clone());
836 }
837 MemorySignal::AutoClassify { id, new_type } => {
838 plan.reclassify.push(ReclassifyPlan {
839 id: id.clone(),
840 new_type: *new_type,
841 });
842 }
843 MemorySignal::TypePromotion(promo) => {
844 plan.reclassify.push(ReclassifyPlan {
845 id: promo.id.clone(),
846 new_type: promo.suggested_type,
847 });
848 }
849 MemorySignal::PromotionCandidate(tc) => {
850 plan.demote.push(tc.clone());
851 }
852 MemorySignal::DecayCandidate(dc) => {
853 if dc.protection <= ProtectionLevel::Low {
854 plan.delete.push(dc.id.clone());
855 }
856 }
857 MemorySignal::Duplicate { id_a, id_b, .. } => {
858 plan.merge.push(MergePlan {
859 keep_id: id_a.clone(),
860 remove_id: id_b.clone(),
861 merged_content: String::new(), });
863 }
864 MemorySignal::Contradiction { newer_id, older_id } => {
865 plan.merge.push(MergePlan {
867 keep_id: newer_id.clone(),
868 remove_id: older_id.clone(),
869 merged_content: String::new(),
870 });
871 }
872 MemorySignal::PageRankBoost {
873 rowid,
874 old_importance,
875 new_importance,
876 pagerank_score,
877 } => {
878 plan.pagerank_updates.push(PageRankUpdate {
879 rowid: *rowid,
880 old_importance: *old_importance,
881 new_importance: *new_importance,
882 pagerank_score: *pagerank_score,
883 });
884 }
885 }
886 }
887
888 Ok(plan)
889 }
890
891 async fn dream_prune_and_index(&self, plan: &ConsolidationPlan) -> Result<Phase4Result> {
892 let mut contradictions_resolved = 0;
893
894 for change in &plan.protection_updates {
896 if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&change.id).await {
897 entry.protection = change.to;
898 let _ = self.memory_manager.remember(entry).await;
899 }
900 }
901
902 for reclassify in &plan.reclassify {
904 if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&reclassify.id).await {
905 entry.memory_type = reclassify.new_type;
906 entry.auto_classified = true;
907 let _ = self.memory_manager.remember(entry).await;
908 }
909 }
910
911 for tc in &plan.demote {
913 if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&tc.id).await {
914 entry.tier = tc.to_tier;
915 let _ = self.memory_manager.remember(entry).await;
916 }
917 }
918
919 for merge in &plan.merge {
921 contradictions_resolved += 1;
922 if let Ok(Some(e)) = self.memory_manager.get_by_id(&merge.remove_id).await {
926 let _ = self.memory_manager.forget(&e.id, e.memory_type).await;
927 }
928 }
929
930 #[cfg(feature = "sqlite-memory")]
932 if let Some(sqlite) = self.memory_manager.sqlite_store() {
933 for update in &plan.pagerank_updates {
934 let conn = sqlite.db().conn();
935 let _ = conn.execute(
936 "UPDATE memories SET importance = ?1 WHERE rowid = ?2",
937 rusqlite::params![update.new_importance, update.rowid as i64],
938 );
939 }
940 }
941
942 for id in &plan.delete {
944 if let Ok(Some(entry)) = self.memory_manager.get_by_id(id).await {
945 if entry.protection <= ProtectionLevel::Low
947 && !entry.pinned
948 && !entry.memory_type.is_auto_protected()
949 {
950 let _ = self.memory_manager.forget(id, entry.memory_type).await;
951 }
952 }
953 }
954
955 self.rebuild_root_index().await?;
957
958 #[cfg(feature = "sqlite-memory")]
962 if let Some(sqlite) = self.memory_manager.sqlite_store() {
963 let config = crate::memory::hyperbolic::HyperbolicConfig::default();
964 match crate::memory::sqlite::hyperbolic_persist::restore_from_sqlite(sqlite, config) {
965 Ok(he) => {
966 let count = he.len();
967 if count < 10 {
968 tracing::debug!("Hyperbolic embeddings need rebuild (count < 10)");
969 }
970 tracing::debug!(count, "Hyperbolic embeddings loaded");
971 }
972 Err(e) => {
973 tracing::debug!(error = %e, "Failed to restore hyperbolic embeddings (non-fatal)");
974 }
975 }
976 }
977
978 let patterns_persisted = {
980 #[cfg(feature = "sqlite-memory")]
981 if let Some(sqlite) = self.memory_manager.sqlite_store() {
982 let _ = sqlite.auto_promote_patterns(0.8, 3);
984 let conn = sqlite.db().conn();
986 let total: usize = conn
987 .query_row("SELECT COUNT(*) FROM patterns", [], |row| row.get(0))
988 .unwrap_or(0);
989 total
990 } else {
991 0
992 }
993 #[cfg(not(feature = "sqlite-memory"))]
994 {
995 0
996 }
997 };
998
999 let flash_reranked = {
1001 #[cfg(feature = "sqlite-memory")]
1002 if let Some(sqlite) = self.memory_manager.sqlite_store() {
1003 let hot = self
1004 .memory_manager
1005 .list_by_tier(MemoryTier::Hot, 50)
1006 .await
1007 .unwrap_or_default();
1008 if !hot.is_empty() {
1009 let query: String = hot
1010 .iter()
1011 .take(3)
1012 .map(|e| e.content.as_str())
1013 .collect::<Vec<_>>()
1014 .join(" ");
1015 if !query.is_empty() {
1016 match sqlite.recall_with_rerank(&query, hot.len()).await {
1017 Ok(reranked) => reranked.len(),
1018 Err(_) => 0,
1019 }
1020 } else {
1021 0
1022 }
1023 } else {
1024 0
1025 }
1026 } else {
1027 0
1028 }
1029 #[cfg(not(feature = "sqlite-memory"))]
1030 {
1031 0
1032 }
1033 };
1034
1035 Ok(Phase4Result {
1036 contradictions_resolved,
1037 flash_reranked,
1038 patterns_persisted,
1039 })
1040 }
1041
1042 async fn rebuild_root_index(&self) -> Result<()> {
1046 let mut root = RootIndex::new();
1047 root.version += 1;
1048 root.updated_at = Utc::now();
1049
1050 let now = Utc::now();
1051 let mut all_entries = Vec::new();
1052 for mt in MemoryType::all() {
1053 if let Ok(entries) = self.memory_manager.list(*mt, 1_000).await {
1054 all_entries.extend(entries);
1055 }
1056 }
1057
1058 let mut recent: Vec<&MemoryEntry> = all_entries
1060 .iter()
1061 .filter(|e| (now - e.accessed_at).num_days() <= 7 && e.importance >= 0.5)
1062 .collect();
1063 recent.sort_by(|a, b| {
1064 b.importance
1065 .partial_cmp(&a.importance)
1066 .unwrap_or(std::cmp::Ordering::Equal)
1067 });
1068 recent.truncate(20);
1069
1070 for entry in &recent {
1071 root.active_context.push(RootEntry {
1072 topic: entry.content.split('.').next().unwrap_or("").to_string(),
1073 memory_type: entry.memory_type,
1074 protection: entry.protection,
1075 age_days: (now - entry.created_at).num_days() as u32,
1076 reference: entry.id.clone(),
1077 });
1078 }
1079
1080 for entry in &all_entries {
1082 let first_sentence = entry
1083 .content
1084 .split('.')
1085 .next()
1086 .unwrap_or(&entry.content)
1087 .to_string();
1088 root.topics.push(TopicEntry {
1089 name: first_sentence.clone(),
1090 category: entry.memory_type.label().to_string(),
1091 age_days: (now - entry.created_at).num_days() as u32,
1092 description: entry.content.chars().take(100).collect(),
1093 reference: entry.id.clone(),
1094 });
1095 }
1096
1097 *self.root_index.write() = root;
1098 Ok(())
1099 }
1100
1101 async fn load_checkpoint(&self) -> Result<Option<DreamCheckpoint>> {
1103 let path = DreamCheckpoint::path(&self.space_dir);
1104 if !path.exists() {
1105 return Ok(None);
1106 }
1107 let data = fs::read_to_string(&path).await?;
1108 let checkpoint: DreamCheckpoint = serde_json::from_str(&data)?;
1109 if checkpoint.is_stale() {
1110 tracing::info!("Stale checkpoint found, ignoring");
1111 return Ok(None);
1112 }
1113 Ok(Some(checkpoint))
1114 }
1115
1116 async fn save_checkpoint(
1118 &self,
1119 dream_id: &str,
1120 completed_phase: u8,
1121 signals: Option<&[MemorySignal]>,
1122 plan: Option<&ConsolidationPlan>,
1123 state: Option<&DreamState>,
1124 ) -> Result<()> {
1125 let checkpoint = DreamCheckpoint {
1126 dream_id: dream_id.to_string(),
1127 started_at: Utc::now(),
1128 completed_phase,
1129 cached_signals: signals.map(|s| s.to_vec()),
1130 cached_plan: plan.cloned(),
1131 cached_state: state.cloned(),
1132 };
1133 let path = DreamCheckpoint::path(&self.space_dir);
1134 if let Some(parent) = path.parent() {
1135 let _ = fs::create_dir_all(parent).await;
1136 }
1137 let data = serde_json::to_string_pretty(&checkpoint)?;
1138 fs::write(&path, data).await?;
1139 Ok(())
1140 }
1141
1142 async fn clear_checkpoint(&self) -> Result<()> {
1144 let path = DreamCheckpoint::path(&self.space_dir);
1145 if path.exists() {
1146 let _ = fs::remove_file(&path).await;
1147 }
1148 let lock_path = DreamCheckpoint::lock_path(&self.space_dir);
1149 if lock_path.exists() {
1150 let _ = fs::remove_file(&lock_path).await;
1151 }
1152 Ok(())
1153 }
1154}
1155
1156#[cfg(test)]
1161mod tests {
1162 use super::*;
1163
1164 #[test]
1165 fn test_dream_checkpoint_stale() {
1166 let cp = DreamCheckpoint {
1167 dream_id: "test".to_string(),
1168
1169 started_at: Utc::now() - chrono::Duration::hours(2),
1170 completed_phase: 2,
1171 cached_signals: None,
1172 cached_plan: None,
1173 cached_state: None,
1174 };
1175 assert!(cp.is_stale());
1176 }
1177
1178 #[test]
1179 fn test_dream_checkpoint_fresh() {
1180 let cp = DreamCheckpoint {
1181 dream_id: "test".to_string(),
1182
1183 started_at: Utc::now(),
1184 completed_phase: 2,
1185 cached_signals: None,
1186 cached_plan: None,
1187 cached_state: None,
1188 };
1189 assert!(!cp.is_stale());
1190 }
1191
1192 #[test]
1193 fn test_consolidation_plan_total_changes() {
1194 let mut plan = ConsolidationPlan::default();
1195 plan.protection_updates.push(ProtectionChange {
1196 id: "1".to_string(),
1197 from: ProtectionLevel::None,
1198 to: ProtectionLevel::Low,
1199 reason: "test".to_string(),
1200 });
1201 plan.delete.push("2".to_string());
1202 assert_eq!(plan.total_changes(), 2);
1203 }
1204
1205 #[test]
1206 fn test_should_dream_never_ran() {
1207 let config = DreamConfig::default();
1208 let temp = tempfile::tempdir().unwrap();
1209 let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1210 Arc::new(crate::memory::test_support::InMemoryStorage::default());
1211 let mgr = Arc::new(MemoryManager::new(storage));
1212 let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1213
1214 assert!(dream.should_dream(None, 0));
1215 }
1216
1217 #[test]
1218 fn test_should_dream_too_recent() {
1219 let config = DreamConfig::default();
1220 let temp = tempfile::tempdir().unwrap();
1221 let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1222 Arc::new(crate::memory::test_support::InMemoryStorage::default());
1223 let mgr = Arc::new(MemoryManager::new(storage));
1224 let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1225
1226 assert!(!dream.should_dream(Some(Utc::now()), 1));
1227 }
1228}