1#![allow(missing_docs)]
2use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tokio::fs;
19use uuid::Uuid;
20
21use crate::memory::auto_classify::AutoClassifier;
22use crate::memory::auto_protect::AutoProtector;
23use crate::memory::compaction::CompactionTree;
24use crate::memory::decay::DecayEngine;
25use crate::memory::manager::MemoryManager;
26use crate::memory::root_index::{RootEntry, RootIndex, TopicEntry};
27use crate::memory::types::{MemoryEntry, MemoryTier, MemoryType, ProtectionLevel};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct DreamCheckpoint {
36 pub dream_id: String,
38 pub started_at: DateTime<Utc>,
40 pub completed_phase: u8,
42 pub cached_signals: Option<Vec<MemorySignal>>,
44 pub cached_plan: Option<ConsolidationPlan>,
46}
47
48impl DreamCheckpoint {
49 pub fn path(space_dir: &Path) -> PathBuf {
51 space_dir.join("memory/.dream_checkpoint.json")
52 }
53
54 pub fn lock_path(space_dir: &Path) -> PathBuf {
56 space_dir.join("memory/.dream.lock")
57 }
58
59 pub fn is_stale(&self) -> bool {
61 let age = Utc::now() - self.started_at;
62 age.num_hours() >= 1
63 }
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DreamReport {
73 pub dream_id: String,
75 pub started_at: DateTime<Utc>,
77 pub completed_at: DateTime<Utc>,
79 pub resumed_from_checkpoint: bool,
81 pub entries_before: usize,
83 pub entries_after: usize,
85 pub compacted: usize,
87 pub promoted: usize,
89 pub demoted: usize,
91 pub protection_promoted: usize,
93 pub protection_demoted: usize,
95 pub deleted: usize,
97 pub contradictions_resolved: usize,
99 pub duplicates_merged: usize,
101 pub auto_protected: usize,
103 pub auto_classified: usize,
105 pub type_promotions: usize,
107 pub root_updated: bool,
109 pub used_llm: bool,
111 pub pagerank_updates: usize,
113 pub patterns_persisted: usize,
115 pub hyperbolic_rebuilt: bool,
117 pub flash_reranked: usize,
119 pub duration_ms: u64,
121 #[serde(skip_serializing_if = "Option::is_none")]
123 pub error: Option<String>,
124}
125
126impl DreamReport {
127 pub fn report_path(space_dir: &Path, dream_id: &str) -> PathBuf {
129 space_dir
130 .join("memory/dream_reports")
131 .join(format!("{dream_id}.json"))
132 }
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
141pub enum MemorySignal {
142 ProtectionChanged(ProtectionChange),
144 AutoClassify { id: String, new_type: MemoryType },
146 TypePromotion(TypePromotion),
148 PromotionCandidate(TierChange),
150 DecayCandidate(DecayCandidate),
152 Duplicate {
154 id_a: String,
155 id_b: String,
156 similarity: f64,
157 },
158 Contradiction { newer_id: String, older_id: String },
160 PageRankBoost {
162 rowid: u64,
163 old_importance: f32,
164 new_importance: f32,
165 pagerank_score: f64,
166 },
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct ProtectionChange {
172 pub id: String,
173 pub from: ProtectionLevel,
174 pub to: ProtectionLevel,
175 pub reason: String,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct TypePromotion {
181 pub id: String,
182 pub current_type: MemoryType,
183 pub suggested_type: MemoryType,
184 pub repetitions: u32,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct TierChange {
190 pub id: String,
191 pub from_tier: MemoryTier,
192 pub to_tier: MemoryTier,
193 pub reason: String,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct DecayCandidate {
199 pub id: String,
200 pub decay_score: f32,
201 pub protection: ProtectionLevel,
202 pub memory_type: MemoryType,
203}
204
205#[derive(Debug, Clone, Default, Serialize, Deserialize)]
207pub struct ConsolidationPlan {
208 pub protection_updates: Vec<ProtectionChange>,
210 pub reclassify: Vec<ReclassifyPlan>,
212 pub promote: Vec<TierChange>,
214 pub demote: Vec<TierChange>,
216 pub delete: Vec<String>,
218 pub merge: Vec<MergePlan>,
220 pub pagerank_updates: Vec<PageRankUpdate>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct PageRankUpdate {
227 pub rowid: u64,
229 pub old_importance: f32,
231 pub new_importance: f32,
233 pub pagerank_score: f64,
235}
236
237impl ConsolidationPlan {
238 pub fn total_changes(&self) -> usize {
240 self.protection_updates.len()
241 + self.reclassify.len()
242 + self.promote.len()
243 + self.demote.len()
244 + self.delete.len()
245 + self.merge.len()
246 + self.pagerank_updates.len()
247 }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct ReclassifyPlan {
253 pub id: String,
254 pub new_type: MemoryType,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MergePlan {
260 pub keep_id: String,
261 pub remove_id: String,
262 pub merged_content: String,
263}
264
265#[derive(Debug, Clone)]
271pub struct DreamState {
272 pub total_entries: usize,
273 pub hot_count: usize,
274 pub warm_count: usize,
275 pub cold_count: usize,
276 pub root_version: u64,
277 pub type_distribution: Vec<(MemoryType, usize)>,
278 pub protection_distribution: Vec<(ProtectionLevel, usize)>,
279 pub avg_decay: f32,
280}
281
282#[derive(Debug, Clone)]
288pub struct DreamConfig {
289 pub dream_enabled: bool,
290 pub dream_interval_hours: u64,
291 pub dream_min_sessions: u32,
292 pub hot_max_entries: usize,
293 pub warm_max_entries: usize,
294 pub cold_max_entries: usize,
295 pub hot_token_budget: usize,
296 pub decay_threshold: f32,
297 pub retention_days: u32,
298 pub decay_multiplier: f32,
299 pub auto_protection: bool,
300 pub protection_low_access: u32,
301 pub protection_medium_access: u32,
302 pub protection_high_access: u32,
303 pub protection_medium_sessions: u32,
304 pub protection_high_sessions: u32,
305 pub protection_demotion_enabled: bool,
306 pub protection_demotion_stale_days: u32,
307 pub auto_classification: bool,
308 pub type_promotion_repetitions: u32,
309 pub compaction_line_threshold: usize,
310 pub proactive_recall_limit: usize,
311 pub proactive_recall_threshold: f32,
312 pub pagerank_enabled: bool,
314 pub pagerank_damping: f64,
316 pub pagerank_iterations: usize,
318 pub pagerank_boost_factor: f32,
320}
321
322impl Default for DreamConfig {
323 fn default() -> Self {
324 Self {
325 dream_enabled: true,
326 dream_interval_hours: 24,
327 dream_min_sessions: 5,
328 hot_max_entries: 100,
329 warm_max_entries: 1000,
330 cold_max_entries: 10000,
331 hot_token_budget: 2000,
332 decay_threshold: 0.1,
333 retention_days: 90,
334 decay_multiplier: 0.95,
335 auto_protection: true,
336 protection_low_access: 3,
337 protection_medium_access: 10,
338 protection_high_access: 50,
339 protection_medium_sessions: 3,
340 protection_high_sessions: 10,
341 protection_demotion_enabled: true,
342 protection_demotion_stale_days: 30,
343 auto_classification: true,
344 type_promotion_repetitions: 3,
345 compaction_line_threshold: 10,
346 proactive_recall_limit: 5,
347 proactive_recall_threshold: 0.6,
348 pagerank_enabled: true,
349 pagerank_damping: 0.85,
350 pagerank_iterations: 30,
351 pagerank_boost_factor: 0.3,
352 }
353 }
354}
355
356pub struct DreamProcess {
365 memory_manager: Arc<MemoryManager>,
367 decay_engine: DecayEngine,
369 auto_protector: AutoProtector,
371 #[allow(dead_code)] compaction_tree: CompactionTree,
374 config: DreamConfig,
376 root_index: RwLock<RootIndex>,
378 space_dir: PathBuf,
380}
381
382impl std::fmt::Debug for DreamProcess {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 f.debug_struct("DreamProcess")
385 .field("space_dir", &self.space_dir.display())
386 .finish()
387 }
388}
389
390struct Phase4Result {
392 contradictions_resolved: usize,
393 flash_reranked: usize,
395 patterns_persisted: usize,
397}
398
399impl DreamProcess {
400 pub fn new(
402 memory_manager: Arc<MemoryManager>,
403 config: DreamConfig,
404 space_dir: PathBuf,
405 ) -> Self {
406 let auto_protector = AutoProtector::new(
407 config.protection_low_access,
408 config.protection_medium_access,
409 config.protection_high_access,
410 config.protection_medium_sessions,
411 config.protection_high_sessions,
412 config.protection_demotion_stale_days,
413 );
414
415 Self {
416 memory_manager,
417 decay_engine: DecayEngine::new(config.decay_multiplier),
418 auto_protector,
419 compaction_tree: CompactionTree::new(config.compaction_line_threshold),
420 config,
421 root_index: RwLock::new(RootIndex::new()),
422 space_dir,
423 }
424 }
425
426 pub fn should_dream(&self, last_dream: Option<DateTime<Utc>>, sessions_since: u32) -> bool {
428 if !self.config.dream_enabled {
429 return false;
430 }
431
432 match last_dream {
433 None => true, Some(last) => {
435 let hours = (Utc::now() - last).num_hours() as u64;
436 hours >= self.config.dream_interval_hours
437 || sessions_since >= self.config.dream_min_sessions
438 }
439 }
440 }
441
442 pub async fn dream(&self) -> DreamReport {
446 let dream_id = Uuid::new_v4().to_string();
447 let started_at = Utc::now();
448 let entries_before = self.memory_manager.total_entries().await;
449
450 let resumed = self.load_checkpoint().await.ok().flatten();
452 let resumed_from_checkpoint = resumed.is_some();
453
454 let start_phase = resumed.as_ref().map(|cp| cp.completed_phase).unwrap_or(0);
455
456 let mut report = DreamReport {
458 dream_id: dream_id.clone(),
459
460 started_at,
461 completed_at: started_at,
462 resumed_from_checkpoint,
463 entries_before,
464 entries_after: entries_before,
465 compacted: 0,
466 promoted: 0,
467 demoted: 0,
468 protection_promoted: 0,
469 protection_demoted: 0,
470 deleted: 0,
471 contradictions_resolved: 0,
472 duplicates_merged: 0,
473 auto_protected: 0,
474 auto_classified: 0,
475 type_promotions: 0,
476 root_updated: false,
477 used_llm: false,
478 pagerank_updates: 0,
479 patterns_persisted: 0,
480 hyperbolic_rebuilt: false,
481 flash_reranked: 0,
482 duration_ms: 0,
483 error: None,
484 };
485
486 let result = async {
487 let _state = if start_phase < 1 {
489 self.dream_orient().await?
490 } else {
491 self.dream_orient().await?
493 };
494 self.save_checkpoint(&dream_id, 1, None, None).await?;
495
496 let signals = if start_phase < 2 {
498 self.dream_gather_signal().await?
499 } else {
500 resumed
501 .as_ref()
502 .and_then(|cp| cp.cached_signals.clone())
503 .unwrap_or_default()
504 };
505 self.save_checkpoint(&dream_id, 2, Some(&signals), None)
506 .await?;
507
508 let plan = if start_phase < 3 {
510 self.dream_consolidate(&signals).await?
511 } else {
512 resumed
513 .as_ref()
514 .and_then(|cp| cp.cached_plan.clone())
515 .unwrap_or_default()
516 };
517 self.save_checkpoint(&dream_id, 3, Some(&signals), Some(&plan))
518 .await?;
519
520 let phase4_result = self.dream_prune_and_index(&plan).await?;
522
523 report.protection_promoted = plan
525 .protection_updates
526 .iter()
527 .filter(|c| c.to > c.from)
528 .count();
529 report.protection_demoted = plan
530 .protection_updates
531 .iter()
532 .filter(|c| c.to < c.from)
533 .count();
534 report.promoted = plan.promote.len();
535 report.demoted = plan.demote.len();
536 report.deleted = plan.delete.len();
537 report.duplicates_merged = plan.merge.len();
538 report.type_promotions = plan.reclassify.len();
539 report.auto_protected = report.protection_promoted;
540 report.auto_classified = signals
541 .iter()
542 .filter(|s| matches!(s, MemorySignal::AutoClassify { .. }))
543 .count();
544 report.contradictions_resolved = phase4_result.contradictions_resolved;
545 report.root_updated = true;
546 report.pagerank_updates = plan.pagerank_updates.len();
547 report.hyperbolic_rebuilt = true;
548 report.flash_reranked = phase4_result.flash_reranked;
549 report.patterns_persisted = phase4_result.patterns_persisted;
550
551 self.clear_checkpoint().await.ok();
553
554 Ok::<(), anyhow::Error>(())
555 }
556 .await;
557
558 if let Err(e) = result {
559 report.error = Some(e.to_string());
560 }
561
562 report.completed_at = Utc::now();
563 report.duration_ms = (report.completed_at - report.started_at)
564 .num_milliseconds()
565 .max(0) as u64;
566 report.entries_after = self.memory_manager.total_entries().await;
567
568 let report_path = DreamReport::report_path(&self.space_dir, &dream_id);
570 if let Some(parent) = report_path.parent() {
571 let _ = fs::create_dir_all(parent).await;
572 }
573 if let Ok(data) = serde_json::to_string_pretty(&report) {
574 let _ = fs::write(&report_path, data).await;
575 }
576
577 report
578 }
579
580 pub fn spawn_dream_task(self: &Arc<Self>) {
582 let dream = Arc::clone(self);
583 tokio::spawn(async move {
584 let report = dream.dream().await;
585 if report.error.is_some() {
586 tracing::warn!(
587 dream_id = %report.dream_id,
588 error = ?report.error,
589 "Dream process completed with error"
590 );
591 } else {
592 tracing::info!(
593 dream_id = %report.dream_id,
594 promoted = report.promoted,
595 demoted = report.demoted,
596 deleted = report.deleted,
597 auto_protected = report.auto_protected,
598 duration_ms = report.duration_ms,
599 "Dream process completed"
600 );
601 }
602 });
603 }
604
605 async fn dream_orient(&self) -> Result<DreamState> {
608 let hot = self
609 .memory_manager
610 .list_by_tier(MemoryTier::Hot, 10_000)
611 .await
612 .unwrap_or_default();
613 let warm = self
614 .memory_manager
615 .list_by_tier(MemoryTier::Warm, 10_000)
616 .await
617 .unwrap_or_default();
618 let cold = self
619 .memory_manager
620 .list_by_tier(MemoryTier::Cold, 10_000)
621 .await
622 .unwrap_or_default();
623
624 let hot_count = hot.len();
625 let warm_count = warm.len();
626 let cold_count = cold.len();
627 let total = hot_count + warm_count + cold_count;
628
629 let root = self.root_index.read().clone();
630
631 let mut type_dist: Vec<(MemoryType, usize)> = Vec::new();
632 for mt in MemoryType::all() {
633 if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
634 let count = entries.len();
635 if count > 0 {
636 type_dist.push((*mt, count));
637 }
638 }
639 }
640
641 let mut prot_dist: Vec<(ProtectionLevel, usize)> = Vec::new();
642 let all_entries: Vec<&MemoryEntry> =
643 hot.iter().chain(warm.iter()).chain(cold.iter()).collect();
644 for level in &[
645 ProtectionLevel::None,
646 ProtectionLevel::Low,
647 ProtectionLevel::Medium,
648 ProtectionLevel::High,
649 ProtectionLevel::Permanent,
650 ] {
651 let count = all_entries
652 .iter()
653 .filter(|e| e.protection == *level)
654 .count();
655 if count > 0 {
656 prot_dist.push((*level, count));
657 }
658 }
659
660 let avg_decay = if all_entries.is_empty() {
661 1.0
662 } else {
663 all_entries.iter().map(|e| e.decay_score).sum::<f32>() / all_entries.len() as f32
664 };
665
666 Ok(DreamState {
667 total_entries: total,
668 hot_count,
669 warm_count,
670 cold_count,
671 root_version: root.version,
672 type_distribution: type_dist,
673 protection_distribution: prot_dist,
674 avg_decay,
675 })
676 }
677
678 async fn dream_gather_signal(&self) -> Result<Vec<MemorySignal>> {
679 let mut signals = Vec::new();
680
681 let mut all_entries = Vec::new();
683 for mt in MemoryType::all() {
684 if let Ok(entries) = self.memory_manager.list(*mt, 1_000_000).await {
685 all_entries.extend(entries);
686 }
687 }
688
689 let now = Utc::now();
690
691 if self.config.auto_protection {
693 for entry in &all_entries {
694 let old_protection = entry.protection;
695 let new_protection = self.auto_protector.compute_protection(entry);
696
697 let final_protection = if self.config.protection_demotion_enabled {
699 self.auto_protector
700 .should_demote_protection(entry, new_protection)
701 .unwrap_or(new_protection)
702 } else {
703 new_protection
704 };
705
706 if old_protection != final_protection {
707 signals.push(MemorySignal::ProtectionChanged(ProtectionChange {
708 id: entry.id.clone(),
709 from: old_protection,
710 to: final_protection,
711 reason: format!(
712 "access_count={}, sessions={}, corrected={}",
713 entry.access_count, entry.session_appearances, entry.user_corrected
714 ),
715 }));
716 }
717 }
718 }
719
720 if self.config.auto_classification {
722 for entry in &all_entries {
723 if entry.auto_classified || entry.memory_type == MemoryType::Knowledge {
724 continue; }
726 let inferred = AutoClassifier::infer_memory_type(&entry.content, "");
727 if inferred != entry.memory_type {
728 signals.push(MemorySignal::AutoClassify {
729 id: entry.id.clone(),
730 new_type: inferred,
731 });
732 }
733 }
734 }
735
736 for entry in &all_entries {
738 let decay = self.decay_engine.compute_decay(entry, now);
739 if self
740 .decay_engine
741 .is_prunable(entry, self.config.decay_threshold)
742 {
743 signals.push(MemorySignal::DecayCandidate(DecayCandidate {
744 id: entry.id.clone(),
745 decay_score: decay,
746 protection: entry.protection,
747 memory_type: entry.memory_type,
748 }));
749 }
750 }
751
752 let hot_count = all_entries
754 .iter()
755 .filter(|e| e.tier == MemoryTier::Hot)
756 .count();
757 if hot_count > self.config.hot_max_entries {
758 let overflow = hot_count - self.config.hot_max_entries;
759 let mut candidates: Vec<&MemoryEntry> = all_entries
760 .iter()
761 .filter(|e| {
762 e.tier == MemoryTier::Hot && e.protection < ProtectionLevel::High && !e.pinned
763 })
764 .collect();
765 candidates.sort_by(|a, b| {
766 a.protection.cmp(&b.protection).then(
767 a.decay_score
768 .partial_cmp(&b.decay_score)
769 .unwrap_or(std::cmp::Ordering::Equal),
770 )
771 });
772 for entry in candidates.into_iter().take(overflow) {
773 signals.push(MemorySignal::PromotionCandidate(TierChange {
774 id: entry.id.clone(),
775 from_tier: MemoryTier::Hot,
776 to_tier: MemoryTier::Warm,
777 reason: "Hot tier overflow".to_string(),
778 }));
779 }
780 }
781
782 if self.config.pagerank_enabled {
784 #[cfg(feature = "sqlite-memory")]
785 if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
786 let scores = sqlite.compute_pagerank(
787 self.config.pagerank_damping,
788 self.config.pagerank_iterations,
789 None,
790 );
791
792 if !scores.is_empty() {
793 let conn = sqlite.db().conn();
795 for (&rowid, &pr_score) in &scores {
796 if let Ok(old_importance) = conn.query_row(
797 "SELECT importance FROM memories WHERE rowid = ?1",
798 rusqlite::params![rowid as i64],
799 |row| row.get::<_, f32>(0),
800 ) {
801 let new_importance = (old_importance
802 * (1.0 + self.config.pagerank_boost_factor * pr_score as f32))
803 .clamp(0.0, 1.0);
804
805 if (new_importance - old_importance).abs() > 0.001 {
806 signals.push(MemorySignal::PageRankBoost {
807 rowid,
808 old_importance,
809 new_importance,
810 pagerank_score: pr_score,
811 });
812 }
813 }
814 }
815 }
816 }
817 }
818
819 Ok(signals)
820 }
821
822 async fn dream_consolidate(&self, signals: &[MemorySignal]) -> Result<ConsolidationPlan> {
823 let mut plan = ConsolidationPlan::default();
824
825 for signal in signals {
826 match signal {
827 MemorySignal::ProtectionChanged(change) => {
828 plan.protection_updates.push(change.clone());
829 }
830 MemorySignal::AutoClassify { id, new_type } => {
831 plan.reclassify.push(ReclassifyPlan {
832 id: id.clone(),
833 new_type: *new_type,
834 });
835 }
836 MemorySignal::TypePromotion(promo) => {
837 plan.reclassify.push(ReclassifyPlan {
838 id: promo.id.clone(),
839 new_type: promo.suggested_type,
840 });
841 }
842 MemorySignal::PromotionCandidate(tc) => {
843 plan.demote.push(tc.clone());
844 }
845 MemorySignal::DecayCandidate(dc) => {
846 if dc.protection <= ProtectionLevel::Low {
847 plan.delete.push(dc.id.clone());
848 }
849 }
850 MemorySignal::Duplicate { id_a, id_b, .. } => {
851 plan.merge.push(MergePlan {
852 keep_id: id_a.clone(),
853 remove_id: id_b.clone(),
854 merged_content: String::new(), });
856 }
857 MemorySignal::Contradiction { newer_id, older_id } => {
858 plan.merge.push(MergePlan {
860 keep_id: newer_id.clone(),
861 remove_id: older_id.clone(),
862 merged_content: String::new(),
863 });
864 }
865 MemorySignal::PageRankBoost {
866 rowid,
867 old_importance,
868 new_importance,
869 pagerank_score,
870 } => {
871 plan.pagerank_updates.push(PageRankUpdate {
872 rowid: *rowid,
873 old_importance: *old_importance,
874 new_importance: *new_importance,
875 pagerank_score: *pagerank_score,
876 });
877 }
878 }
879 }
880
881 Ok(plan)
882 }
883
884 async fn dream_prune_and_index(&self, plan: &ConsolidationPlan) -> Result<Phase4Result> {
885 let mut contradictions_resolved = 0;
886
887 for change in &plan.protection_updates {
889 if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&change.id).await {
890 entry.protection = change.to;
891 let _ = self.memory_manager.remember(entry).await;
892 }
893 }
894
895 for reclassify in &plan.reclassify {
897 if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&reclassify.id).await {
898 entry.memory_type = reclassify.new_type;
899 entry.auto_classified = true;
900 let _ = self.memory_manager.remember(entry).await;
901 }
902 }
903
904 for tc in &plan.demote {
906 if let Ok(Some(mut entry)) = self.memory_manager.get_by_id(&tc.id).await {
907 entry.tier = tc.to_tier;
908 let _ = self.memory_manager.remember(entry).await;
909 }
910 }
911
912 for merge in &plan.merge {
914 contradictions_resolved += 1;
915 let _ = self
917 .memory_manager
918 .get_by_id(&merge.remove_id)
919 .await
920 .ok()
921 .flatten()
922 .map(
923 |e| async move { self.memory_manager.forget(&e.id, e.memory_type).await.ok() },
924 );
925 }
926
927 #[cfg(feature = "sqlite-memory")]
929 if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
930 for update in &plan.pagerank_updates {
931 let conn = sqlite.db().conn();
932 let _ = conn.execute(
933 "UPDATE memories SET importance = ?1 WHERE rowid = ?2",
934 rusqlite::params![update.new_importance, update.rowid as i64],
935 );
936 }
937 }
938
939 for id in &plan.delete {
941 if let Ok(Some(entry)) = self.memory_manager.get_by_id(id).await {
942 if entry.protection <= ProtectionLevel::Low
944 && !entry.pinned
945 && !entry.memory_type.is_auto_protected()
946 {
947 let _ = self.memory_manager.forget(id, entry.memory_type).await;
948 }
949 }
950 }
951
952 self.rebuild_root_index().await?;
954
955 #[cfg(feature = "sqlite-memory")]
959 if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
960 let config = crate::memory::hyperbolic::HyperbolicConfig::default();
961 match crate::memory::sqlite::hyperbolic_persist::restore_from_sqlite(sqlite, config) {
962 Ok(he) => {
963 let count = he.len();
964 if count < 10 {
965 tracing::debug!("Hyperbolic embeddings need rebuild (count < 10)");
966 }
967 tracing::debug!(count, "Hyperbolic embeddings loaded");
968 }
969 Err(e) => {
970 tracing::debug!(error = %e, "Failed to restore hyperbolic embeddings (non-fatal)");
971 }
972 }
973 }
974
975 let patterns_persisted = {
977 #[cfg(feature = "sqlite-memory")]
978 if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
979 let _ = sqlite.auto_promote_patterns(0.8, 3);
981 let conn = sqlite.db().conn();
983 let total: usize = conn
984 .query_row("SELECT COUNT(*) FROM patterns", [], |row| row.get(0))
985 .unwrap_or(0);
986 total
987 } else {
988 0
989 }
990 #[cfg(not(feature = "sqlite-memory"))]
991 {
992 0
993 }
994 };
995
996 let flash_reranked = {
998 #[cfg(feature = "sqlite-memory")]
999 if let Some(ref sqlite) = self.memory_manager.sqlite_store() {
1000 let hot = self
1001 .memory_manager
1002 .list_by_tier(MemoryTier::Hot, 50)
1003 .await
1004 .unwrap_or_default();
1005 if !hot.is_empty() {
1006 let query: String = hot
1007 .iter()
1008 .take(3)
1009 .map(|e| e.content.as_str())
1010 .collect::<Vec<_>>()
1011 .join(" ");
1012 if !query.is_empty() {
1013 match sqlite.recall_with_rerank(&query, hot.len()).await {
1014 Ok(reranked) => reranked.len(),
1015 Err(_) => 0,
1016 }
1017 } else {
1018 0
1019 }
1020 } else {
1021 0
1022 }
1023 } else {
1024 0
1025 }
1026 #[cfg(not(feature = "sqlite-memory"))]
1027 {
1028 0
1029 }
1030 };
1031
1032 Ok(Phase4Result {
1033 contradictions_resolved,
1034 flash_reranked,
1035 patterns_persisted,
1036 })
1037 }
1038
1039 async fn rebuild_root_index(&self) -> Result<()> {
1043 let mut root = RootIndex::new();
1044 root.version += 1;
1045 root.updated_at = Utc::now();
1046
1047 let now = Utc::now();
1048 let mut all_entries = Vec::new();
1049 for mt in MemoryType::all() {
1050 if let Ok(entries) = self.memory_manager.list(*mt, 1_000).await {
1051 all_entries.extend(entries);
1052 }
1053 }
1054
1055 let mut recent: Vec<&MemoryEntry> = all_entries
1057 .iter()
1058 .filter(|e| (now - e.accessed_at).num_days() <= 7 && e.importance >= 0.5)
1059 .collect();
1060 recent.sort_by(|a, b| {
1061 b.importance
1062 .partial_cmp(&a.importance)
1063 .unwrap_or(std::cmp::Ordering::Equal)
1064 });
1065 recent.truncate(20);
1066
1067 for entry in &recent {
1068 root.active_context.push(RootEntry {
1069 topic: entry.content.split('.').next().unwrap_or("").to_string(),
1070 memory_type: entry.memory_type,
1071 protection: entry.protection,
1072 age_days: (now - entry.created_at).num_days() as u32,
1073 reference: entry.id.clone(),
1074 });
1075 }
1076
1077 for entry in &all_entries {
1079 let first_sentence = entry
1080 .content
1081 .split('.')
1082 .next()
1083 .unwrap_or(&entry.content)
1084 .to_string();
1085 root.topics.push(TopicEntry {
1086 name: first_sentence.clone(),
1087 category: entry.memory_type.label().to_string(),
1088 age_days: (now - entry.created_at).num_days() as u32,
1089 description: entry.content.chars().take(100).collect(),
1090 reference: entry.id.clone(),
1091 });
1092 }
1093
1094 *self.root_index.write() = root;
1095 Ok(())
1096 }
1097
1098 async fn load_checkpoint(&self) -> Result<Option<DreamCheckpoint>> {
1100 let path = DreamCheckpoint::path(&self.space_dir);
1101 if !path.exists() {
1102 return Ok(None);
1103 }
1104 let data = fs::read_to_string(&path).await?;
1105 let checkpoint: DreamCheckpoint = serde_json::from_str(&data)?;
1106 if checkpoint.is_stale() {
1107 tracing::info!("Stale checkpoint found, ignoring");
1108 return Ok(None);
1109 }
1110 Ok(Some(checkpoint))
1111 }
1112
1113 async fn save_checkpoint(
1115 &self,
1116 dream_id: &str,
1117 completed_phase: u8,
1118 signals: Option<&[MemorySignal]>,
1119 plan: Option<&ConsolidationPlan>,
1120 ) -> Result<()> {
1121 let checkpoint = DreamCheckpoint {
1122 dream_id: dream_id.to_string(),
1123
1124 started_at: Utc::now(),
1125 completed_phase,
1126 cached_signals: signals.map(|s| s.to_vec()),
1127 cached_plan: plan.cloned(),
1128 };
1129 let path = DreamCheckpoint::path(&self.space_dir);
1130 if let Some(parent) = path.parent() {
1131 let _ = fs::create_dir_all(parent).await;
1132 }
1133 let data = serde_json::to_string_pretty(&checkpoint)?;
1134 fs::write(&path, data).await?;
1135 Ok(())
1136 }
1137
1138 async fn clear_checkpoint(&self) -> Result<()> {
1140 let path = DreamCheckpoint::path(&self.space_dir);
1141 if path.exists() {
1142 let _ = fs::remove_file(&path).await;
1143 }
1144 let lock_path = DreamCheckpoint::lock_path(&self.space_dir);
1145 if lock_path.exists() {
1146 let _ = fs::remove_file(&lock_path).await;
1147 }
1148 Ok(())
1149 }
1150}
1151
1152#[cfg(test)]
1157mod tests {
1158 use super::*;
1159
1160 #[test]
1161 fn test_dream_checkpoint_stale() {
1162 let cp = DreamCheckpoint {
1163 dream_id: "test".to_string(),
1164
1165 started_at: Utc::now() - chrono::Duration::hours(2),
1166 completed_phase: 2,
1167 cached_signals: None,
1168 cached_plan: None,
1169 };
1170 assert!(cp.is_stale());
1171 }
1172
1173 #[test]
1174 fn test_dream_checkpoint_fresh() {
1175 let cp = DreamCheckpoint {
1176 dream_id: "test".to_string(),
1177
1178 started_at: Utc::now(),
1179 completed_phase: 2,
1180 cached_signals: None,
1181 cached_plan: None,
1182 };
1183 assert!(!cp.is_stale());
1184 }
1185
1186 #[test]
1187 fn test_consolidation_plan_total_changes() {
1188 let mut plan = ConsolidationPlan::default();
1189 plan.protection_updates.push(ProtectionChange {
1190 id: "1".to_string(),
1191 from: ProtectionLevel::None,
1192 to: ProtectionLevel::Low,
1193 reason: "test".to_string(),
1194 });
1195 plan.delete.push("2".to_string());
1196 assert_eq!(plan.total_changes(), 2);
1197 }
1198
1199 #[test]
1200 fn test_should_dream_never_ran() {
1201 let config = DreamConfig::default();
1202 let temp = tempfile::tempdir().unwrap();
1203 let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1204 Arc::new(crate::memory::test_support::InMemoryStorage::default());
1205 let mgr = Arc::new(MemoryManager::new(storage));
1206 let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1207
1208 assert!(dream.should_dream(None, 0));
1209 }
1210
1211 #[test]
1212 fn test_should_dream_too_recent() {
1213 let config = DreamConfig::default();
1214 let temp = tempfile::tempdir().unwrap();
1215 let storage: Arc<dyn crate::memory::storage::MemoryStorage> =
1216 Arc::new(crate::memory::test_support::InMemoryStorage::default());
1217 let mgr = Arc::new(MemoryManager::new(storage));
1218 let dream = DreamProcess::new(mgr, config, temp.path().to_path_buf());
1219
1220 assert!(!dream.should_dream(Some(Utc::now()), 1));
1221 }
1222}