1pub mod compression;
10pub mod context;
11pub mod facts;
12pub mod feedback;
13pub mod files;
14pub mod graph_retrieval;
15pub mod hybrid_search;
16pub mod injection;
17pub mod introspection;
18pub mod learning_history;
19pub mod lineage;
20pub mod pattern_detection;
21pub mod prospective;
22pub mod query_parser;
23pub mod replay;
24pub mod retrieval;
25pub mod segmentation;
26pub mod sessions;
27pub mod storage;
28pub mod temporal_facts;
29pub mod todo_formatter;
30pub mod todos;
31pub mod types;
32pub mod visualization;
33
34use anyhow::{Context, Result};
35use parking_lot::RwLock;
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use std::collections::HashSet;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41use tracing::debug;
42use uuid::Uuid;
43
44use crate::metrics::{
45 EMBEDDING_CACHE_CONTENT, EMBEDDING_CACHE_CONTENT_SIZE, EMBEDDING_CACHE_QUERY,
46 EMBEDDING_CACHE_QUERY_SIZE,
47};
48
49use crate::constants::{
50 DEFAULT_COMPRESSION_AGE_DAYS, DEFAULT_IMPORTANCE_THRESHOLD, DEFAULT_MAX_HEAP_PER_USER_MB,
51 DEFAULT_SESSION_MEMORY_SIZE_MB, DEFAULT_WORKING_MEMORY_SIZE, EDGE_SEMANTIC_WEIGHT_FLOOR,
52 HEBBIAN_BOOST_HELPFUL, HEBBIAN_DECAY_MISLEADING, POTENTIATION_ACCESS_THRESHOLD,
53 POTENTIATION_MAINTENANCE_BOOST, TIER_PROMOTION_SESSION_AGE_SECS,
54 TIER_PROMOTION_SESSION_IMPORTANCE, TIER_PROMOTION_WORKING_AGE_SECS,
55 TIER_PROMOTION_WORKING_IMPORTANCE,
56};
57
58use crate::memory::storage::{MemoryStorage, SearchCriteria};
59pub use crate::memory::types::*;
60use crate::embeddings::Embedder;
62use crate::memory::compression::CompressionPipeline;
63pub use crate::memory::compression::{
64 ConsolidationResult, FactType, SemanticConsolidator, SemanticFact,
65};
66pub use crate::memory::facts::{FactQueryResponse, FactStats, SemanticFactStore};
67pub use crate::memory::feedback::{
68 apply_context_pattern_signals, calculate_entity_flow, calculate_entity_overlap,
69 detect_negative_keywords, extract_entities_simple, process_implicit_feedback,
70 process_implicit_feedback_with_semantics, signal_from_entity_flow, ContextFingerprint,
71 FeedbackMomentum, FeedbackStore, FeedbackStoreStats, PendingFeedback, PreviousContext,
72 SignalRecord, SignalTrigger, SurfacedMemoryInfo, Trend,
73};
74pub use crate::memory::files::{FileMemoryStats, FileMemoryStore, IndexingResult};
75pub use crate::memory::graph_retrieval::{
76 calculate_density_weights, spreading_activation_retrieve, ActivatedMemory,
77};
78pub use crate::memory::hybrid_search::{
79 BM25Index, CrossEncoderReranker, HybridSearchConfig, HybridSearchEngine, HybridSearchResult,
80 RRFusion,
81};
82pub use crate::memory::introspection::{
83 AssociationChange, ConsolidationEvent, ConsolidationEventBuffer, ConsolidationReport,
84 ConsolidationStats, EdgeFormationReason, FactChange, InterferenceEvent, InterferenceType,
85 MemoryChange, PruningReason, ReplayEvent, ReportPeriod, StrengtheningReason,
86};
87pub use crate::memory::learning_history::{
88 LearningEventType, LearningHistoryStore, LearningStats, LearningVelocity, StoredLearningEvent,
89};
90pub use crate::memory::lineage::{
91 CausalRelation, InferenceConfig, LineageBranch, LineageEdge, LineageGraph, LineageSource,
92 LineageStats, LineageTrace, PostMortem, TraceDirection,
93};
94pub use crate::memory::prospective::ProspectiveStore;
95pub use crate::memory::replay::{
96 InterferenceCheckResult, InterferenceDetector, InterferenceRecord, ReplayCandidate,
97 ReplayCycleResult, ReplayManager,
98};
99use crate::memory::retrieval::RetrievalEngine;
100pub use crate::memory::retrieval::{
101 AnticipatoryPrefetch, IndexHealth, MemoryGraphStats, PrefetchContext, PrefetchReason,
102 PrefetchResult, ReinforcementStats, RetrievalFeedback, RetrievalOutcome, TrackedRetrieval,
103};
104pub use crate::memory::segmentation::{
105 AtomicMemory, DeduplicationEngine, DeduplicationResult, InputSource, SegmentationEngine,
106};
107pub use crate::memory::sessions::{
108 Session, SessionEvent, SessionId, SessionStats, SessionStatus, SessionStore, SessionStoreStats,
109 SessionSummary, TemporalContext, TimeOfDay,
110};
111pub use crate::memory::temporal_facts::{EventType, ResolvedTime, TemporalFact, TemporalFactStore};
112pub use crate::memory::todos::{ProjectStats, TodoStore, UserTodoStats};
113pub use crate::memory::visualization::{GraphStats, MemoryLogger};
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MemoryConfig {
118 pub storage_path: PathBuf,
120
121 pub working_memory_size: usize,
123
124 pub session_memory_size_mb: usize,
126
127 pub max_heap_per_user_mb: usize,
129
130 pub auto_compress: bool,
132
133 pub compression_age_days: u32,
135
136 pub importance_threshold: f32,
138}
139
140impl Default for MemoryConfig {
141 fn default() -> Self {
142 Self {
143 storage_path: PathBuf::from("./memory_store"),
144 working_memory_size: DEFAULT_WORKING_MEMORY_SIZE,
145 session_memory_size_mb: DEFAULT_SESSION_MEMORY_SIZE_MB,
146 max_heap_per_user_mb: DEFAULT_MAX_HEAP_PER_USER_MB,
147 auto_compress: true,
148 compression_age_days: DEFAULT_COMPRESSION_AGE_DAYS,
149 importance_threshold: DEFAULT_IMPORTANCE_THRESHOLD,
150 }
151 }
152}
153
154pub struct MemorySystem {
156 config: MemoryConfig,
157
158 working_memory: Arc<RwLock<WorkingMemory>>,
160 session_memory: Arc<RwLock<SessionMemory>>,
161 long_term_memory: Arc<MemoryStorage>,
162
163 compressor: CompressionPipeline,
165
166 retriever: RetrievalEngine,
168
169 embedder: Arc<crate::embeddings::minilm::MiniLMEmbedder>,
171
172 query_cache: moka::sync::Cache<[u8; 32], Vec<f32>>,
177
178 content_cache: moka::sync::Cache<[u8; 32], Vec<f32>>,
183
184 stats: Arc<RwLock<MemoryStats>>,
186
187 logger: Arc<RwLock<MemoryLogger>>,
189
190 consolidation_events: Arc<RwLock<ConsolidationEventBuffer>>,
193
194 replay_manager: Arc<RwLock<replay::ReplayManager>>,
197
198 interference_detector: Arc<RwLock<replay::InterferenceDetector>>,
201
202 pattern_detector: Arc<RwLock<pattern_detection::PatternDetector>>,
205
206 fact_store: Arc<facts::SemanticFactStore>,
210
211 lineage_graph: Arc<lineage::LineageGraph>,
215
216 hybrid_search: Arc<hybrid_search::HybridSearchEngine>,
219
220 graph_memory: Option<Arc<parking_lot::RwLock<crate::graph_memory::GraphMemory>>>,
224
225 feedback_store: Option<Arc<parking_lot::RwLock<FeedbackStore>>>,
229
230 learning_history: Arc<learning_history::LearningHistoryStore>,
233
234 temporal_fact_store: Arc<temporal_facts::TemporalFactStore>,
238
239 fact_extraction_needed: std::sync::atomic::AtomicBool,
243
244 fact_extraction_watermark: std::sync::atomic::AtomicI64,
249}
250
251fn resolve_entity_label(
255 entity_name: &str,
256 ner_lookup: &std::collections::HashMap<String, (String, f32)>,
257) -> (crate::graph_memory::EntityLabel, f32) {
258 if let Some((ner_type, confidence)) = ner_lookup.get(&entity_name.to_lowercase()) {
259 let label = match ner_type.as_str() {
260 "PER" => crate::graph_memory::EntityLabel::Person,
261 "ORG" => crate::graph_memory::EntityLabel::Organization,
262 "LOC" => crate::graph_memory::EntityLabel::Location,
263 _ => crate::graph_memory::EntityLabel::Concept,
264 };
265 (label, *confidence)
266 } else {
267 (crate::graph_memory::EntityLabel::Concept, 0.5)
268 }
269}
270
271fn build_ner_lookup(
273 ner_entities: &[NerEntityRecord],
274) -> std::collections::HashMap<String, (String, f32)> {
275 ner_entities
276 .iter()
277 .map(|r| (r.text.to_lowercase(), (r.entity_type.clone(), r.confidence)))
278 .collect()
279}
280
281impl MemorySystem {
282 pub fn new(config: MemoryConfig, shared_cache: Option<&rocksdb::Cache>) -> Result<Self> {
288 let storage_path = config.storage_path.clone();
289 let storage = Arc::new(
290 MemoryStorage::new(&storage_path, shared_cache)
291 .with_context(|| format!("Failed to open storage at {:?}", storage_path))?,
292 );
293
294 let embedding_config = crate::embeddings::minilm::EmbeddingConfig::default();
297 let embedder = Arc::new(
298 crate::embeddings::minilm::MiniLMEmbedder::new(embedding_config)
299 .context("Failed to initialize MiniLM embedder (ONNX model)")?,
300 );
301
302 let consolidation_events = Arc::new(RwLock::new(ConsolidationEventBuffer::new()));
304
305 let retriever = RetrievalEngine::with_event_buffer(
308 storage.clone(),
309 embedder.clone(),
310 Some(consolidation_events.clone()),
311 )
312 .context("Failed to initialize retrieval engine")?;
313
314 let storage_count = storage.get_stats().map(|s| s.total_count).unwrap_or(0);
317 let indexed_count = retriever.len();
318 let orphaned_count = storage_count.saturating_sub(indexed_count);
319
320 if orphaned_count > 0 {
321 tracing::warn!(
322 storage_count = storage_count,
323 indexed_count = indexed_count,
324 orphaned_count = orphaned_count,
325 "Detected orphaned memories at startup - initiating auto-repair"
326 );
327
328 if let Ok(all_memories) = storage.get_all() {
330 let indexed_ids = retriever.get_indexed_memory_ids();
331 let mut repaired = 0;
332 let mut failed = 0;
333
334 for memory in all_memories {
335 if indexed_ids.contains(&memory.id) {
336 continue; }
338
339 const MAX_REPAIR_CONTENT_LEN: usize = 1_000_000;
342 if memory.experience.content.len() > MAX_REPAIR_CONTENT_LEN {
343 tracing::warn!(
344 memory_id = %memory.id.0,
345 content_len = memory.experience.content.len(),
346 "Skipping oversized memory during auto-repair (>1MB)"
347 );
348 failed += 1;
349 continue;
350 }
351
352 tracing::info!(memory_id = %memory.id.0, content_len = memory.experience.content.len(), "Attempting to repair orphaned memory...");
354 match retriever.index_memory(&memory) {
355 Ok(_) => {
356 repaired += 1;
357 if repaired <= 10 || repaired % 100 == 0 {
358 tracing::info!(
359 memory_id = %memory.id.0,
360 progress = format!("{}/{}", repaired, orphaned_count),
361 "Repaired orphaned memory"
362 );
363 }
364 }
365 Err(e) => {
366 failed += 1;
367 tracing::error!(
368 memory_id = %memory.id.0,
369 error = %e,
370 "Failed to repair orphaned memory"
371 );
372 }
373 }
374 }
375
376 if repaired > 0 {
378 if let Err(e) = retriever.save() {
379 tracing::error!("Failed to persist repaired index: {}", e);
380 } else {
381 tracing::info!(
382 repaired = repaired,
383 failed = failed,
384 final_indexed = retriever.len(),
385 "Startup repair complete - index persisted"
386 );
387 }
388 }
389 }
390 } else if storage_count > 0 {
391 tracing::info!(
392 storage_count = storage_count,
393 indexed_count = indexed_count,
394 "All memories indexed - no repair needed"
395 );
396 }
397
398 let logger = Arc::new(RwLock::new(MemoryLogger::new(false)));
400
401 let initial_stats = {
403 let storage_stats = storage.get_stats().unwrap_or_default();
404 let vector_count = retriever.len();
405 MemoryStats {
406 total_memories: storage_stats.total_count,
407 working_memory_count: 0, session_memory_count: 0, long_term_memory_count: storage_stats.total_count,
410 vector_index_count: vector_count,
411 compressed_count: storage_stats.compressed_count,
412 promotions_to_session: 0, promotions_to_longterm: 0, total_retrievals: storage_stats.total_retrievals,
415 average_importance: storage_stats.average_importance,
416 graph_nodes: 0, graph_edges: 0, }
419 };
420
421 let fact_store = Arc::new(facts::SemanticFactStore::new(storage.db()));
424
425 let lineage_graph = Arc::new(lineage::LineageGraph::new(storage.db()));
428
429 let bm25_path = storage_path.join("bm25_index");
431 let hybrid_search_config = hybrid_search::HybridSearchConfig::default();
432 let hybrid_search_engine = hybrid_search::HybridSearchEngine::new(
433 &bm25_path,
434 embedder.clone(),
435 hybrid_search_config,
436 )
437 .context("Failed to initialize hybrid search engine")?;
438
439 if hybrid_search_engine.needs_backfill() {
441 let existing_memories = storage.get_all()?;
442 let memory_count = existing_memories.len();
443
444 if memory_count > 0 {
445 tracing::info!(
446 "BM25 index empty, backfilling {} existing memories...",
447 memory_count
448 );
449
450 let memories_iter = existing_memories.into_iter().map(|mem| {
451 (
452 mem.id,
453 mem.experience.content,
454 mem.experience.tags,
455 mem.experience.entities,
456 )
457 });
458
459 match hybrid_search_engine.backfill(memories_iter) {
460 Ok(indexed) => {
461 tracing::info!("BM25 backfill complete: {} memories indexed", indexed);
462 }
463 Err(e) => {
464 tracing::warn!("BM25 backfill failed (non-fatal): {}", e);
465 }
466 }
467 }
468 }
469
470 let learning_history = Arc::new(learning_history::LearningHistoryStore::new(storage.db()));
473
474 let temporal_fact_store = Arc::new(temporal_facts::TemporalFactStore::new(storage.db()));
477
478 let interference_detector = {
480 let mut detector = replay::InterferenceDetector::new();
481 match storage.load_all_interference_records() {
482 Ok((history, total_events)) => {
483 if !history.is_empty() {
484 detector.load_history(history, total_events);
485 }
486 }
487 Err(e) => {
488 tracing::warn!(
489 error = %e,
490 "Failed to load interference history, starting fresh"
491 );
492 }
493 }
494 Arc::new(RwLock::new(detector))
495 };
496
497 Ok(Self {
498 config: config.clone(),
499 working_memory: Arc::new(RwLock::new(WorkingMemory::new(config.working_memory_size))),
500 session_memory: Arc::new(RwLock::new(SessionMemory::new(
501 config.session_memory_size_mb,
502 ))),
503 long_term_memory: storage,
504 compressor: CompressionPipeline::new(),
505 retriever,
506 embedder,
507 query_cache: moka::sync::Cache::builder().max_capacity(2_000).build(),
509 content_cache: moka::sync::Cache::builder().max_capacity(2_000).build(),
510 stats: Arc::new(RwLock::new(initial_stats)),
511 logger,
512 consolidation_events, replay_manager: Arc::new(RwLock::new(replay::ReplayManager::new())),
515 interference_detector,
517 pattern_detector: Arc::new(RwLock::new(pattern_detection::PatternDetector::new())),
519 fact_store,
521 lineage_graph,
523 hybrid_search: Arc::new(hybrid_search_engine),
525 graph_memory: None,
527 feedback_store: None,
529 learning_history,
531 temporal_fact_store,
533 fact_extraction_needed: std::sync::atomic::AtomicBool::new(true),
535 fact_extraction_watermark: std::sync::atomic::AtomicI64::new(0),
538 })
539 }
540
541 pub fn set_graph_memory(
550 &mut self,
551 graph: Arc<parking_lot::RwLock<crate::graph_memory::GraphMemory>>,
552 ) {
553 self.graph_memory = Some(graph);
554 }
555
556 pub fn graph_memory(
558 &self,
559 ) -> Option<&Arc<parking_lot::RwLock<crate::graph_memory::GraphMemory>>> {
560 self.graph_memory.as_ref()
561 }
562
563 pub fn set_feedback_store(&mut self, feedback: Arc<parking_lot::RwLock<FeedbackStore>>) {
571 self.feedback_store = Some(feedback);
572 }
573
574 pub fn feedback_store(&self) -> Option<&Arc<parking_lot::RwLock<FeedbackStore>>> {
576 self.feedback_store.as_ref()
577 }
578
579 pub fn remember_with_id(
585 &self,
586 memory_id: MemoryId,
587 mut experience: Experience,
588 created_at: Option<chrono::DateTime<chrono::Utc>>,
589 ) -> Result<MemoryId> {
590 let importance = self.calculate_importance(&experience);
591
592 if experience.embeddings.is_none() {
594 let content_hash = Self::sha256_hash(&experience.content);
595 if let Some(cached) = self.content_cache.get(&content_hash) {
596 experience.embeddings = Some(cached.clone());
597 } else if let Ok(embedding) = self.embedder.encode(&experience.content) {
598 self.content_cache.insert(content_hash, embedding.clone());
599 experience.embeddings = Some(embedding);
600 }
601 }
602
603 let memory = Arc::new(Memory::new(
604 memory_id.clone(),
605 experience,
606 importance,
607 None,
608 None,
609 None,
610 created_at,
611 ));
612
613 self.long_term_memory.store(&memory)?;
614 self.logger.write().log_created(&memory, "import");
615
616 self.working_memory
617 .write()
618 .add_shared(Arc::clone(&memory))?;
619
620 if let Err(e) = self.retriever.index_memory(&memory) {
621 tracing::warn!("Failed to index imported memory {}: {}", memory.id.0, e);
622 }
623
624 Ok(memory_id)
625 }
626
627 pub fn remember(
631 &self,
632 mut experience: Experience,
633 created_at: Option<chrono::DateTime<chrono::Utc>>,
634 ) -> Result<MemoryId> {
635 if let Some(existing_id) = self
640 .long_term_memory
641 .get_by_content_hash(&experience.content)
642 {
643 tracing::debug!(
644 existing_id = %existing_id.0,
645 "Content dedup: returning existing memory (identical content already stored)"
646 );
647 return Ok(existing_id);
648 }
649
650 let memory_id = MemoryId(Uuid::new_v4());
651
652 let importance = self.calculate_importance(&experience);
654
655 if experience.embeddings.is_none() {
658 let content_hash = Self::sha256_hash(&experience.content);
660
661 if let Some(cached_embedding) = self.content_cache.get(&content_hash) {
663 experience.embeddings = Some(cached_embedding.clone());
664 EMBEDDING_CACHE_CONTENT.with_label_values(&["hit"]).inc();
665 tracing::debug!("Content embedding cache HIT");
666 } else {
667 EMBEDDING_CACHE_CONTENT.with_label_values(&["miss"]).inc();
669 match self.embedder.encode(&experience.content) {
670 Ok(embedding) => {
671 self.content_cache.insert(content_hash, embedding.clone());
673 EMBEDDING_CACHE_CONTENT_SIZE.set(self.content_cache.entry_count() as i64);
674 experience.embeddings = Some(embedding);
675 tracing::debug!("Content embedding cache MISS - generated and cached");
676 }
677 Err(e) => {
678 tracing::warn!("Failed to generate embedding: {}", e);
679 }
681 }
682 }
683 }
684
685 if experience.temporal_refs.is_empty() {
688 let temporal = crate::memory::query_parser::extract_temporal_refs(&experience.content);
689 for temp_ref in temporal.refs {
690 experience.temporal_refs.push(temp_ref.date.to_string());
691 }
692 }
693
694 let memory = Arc::new(Memory::new(
697 memory_id.clone(),
698 experience, importance,
700 None, None, None, created_at, ));
705
706 self.long_term_memory.store(&memory)?;
709
710 self.logger.write().log_created(&memory, "working");
712
713 self.working_memory
715 .write()
716 .add_shared(Arc::clone(&memory))?;
717
718 let indexed = if let Err(e) = self.retriever.index_memory(&memory) {
721 tracing::warn!("Failed to index memory {} in vector DB: {}", memory.id.0, e);
722 false
724 } else {
725 true
726 };
727
728 if let Err(e) = self.hybrid_search.index_memory(
736 &memory.id,
737 &memory.experience.content,
738 &memory.experience.tags,
739 &memory.experience.entities,
740 ) {
741 tracing::warn!("Failed to index memory {} in BM25: {}", memory.id.0, e);
742 }
743
744 {
747 let arousal = memory
748 .experience
749 .context
750 .as_ref()
751 .map(|c| c.emotional.arousal)
752 .unwrap_or(0.3);
753
754 let pattern_memory = pattern_detection::PatternMemory {
755 id: memory.id.0.to_string(),
756 content_preview: memory.experience.content.chars().take(100).collect(),
757 entities: memory.experience.entities.clone(),
758 importance,
759 arousal,
760 created_at: memory.created_at,
761 embedding_hash: memory.experience.embeddings.as_ref().map(|e| {
762 e.iter()
763 .fold(0u64, |acc, &x| acc.wrapping_add(x.to_bits() as u64))
764 }),
765 session_id: memory
766 .experience
767 .context
768 .as_ref()
769 .and_then(|c| c.episode.episode_id.clone()),
770 memory_type: format!("{:?}", memory.experience.experience_type),
771 };
772
773 let mut detector = self.pattern_detector.write();
774 detector.register_memory(pattern_memory.clone());
775
776 if let Some(trigger) = detector.check_salience_spike(&pattern_memory) {
778 tracing::debug!(
779 "Salience spike detected for memory {}: {}",
780 memory.id.0,
781 trigger.description()
782 );
783 self.record_consolidation_event(
785 introspection::ConsolidationEvent::PatternDetected {
786 trigger_type: trigger.trigger_type_name().to_string(),
787 description: trigger.description(),
788 memory_ids: trigger.memory_ids(),
789 timestamp: chrono::Utc::now(),
790 },
791 );
792 }
793 }
794
795 if !memory.experience.entities.is_empty() {
801 let facts = temporal_facts::extract_temporal_facts(
802 &memory.experience.content,
803 &memory.id,
804 memory.created_at,
805 &memory.experience.entities,
806 );
807 if !facts.is_empty() {
808 tracing::debug!(
812 "Extracted {} temporal facts from memory {}",
813 facts.len(),
814 memory.id.0
815 );
816 }
817 }
818
819 if let Some(embedding) = &memory.experience.embeddings {
822 if let Ok(similar_ids) =
824 self.retriever
825 .search_by_embedding(embedding, 5, Some(&memory.id))
826 {
827 if !similar_ids.is_empty() {
828 let similar_memories: Vec<_> = similar_ids
830 .iter()
831 .filter_map(|(id, similarity)| {
832 self.retriever.get_from_storage(id).ok().map(|m| {
833 (
834 id.0.to_string(),
835 *similarity,
836 m.importance(),
837 m.created_at,
838 m.experience.content.chars().take(50).collect::<String>(),
839 )
840 })
841 })
842 .collect();
843
844 if !similar_memories.is_empty() {
845 let interference_result =
846 self.interference_detector.write().check_interference(
847 &memory.id.0.to_string(),
848 importance,
849 memory.created_at,
850 &similar_memories,
851 );
852
853 for (old_id, _similarity, decay_amount) in
855 &interference_result.retroactive_targets
856 {
857 if let Ok(old_uuid) = uuid::Uuid::parse_str(old_id) {
858 if let Ok(old_memory) =
859 self.long_term_memory.get(&MemoryId(old_uuid))
860 {
861 old_memory.decay_importance(*decay_amount);
862 if let Err(e) = self.long_term_memory.update(&old_memory) {
863 tracing::debug!("Failed to persist retroactive decay: {e}");
864 }
865 }
866 } else {
867 tracing::warn!(
868 "Skipping retroactive decay: malformed UUID '{old_id}'"
869 );
870 }
871 }
872
873 if interference_result.proactive_decay > 0.0 {
875 memory.decay_importance(interference_result.proactive_decay);
876 if let Err(e) = self.long_term_memory.update(&memory) {
877 tracing::debug!("Failed to persist proactive decay: {e}");
878 }
879 }
880
881 for event in &interference_result.events {
883 self.record_consolidation_event(event.clone());
884 }
885
886 if interference_result.is_duplicate {
890 tracing::info!(
891 memory_id = %memory.id.0,
892 "Near-duplicate detected (≥0.95 cosine), suppressing importance"
893 );
894 memory.decay_importance(0.99);
896 if let Err(e) = self.long_term_memory.update(&memory) {
897 tracing::debug!("Failed to suppress duplicate importance: {e}");
898 }
899 }
900
901 {
903 let detector = self.interference_detector.read();
904 let affected_ids = detector.get_affected_ids_from_check(
905 &memory.id.0.to_string(),
906 &interference_result,
907 );
908 for (id, records) in detector.get_records_for_ids(&affected_ids) {
909 if let Err(e) =
910 self.long_term_memory.save_interference_records(id, records)
911 {
912 tracing::debug!("Failed to persist interference records: {e}");
913 }
914 }
915 let (total_events, _) = detector.stats();
916 if let Err(e) = self
917 .long_term_memory
918 .save_interference_event_count(total_events)
919 {
920 tracing::debug!("Failed to persist interference event count: {e}");
921 }
922 }
923 }
924 }
925 }
926 }
927
928 let added_to_session = if importance > self.config.importance_threshold {
930 self.session_memory
931 .write()
932 .add_shared(Arc::clone(&memory))?;
933 self.logger.write().log_created(&memory, "session");
934 true
935 } else {
936 false
937 };
938
939 {
941 let mut stats = self.stats.write();
942 stats.total_memories += 1;
943 stats.long_term_memory_count += 1; stats.working_memory_count += 1;
945 if added_to_session {
946 stats.session_memory_count += 1;
947 }
948 if indexed {
949 stats.vector_index_count += 1;
950 }
951 }
952
953 self.consolidate_if_needed()?;
955
956 if let Err(e) = self.hybrid_search.commit_and_reload() {
960 tracing::warn!("Failed to commit/reload BM25 index: {}", e);
961 }
962
963 self.fact_extraction_needed
965 .store(true, std::sync::atomic::Ordering::Relaxed);
966
967 Ok(memory_id)
968 }
969
970 pub fn remember_with_agent(
975 &self,
976 mut experience: Experience,
977 created_at: Option<chrono::DateTime<chrono::Utc>>,
978 agent_id: Option<String>,
979 run_id: Option<String>,
980 ) -> Result<MemoryId> {
981 if let Some(existing_id) = self
983 .long_term_memory
984 .get_by_content_hash(&experience.content)
985 {
986 tracing::debug!(
987 existing_id = %existing_id.0,
988 "Content dedup: returning existing memory (identical content already stored)"
989 );
990 return Ok(existing_id);
991 }
992
993 let memory_id = MemoryId(Uuid::new_v4());
994
995 let importance = self.calculate_importance(&experience);
997
998 if experience.embeddings.is_none() {
1000 let content_hash = Self::sha256_hash(&experience.content);
1001 if let Some(cached_embedding) = self.content_cache.get(&content_hash) {
1002 experience.embeddings = Some(cached_embedding.clone());
1003 EMBEDDING_CACHE_CONTENT.with_label_values(&["hit"]).inc();
1004 } else {
1005 EMBEDDING_CACHE_CONTENT.with_label_values(&["miss"]).inc();
1006 if let Ok(embedding) = self.embedder.encode(&experience.content) {
1007 self.content_cache.insert(content_hash, embedding.clone());
1008 EMBEDDING_CACHE_CONTENT_SIZE.set(self.content_cache.entry_count() as i64);
1009 experience.embeddings = Some(embedding);
1010 }
1011 }
1012 }
1013
1014 if experience.temporal_refs.is_empty() {
1016 let temporal = crate::memory::query_parser::extract_temporal_refs(&experience.content);
1017 for temp_ref in temporal.refs {
1018 experience.temporal_refs.push(temp_ref.date.to_string());
1019 }
1020 }
1021
1022 let memory = Arc::new(Memory::new(
1024 memory_id.clone(),
1025 experience,
1026 importance,
1027 agent_id,
1028 run_id,
1029 None, created_at,
1031 ));
1032
1033 self.long_term_memory.store(&memory)?;
1035 self.logger.write().log_created(&memory, "working");
1036
1037 self.working_memory
1039 .write()
1040 .add_shared(Arc::clone(&memory))?;
1041
1042 if let Err(e) = self.retriever.index_memory(&memory) {
1044 tracing::warn!("Failed to index memory {} in vector DB: {}", memory.id.0, e);
1045 }
1046
1047 if let Some(graph) = &self.graph_memory {
1051 let now = chrono::Utc::now();
1052
1053 let ner_lookup = build_ner_lookup(&memory.experience.ner_entities);
1055
1056 let entity_names: Vec<&str> = memory
1058 .experience
1059 .entities
1060 .iter()
1061 .map(|s| s.as_str())
1062 .collect();
1063 let entity_embeddings: Vec<Option<Vec<f32>>> = if entity_names.is_empty() {
1064 Vec::new()
1065 } else {
1066 match self.embedder.encode_batch(&entity_names) {
1067 Ok(embs) => embs.into_iter().map(Some).collect(),
1068 Err(e) => {
1069 tracing::debug!(
1070 error = %e,
1071 "Entity name embedding failed, concept merge disabled for this batch"
1072 );
1073 vec![None; entity_names.len()]
1074 }
1075 }
1076 };
1077
1078 let entities_to_add: Vec<crate::graph_memory::EntityNode> = memory
1079 .experience
1080 .entities
1081 .iter()
1082 .zip(entity_embeddings.into_iter())
1083 .map(|(entity_name, embedding)| {
1084 let (label, salience) = resolve_entity_label(entity_name, &ner_lookup);
1085 crate::graph_memory::EntityNode {
1086 uuid: Uuid::new_v4(),
1087 name: entity_name.clone(),
1088 labels: vec![label],
1089 created_at: now,
1090 last_seen_at: now,
1091 mention_count: 1,
1092 summary: String::new(),
1093 attributes: std::collections::HashMap::new(),
1094 name_embedding: embedding,
1095 salience,
1096 is_proper_noun: entity_name
1097 .chars()
1098 .next()
1099 .map(|c| c.is_uppercase())
1100 .unwrap_or(false),
1101 }
1102 })
1103 .collect();
1104
1105 let cooccurrence_pairs = if !memory.experience.cooccurrence_pairs.is_empty() {
1107 memory.experience.cooccurrence_pairs.clone()
1108 } else {
1109 let entity_extractor = crate::graph_memory::EntityExtractor::new();
1110 entity_extractor.extract_cooccurrence_pairs(&memory.experience.content)
1111 };
1112
1113 let edge_context = format!("Co-occurred in memory {}", memory.id.0);
1114
1115 let graph_guard = graph.read();
1117
1118 for entity in entities_to_add {
1119 if let Err(e) = graph_guard.add_entity(entity.clone()) {
1120 tracing::debug!("Failed to add entity '{}' to graph: {}", entity.name, e);
1121 }
1122 }
1123
1124 let l1_base_weight = crate::graph_memory::EdgeTier::L1Working.initial_weight();
1126 for (entity1, entity2) in cooccurrence_pairs {
1127 if let (Ok(Some(e1)), Ok(Some(e2))) = (
1128 graph_guard.find_entity_by_name(&entity1),
1129 graph_guard.find_entity_by_name(&entity2),
1130 ) {
1131 let entity_confidence = Some((e1.salience + e2.salience) / 2.0);
1132
1133 let semantic_weight = match (&e1.name_embedding, &e2.name_embedding) {
1134 (Some(emb1), Some(emb2)) => {
1135 let sim = crate::similarity::cosine_similarity(emb1, emb2).max(0.0);
1136 EDGE_SEMANTIC_WEIGHT_FLOOR + (1.0 - EDGE_SEMANTIC_WEIGHT_FLOOR) * sim
1137 }
1138 _ => 1.0,
1139 };
1140
1141 let edge = crate::graph_memory::RelationshipEdge {
1142 uuid: Uuid::new_v4(),
1143 from_entity: e1.uuid,
1144 to_entity: e2.uuid,
1145 relation_type: crate::graph_memory::RelationType::CoOccurs,
1146 strength: l1_base_weight * semantic_weight,
1147 created_at: now,
1148 valid_at: now,
1149 invalidated_at: None,
1150 source_episode_id: Some(memory.id.0),
1151 context: edge_context.clone(),
1152 last_activated: now,
1153 activation_count: 1,
1154 ltp_status: crate::graph_memory::LtpStatus::None,
1155 tier: crate::graph_memory::EdgeTier::L1Working,
1156 activation_timestamps: None,
1157 entity_confidence,
1158 };
1159
1160 if let Err(e) = graph_guard.add_relationship(edge) {
1161 tracing::trace!(
1162 "Failed to add co-occurrence edge {}<->{}: {}",
1163 entity1,
1164 entity2,
1165 e
1166 );
1167 }
1168 }
1169 }
1170 }
1171
1172 if let Err(e) = self.hybrid_search.index_memory(
1174 &memory.id,
1175 &memory.experience.content,
1176 &memory.experience.tags,
1177 &memory.experience.entities,
1178 ) {
1179 tracing::warn!("Failed to index memory {} in BM25: {}", memory.id.0, e);
1180 }
1181
1182 if importance > self.config.importance_threshold {
1184 self.session_memory
1185 .write()
1186 .add_shared(Arc::clone(&memory))?;
1187 }
1188
1189 {
1191 let mut stats = self.stats.write();
1192 stats.total_memories += 1;
1193 stats.long_term_memory_count += 1;
1194 stats.working_memory_count += 1;
1195 }
1196
1197 self.consolidate_if_needed()?;
1198
1199 if let Err(e) = self.hybrid_search.commit_and_reload() {
1201 tracing::warn!("Failed to commit/reload BM25 index: {}", e);
1202 }
1203
1204 self.fact_extraction_needed
1206 .store(true, std::sync::atomic::Ordering::Relaxed);
1207
1208 Ok(memory_id)
1209 }
1210
1211 pub fn recall(&self, query: &Query) -> Result<Vec<SharedMemory>> {
1218 if let Some(query_text) = &query.query_text {
1220 return self.semantic_retrieve(query_text, query);
1221 }
1222
1223 let mut memories = Vec::new();
1225 let mut seen_ids: HashSet<MemoryId> = HashSet::new();
1226 let mut sources = Vec::new();
1227
1228 {
1230 let working = self.working_memory.read();
1231 let working_results = working.search(query, query.max_results)?;
1232 if !working_results.is_empty() {
1233 sources.push("working");
1234 }
1235 for memory in working_results {
1236 if seen_ids.insert(memory.id.clone()) {
1237 memories.push(memory);
1238 }
1239 }
1240 }
1241
1242 {
1243 let session = self.session_memory.read();
1244 let session_results = session.search(query, query.max_results)?;
1245 if !session_results.is_empty() {
1246 sources.push("session");
1247 }
1248 for memory in session_results {
1249 if seen_ids.insert(memory.id.clone()) {
1250 memories.push(memory);
1251 }
1252 }
1253 }
1254
1255 {
1256 let long_term_results = self.retriever.search(query, query.max_results)?;
1257 if !long_term_results.is_empty() {
1258 sources.push("longterm");
1259 }
1260 for memory in long_term_results {
1261 if seen_ids.insert(memory.id.clone()) {
1262 memories.push(memory);
1263 }
1264 }
1265 }
1266
1267 self.expand_with_hierarchy(&mut memories, &mut seen_ids);
1270
1271 let now = chrono::Utc::now();
1273 memories.sort_by(|a, b| {
1274 let age_days_a = (now - a.created_at).num_days();
1275 let temporal_a = Self::calculate_temporal_relevance(age_days_a);
1276 let score_a = a.importance() * temporal_a;
1277
1278 let age_days_b = (now - b.created_at).num_days();
1279 let temporal_b = Self::calculate_temporal_relevance(age_days_b);
1280 let score_b = b.importance() * temporal_b;
1281
1282 score_b.total_cmp(&score_a)
1283 });
1284
1285 memories.truncate(query.max_results);
1286
1287 self.logger
1289 .read()
1290 .log_retrieved("", memories.len(), &sources);
1291
1292 for memory in &memories {
1294 self.update_access_count_instrumented(memory, StrengtheningReason::Recalled);
1295 }
1296
1297 if memories.len() >= 2 {
1300 if let Some(graph) = &self.graph_memory {
1301 let memory_uuids: Vec<uuid::Uuid> = memories.iter().map(|m| m.id.0).collect();
1302 if let Err(e) = graph.read().record_memory_coactivation(&memory_uuids) {
1303 tracing::trace!("Coactivation recording failed (non-critical): {e}");
1304 }
1305 }
1306 }
1307
1308 if let Ok(count) = self.long_term_memory.increment_retrieval_count() {
1310 self.stats.write().total_retrievals = count;
1311 }
1312
1313 Ok(memories)
1314 }
1315
1316 pub fn paginated_recall(&self, query: &Query) -> Result<PaginatedResults<SharedMemory>> {
1326 let extra_limit = query.offset + query.max_results + 1;
1330 let mut modified_query = query.clone();
1331 modified_query.max_results = extra_limit;
1332 modified_query.offset = 0; let all_results = self.recall(&modified_query)?;
1336
1337 let offset = query.offset;
1339 let limit = query.max_results;
1340
1341 let results_after_offset: Vec<_> = all_results.into_iter().skip(offset).collect();
1342 let has_more = results_after_offset.len() > limit;
1343
1344 let final_results: Vec<_> = results_after_offset.into_iter().take(limit).collect();
1345
1346 Ok(PaginatedResults {
1347 results: final_results,
1348 has_more,
1349 total_count: None, offset,
1351 limit,
1352 })
1353 }
1354
1355 pub fn recall_by_tags(&self, tags: &[String], limit: usize) -> Result<Vec<Memory>> {
1359 let criteria = storage::SearchCriteria::ByTags(tags.to_vec());
1360 let mut memories = self.advanced_search(criteria)?;
1361 memories.truncate(limit);
1362 if let Ok(count) = self.long_term_memory.increment_retrieval_count() {
1363 self.stats.write().total_retrievals = count;
1364 }
1365 Ok(memories)
1366 }
1367
1368 pub fn recall_by_date(
1372 &self,
1373 start: chrono::DateTime<chrono::Utc>,
1374 end: chrono::DateTime<chrono::Utc>,
1375 limit: usize,
1376 ) -> Result<Vec<Memory>> {
1377 let criteria = storage::SearchCriteria::ByDate { start, end };
1378 let mut memories = self.advanced_search(criteria)?;
1379 memories.truncate(limit);
1380 if let Ok(count) = self.long_term_memory.increment_retrieval_count() {
1381 self.stats.write().total_retrievals = count;
1382 }
1383 Ok(memories)
1384 }
1385
1386 fn semantic_retrieve(&self, query_text: &str, query: &Query) -> Result<Vec<SharedMemory>> {
1395 let recall_start = std::time::Instant::now();
1396
1397 let query_temporal = query_parser::extract_temporal_refs(query_text);
1403 let has_temporal_query = query_parser::requires_temporal_filtering(query_text);
1404
1405 if has_temporal_query {
1406 let temporal_intent = query_parser::detect_temporal_intent(query_text);
1407 tracing::debug!(
1408 "Temporal query detected: intent={:?}, refs={:?}",
1409 temporal_intent,
1410 query_temporal
1411 .refs
1412 .iter()
1413 .map(|r| r.date.to_string())
1414 .collect::<Vec<_>>()
1415 );
1416 }
1417
1418 let query_type = query_parser::classify_query(query_text);
1426 let query_analysis = query_parser::analyze_query(query_text);
1428 let attribute_boost_ids: HashSet<MemoryId> = match &query_type {
1429 query_parser::QueryType::Attribute(attr_query) => {
1430 tracing::debug!(
1431 "Layer 0.5: Attribute query detected - entity='{}', attribute='{}', synonyms={:?}",
1432 attr_query.entity,
1433 attr_query.attribute,
1434 attr_query.attribute_synonyms
1435 );
1436
1437 let mut expanded_terms: Vec<String> = vec![attr_query.entity.clone()];
1440 expanded_terms.extend(attr_query.attribute_synonyms.clone());
1441
1442 let expanded_query = expanded_terms.join(" ");
1444 let bm25_matches = self
1445 .hybrid_search
1446 .bm25_index()
1447 .search(&expanded_query, query.max_results * 5)
1448 .unwrap_or_default();
1449
1450 let entity_lower = attr_query.entity.to_lowercase();
1452 let mut boosted_ids = HashSet::new();
1453
1454 for (mem_id, _score) in bm25_matches {
1455 let content = self
1457 .working_memory
1458 .read()
1459 .get(&mem_id)
1460 .map(|m| m.experience.content.to_lowercase())
1461 .or_else(|| {
1462 self.session_memory
1463 .read()
1464 .get(&mem_id)
1465 .map(|m| m.experience.content.to_lowercase())
1466 })
1467 .or_else(|| {
1468 self.long_term_memory
1469 .get(&mem_id)
1470 .ok()
1471 .map(|m| m.experience.content.to_lowercase())
1472 });
1473
1474 if let Some(content) = content {
1475 if !content.contains(&entity_lower) {
1477 continue;
1478 }
1479 let has_synonym = attr_query
1481 .attribute_synonyms
1482 .iter()
1483 .any(|syn| content.contains(&syn.to_lowercase()));
1484 if has_synonym {
1485 boosted_ids.insert(mem_id);
1486 }
1487 }
1488 }
1489
1490 if !boosted_ids.is_empty() {
1491 tracing::info!(
1492 "Layer 0.5: Found {} memories with entity '{}' + attribute values",
1493 boosted_ids.len(),
1494 attr_query.entity
1495 );
1496 }
1497
1498 boosted_ids
1499 }
1500 _ => HashSet::new(),
1501 };
1502
1503 let temporal_fact_boost_ids: HashSet<MemoryId> = if has_temporal_query {
1514 if let Some(user_id) = &query.user_id {
1515 let entity = query_analysis
1517 .focal_entities
1518 .first()
1519 .map(|e| e.text.clone())
1520 .unwrap_or_default();
1521
1522 let event_keywords: Vec<&str> = query_analysis
1524 .focal_entities
1525 .iter()
1526 .skip(1) .map(|e| e.text.as_str())
1528 .chain(
1529 query_analysis
1530 .relational_context
1531 .iter()
1532 .map(|r| r.stem.as_str()),
1533 )
1534 .chain(
1535 query_analysis
1536 .discriminative_modifiers
1537 .iter()
1538 .map(|m| m.text.as_str()),
1539 )
1540 .collect();
1541
1542 if !entity.is_empty() && !event_keywords.is_empty() {
1543 let query_lower = query_text.to_lowercase();
1548 let event_type = if query_lower.contains("planning")
1549 || query_lower.contains("going to")
1550 || query_lower.contains("will")
1551 {
1552 Some(temporal_facts::EventType::Planned)
1553 } else if query_lower.contains(" did ")
1554 || query_lower.contains("when did")
1555 || query_lower.contains(" ran ")
1556 || query_lower.contains(" went ")
1557 {
1558 None
1560 } else {
1561 None };
1563
1564 match self.find_temporal_facts(user_id, &entity, &event_keywords, event_type) {
1566 Ok(facts) if !facts.is_empty() => {
1567 tracing::info!(
1568 "Layer 0.6: Found {} temporal facts for entity='{}', events={:?}",
1569 facts.len(),
1570 entity,
1571 event_keywords
1572 );
1573
1574 let boosted: HashSet<MemoryId> =
1576 facts.iter().map(|f| f.source_memory_id.clone()).collect();
1577 boosted
1578 }
1579 Ok(_) => {
1580 tracing::debug!(
1581 "Layer 0.6: No temporal facts found for entity='{}', events={:?}",
1582 entity,
1583 event_keywords
1584 );
1585 HashSet::new()
1586 }
1587 Err(e) => {
1588 tracing::debug!("Layer 0.6: Temporal fact lookup failed: {}", e);
1589 HashSet::new()
1590 }
1591 }
1592 } else {
1593 HashSet::new()
1594 }
1595 } else {
1596 HashSet::new()
1597 }
1598 } else {
1599 HashSet::new()
1600 };
1601
1602 let fact_source_boosts: std::collections::HashMap<MemoryId, f32> = {
1609 let mut boosts: std::collections::HashMap<MemoryId, f32> =
1610 std::collections::HashMap::new();
1611
1612 if let Some(user_id) = &query.user_id {
1613 let entity_names: Vec<String> = query_analysis
1614 .focal_entities
1615 .iter()
1616 .map(|e| e.text.to_lowercase())
1617 .collect();
1618
1619 if !entity_names.is_empty() {
1620 if let Ok(facts) = self.get_facts_for_graph_entities(user_id, &entity_names, 5)
1621 {
1622 for fact in &facts {
1623 if fact.confidence < 0.5 || fact.support_count < 3 {
1624 continue;
1625 }
1626 let per_fact_boost = fact.confidence * 0.08;
1627 for src_id in &fact.source_memories {
1628 let entry = boosts.entry(src_id.clone()).or_insert(0.0);
1629 *entry = (*entry + per_fact_boost).min(0.3);
1630 }
1631 }
1632 if !boosts.is_empty() {
1633 tracing::debug!(
1634 "Layer 0.7: Pre-fetched {} fact-source boosts from {} facts",
1635 boosts.len(),
1636 facts.len()
1637 );
1638 }
1639 }
1640 }
1641 }
1642 boosts
1643 };
1644
1645 let t_query_analysis = recall_start.elapsed();
1646 tracing::info!(
1647 query_analysis_ms = format!("{:.2}", t_query_analysis.as_secs_f64() * 1000.0),
1648 "recall [layer:0.5-0.7] query analysis + attribute + temporal fact + fact source lookup"
1649 );
1650
1651 let query_embedding =
1654 if let Some(pre) = query.query_embedding.as_ref().filter(|e| !e.is_empty()) {
1655 EMBEDDING_CACHE_QUERY
1656 .with_label_values(&["precomputed"])
1657 .inc();
1658 tracing::debug!("Query embedding PRECOMPUTED by caller — skipping encode");
1659 pre.clone()
1660 } else {
1661 let query_hash = Self::sha256_hash(query_text);
1662 if let Some(cached_embedding) = self.query_cache.get(&query_hash) {
1663 EMBEDDING_CACHE_QUERY.with_label_values(&["hit"]).inc();
1664 tracing::debug!("Query embedding cache HIT for: {}", query_text);
1665 cached_embedding.clone()
1666 } else {
1667 EMBEDDING_CACHE_QUERY.with_label_values(&["miss"]).inc();
1668 tracing::debug!(
1669 "Query embedding cache MISS - generating for: {}",
1670 query_text
1671 );
1672 let embedding = self
1673 .embedder
1674 .as_ref()
1675 .encode(query_text)
1676 .context("Failed to generate query embedding")?;
1677
1678 self.query_cache.insert(query_hash, embedding.clone());
1679 EMBEDDING_CACHE_QUERY_SIZE.set(self.query_cache.entry_count() as i64);
1680 embedding
1681 }
1682 };
1683
1684 let t_embedding = recall_start.elapsed();
1685 tracing::info!(
1686 embedding_ms = format!(
1687 "{:.2}",
1688 (t_embedding - t_query_analysis).as_secs_f64() * 1000.0
1689 ),
1690 cumulative_ms = format!("{:.2}", t_embedding.as_secs_f64() * 1000.0),
1691 "recall [layer:embedding] query embedding (cache miss logged above if any)"
1692 );
1693
1694 let episode_candidates: Option<HashSet<MemoryId>> = if let Some(episode_id) =
1698 &query.episode_id
1699 {
1700 match self
1701 .long_term_memory
1702 .search(SearchCriteria::ByEpisode(episode_id.clone()))
1703 {
1704 Ok(ep) if !ep.is_empty() => {
1705 tracing::debug!("Layer 1: {} candidates in episode {}", ep.len(), episode_id);
1706 Some(ep.into_iter().map(|m| m.id).collect())
1707 }
1708 _ => {
1709 tracing::debug!("Layer 1: global search");
1710 None
1711 }
1712 }
1713 } else {
1714 None
1715 };
1716
1717 let use_graph = matches!(
1721 query.retrieval_mode,
1722 RetrievalMode::Hybrid | RetrievalMode::Associative | RetrievalMode::Causal
1723 );
1724 let (
1725 graph_results,
1726 graph_density,
1727 query_entity_count,
1728 ic_weights,
1729 phrase_boosts,
1730 keyword_disc,
1731 ): (
1732 Vec<(MemoryId, f32, f32)>,
1733 Option<f32>,
1734 usize,
1735 std::collections::HashMap<String, f32>,
1736 Vec<(String, f32)>,
1737 f32, ) = {
1739 if let Some(graph) = self.graph_memory.as_ref().filter(|_| use_graph) {
1740 let g = graph.read();
1741 let weights = query_analysis.to_ic_weights();
1743 let phrases = query_analysis.to_phrase_boosts();
1745 let (disc, disc_keywords) = query_analysis.keyword_discriminativeness();
1748 if disc > 0.5 && !disc_keywords.is_empty() {
1749 tracing::debug!(
1750 "Layer 2: YAKE discriminative keywords: {:?} (disc={:.2})",
1751 disc_keywords,
1752 disc
1753 );
1754 }
1755 let entity_count = query_analysis.focal_entities.len()
1757 + query_analysis.discriminative_modifiers.len();
1758
1759 let mut query_entities: Vec<uuid::Uuid> = Vec::new();
1762 for e in query_analysis
1763 .focal_entities
1764 .iter()
1765 .map(|e| e.text.as_str())
1766 .chain(
1767 query_analysis
1768 .discriminative_modifiers
1769 .iter()
1770 .map(|m| m.text.as_str()),
1771 )
1772 .chain(
1773 query_analysis
1774 .relational_context
1775 .iter()
1776 .map(|r| r.text.as_str()),
1777 )
1778 .chain(
1779 query_analysis
1780 .relational_context
1781 .iter()
1782 .map(|r| r.stem.as_str()),
1783 )
1784 {
1785 if let Ok(Some(ent)) = g.find_entity_by_name(e) {
1786 query_entities.push(ent.uuid);
1787 }
1788 }
1789
1790 let d = if !query_entities.is_empty() {
1793 g.entities_average_density(&query_entities).ok().flatten()
1794 } else {
1795 None
1798 };
1799
1800 let mut ids = Vec::new();
1801
1802 let density_val = d.unwrap_or(0.0);
1808 let (bidir_depth, bidir_min_str, weighted_depth, weighted_min_str) =
1809 if density_val > 15.0 {
1810 (3usize, 0.12f32, 3usize, 0.15f32)
1811 } else if density_val > 8.0 {
1812 (4, 0.08, 4, 0.12)
1813 } else {
1814 (6, 0.05, 5, 0.10)
1815 };
1816
1817 if density_val > 0.0 {
1818 tracing::debug!(
1819 "Layer 2: density={:.1}, bidir_depth={}, bidir_min_str={:.2}, weighted_depth={}, weighted_min_str={:.2}",
1820 density_val, bidir_depth, bidir_min_str, weighted_depth, weighted_min_str
1821 );
1822 }
1823
1824 if query_entities.len() >= 2 {
1829 let max_pairs = 3usize;
1830 let max_ents = query_entities.len().min(4);
1831 let mut pair_count = 0usize;
1832 'bidir: for i in 0..max_ents {
1833 for j in (i + 1)..max_ents {
1834 if pair_count >= max_pairs {
1835 break 'bidir;
1836 }
1837 if let Ok(path) = g.traverse_bidirectional(
1838 &query_entities[i],
1839 &query_entities[j],
1840 bidir_depth,
1841 bidir_min_str,
1842 ) {
1843 for tr in &path.entities {
1844 if let Ok(mut eps) = g.get_episodes_by_entity(&tr.entity.uuid) {
1845 eps.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1848 eps.truncate(50);
1849 for ep in eps {
1850 let mid = MemoryId(ep.uuid);
1851 if episode_candidates
1852 .as_ref()
1853 .map_or(true, |c| c.contains(&mid))
1854 {
1855 let path_boost = 1.5;
1856 ids.push((
1857 mid,
1858 tr.entity.salience
1859 * tr.decay_factor
1860 * path_boost,
1861 tr.decay_factor,
1862 ));
1863 }
1864 }
1865 }
1866 }
1867 }
1868 pair_count += 1;
1869 }
1870 }
1871 }
1872
1873 for entity_uuid in &query_entities {
1875 if let Ok(t) =
1876 g.traverse_weighted(entity_uuid, weighted_depth, None, weighted_min_str)
1877 {
1878 for tr in &t.entities {
1879 if let Ok(mut eps) = g.get_episodes_by_entity(&tr.entity.uuid) {
1880 eps.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1881 eps.truncate(50);
1882 for ep in eps {
1883 let mid = MemoryId(ep.uuid);
1884 if episode_candidates
1885 .as_ref()
1886 .map_or(true, |c| c.contains(&mid))
1887 {
1888 ids.push((
1889 mid,
1890 tr.entity.salience * tr.decay_factor,
1891 tr.decay_factor,
1892 ));
1893 }
1894 }
1895 }
1896 }
1897 }
1898 }
1899
1900 let mut seen: std::collections::HashMap<MemoryId, (f32, f32)> =
1901 std::collections::HashMap::new();
1902 for (id, act, heb) in ids {
1903 seen.entry(id)
1904 .and_modify(|(a, h)| {
1905 *a = a.max(act);
1906 *h = h.max(heb);
1907 })
1908 .or_insert((act, heb));
1909 }
1910 let mut r: Vec<_> = seen.into_iter().map(|(id, (a, h))| (id, a, h)).collect();
1911 r.sort_by(|a, b| b.1.total_cmp(&a.1));
1913 let pre_cap = r.len();
1914 r.truncate(200);
1916 if !r.is_empty() {
1917 tracing::debug!("Layer 2: {} graph results (capped from {}), {} query entities, bidirectional={}, top_activation={:.3}",
1918 r.len(), pre_cap, entity_count, query_entities.len() >= 2, r.first().map(|x| x.1).unwrap_or(0.0));
1919 }
1920 (r, d, entity_count, weights, phrases, disc)
1921 } else {
1922 if !use_graph && self.graph_memory.is_some() {
1923 tracing::debug!(
1924 "Layer 2: SKIPPED (retrieval_mode={:?})",
1925 query.retrieval_mode
1926 );
1927 }
1928 let (disc, _) = query_analysis.keyword_discriminativeness();
1930 (
1931 Vec::new(),
1932 None,
1933 0,
1934 query_analysis.to_ic_weights(),
1935 query_analysis.to_phrase_boosts(),
1936 disc,
1937 )
1938 }
1939 };
1940
1941 let t_graph = recall_start.elapsed();
1942 tracing::info!(
1943 graph_ms = format!("{:.2}", (t_graph - t_embedding).as_secs_f64() * 1000.0),
1944 cumulative_ms = format!("{:.2}", t_graph.as_secs_f64() * 1000.0),
1945 graph_results = graph_results.len(),
1946 "recall [layer:1-2] episode filter + graph expansion"
1947 );
1948
1949 let vector_query = Query {
1951 user_id: query.user_id.clone(),
1952 query_text: None, query_embedding: Some(query_embedding),
1954 time_range: query.time_range,
1955 experience_types: query.experience_types.clone(),
1956 importance_threshold: query.importance_threshold,
1957 max_results: query.max_results,
1958 retrieval_mode: query.retrieval_mode.clone(),
1959 robot_id: query.robot_id.clone(),
1961 mission_id: query.mission_id.clone(),
1962 geo_filter: query.geo_filter.clone(),
1963 action_type: query.action_type.clone(),
1964 reward_range: query.reward_range,
1965 outcome_type: query.outcome_type.clone(),
1967 failures_only: query.failures_only,
1968 anomalies_only: query.anomalies_only,
1969 severity: query.severity.clone(),
1970 tags: query.tags.clone(),
1971 pattern_id: query.pattern_id.clone(),
1972 terrain_type: query.terrain_type.clone(),
1973 confidence_range: query.confidence_range,
1974 offset: query.offset,
1975 episode_id: query.episode_id.clone(),
1976 prospective_signals: query.prospective_signals.clone(),
1977 recency_weight: query.recency_weight,
1978 };
1979
1980 let vr = self
1984 .retriever
1985 .search_ids(&vector_query, query.max_results * 3)?;
1986 let vector_results: Vec<(MemoryId, f32)> = if let Some(ref c) = episode_candidates {
1987 vr.into_iter().filter(|(id, _)| c.contains(id)).collect()
1988 } else {
1989 vr
1990 };
1991 let t_vector = recall_start.elapsed();
1992 tracing::info!(
1993 vector_ms = format!("{:.2}", (t_vector - t_graph).as_secs_f64() * 1000.0),
1994 cumulative_ms = format!("{:.2}", t_vector.as_secs_f64() * 1000.0),
1995 vector_results = vector_results.len(),
1996 "recall [layer:3] Vamana vector search"
1997 );
1998
1999 let (memory_ids, hebbian_scores): (
2003 Vec<(MemoryId, f32)>,
2004 std::collections::HashMap<MemoryId, f32>,
2005 ) = {
2006 let get_content = |id: &MemoryId| -> Option<String> {
2007 self.working_memory
2008 .read()
2009 .get(id)
2010 .map(|m| m.experience.content.clone())
2011 .or_else(|| {
2012 self.session_memory
2013 .read()
2014 .get(id)
2015 .map(|m| m.experience.content.clone())
2016 })
2017 .or_else(|| {
2018 self.long_term_memory
2019 .get(id)
2020 .ok()
2021 .map(|m| m.experience.content.clone())
2022 })
2023 };
2024 let term_weights = if ic_weights.is_empty() {
2026 None
2027 } else {
2028 Some(&ic_weights)
2029 };
2030 let phrases = if phrase_boosts.is_empty() {
2031 None
2032 } else {
2033 Some(phrase_boosts.as_slice())
2034 };
2035 let disc_opt = if keyword_disc > 0.3 {
2038 Some(keyword_disc)
2039 } else {
2040 None
2041 };
2042 let hybrid_ids = self
2043 .hybrid_search
2044 .search_with_dynamic_weights(
2045 query_text,
2046 vector_results.clone(),
2047 get_content,
2048 term_weights,
2049 phrases,
2050 disc_opt,
2051 )
2052 .map(|r| {
2053 r.into_iter()
2054 .map(|x| (x.memory_id, x.score))
2055 .collect::<Vec<_>>()
2056 })
2057 .unwrap_or(vector_results);
2058
2059 const K: f32 = 30.0;
2071 let mut fused: std::collections::HashMap<MemoryId, f32> =
2072 std::collections::HashMap::new();
2073 let mut heb: std::collections::HashMap<MemoryId, f32> =
2074 std::collections::HashMap::new();
2075
2076 let (semantic_w, graph_w, linguistic_w) = graph_density
2080 .map(calculate_density_weights)
2081 .unwrap_or((0.6, 0.3, 0.1));
2082
2083 let hybrid_w = semantic_w + linguistic_w;
2085
2086 tracing::debug!(
2087 "Layer 4 RRF: density={:?}, graph_w={:.2}, hybrid_w={:.2}, query_entities={}",
2088 graph_density,
2089 graph_w,
2090 hybrid_w,
2091 query_entity_count
2092 );
2093
2094 for (r, (id, activation, h)) in graph_results.iter().enumerate() {
2096 let rrf_score = graph_w / (K + (r + 1) as f32);
2098 *fused.entry(id.clone()).or_insert(0.0) += rrf_score;
2099 heb.insert(id.clone(), *h);
2100
2101 let activation_bonus = graph_w * 0.2 * activation.clamp(0.0, 1.0);
2104 *fused.get_mut(id).unwrap() += activation_bonus;
2105 }
2106
2107 for (r, (id, _)) in hybrid_ids.iter().enumerate() {
2109 *fused.entry(id.clone()).or_insert(0.0) += hybrid_w / (K + (r + 1) as f32);
2110 }
2111
2112 if !attribute_boost_ids.is_empty() {
2119 const ATTRIBUTE_BOOST: f32 = 0.5; let mut boosted_count = 0;
2121 for id in &attribute_boost_ids {
2122 if fused.contains_key(id) {
2123 *fused.get_mut(id).unwrap() += ATTRIBUTE_BOOST;
2124 boosted_count += 1;
2125 } else {
2126 fused.insert(id.clone(), ATTRIBUTE_BOOST);
2128 boosted_count += 1;
2129 }
2130 }
2131 if boosted_count > 0 {
2132 tracing::info!(
2133 "Layer 4.5: Boosted {} memories for attribute query",
2134 boosted_count
2135 );
2136 }
2137 }
2138
2139 if !temporal_fact_boost_ids.is_empty() {
2146 const TEMPORAL_FACT_BOOST: f32 = 0.4;
2147 let mut boosted_count = 0;
2148 for id in &temporal_fact_boost_ids {
2149 if fused.contains_key(id) {
2150 *fused.get_mut(id).unwrap() += TEMPORAL_FACT_BOOST;
2151 boosted_count += 1;
2152 } else {
2153 fused.insert(id.clone(), TEMPORAL_FACT_BOOST);
2154 boosted_count += 1;
2155 }
2156 }
2157 if boosted_count > 0 {
2158 tracing::info!(
2159 "Layer 4.55: Boosted {} memories from temporal fact matches",
2160 boosted_count
2161 );
2162 }
2163 }
2164
2165 {
2179 let detector = self.interference_detector.read();
2180
2181 let max_score = fused
2183 .values()
2184 .copied()
2185 .fold(0.0_f32, |a, b| a.max(b))
2186 .max(0.01);
2187
2188 let adjustments: Vec<_> = fused
2190 .iter()
2191 .map(|(id, &score)| {
2192 let current_activation = (score / max_score).clamp(0.0, 1.0);
2193 let adjustment = detector
2194 .calculate_retrieval_adjustment(&id.0.to_string(), current_activation);
2195 (id.clone(), adjustment)
2196 })
2197 .filter(|(_, adj)| (*adj - 1.0).abs() > 0.01)
2198 .collect();
2199
2200 let adjusted_count = adjustments.len();
2202 for (id, adjustment) in adjustments {
2203 if let Some(score) = fused.get_mut(&id) {
2204 *score *= adjustment;
2205 }
2206 }
2207
2208 if adjusted_count > 0 {
2209 tracing::debug!(
2210 "Layer 4.6 (PIPE-3): Applied interference adjustments to {} memories",
2211 adjusted_count
2212 );
2213 }
2214 }
2215
2216 if let Some(ref signals) = query.prospective_signals {
2229 if !signals.is_empty() {
2230 const PROSPECTIVE_BOOST_PER_MATCH: f32 = 0.15;
2231 const MAX_PROSPECTIVE_BOOST: f32 = 0.5;
2232
2233 let signal_terms: std::collections::HashSet<String> = signals
2235 .iter()
2236 .flat_map(|s| {
2237 s.to_lowercase()
2238 .split_whitespace()
2239 .filter(|w| w.len() >= 3)
2240 .map(|w| w.to_string())
2241 .collect::<Vec<_>>()
2242 })
2243 .collect();
2244
2245 if !signal_terms.is_empty() {
2246 let mut boosted_count = 0;
2247 let ids: Vec<MemoryId> = fused.keys().cloned().collect();
2248
2249 for id in &ids {
2250 if let Some(content) = get_content(id) {
2251 let content_lower = content.to_lowercase();
2252 let match_count = signal_terms
2253 .iter()
2254 .filter(|term| content_lower.contains(term.as_str()))
2255 .count();
2256
2257 if match_count > 0 {
2258 let boost = (PROSPECTIVE_BOOST_PER_MATCH
2260 * (match_count as f32).sqrt())
2261 .min(MAX_PROSPECTIVE_BOOST);
2262 *fused.get_mut(id).unwrap() += boost;
2263 boosted_count += 1;
2264 }
2265 }
2266 }
2267
2268 if boosted_count > 0 {
2269 tracing::info!(
2270 "Layer 4.7: Boosted {} memories from {} prospective signal terms",
2271 boosted_count,
2272 signal_terms.len()
2273 );
2274 }
2275 }
2276 }
2277 }
2278
2279 if !fact_source_boosts.is_empty() {
2289 let mut boosted_count = 0;
2290 for (id, boost) in &fact_source_boosts {
2291 if let Some(score) = fused.get_mut(id) {
2292 *score += boost;
2293 boosted_count += 1;
2294 }
2295 }
2296 if boosted_count > 0 {
2297 tracing::info!(
2298 "Layer 4.8: Boosted {} memories from semantic fact sources",
2299 boosted_count
2300 );
2301 }
2302 }
2303
2304 let mut res: Vec<_> = fused.into_iter().collect();
2305 res.sort_by(|a, b| b.1.total_cmp(&a.1));
2306 res.truncate(query.max_results);
2307 tracing::debug!("Layer 4: {} fused results", res.len());
2308 (res, heb)
2309 };
2310
2311 let t_fusion = recall_start.elapsed();
2312 tracing::info!(
2313 fusion_ms = format!("{:.2}", (t_fusion - t_vector).as_secs_f64() * 1000.0),
2314 cumulative_ms = format!("{:.2}", t_fusion.as_secs_f64() * 1000.0),
2315 fused_results = memory_ids.len(),
2316 "recall [layer:4] BM25 + RRF fusion + boosts + interference"
2317 );
2318
2319 let mut memories = Vec::new();
2322 let mut sources = Vec::new();
2323 let mut cache_hits = 0;
2324 let mut storage_fetches = 0;
2325 let mut filtered_out = 0;
2326
2327 const RECENCY_DECAY_RATE: f32 = 0.01;
2331 let now = chrono::Utc::now();
2332
2333 let feedback_guard = self.feedback_store.as_ref().map(|fs| fs.read());
2336
2337 for (memory_id, score) in memory_ids {
2338 let hebbian_boost = hebbian_scores.get(&memory_id).copied().unwrap_or(0.0);
2340 let base_score = score + hebbian_boost * 0.1;
2341
2342 let recency_scale = query.recency_weight.unwrap_or(0.1);
2344 let with_unified_score = |mem: &SharedMemory, base: f32| -> SharedMemory {
2345 let hours_old = (now - mem.created_at).num_hours().max(0) as f32;
2347 let recency_boost = (-RECENCY_DECAY_RATE * hours_old).exp() * recency_scale;
2348
2349 let arousal_boost = mem
2352 .experience
2353 .context
2354 .as_ref()
2355 .map(|c| c.emotional.arousal * 0.05)
2356 .unwrap_or(0.0);
2357
2358 let credibility_boost = mem
2361 .experience
2362 .context
2363 .as_ref()
2364 .map(|c| (c.source.credibility - 0.5).max(0.0) * 0.1)
2365 .unwrap_or(0.0);
2366
2367 let temporal_boost = if has_temporal_query
2371 && !mem.experience.temporal_refs.is_empty()
2372 {
2373 let mut best_match = 0.0_f32;
2375 for mem_ref in &mem.experience.temporal_refs {
2376 for query_ref in &query_temporal.refs {
2377 if mem_ref == &query_ref.date.to_string() {
2379 best_match = best_match.max(0.25);
2380 } else if let Ok(mem_date) =
2381 chrono::NaiveDate::parse_from_str(mem_ref, "%Y-%m-%d")
2382 {
2383 let days_diff = (mem_date - query_ref.date).num_days().abs();
2385 if days_diff <= 7 {
2386 let proximity_boost = 0.15 * (1.0 - days_diff as f32 / 7.0);
2387 best_match = best_match.max(proximity_boost);
2388 } else if days_diff <= 30 {
2389 let proximity_boost = 0.05 * (1.0 - days_diff as f32 / 30.0);
2391 best_match = best_match.max(proximity_boost);
2392 }
2393 }
2394 }
2395 }
2396 best_match
2397 } else {
2398 0.0
2399 };
2400
2401 let feedback_multiplier = if let Some(ref guard) = feedback_guard {
2407 if let Some(fm) = guard.get_momentum(&mem.id) {
2408 let momentum = fm.ema_with_decay();
2409 if momentum < 0.0 {
2410 1.0 + (momentum * 0.2).max(-0.2)
2412 } else {
2413 1.0 + (momentum * 0.1).min(0.1)
2415 }
2416 } else {
2417 1.0 }
2419 } else {
2420 1.0 };
2422
2423 let final_score =
2424 (base + recency_boost + arousal_boost + credibility_boost + temporal_boost)
2425 * feedback_multiplier;
2426
2427 let mut cloned: Memory = mem.as_ref().clone();
2428 cloned.set_score(final_score);
2429 Arc::new(cloned)
2430 };
2431
2432 if let Some(memory) = self.working_memory.read().get(&memory_id) {
2434 if self.retriever.matches_filters(&memory, &vector_query) {
2436 memories.push(with_unified_score(&memory, base_score));
2437 if !sources.contains(&"working") {
2438 sources.push("working");
2439 }
2440 cache_hits += 1;
2441 } else {
2442 filtered_out += 1;
2443 }
2444 continue;
2445 }
2446
2447 if let Some(memory) = self.session_memory.read().get(&memory_id) {
2449 if self.retriever.matches_filters(&memory, &vector_query) {
2451 memories.push(with_unified_score(&memory, base_score));
2452 if !sources.contains(&"session") {
2453 sources.push("session");
2454 }
2455 cache_hits += 1;
2456 } else {
2457 filtered_out += 1;
2458 }
2459 continue;
2460 }
2461
2462 match self.retriever.get_from_storage(&memory_id) {
2464 Ok(memory) => {
2465 if self.retriever.matches_filters(&memory, &vector_query) {
2467 let shared = Arc::new(memory);
2469 memories.push(with_unified_score(&shared, base_score));
2470 if !sources.contains(&"longterm") {
2471 sources.push("longterm");
2472 }
2473 storage_fetches += 1;
2474 } else {
2475 filtered_out += 1;
2476 }
2477 }
2478 Err(e) => {
2479 tracing::warn!(
2480 memory_id = %memory_id.0,
2481 error = %e,
2482 "Stale vector reference — cleaning up orphaned index entry"
2483 );
2484 self.retriever.remove_memory(&memory_id);
2485 }
2486 }
2487
2488 if memories.len() >= query.max_results {
2489 break;
2490 }
2491 }
2492
2493 tracing::debug!(filtered_out = filtered_out, "Filter pass completed");
2494
2495 tracing::debug!(
2497 cache_hits = cache_hits,
2498 storage_fetches = storage_fetches,
2499 hit_rate = if cache_hits + storage_fetches > 0 {
2500 (cache_hits as f32 / (cache_hits + storage_fetches) as f32) * 100.0
2501 } else {
2502 0.0
2503 },
2504 "Cache-aware retrieval completed"
2505 );
2506
2507 let t_fetch = recall_start.elapsed();
2508 tracing::info!(
2509 fetch_ms = format!("{:.2}", (t_fetch - t_fusion).as_secs_f64() * 1000.0),
2510 cumulative_ms = format!("{:.2}", t_fetch.as_secs_f64() * 1000.0),
2511 memories = memories.len(),
2512 cache_hits,
2513 storage_fetches,
2514 filtered_out,
2515 "recall [layer:5] memory fetch + unified scoring"
2516 );
2517
2518 if !query_analysis.focal_entities.is_empty() {
2520 memories.sort_by(|a, b| {
2521 let score_a = a.score.unwrap_or(0.0)
2522 + Self::linguistic_boost(&a.experience.content, &query_analysis) * 0.05;
2523 let score_b = b.score.unwrap_or(0.0)
2524 + Self::linguistic_boost(&b.experience.content, &query_analysis) * 0.05;
2525 score_b.total_cmp(&score_a)
2526 });
2527 }
2528
2529 self.logger
2530 .read()
2531 .log_retrieved(query_text, memories.len(), &sources);
2532
2533 if memories.len() >= 2 {
2539 let candidates: Vec<(String, f32, f32)> = memories
2541 .iter()
2542 .enumerate()
2543 .map(|(i, m)| {
2544 let relevance = 1.0 - (i as f32 / memories.len() as f32) * 0.3; let similarity = m.score.unwrap_or(0.5); (m.id.0.to_string(), relevance, similarity)
2547 })
2548 .collect();
2549
2550 let competition_result = self
2551 .interference_detector
2552 .write()
2553 .apply_retrieval_competition(&candidates, query_text);
2554
2555 if let Some(ref event) = competition_result.event {
2557 self.record_consolidation_event(event.clone());
2558 }
2559
2560 if !competition_result.suppressed.is_empty() {
2562 let winner_set: std::collections::HashSet<_> = competition_result
2563 .winners
2564 .iter()
2565 .map(|(id, _)| id.clone())
2566 .collect();
2567
2568 memories.retain(|m| winner_set.contains(&m.id.0.to_string()));
2570
2571 tracing::debug!(
2572 "Retrieval competition: {} memories suppressed",
2573 competition_result.suppressed.len()
2574 );
2575 }
2576
2577 {
2579 let detector = self.interference_detector.read();
2580 let affected_ids = detector.get_affected_ids_from_competition(&competition_result);
2581 if !affected_ids.is_empty() {
2582 for (id, records) in detector.get_records_for_ids(&affected_ids) {
2583 if let Err(e) = self.long_term_memory.save_interference_records(id, records)
2584 {
2585 tracing::debug!("Failed to persist competition interference: {e}");
2586 }
2587 }
2588 let (total_events, _) = detector.stats();
2589 if let Err(e) = self
2590 .long_term_memory
2591 .save_interference_event_count(total_events)
2592 {
2593 tracing::debug!("Failed to persist interference event count: {e}");
2594 }
2595 }
2596 }
2597 }
2598
2599 for memory in &memories {
2602 self.update_access_count_instrumented(memory, StrengtheningReason::Recalled);
2603 }
2604
2605 if memories.len() >= 2 {
2611 if let Some(graph) = &self.graph_memory {
2612 let memory_uuids: Vec<uuid::Uuid> = memories.iter().map(|m| m.id.0).collect();
2613 match graph.read().record_memory_coactivation(&memory_uuids) {
2614 Ok(edges_updated) if edges_updated > 0 => {
2615 for i in 0..memories.len().min(5) {
2617 for j in (i + 1)..memories.len().min(5) {
2618 self.record_consolidation_event(
2619 introspection::ConsolidationEvent::EdgeStrengthened {
2620 from_memory_id: memories[i].id.0.to_string(),
2621 to_memory_id: memories[j].id.0.to_string(),
2622 strength_before: 0.0,
2623 strength_after: 0.025,
2624 co_activations: 1,
2625 timestamp: chrono::Utc::now(),
2626 },
2627 );
2628 }
2629 }
2630 }
2631 Err(e) => {
2632 tracing::trace!("Coactivation recording failed (non-critical): {e}");
2633 }
2634 _ => {}
2635 }
2636 }
2637 }
2638
2639 if let Ok(count) = self.long_term_memory.increment_retrieval_count() {
2641 self.stats.write().total_retrievals = count;
2642 }
2643
2644 let mut seen_ids: HashSet<MemoryId> = memories.iter().map(|m| m.id.clone()).collect();
2647 self.expand_with_hierarchy(&mut memories, &mut seen_ids);
2648
2649 let t_total = recall_start.elapsed();
2650 tracing::info!(
2651 post_ms = format!("{:.2}", (t_total - t_fetch).as_secs_f64() * 1000.0),
2652 total_ms = format!("{:.2}", t_total.as_secs_f64() * 1000.0),
2653 final_count = memories.len(),
2654 "recall [layer:post] linguistic + competition + coactivation + hierarchy === RECALL COMPLETE ==="
2655 );
2656
2657 Ok(memories)
2658 }
2659
2660 pub fn apply_learning_boost(
2674 &self,
2675 user_id: &str,
2676 mut memories: Vec<SharedMemory>,
2677 ) -> Vec<SharedMemory> {
2678 if memories.is_empty() {
2679 return memories;
2680 }
2681
2682 let mut boosted: Vec<(SharedMemory, f32)> = memories
2684 .drain(..)
2685 .map(|mem| {
2686 let base_score = mem.score.unwrap_or(0.5);
2687 let boost = self
2688 .learning_history
2689 .recency_boost(user_id, &mem.id.0.to_string())
2690 .unwrap_or(1.0);
2691 let adjusted_score = base_score * boost;
2692 (mem, adjusted_score)
2693 })
2694 .collect();
2695
2696 let boosted_count = boosted.iter().filter(|(_, s)| *s > 0.5).count();
2698 if boosted_count > 0 {
2699 tracing::debug!(
2700 user_id = %user_id,
2701 boosted_count = boosted_count,
2702 "Applied learning velocity boost to retrieved memories"
2703 );
2704 }
2705
2706 boosted.sort_by(|a, b| b.1.total_cmp(&a.1));
2708
2709 boosted
2711 .into_iter()
2712 .map(|(mem, score)| {
2713 let mut cloned: Memory = mem.as_ref().clone();
2714 cloned.set_score(score);
2715 Arc::new(cloned)
2716 })
2717 .collect()
2718 }
2719
2720 pub fn recall_for_user(&self, user_id: &str, query: &Query) -> Result<Vec<SharedMemory>> {
2725 let memories = self.recall(query)?;
2726 Ok(self.apply_learning_boost(user_id, memories))
2727 }
2728
2729 pub fn get_learning_velocity(
2734 &self,
2735 user_id: &str,
2736 memory_id: &str,
2737 hours: i64,
2738 ) -> Result<learning_history::LearningVelocity> {
2739 self.learning_history
2740 .memory_learning_velocity(user_id, memory_id, hours)
2741 }
2742
2743 pub fn get_learning_stats(&self, user_id: &str) -> Result<learning_history::LearningStats> {
2745 self.learning_history.stats(user_id)
2746 }
2747
2748 pub fn get_learning_events(
2750 &self,
2751 user_id: &str,
2752 since: chrono::DateTime<chrono::Utc>,
2753 limit: usize,
2754 ) -> Result<Vec<learning_history::StoredLearningEvent>> {
2755 let mut events = self.learning_history.events_since(user_id, since)?;
2756 events.truncate(limit);
2757 Ok(events)
2758 }
2759
2760 pub fn store_temporal_facts_for_memory(
2770 &self,
2771 user_id: &str,
2772 memory_id: &MemoryId,
2773 content: &str,
2774 entities: &[String],
2775 created_at: chrono::DateTime<chrono::Utc>,
2776 ) -> Result<usize> {
2777 let facts =
2778 temporal_facts::extract_temporal_facts(content, memory_id, created_at, entities);
2779 if facts.is_empty() {
2780 return Ok(0);
2781 }
2782
2783 let stored = self.temporal_fact_store.store_batch(user_id, &facts)?;
2784 if stored > 0 {
2785 tracing::debug!(
2786 user_id = user_id,
2787 memory_id = %memory_id.0,
2788 facts_stored = stored,
2789 "Stored temporal facts for memory"
2790 );
2791 }
2792 Ok(stored)
2793 }
2794
2795 pub fn find_temporal_facts(
2800 &self,
2801 user_id: &str,
2802 entity: &str,
2803 event_keywords: &[&str],
2804 event_type: Option<temporal_facts::EventType>,
2805 ) -> Result<Vec<temporal_facts::TemporalFact>> {
2806 self.temporal_fact_store.find_by_entity_and_event(
2807 user_id,
2808 entity,
2809 event_keywords,
2810 event_type,
2811 )
2812 }
2813
2814 pub fn list_temporal_facts(
2816 &self,
2817 user_id: &str,
2818 limit: usize,
2819 ) -> Result<Vec<temporal_facts::TemporalFact>> {
2820 self.temporal_fact_store.list(user_id, limit)
2821 }
2822
2823 fn linguistic_boost(content: &str, analysis: &query_parser::QueryAnalysis) -> f32 {
2825 let content_lower = content.to_lowercase();
2826 let mut boost = 0.0;
2827
2828 for entity in &analysis.focal_entities {
2829 if content_lower.contains(&entity.text) {
2830 boost += entity.ic_weight;
2831 }
2832 }
2833
2834 for modifier in &analysis.discriminative_modifiers {
2835 if content_lower.contains(&modifier.text) {
2836 boost += 1.7; }
2838 }
2839
2840 boost
2841 }
2842
2843 #[inline]
2851 fn sha256_hash(text: &str) -> [u8; 32] {
2852 let mut hasher = Sha256::new();
2853 hasher.update(text.as_bytes());
2854 hasher.finalize().into()
2855 }
2856
2857 pub fn forget(&self, criteria: ForgetCriteria) -> Result<usize> {
2860 let forgotten_count = match criteria {
2861 ForgetCriteria::ById(memory_id) => {
2862 let mut deleted_from_any = false;
2864 let mut was_in_working = false;
2865 let mut was_in_session = false;
2866 let mut was_in_longterm = false;
2867
2868 if self.working_memory.write().remove(&memory_id).is_ok() {
2870 deleted_from_any = true;
2871 was_in_working = true;
2872 }
2873
2874 if self.session_memory.write().remove(&memory_id).is_ok() {
2876 deleted_from_any = true;
2877 was_in_session = true;
2878 }
2879
2880 if self.long_term_memory.delete(&memory_id).is_ok() {
2882 deleted_from_any = true;
2883 was_in_longterm = true;
2884 }
2885
2886 let was_indexed = self.retriever.remove_memory(&memory_id);
2889
2890 if let Some(graph) = &self.graph_memory {
2892 if let Err(e) = graph.read().delete_episode(&memory_id.0) {
2893 tracing::warn!(
2894 memory_id = %memory_id.0,
2895 error = %e,
2896 "Failed to clean up graph episode for deleted memory"
2897 );
2898 }
2899 }
2900
2901 if let Err(e) = self.hybrid_search.remove_memory(&memory_id) {
2903 tracing::warn!(
2904 memory_id = %memory_id.0,
2905 error = %e,
2906 "Failed to clean BM25 index for deleted memory"
2907 );
2908 }
2909
2910 self.cleanup_interference_for_ids(&[memory_id.clone()]);
2912
2913 if deleted_from_any {
2915 let mut stats = self.stats.write();
2916 stats.total_memories = stats.total_memories.saturating_sub(1);
2917 if was_in_working {
2918 stats.working_memory_count = stats.working_memory_count.saturating_sub(1);
2919 }
2920 if was_in_session {
2921 stats.session_memory_count = stats.session_memory_count.saturating_sub(1);
2922 }
2923 if was_in_longterm {
2924 stats.long_term_memory_count =
2925 stats.long_term_memory_count.saturating_sub(1);
2926 }
2927 if was_indexed {
2928 stats.vector_index_count = stats.vector_index_count.saturating_sub(1);
2929 }
2930 1
2931 } else {
2932 0
2933 }
2934 }
2935 ForgetCriteria::OlderThan(days) => {
2936 let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64);
2937
2938 let working_removed = self.working_memory.write().remove_older_than(cutoff)?;
2940
2941 let session_removed = self.session_memory.write().remove_older_than(cutoff)?;
2943
2944 let flagged_ids = self.long_term_memory.mark_forgotten_by_age(cutoff)?;
2946 let lt_flagged = flagged_ids.len();
2947
2948 for id in &flagged_ids {
2950 self.retriever.remove_memory(id);
2951 let _ = self.hybrid_search.remove_memory(id);
2952 }
2953 self.cleanup_graph_for_ids(&flagged_ids);
2954 self.cleanup_interference_for_ids(&flagged_ids);
2955
2956 {
2958 let removed = working_removed + session_removed + lt_flagged;
2959 if removed > 0 {
2960 let mut stats = self.stats.write();
2961 stats.working_memory_count =
2962 stats.working_memory_count.saturating_sub(working_removed);
2963 stats.session_memory_count =
2964 stats.session_memory_count.saturating_sub(session_removed);
2965 stats.long_term_memory_count =
2966 stats.long_term_memory_count.saturating_sub(lt_flagged);
2967 stats.total_memories = stats.total_memories.saturating_sub(removed);
2968 }
2969 }
2970
2971 lt_flagged
2972 }
2973 ForgetCriteria::LowImportance(threshold) => {
2974 let working_removed = self
2975 .working_memory
2976 .write()
2977 .remove_below_importance(threshold)?;
2978 let session_removed = self
2979 .session_memory
2980 .write()
2981 .remove_below_importance(threshold)?;
2982 let flagged_ids = self
2983 .long_term_memory
2984 .mark_forgotten_by_importance(threshold)?;
2985 let lt_flagged = flagged_ids.len();
2986
2987 for id in &flagged_ids {
2989 self.retriever.remove_memory(id);
2990 let _ = self.hybrid_search.remove_memory(id);
2991 }
2992 self.cleanup_graph_for_ids(&flagged_ids);
2993 self.cleanup_interference_for_ids(&flagged_ids);
2994
2995 {
2997 let removed = working_removed + session_removed + lt_flagged;
2998 if removed > 0 {
2999 let mut stats = self.stats.write();
3000 stats.working_memory_count =
3001 stats.working_memory_count.saturating_sub(working_removed);
3002 stats.session_memory_count =
3003 stats.session_memory_count.saturating_sub(session_removed);
3004 stats.long_term_memory_count =
3005 stats.long_term_memory_count.saturating_sub(lt_flagged);
3006 stats.total_memories = stats.total_memories.saturating_sub(removed);
3007 }
3008 }
3009
3010 lt_flagged
3011 }
3012 ForgetCriteria::Pattern(pattern) => {
3013 self.forget_by_pattern(&pattern)?
3015 }
3016 ForgetCriteria::ByTags(tags) => {
3017 self.forget_by_tags(&tags)?
3019 }
3020 ForgetCriteria::ByDateRange { start, end } => {
3021 self.forget_by_date_range(start, end)?
3023 }
3024 ForgetCriteria::ByType(exp_type) => {
3025 self.forget_by_type(exp_type)?
3027 }
3028 ForgetCriteria::All => {
3029 self.forget_all()?
3031 }
3032 };
3033
3034 if forgotten_count > 0 {
3036 if let Err(e) = self.hybrid_search.commit_and_reload() {
3037 tracing::warn!(error = %e, "Failed to commit BM25 after forget");
3038 }
3039 }
3040
3041 Ok(forgotten_count)
3042 }
3043
3044 pub fn stats(&self) -> MemoryStats {
3050 let mut stats = self.stats.read().clone();
3051
3052 if let Ok(storage_stats) = self.long_term_memory.get_stats() {
3055 stats.average_importance = storage_stats.average_importance;
3056 }
3057
3058 stats
3059 }
3060
3061 pub fn export_visualization_dot(&self) -> String {
3063 self.logger.read().graph.to_dot()
3064 }
3065
3066 pub fn build_visualization_graph(&self) -> Result<visualization::GraphStats> {
3069 let mut logger = self.logger.write();
3070
3071 for memory in self.working_memory.read().all_memories() {
3073 logger.graph.add_memory(&memory, "working");
3074 }
3075
3076 for memory in self.session_memory.read().all_memories() {
3078 logger.graph.add_memory(&memory, "session");
3079 }
3080
3081 for memory in self.long_term_memory.get_all()? {
3083 logger.graph.add_memory(&memory, "longterm");
3084 }
3085
3086 Ok(logger.get_stats())
3087 }
3088
3089 pub fn get_embedder(&self) -> &dyn Embedder {
3091 self.embedder.as_ref()
3092 }
3093
3094 pub fn compute_embedding(&self, text: &str) -> Result<Vec<f32>> {
3096 self.embedder.encode(text)
3097 }
3098
3099 pub fn get_all_memories(&self) -> Result<Vec<SharedMemory>> {
3102 use std::collections::HashSet;
3103 let mut seen_ids: HashSet<MemoryId> = HashSet::new();
3104 let mut all_memories = Vec::new();
3105
3106 {
3108 let working = self.working_memory.read();
3109 for mem in working.all_memories() {
3110 if seen_ids.insert(mem.id.clone()) {
3111 all_memories.push(mem);
3112 }
3113 }
3114 }
3115
3116 {
3118 let session = self.session_memory.read();
3119 for mem in session.all_memories() {
3120 if seen_ids.insert(mem.id.clone()) {
3121 all_memories.push(mem);
3122 }
3123 }
3124 }
3125
3126 {
3128 let longterm_mems = self.long_term_memory.get_all()?;
3129 for mem in longterm_mems {
3130 if seen_ids.insert(mem.id.clone()) {
3131 all_memories.push(Arc::new(mem));
3132 }
3133 }
3134 }
3135
3136 Ok(all_memories)
3137 }
3138
3139 pub fn find_memory_by_prefix(&self, id_prefix: &str) -> Result<Option<SharedMemory>> {
3145 if let Ok(uuid) = uuid::Uuid::parse_str(id_prefix) {
3147 let target_id = MemoryId(uuid);
3148 let all = self.get_all_memories()?;
3149 return Ok(all.into_iter().find(|m| m.id == target_id));
3150 }
3151
3152 let prefix_lower = id_prefix.to_lowercase();
3154 let all_memories = self.get_all_memories()?;
3155 let matches: Vec<SharedMemory> = all_memories
3156 .into_iter()
3157 .filter(|m| {
3158 m.id.0
3159 .to_string()
3160 .replace('-', "")
3161 .to_lowercase()
3162 .starts_with(&prefix_lower)
3163 })
3164 .collect();
3165
3166 match matches.len() {
3167 0 => Ok(None),
3168 1 => Ok(Some(matches.into_iter().next().unwrap())),
3169 n => Err(anyhow::anyhow!(
3170 "Ambiguous memory ID prefix '{}': matches {} memories",
3171 id_prefix,
3172 n
3173 )),
3174 }
3175 }
3176
3177 pub fn get_working_memories(&self) -> Vec<SharedMemory> {
3179 let working = self.working_memory.read();
3180 working.all_memories()
3181 }
3182
3183 pub fn get_session_memories(&self) -> Vec<SharedMemory> {
3185 let session = self.session_memory.read();
3186 session.all_memories()
3187 }
3188
3189 pub fn get_longterm_memories(&self, limit: usize) -> Result<Vec<Memory>> {
3192 let all = self.long_term_memory.get_all()?;
3193 Ok(all.into_iter().take(limit).collect())
3194 }
3195
3196 fn calculate_temporal_relevance(age_days: i64) -> f32 {
3207 match age_days {
3208 0..=7 => 1.0, 8..=30 => 0.7, 31..=90 => 0.4, _ => 0.2, }
3213 }
3214
3215 fn expand_with_hierarchy(
3225 &self,
3226 memories: &mut Vec<SharedMemory>,
3227 seen_ids: &mut HashSet<MemoryId>,
3228 ) {
3229 if memories.is_empty() {
3231 return;
3232 }
3233
3234 let ids_to_expand: Vec<MemoryId> = memories.iter().map(|m| m.id.clone()).collect();
3236
3237 for memory_id in ids_to_expand {
3239 if let Ok(ancestors) = self.long_term_memory.get_ancestors(&memory_id) {
3241 for ancestor in ancestors {
3242 if seen_ids.insert(ancestor.id.clone()) {
3243 let new_importance = (ancestor.importance() * 1.1).min(1.0);
3245 let mut shared = Arc::new(ancestor);
3246 Arc::make_mut(&mut shared).set_importance(new_importance);
3247 memories.push(shared);
3248 }
3249 }
3250 }
3251
3252 if let Ok(children) = self.long_term_memory.get_children(&memory_id) {
3254 for child in children {
3255 if seen_ids.insert(child.id.clone()) {
3256 let new_importance = (child.importance() * 1.05).min(1.0);
3258 let mut shared = Arc::new(child);
3259 Arc::make_mut(&mut shared).set_importance(new_importance);
3260 memories.push(shared);
3261 }
3262 }
3263 }
3264 }
3265 }
3266
3267 fn calculate_importance(&self, experience: &Experience) -> f32 {
3269 let mut factors = Vec::new();
3270
3271 let type_score = match experience.experience_type {
3273 ExperienceType::Decision => 0.3,
3274 ExperienceType::Error => 0.25,
3275 ExperienceType::Learning => 0.25,
3276 ExperienceType::Discovery => 0.2,
3277 ExperienceType::Pattern => 0.2,
3278 ExperienceType::Task => 0.15,
3279 ExperienceType::Conversation => 0.1,
3280 ExperienceType::Context => 0.1,
3281 _ => 0.05,
3282 };
3283 factors.push(("type", type_score));
3284
3285 let _content_length = experience.content.len();
3287 let word_count = experience.content.split_whitespace().count();
3288 let richness_score = if word_count > 50 {
3289 0.25
3290 } else if word_count > 20 {
3291 0.15
3292 } else if word_count > 5 {
3293 0.08
3294 } else {
3295 0.02
3296 };
3297 factors.push(("richness", richness_score));
3298
3299 let entity_score = if experience.entities.len() > 10 {
3301 0.2
3302 } else if experience.entities.len() > 5 {
3303 0.15
3304 } else if experience.entities.len() > 2 {
3305 0.1
3306 } else if !experience.entities.is_empty() {
3307 0.05
3308 } else {
3309 0.0
3310 };
3311 factors.push(("entities", entity_score));
3312
3313 let context_score = if let Some(ctx) = &experience.context {
3315 let mut score: f32 = 0.0;
3316
3317 if !ctx.semantic.concepts.is_empty() {
3319 score += 0.05;
3320 }
3321 if !ctx.semantic.tags.is_empty() {
3322 score += 0.03;
3323 }
3324 if !ctx.semantic.related_concepts.is_empty() {
3325 score += 0.04;
3326 }
3327
3328 if ctx.project.project_id.is_some() {
3330 score += 0.03;
3331 }
3332
3333 if ctx.code.current_file.is_some() {
3335 score += 0.03;
3336 }
3337
3338 if !ctx.document.citations.is_empty() {
3340 score += 0.02;
3341 }
3342
3343 score.min(0.2)
3344 } else {
3345 0.0
3346 };
3347 factors.push(("context", context_score));
3348
3349 let mut metadata_score: f32 = 0.0;
3351
3352 if experience.metadata.contains_key("priority") {
3353 if let Some(priority) = experience.metadata.get("priority") {
3354 metadata_score += match priority.as_str() {
3355 "critical" => 0.15,
3356 "high" => 0.10,
3357 "medium" => 0.05,
3358 _ => 0.0,
3359 };
3360 }
3361 }
3362
3363 if experience.metadata.contains_key("unexpected") {
3364 metadata_score += 0.08;
3365 }
3366
3367 if experience.metadata.contains_key("breakthrough") {
3368 metadata_score += 0.12;
3369 }
3370
3371 if experience.metadata.get("role") == Some(&"user".to_string()) {
3372 metadata_score += 0.02; }
3374
3375 factors.push(("metadata", metadata_score.min(0.15)));
3376
3377 let embedding_score = if let Some(emb) = &experience.embeddings {
3379 if emb.len() >= 384 {
3380 0.1
3382 } else {
3383 0.05
3384 }
3385 } else {
3386 0.0
3387 };
3388 factors.push(("embeddings", embedding_score));
3389
3390 let content_lower = experience.content.to_lowercase();
3392 let mut quality_score: f32 = 0.0;
3393
3394 let technical_terms = [
3396 "algorithm",
3397 "architecture",
3398 "implementation",
3399 "optimization",
3400 "performance",
3401 "security",
3402 "database",
3403 "api",
3404 "framework",
3405 ];
3406 for term in &technical_terms {
3407 if content_lower.contains(term) {
3408 quality_score += 0.015;
3409 }
3410 }
3411
3412 if content_lower.contains('?') {
3414 quality_score += 0.02;
3415 }
3416
3417 if experience.content.contains("```")
3419 || experience.content.contains("fn ")
3420 || experience.content.contains("function ")
3421 || experience.content.contains("class ")
3422 {
3423 quality_score += 0.03;
3424 }
3425
3426 factors.push(("quality", quality_score.min(0.1)));
3427
3428 let importance: f32 = factors.iter().map(|(_, score)| score).sum();
3430
3431 let importance = importance.clamp(0.0, 1.0);
3433
3434 if importance > 0.7 {
3436 debug!("High importance memory: {:.2} (type={:.2}, richness={:.2}, entities={:.2}, context={:.2})",
3437 importance,
3438 factors.iter().find(|(k, _)| *k == "type").map(|(_, v)| v).unwrap_or(&0.0),
3439 factors.iter().find(|(k, _)| *k == "richness").map(|(_, v)| v).unwrap_or(&0.0),
3440 factors.iter().find(|(k, _)| *k == "entities").map(|(_, v)| v).unwrap_or(&0.0),
3441 factors.iter().find(|(k, _)| *k == "context").map(|(_, v)| v).unwrap_or(&0.0)
3442 );
3443 }
3444
3445 importance
3446 }
3447
3448 fn consolidate_if_needed(&self) -> Result<()> {
3454 self.promote_working_to_session()?;
3456
3457 self.promote_session_to_longterm()?;
3459
3460 if self.config.auto_compress {
3462 self.compress_old_memories()?;
3463 }
3464
3465 Ok(())
3466 }
3467
3468 fn promote_working_to_session(&self) -> Result<()> {
3473 let now = chrono::Utc::now();
3474 let min_age = chrono::Duration::seconds(TIER_PROMOTION_WORKING_AGE_SECS);
3475
3476 let to_promote: Vec<SharedMemory> = {
3478 let working = self.working_memory.read();
3479 working
3480 .all_memories()
3481 .into_iter()
3482 .filter(|m| {
3483 let age = now - m.created_at;
3484 let importance = m.importance();
3485 let threshold =
3486 self.graph_adjusted_threshold(m, TIER_PROMOTION_WORKING_IMPORTANCE);
3487 importance >= threshold && age >= min_age
3488 })
3489 .collect()
3490 };
3491
3492 if to_promote.is_empty() {
3493 return Ok(());
3494 }
3495
3496 let count = to_promote.len();
3497 let mut working = self.working_memory.write();
3498 let mut session = self.session_memory.write();
3499
3500 for memory in &to_promote {
3501 self.logger
3503 .write()
3504 .log_promoted(&memory.id, "working", "session", count);
3505
3506 let mut promoted_memory = (**memory).clone();
3508 promoted_memory.promote(); session.add(promoted_memory)?;
3510 working.remove(&memory.id)?;
3511 }
3512
3513 if count > 0 {
3514 let mut stats = self.stats.write();
3515 stats.promotions_to_session += count;
3516 stats.working_memory_count = stats.working_memory_count.saturating_sub(count);
3517 stats.session_memory_count += count;
3518 tracing::debug!(
3519 "Promoted {} memories from working to session (importance >= {}, age >= {}s)",
3520 count,
3521 TIER_PROMOTION_WORKING_IMPORTANCE,
3522 TIER_PROMOTION_WORKING_AGE_SECS
3523 );
3524 }
3525 Ok(())
3526 }
3527
3528 fn promote_session_to_longterm(&self) -> Result<()> {
3533 let now = chrono::Utc::now();
3534 let min_age = chrono::Duration::seconds(TIER_PROMOTION_SESSION_AGE_SECS);
3535
3536 let to_promote: Vec<SharedMemory> = {
3538 let session = self.session_memory.read();
3539 session
3540 .all_memories()
3541 .into_iter()
3542 .filter(|m| {
3543 let age = now - m.created_at;
3544 let importance = m.importance();
3545 let threshold =
3546 self.graph_adjusted_threshold(m, TIER_PROMOTION_SESSION_IMPORTANCE);
3547 importance >= threshold && age >= min_age
3548 })
3549 .collect()
3550 };
3551
3552 if to_promote.is_empty() {
3553 return Ok(());
3554 }
3555
3556 let count = to_promote.len();
3557 let mut session = self.session_memory.write();
3558
3559 for memory in &to_promote {
3560 self.logger
3562 .write()
3563 .log_promoted(&memory.id, "session", "longterm", count);
3564
3565 let mut owned_memory = (**memory).clone();
3567 owned_memory.promote(); let compressed_memory = if self.should_compress(&owned_memory) {
3571 self.compressor.compress(&owned_memory)?
3572 } else {
3573 owned_memory
3574 };
3575
3576 self.long_term_memory.store(&compressed_memory)?;
3578
3579 if let Err(e) = self.retriever.index_memory(&compressed_memory) {
3581 tracing::warn!(
3582 "Failed to index memory {} in vector DB: {}",
3583 compressed_memory.id.0,
3584 e
3585 );
3586 }
3588
3589 session.remove(&memory.id)?;
3591 }
3592
3593 if count > 0 {
3594 let mut stats = self.stats.write();
3595 stats.promotions_to_longterm += count;
3596 stats.session_memory_count = stats.session_memory_count.saturating_sub(count);
3597 stats.long_term_memory_count += count;
3598 tracing::debug!(
3599 "Promoted {} memories from session to long-term (importance >= {}, age >= {}s)",
3600 count,
3601 TIER_PROMOTION_SESSION_IMPORTANCE,
3602 TIER_PROMOTION_SESSION_AGE_SECS
3603 );
3604 }
3605 Ok(())
3606 }
3607
3608 fn graph_adjusted_threshold(&self, memory: &Memory, base_threshold: f32) -> f32 {
3618 use crate::constants::*;
3619
3620 let graph = match &self.graph_memory {
3621 Some(g) => g,
3622 None => return base_threshold,
3623 };
3624
3625 if memory.entity_refs.is_empty() {
3626 return base_threshold;
3627 }
3628
3629 let graph_guard = graph.read();
3630 let mut l2_plus_count = 0usize;
3631
3632 for entity_ref in &memory.entity_refs {
3633 if let Ok(edges) = graph_guard.get_entity_relationships(&entity_ref.entity_id) {
3634 for edge in &edges {
3635 if matches!(
3636 edge.tier,
3637 crate::graph_memory::EdgeTier::L2Episodic
3638 | crate::graph_memory::EdgeTier::L3Semantic
3639 ) {
3640 l2_plus_count += 1;
3641 }
3642 }
3643 }
3644 }
3645
3646 if l2_plus_count == 0 {
3647 base_threshold * (1.0 + GRAPH_HEALTH_NO_EDGES_PENALTY as f32)
3649 } else {
3650 let ratio = (l2_plus_count as f64 / GRAPH_HEALTH_EDGE_SATURATION).min(1.0);
3652 base_threshold * (1.0 - (GRAPH_HEALTH_PROMOTION_DISCOUNT * ratio) as f32)
3653 }
3654 }
3655
3656 pub fn apply_edge_promotion_boosts(
3662 &self,
3663 boosts: &[crate::memory::types::EdgePromotionBoost],
3664 ) -> Result<usize> {
3665 let mut applied = 0;
3666
3667 for boost in boosts {
3668 let memory_id = match uuid::Uuid::parse_str(&boost.memory_id) {
3669 Ok(uuid) => MemoryId(uuid),
3670 Err(_) => continue,
3671 };
3672
3673 let found = self
3675 .working_memory
3676 .read()
3677 .get(&memory_id)
3678 .or_else(|| self.session_memory.read().get(&memory_id));
3679
3680 if let Some(memory) = found {
3681 let new_importance = (memory.importance() + boost.boost as f32).min(1.0);
3682 memory.set_importance(new_importance);
3683 self.record_consolidation_event(ConsolidationEvent::EdgePromotionBoostApplied {
3684 memory_id: boost.memory_id.clone(),
3685 entity_name: boost.entity_name.clone(),
3686 old_tier: boost.old_tier.clone(),
3687 new_tier: boost.new_tier.clone(),
3688 importance_boost: boost.boost,
3689 new_importance: new_importance as f64,
3690 timestamp: chrono::Utc::now(),
3691 });
3692 applied += 1;
3693 } else if let Ok(memory) = self.long_term_memory.get(&memory_id) {
3694 let new_importance = (memory.importance() + boost.boost as f32).min(1.0);
3695 memory.set_importance(new_importance);
3696 if let Err(e) = self.long_term_memory.store(&memory) {
3697 tracing::debug!(
3698 "Failed to persist edge promotion boost for {}: {}",
3699 boost.memory_id,
3700 e
3701 );
3702 continue;
3703 }
3704 self.record_consolidation_event(ConsolidationEvent::EdgePromotionBoostApplied {
3705 memory_id: boost.memory_id.clone(),
3706 entity_name: boost.entity_name.clone(),
3707 old_tier: boost.old_tier.clone(),
3708 new_tier: boost.new_tier.clone(),
3709 importance_boost: boost.boost,
3710 new_importance: new_importance as f64,
3711 timestamp: chrono::Utc::now(),
3712 });
3713 applied += 1;
3714 }
3715 }
3716
3717 if applied > 0 {
3718 tracing::debug!(
3719 "Applied {} edge promotion boosts to memory importance",
3720 applied
3721 );
3722 }
3723
3724 Ok(applied)
3725 }
3726
3727 pub fn compensate_orphaned_memories(&self, orphaned_entity_ids: &[String]) -> Result<usize> {
3733 use crate::constants::ORPHAN_COMPENSATORY_BOOST;
3734
3735 if orphaned_entity_ids.is_empty() {
3736 return Ok(0);
3737 }
3738
3739 let orphaned_set: std::collections::HashSet<&str> =
3740 orphaned_entity_ids.iter().map(|s| s.as_str()).collect();
3741
3742 let mut compensated = 0;
3743
3744 let tiers: Vec<Vec<SharedMemory>> = vec![
3746 self.working_memory.read().all_memories(),
3747 self.session_memory.read().all_memories(),
3748 ];
3749
3750 for memories in &tiers {
3751 for memory in memories {
3752 let entity_count = memory
3753 .entity_refs
3754 .iter()
3755 .filter(|e| orphaned_set.contains(e.entity_id.to_string().as_str()))
3756 .count();
3757 if entity_count > 0 {
3758 let new_importance =
3759 (memory.importance() + ORPHAN_COMPENSATORY_BOOST as f32).min(1.0);
3760 memory.set_importance(new_importance);
3761 self.record_consolidation_event(ConsolidationEvent::GraphOrphanDetected {
3762 memory_id: memory.id.0.to_string(),
3763 entity_count,
3764 compensatory_boost: ORPHAN_COMPENSATORY_BOOST,
3765 timestamp: chrono::Utc::now(),
3766 });
3767 compensated += 1;
3768 }
3769 }
3770 }
3771
3772 if compensated > 0 {
3773 tracing::debug!(
3774 "Compensated {} orphaned memories (from {} orphaned entities)",
3775 compensated,
3776 orphaned_entity_ids.len()
3777 );
3778 }
3779
3780 Ok(compensated)
3781 }
3782
3783 fn compress_old_memories(&self) -> Result<()> {
3785 let cutoff =
3786 chrono::Utc::now() - chrono::Duration::days(self.config.compression_age_days as i64);
3787
3788 let to_compress = self.long_term_memory.get_uncompressed_older_than(cutoff)?;
3790
3791 for memory in to_compress {
3792 let compressed = self.compressor.compress(&memory)?;
3793 self.long_term_memory.update(&compressed)?;
3794 self.stats.write().compressed_count += 1;
3795 }
3796
3797 Ok(())
3798 }
3799
3800 fn should_compress(&self, memory: &Memory) -> bool {
3802 let age = chrono::Utc::now() - memory.created_at;
3803 age.num_days() > self.config.compression_age_days as i64 && !memory.compressed
3804 }
3805
3806 #[allow(dead_code)]
3809 fn update_access_count(&self, memory_id: &MemoryId) -> Result<()> {
3810 {
3813 let mut wm = self.working_memory.write();
3814
3815 if wm.contains(memory_id) {
3816 return wm
3818 .update_access(memory_id)
3819 .map_err(|e| anyhow::anyhow!("Failed to update working memory access: {e}"));
3820 }
3821 } {
3825 let mut sm = self.session_memory.write();
3826
3827 if sm.contains(memory_id) {
3828 return sm
3829 .update_access(memory_id)
3830 .map_err(|e| anyhow::anyhow!("Failed to update session memory access: {e}"));
3831 }
3832 } self.long_term_memory
3836 .update_access(memory_id)
3837 .map_err(|e| anyhow::anyhow!("Failed to update long-term memory access: {e}"))
3838 }
3839
3840 fn update_access_count_instrumented(&self, memory: &SharedMemory, reason: StrengtheningReason) {
3845 let activation_before = memory.importance();
3847
3848 memory.update_access();
3850
3851 let activation_after = memory.importance();
3853
3854 if (activation_after - activation_before).abs() > f32::EPSILON {
3856 let content_preview = if memory.experience.content.chars().count() > 50 {
3857 let truncated: String = memory.experience.content.chars().take(50).collect();
3858 format!("{}...", truncated)
3859 } else {
3860 memory.experience.content.clone()
3861 };
3862
3863 let event = ConsolidationEvent::MemoryStrengthened {
3864 memory_id: memory.id.0.to_string(),
3865 content_preview,
3866 activation_before,
3867 activation_after,
3868 reason,
3869 timestamp: chrono::Utc::now(),
3870 };
3871
3872 self.consolidation_events.write().push(event);
3873 }
3874 }
3875
3876 fn cleanup_graph_for_ids(&self, ids: &[MemoryId]) {
3878 if ids.is_empty() {
3879 return;
3880 }
3881 if let Some(graph) = &self.graph_memory {
3882 let graph_guard = graph.read();
3883 for id in ids {
3884 if let Err(e) = graph_guard.delete_episode(&id.0) {
3885 tracing::debug!("Graph cleanup failed for {}: {}", &id.0.to_string()[..8], e);
3886 }
3887 }
3888 }
3889 }
3890
3891 fn cleanup_interference_for_ids(&self, ids: &[MemoryId]) {
3893 if ids.is_empty() {
3894 return;
3895 }
3896 let mut detector = self.interference_detector.write();
3897 for id in ids {
3898 let id_str = id.0.to_string();
3899 detector.clear_memory(&id_str);
3900 if let Err(e) = self.long_term_memory.delete_interference_records(&id_str) {
3901 tracing::debug!(
3902 "Interference cleanup failed for {}: {e}",
3903 &id_str[..8.min(id_str.len())]
3904 );
3905 }
3906 }
3907 }
3908
3909 fn forget_by_pattern(&self, pattern: &str) -> Result<usize> {
3913 let regex = crate::validation::validate_and_compile_pattern(pattern)
3915 .map_err(|e| anyhow::anyhow!("Invalid pattern: {e}"))?;
3916 let mut count = 0;
3917 let mut working_removed = 0;
3918 let mut session_removed = 0;
3919 let mut long_term_removed = 0;
3920
3921 let working_ids: Vec<MemoryId> = {
3923 let working = self.working_memory.read();
3924 working
3925 .all_memories()
3926 .iter()
3927 .filter(|m| regex.is_match(&m.experience.content))
3928 .map(|m| m.id.clone())
3929 .collect()
3930 };
3931 {
3933 let mut working = self.working_memory.write();
3934 for id in &working_ids {
3935 if working.remove(id).is_ok() {
3936 self.retriever.remove_memory(id);
3937 let _ = self.hybrid_search.remove_memory(id);
3938 working_removed += 1;
3939 count += 1;
3940 }
3941 }
3942 }
3943
3944 let session_ids: Vec<MemoryId> = {
3946 let session = self.session_memory.read();
3947 session
3948 .all_memories()
3949 .iter()
3950 .filter(|m| regex.is_match(&m.experience.content))
3951 .map(|m| m.id.clone())
3952 .collect()
3953 };
3954 {
3956 let mut session = self.session_memory.write();
3957 for id in &session_ids {
3958 if session.remove(id).is_ok() {
3959 self.retriever.remove_memory(id);
3960 let _ = self.hybrid_search.remove_memory(id);
3961 session_removed += 1;
3962 count += 1;
3963 }
3964 }
3965 }
3966
3967 let all_lt = self.long_term_memory.get_all()?;
3969 let mut lt_ids = Vec::new();
3970 for memory in all_lt {
3971 if regex.is_match(&memory.experience.content) {
3972 lt_ids.push(memory.id.clone());
3973 self.retriever.remove_memory(&memory.id);
3974 let _ = self.hybrid_search.remove_memory(&memory.id);
3975 self.long_term_memory.delete(&memory.id)?;
3976 long_term_removed += 1;
3977 count += 1;
3978 }
3979 }
3980
3981 let all_ids: Vec<MemoryId> = working_ids
3983 .into_iter()
3984 .chain(session_ids)
3985 .chain(lt_ids)
3986 .collect();
3987 self.cleanup_graph_for_ids(&all_ids);
3988 self.cleanup_interference_for_ids(&all_ids);
3989
3990 {
3992 let mut stats = self.stats.write();
3993 stats.total_memories = stats.total_memories.saturating_sub(count);
3994 stats.working_memory_count = stats.working_memory_count.saturating_sub(working_removed);
3995 stats.session_memory_count = stats.session_memory_count.saturating_sub(session_removed);
3996 stats.long_term_memory_count = stats
3997 .long_term_memory_count
3998 .saturating_sub(long_term_removed);
3999 stats.vector_index_count = stats.vector_index_count.saturating_sub(count);
4000 }
4001
4002 Ok(count)
4003 }
4004
4005 fn forget_by_tags(&self, tags: &[String]) -> Result<usize> {
4007 let mut count = 0;
4008 let mut working_removed = 0;
4009 let mut session_removed = 0;
4010 let mut long_term_removed = 0;
4011 let mut all_deleted_ids = Vec::new();
4012
4013 {
4015 let mut working = self.working_memory.write();
4016 let ids_to_remove: Vec<MemoryId> = working
4017 .all_memories()
4018 .iter()
4019 .filter(|m| m.experience.tags.iter().any(|t| tags.contains(t)))
4020 .map(|m| m.id.clone())
4021 .collect();
4022 for id in &ids_to_remove {
4023 if working.remove(id).is_ok() {
4024 self.retriever.remove_memory(id);
4025 let _ = self.hybrid_search.remove_memory(id);
4026 working_removed += 1;
4027 count += 1;
4028 }
4029 }
4030 all_deleted_ids.extend(ids_to_remove);
4031 }
4032
4033 {
4035 let mut session = self.session_memory.write();
4036 let ids_to_remove: Vec<MemoryId> = session
4037 .all_memories()
4038 .iter()
4039 .filter(|m| m.experience.tags.iter().any(|t| tags.contains(t)))
4040 .map(|m| m.id.clone())
4041 .collect();
4042 for id in &ids_to_remove {
4043 if session.remove(id).is_ok() {
4044 self.retriever.remove_memory(id);
4045 let _ = self.hybrid_search.remove_memory(id);
4046 session_removed += 1;
4047 count += 1;
4048 }
4049 }
4050 all_deleted_ids.extend(ids_to_remove);
4051 }
4052
4053 let all_lt = self.long_term_memory.get_all()?;
4055 for memory in all_lt {
4056 if memory.experience.tags.iter().any(|t| tags.contains(t)) {
4057 all_deleted_ids.push(memory.id.clone());
4058 self.retriever.remove_memory(&memory.id);
4059 let _ = self.hybrid_search.remove_memory(&memory.id);
4060 self.long_term_memory.delete(&memory.id)?;
4061 long_term_removed += 1;
4062 count += 1;
4063 }
4064 }
4065
4066 self.cleanup_graph_for_ids(&all_deleted_ids);
4068 self.cleanup_interference_for_ids(&all_deleted_ids);
4069
4070 {
4072 let mut stats = self.stats.write();
4073 stats.total_memories = stats.total_memories.saturating_sub(count);
4074 stats.working_memory_count = stats.working_memory_count.saturating_sub(working_removed);
4075 stats.session_memory_count = stats.session_memory_count.saturating_sub(session_removed);
4076 stats.long_term_memory_count = stats
4077 .long_term_memory_count
4078 .saturating_sub(long_term_removed);
4079 stats.vector_index_count = stats.vector_index_count.saturating_sub(count);
4080 }
4081
4082 Ok(count)
4083 }
4084
4085 fn forget_by_date_range(
4087 &self,
4088 start: chrono::DateTime<chrono::Utc>,
4089 end: chrono::DateTime<chrono::Utc>,
4090 ) -> Result<usize> {
4091 let mut count = 0;
4092 let mut working_removed = 0;
4093 let mut session_removed = 0;
4094 let mut long_term_removed = 0;
4095
4096 let working_ids: Vec<MemoryId> = {
4098 let working = self.working_memory.read();
4099 working
4100 .all_memories()
4101 .iter()
4102 .filter(|m| m.created_at >= start && m.created_at <= end)
4103 .map(|m| m.id.clone())
4104 .collect()
4105 };
4106 {
4108 let mut working = self.working_memory.write();
4109 for id in &working_ids {
4110 if working.remove(id).is_ok() {
4111 self.retriever.remove_memory(id);
4112 let _ = self.hybrid_search.remove_memory(id);
4113 working_removed += 1;
4114 count += 1;
4115 }
4116 }
4117 }
4118
4119 let session_ids: Vec<MemoryId> = {
4121 let session = self.session_memory.read();
4122 session
4123 .all_memories()
4124 .iter()
4125 .filter(|m| m.created_at >= start && m.created_at <= end)
4126 .map(|m| m.id.clone())
4127 .collect()
4128 };
4129 {
4131 let mut session = self.session_memory.write();
4132 for id in &session_ids {
4133 if session.remove(id).is_ok() {
4134 self.retriever.remove_memory(id);
4135 let _ = self.hybrid_search.remove_memory(id);
4136 session_removed += 1;
4137 count += 1;
4138 }
4139 }
4140 }
4141
4142 let memories = self
4144 .long_term_memory
4145 .search(storage::SearchCriteria::ByDate { start, end })?;
4146 let mut lt_ids = Vec::new();
4147 for memory in memories {
4148 lt_ids.push(memory.id.clone());
4149 self.retriever.remove_memory(&memory.id);
4150 let _ = self.hybrid_search.remove_memory(&memory.id);
4151 self.long_term_memory.delete(&memory.id)?;
4152 long_term_removed += 1;
4153 count += 1;
4154 }
4155
4156 let all_ids: Vec<MemoryId> = working_ids
4158 .into_iter()
4159 .chain(session_ids)
4160 .chain(lt_ids)
4161 .collect();
4162 self.cleanup_graph_for_ids(&all_ids);
4163 self.cleanup_interference_for_ids(&all_ids);
4164
4165 {
4167 let mut stats = self.stats.write();
4168 stats.total_memories = stats.total_memories.saturating_sub(count);
4169 stats.working_memory_count = stats.working_memory_count.saturating_sub(working_removed);
4170 stats.session_memory_count = stats.session_memory_count.saturating_sub(session_removed);
4171 stats.long_term_memory_count = stats
4172 .long_term_memory_count
4173 .saturating_sub(long_term_removed);
4174 stats.vector_index_count = stats.vector_index_count.saturating_sub(count);
4175 }
4176
4177 Ok(count)
4178 }
4179
4180 fn forget_by_type(&self, exp_type: ExperienceType) -> Result<usize> {
4182 let mut count = 0;
4183 let mut working_removed = 0;
4184 let mut session_removed = 0;
4185 let mut long_term_removed = 0;
4186
4187 let working_ids: Vec<MemoryId> = {
4189 let working = self.working_memory.read();
4190 working
4191 .all_memories()
4192 .iter()
4193 .filter(|m| m.experience.experience_type == exp_type)
4194 .map(|m| m.id.clone())
4195 .collect()
4196 };
4197 {
4199 let mut working = self.working_memory.write();
4200 for id in &working_ids {
4201 if working.remove(id).is_ok() {
4202 self.retriever.remove_memory(id);
4203 let _ = self.hybrid_search.remove_memory(id);
4204 working_removed += 1;
4205 count += 1;
4206 }
4207 }
4208 }
4209
4210 let session_ids: Vec<MemoryId> = {
4212 let session = self.session_memory.read();
4213 session
4214 .all_memories()
4215 .iter()
4216 .filter(|m| m.experience.experience_type == exp_type)
4217 .map(|m| m.id.clone())
4218 .collect()
4219 };
4220 {
4222 let mut session = self.session_memory.write();
4223 for id in &session_ids {
4224 if session.remove(id).is_ok() {
4225 self.retriever.remove_memory(id);
4226 let _ = self.hybrid_search.remove_memory(id);
4227 session_removed += 1;
4228 count += 1;
4229 }
4230 }
4231 }
4232
4233 let memories = self
4235 .long_term_memory
4236 .search(storage::SearchCriteria::ByType(exp_type))?;
4237 let mut lt_ids = Vec::new();
4238 for memory in memories {
4239 lt_ids.push(memory.id.clone());
4240 self.retriever.remove_memory(&memory.id);
4241 let _ = self.hybrid_search.remove_memory(&memory.id);
4242 self.long_term_memory.delete(&memory.id)?;
4243 long_term_removed += 1;
4244 count += 1;
4245 }
4246
4247 let all_ids: Vec<MemoryId> = working_ids
4249 .into_iter()
4250 .chain(session_ids)
4251 .chain(lt_ids)
4252 .collect();
4253 self.cleanup_graph_for_ids(&all_ids);
4254 self.cleanup_interference_for_ids(&all_ids);
4255
4256 {
4258 let mut stats = self.stats.write();
4259 stats.total_memories = stats.total_memories.saturating_sub(count);
4260 stats.working_memory_count = stats.working_memory_count.saturating_sub(working_removed);
4261 stats.session_memory_count = stats.session_memory_count.saturating_sub(session_removed);
4262 stats.long_term_memory_count = stats
4263 .long_term_memory_count
4264 .saturating_sub(long_term_removed);
4265 stats.vector_index_count = stats.vector_index_count.saturating_sub(count);
4266 }
4267
4268 Ok(count)
4269 }
4270
4271 fn forget_all(&self) -> Result<usize> {
4276 let mut count = 0;
4282
4283 if let Some(graph) = &self.graph_memory {
4286 match graph.read().clear_all() {
4287 Ok((entities, relationships, episodes)) => {
4288 tracing::info!(
4289 entities,
4290 relationships,
4291 episodes,
4292 "Graph cleared during forget_all"
4293 );
4294 }
4295 Err(e) => {
4296 tracing::warn!(error = %e, "Failed to clear knowledge graph during forget_all");
4297 }
4299 }
4300 }
4301
4302 let all_lt = self.long_term_memory.get_all()?;
4304 let long_term_count = all_lt.len();
4305 for memory in all_lt {
4306 self.retriever.remove_memory(&memory.id);
4307 let _ = self.hybrid_search.remove_memory(&memory.id);
4308 self.long_term_memory.delete(&memory.id)?;
4309 }
4310 count += long_term_count;
4311
4312 let session_ids: Vec<MemoryId> = {
4314 let session = self.session_memory.read();
4315 session
4316 .all_memories()
4317 .iter()
4318 .map(|m| m.id.clone())
4319 .collect()
4320 };
4321 let session_count = session_ids.len();
4322 for id in &session_ids {
4323 self.retriever.remove_memory(id);
4324 let _ = self.hybrid_search.remove_memory(id);
4325 }
4326 {
4327 let mut session = self.session_memory.write();
4328 session.clear();
4329 }
4330 count += session_count;
4331
4332 let working_ids: Vec<MemoryId> = {
4334 let working = self.working_memory.read();
4335 working
4336 .all_memories()
4337 .iter()
4338 .map(|m| m.id.clone())
4339 .collect()
4340 };
4341 let working_count = working_ids.len();
4342 for id in &working_ids {
4343 self.retriever.remove_memory(id);
4344 let _ = self.hybrid_search.remove_memory(id);
4345 }
4346 {
4347 let mut working = self.working_memory.write();
4348 working.clear();
4349 }
4350 count += working_count;
4351
4352 if let Err(e) = self.hybrid_search.commit_and_reload() {
4354 tracing::warn!(error = %e, "BM25 commit failed during forget_all");
4355 }
4356
4357 {
4359 let db = self.long_term_memory.db();
4360 let mut batch = rocksdb::WriteBatch::default();
4361 let mut facts_deleted = 0usize;
4362 for prefix in &[
4363 "facts:",
4364 "facts_by_entity:",
4365 "facts_by_type:",
4366 "facts_embedding:",
4367 ] {
4368 let iter = db.prefix_iterator(prefix.as_bytes());
4369 for item in iter {
4370 if let Ok((key, _)) = item {
4371 if !key.starts_with(prefix.as_bytes()) {
4372 break;
4373 }
4374 batch.delete(&key);
4375 if *prefix == "facts:" {
4376 facts_deleted += 1;
4377 }
4378 }
4379 }
4380 }
4381 let iter = db.prefix_iterator(b"temporal_facts:");
4383 for item in iter {
4384 if let Ok((key, _)) = item {
4385 if !key.starts_with(b"temporal_facts:") {
4386 break;
4387 }
4388 batch.delete(&key);
4389 }
4390 }
4391 if facts_deleted > 0 || !batch.is_empty() {
4392 if let Err(e) = db.write(batch) {
4393 tracing::warn!(error = %e, "Failed to clear facts during forget_all");
4394 } else {
4395 tracing::info!(facts_deleted, "Semantic facts cleared during forget_all");
4396 }
4397 }
4398 }
4399
4400 {
4402 let mut detector = self.interference_detector.write();
4403 *detector = replay::InterferenceDetector::new();
4404 }
4405 if let Err(e) = self.long_term_memory.clear_all_interference_records() {
4406 tracing::warn!(error = %e, "Failed to clear interference records during forget_all");
4407 }
4408
4409 {
4411 let mut stats = self.stats.write();
4412 stats.total_memories = 0;
4413 stats.working_memory_count = 0;
4414 stats.session_memory_count = 0;
4415 stats.long_term_memory_count = 0;
4416 stats.vector_index_count = 0;
4417 }
4418
4419 Ok(count)
4420 }
4421
4422 pub fn show_visualization(&self) {
4424 self.logger.read().show_visualization();
4425 }
4426
4427 pub fn export_graph(&self, path: &std::path::Path) -> Result<()> {
4429 self.logger.read().export_dot(path)
4430 }
4431
4432 pub fn get_visualization_stats(&self) -> GraphStats {
4434 self.logger.read().get_stats()
4435 }
4436
4437 pub fn flush_storage(&self) -> Result<()> {
4439 self.long_term_memory.flush()?;
4441
4442 self.retriever.save()?;
4444
4445 Ok(())
4446 }
4447
4448 pub fn get_db(&self) -> std::sync::Arc<rocksdb::DB> {
4454 self.long_term_memory.db()
4455 }
4456
4457 pub fn advanced_search(&self, criteria: storage::SearchCriteria) -> Result<Vec<Memory>> {
4459 self.long_term_memory.search(criteria)
4460 }
4461
4462 pub fn get_memory(&self, id: &MemoryId) -> Result<Memory> {
4464 self.long_term_memory.get(id)
4465 }
4466
4467 pub fn update_memory(&self, memory: &Memory) -> Result<()> {
4475 let memory_id = memory.id.clone();
4476
4477 self.long_term_memory.update(memory)?;
4479
4480 if let Err(e) = self.retriever.reindex_memory(memory) {
4482 tracing::warn!(
4483 "Failed to reindex memory {} in vector DB: {}",
4484 memory_id.0,
4485 e
4486 );
4487 }
4488
4489 if let Err(e) = self.hybrid_search.index_memory(
4491 &memory_id,
4492 &memory.experience.content,
4493 &memory.experience.tags,
4494 &memory.experience.entities,
4495 ) {
4496 tracing::warn!("Failed to reindex memory {} in BM25: {}", memory_id.0, e);
4497 }
4498 if let Err(e) = self.hybrid_search.commit_and_reload() {
4499 tracing::warn!("Failed to commit/reload BM25 index: {}", e);
4500 }
4501
4502 {
4504 let mut working = self.working_memory.write();
4505 if working.contains(&memory_id) {
4506 let _ = working.remove(&memory_id);
4507 let _ = working.add_shared(std::sync::Arc::new(memory.clone()));
4508 }
4509 }
4510 {
4511 let mut session = self.session_memory.write();
4512 if session.contains(&memory_id) {
4513 let _ = session.remove(&memory_id);
4514 let _ = session.add_shared(std::sync::Arc::new(memory.clone()));
4515 }
4516 }
4517
4518 Ok(())
4519 }
4520
4521 pub fn set_memory_parent(
4528 &self,
4529 memory_id: &MemoryId,
4530 parent_id: Option<MemoryId>,
4531 ) -> Result<()> {
4532 let mut memory = self.long_term_memory.get(memory_id)?;
4534 memory.set_parent(parent_id.clone());
4535 self.long_term_memory.update(&memory)?;
4536
4537 let updated = Arc::new(memory);
4540 {
4541 let mut wm = self.working_memory.write();
4542 if wm.contains(memory_id) {
4543 let _ = wm.remove(memory_id);
4544 let _ = wm.add_shared(Arc::clone(&updated));
4545 }
4546 }
4547 {
4548 let mut sm = self.session_memory.write();
4549 if sm.contains(memory_id) {
4550 let _ = sm.remove(memory_id);
4551 let _ = sm.add_shared(Arc::clone(&updated));
4552 }
4553 }
4554
4555 Ok(())
4556 }
4557
4558 pub fn get_memory_children(&self, parent_id: &MemoryId) -> Result<Vec<Memory>> {
4560 self.long_term_memory.get_children(parent_id)
4561 }
4562
4563 pub fn get_memory_ancestors(&self, memory_id: &MemoryId) -> Result<Vec<Memory>> {
4565 self.long_term_memory.get_ancestors(memory_id)
4566 }
4567
4568 pub fn get_memory_hierarchy(
4570 &self,
4571 memory_id: &MemoryId,
4572 ) -> Result<(Vec<Memory>, Memory, Vec<Memory>)> {
4573 self.long_term_memory.get_hierarchy_context(memory_id)
4574 }
4575
4576 pub fn decompress_memory(&self, memory: &Memory) -> Result<Memory> {
4578 self.compressor.decompress(memory)
4579 }
4580
4581 pub fn get_storage_stats(&self) -> Result<storage::StorageStats> {
4583 self.long_term_memory.get_stats()
4584 }
4585
4586 pub fn get_uncompressed_older_than(
4588 &self,
4589 cutoff: chrono::DateTime<chrono::Utc>,
4590 ) -> Result<Vec<Memory>> {
4591 self.long_term_memory.get_uncompressed_older_than(cutoff)
4592 }
4593
4594 pub fn rebuild_vector_index(&self) -> Result<()> {
4596 self.retriever.rebuild_index()
4597 }
4598
4599 pub fn repair_vector_index(&self) -> Result<(usize, usize, usize, usize)> {
4606 let all_memories = self.long_term_memory.get_all()?;
4607 let total_storage = all_memories.len();
4608 let indexed_before = self.retriever.len();
4609
4610 let mut repaired = 0;
4611 let mut failed = 0;
4612
4613 let indexed_ids = self.retriever.get_indexed_memory_ids();
4615
4616 for memory in all_memories {
4617 if indexed_ids.contains(&memory.id) {
4619 continue;
4620 }
4621
4622 tracing::info!(
4624 memory_id = %memory.id.0,
4625 content_preview = %memory.experience.content.chars().take(50).collect::<String>(),
4626 "Repairing orphaned memory"
4627 );
4628
4629 match self.retriever.index_memory(&memory) {
4630 Ok(_) => {
4631 repaired += 1;
4632 tracing::info!(memory_id = %memory.id.0, "Successfully repaired orphaned memory");
4633 }
4634 Err(e) => {
4635 failed += 1;
4636 tracing::error!(
4637 memory_id = %memory.id.0,
4638 error = %e,
4639 "Failed to repair orphaned memory - embedding generation failed"
4640 );
4641 }
4642 }
4643 }
4644
4645 let indexed_after = self.retriever.len();
4646
4647 {
4649 let mut stats = self.stats.write();
4650 stats.vector_index_count = indexed_after;
4651 }
4652
4653 tracing::info!(
4654 total_storage = total_storage,
4655 indexed_before = indexed_before,
4656 indexed_after = indexed_after,
4657 repaired = repaired,
4658 failed = failed,
4659 "Vector index repair completed"
4660 );
4661
4662 Ok((total_storage, indexed_after, repaired, failed))
4663 }
4664
4665 pub fn verify_index_integrity(&self) -> Result<IndexIntegrityReport> {
4673 let all_memories = self.long_term_memory.get_all()?;
4674 let total_storage = all_memories.len();
4675 let indexed_ids = self.retriever.get_indexed_memory_ids();
4676 let total_indexed = indexed_ids.len();
4677
4678 let mut orphaned_ids = Vec::new();
4679 for memory in &all_memories {
4680 if !indexed_ids.contains(&memory.id) {
4681 if orphaned_ids.len() < 100 {
4682 orphaned_ids.push(memory.id.clone());
4683 }
4684 }
4685 }
4686
4687 let orphaned_count = total_storage.saturating_sub(total_indexed);
4688
4689 let is_healthy = orphaned_count == 0;
4690 Ok(IndexIntegrityReport {
4691 total_storage,
4692 total_indexed,
4693 orphaned_count,
4694 orphaned_ids,
4695 is_healthy,
4696 healthy: is_healthy,
4697 })
4698 }
4699
4700 pub fn cleanup_corrupted(&self) -> Result<usize> {
4703 self.long_term_memory.cleanup_corrupted()
4704 }
4705
4706 pub fn migrate_legacy(&self) -> Result<(usize, usize, usize)> {
4709 self.long_term_memory.migrate_legacy()
4710 }
4711
4712 pub fn rebuild_index(&self) -> Result<(usize, usize)> {
4716 tracing::info!("Starting full index rebuild from storage");
4717 self.retriever.rebuild_index()?;
4718 let indexed = self.retriever.len();
4719 let storage_count = self.long_term_memory.get_stats()?.total_count;
4720
4721 {
4723 let mut stats = self.stats.write();
4724 stats.vector_index_count = indexed;
4725 }
4726
4727 tracing::info!(
4728 storage_count = storage_count,
4729 indexed = indexed,
4730 "Index rebuild complete"
4731 );
4732
4733 Ok((storage_count, indexed))
4734 }
4735
4736 pub fn save_vector_index(&self, _path: &Path) -> Result<()> {
4739 self.retriever.save()
4740 }
4741 pub fn index_health(&self) -> retrieval::IndexHealth {
4746 self.retriever.index_health()
4747 }
4748
4749 pub fn auto_rebuild_index_if_needed(&self) -> Result<bool> {
4754 self.retriever.auto_rebuild_index_if_needed()
4755 }
4756
4757 fn auto_repair_and_compact(&self) {
4766 let health = self.index_health();
4768
4769 if health.needs_compaction {
4771 tracing::info!(
4772 "Index compaction triggered: {:.1}% deleted ({} of {} vectors)",
4773 health.deletion_ratio * 100.0,
4774 health.deleted_count,
4775 health.total_vectors
4776 );
4777 if let Err(e) = self.auto_rebuild_index_if_needed() {
4778 tracing::warn!("Index compaction failed: {}", e);
4779 }
4780 }
4781
4782 let storage_count = self
4785 .long_term_memory
4786 .get_stats()
4787 .map(|s| s.total_count)
4788 .unwrap_or(0);
4789 let index_count = health.total_vectors.saturating_sub(health.deleted_count);
4790
4791 if storage_count > index_count {
4792 let orphan_estimate = storage_count - index_count;
4794 if orphan_estimate > 0 {
4795 tracing::info!(
4796 "Potential orphaned memories detected: ~{} (storage={}, indexed={})",
4797 orphan_estimate,
4798 storage_count,
4799 index_count
4800 );
4801 match self.repair_vector_index() {
4802 Ok((_, _, repaired, failed)) => {
4803 if repaired > 0 || failed > 0 {
4804 tracing::info!(
4805 "Index repair complete: {} repaired, {} failed",
4806 repaired,
4807 failed
4808 );
4809 }
4810 }
4811 Err(e) => {
4812 tracing::warn!("Index repair failed: {}", e);
4813 }
4814 }
4815 }
4816 }
4817 }
4818
4819 pub fn recall_tracked(&self, query: &Query) -> Result<TrackedRetrieval> {
4836 let result = self.retriever.search_tracked(query, query.max_results)?;
4837 if let Ok(count) = self.long_term_memory.increment_retrieval_count() {
4838 self.stats.write().total_retrievals = count;
4839 }
4840 Ok(result)
4841 }
4842
4843 pub fn reinforce_recall(
4861 &self,
4862 memory_ids: &[MemoryId],
4863 outcome: RetrievalOutcome,
4864 ) -> Result<ReinforcementStats> {
4865 if memory_ids.is_empty() {
4866 return Ok(ReinforcementStats::default());
4867 }
4868
4869 let mut stats = ReinforcementStats {
4870 memories_processed: memory_ids.len(),
4871 outcome: outcome.clone(),
4872 ..Default::default()
4873 };
4874
4875 if !matches!(outcome, RetrievalOutcome::Misleading) && memory_ids.len() >= 2 {
4878 if let Some(graph) = &self.graph_memory {
4879 let memory_uuids: Vec<uuid::Uuid> = memory_ids.iter().map(|id| id.0).collect();
4880 if memory_uuids.len() >= 2 {
4881 match graph.read().record_memory_coactivation(&memory_uuids) {
4882 Ok(count) => {
4883 stats.associations_strengthened = count;
4884 }
4885 Err(e) => {
4886 tracing::warn!(error = %e, "Failed to record memory coactivation");
4887 let n = memory_ids.len();
4889 stats.associations_strengthened = n * (n - 1) / 2;
4890 }
4891 }
4892 }
4893 } else {
4894 let n = memory_ids.len();
4896 stats.associations_strengthened = n * (n - 1) / 2;
4897 }
4898 }
4899
4900 let mut persist_failures: Vec<(MemoryId, String)> = Vec::new();
4907
4908 for id in memory_ids {
4909 let cached_memory = {
4911 let working = self.working_memory.read();
4912 working.get(id)
4913 };
4914
4915 let cached_memory = cached_memory.or_else(|| {
4917 let session = self.session_memory.read();
4918 session.get(id)
4919 });
4920
4921 if let Some(memory) = cached_memory {
4922 memory.record_access();
4924 match &outcome {
4925 RetrievalOutcome::Helpful => {
4926 memory.boost_importance(HEBBIAN_BOOST_HELPFUL);
4927 stats.importance_boosts += 1;
4928 }
4929 RetrievalOutcome::Misleading => {
4930 memory.decay_importance(HEBBIAN_DECAY_MISLEADING);
4931 stats.importance_decays += 1;
4932 }
4933 RetrievalOutcome::Neutral => {
4934 }
4936 }
4937 if let Err(e) = self.long_term_memory.update(&memory) {
4940 persist_failures.push((id.clone(), e.to_string()));
4941 tracing::warn!(
4942 memory_id = %id.0,
4943 error = %e,
4944 "Failed to persist reinforcement update - Hebbian feedback may be lost on restart"
4945 );
4946 }
4947 } else {
4948 match self.long_term_memory.get(id) {
4950 Ok(memory) => {
4951 memory.record_access();
4952 match &outcome {
4953 RetrievalOutcome::Helpful => {
4954 memory.boost_importance(HEBBIAN_BOOST_HELPFUL);
4955 stats.importance_boosts += 1;
4956 }
4957 RetrievalOutcome::Misleading => {
4958 memory.decay_importance(HEBBIAN_DECAY_MISLEADING);
4959 stats.importance_decays += 1;
4960 }
4961 RetrievalOutcome::Neutral => {
4962 }
4964 }
4965 if let Err(e) = self.long_term_memory.update(&memory) {
4967 persist_failures.push((id.clone(), e.to_string()));
4968 tracing::warn!(
4969 memory_id = %id.0,
4970 error = %e,
4971 "Failed to persist reinforcement update - Hebbian feedback may be lost on restart"
4972 );
4973 }
4974 }
4975 Err(e) => {
4976 tracing::debug!(
4977 memory_id = %id.0,
4978 error = %e,
4979 "Memory not found during reinforcement - may have been deleted"
4980 );
4981 }
4982 }
4983 }
4984 }
4985
4986 if !persist_failures.is_empty() {
4988 stats.persist_failures = persist_failures.len();
4989 tracing::error!(
4990 failure_count = persist_failures.len(),
4991 "Hebbian reinforcement had persistence failures - learning feedback partially lost"
4992 );
4993 }
4994
4995 Ok(stats)
4996 }
4997
4998 pub fn reinforce_recall_tracked(
5000 &self,
5001 tracked: &TrackedRetrieval,
5002 outcome: RetrievalOutcome,
5003 ) -> Result<ReinforcementStats> {
5004 self.retriever.reinforce_tracked(tracked, outcome)
5005 }
5006
5007 pub fn graph_maintenance(&self) {
5012 if let Some(graph) = &self.graph_memory {
5013 if let Err(e) = graph.read().apply_decay() {
5014 tracing::debug!("Graph decay maintenance failed: {e}");
5015 }
5016 }
5017 }
5018
5019 fn connect_facts_to_graph(&self, facts: &[SemanticFact]) {
5027 let graph = match &self.graph_memory {
5028 Some(g) => g,
5029 None => return,
5030 };
5031 let graph_guard = graph.read();
5032 let now = chrono::Utc::now();
5033 let mut entities_added = 0;
5034 let mut edges_added = 0;
5035
5036 let mut all_entity_names: Vec<String> = Vec::new();
5038 for fact in facts {
5039 for name in &fact.related_entities {
5040 if !all_entity_names.contains(name) {
5041 all_entity_names.push(name.clone());
5042 }
5043 }
5044 }
5045
5046 let embedding_map: std::collections::HashMap<String, Vec<f32>> =
5048 if all_entity_names.is_empty() {
5049 std::collections::HashMap::new()
5050 } else {
5051 let name_refs: Vec<&str> = all_entity_names.iter().map(|s| s.as_str()).collect();
5052 match self.embedder.encode_batch(&name_refs) {
5053 Ok(embs) => all_entity_names.into_iter().zip(embs).collect(),
5054 Err(e) => {
5055 tracing::debug!(
5056 error = %e,
5057 "Fact entity name embedding failed, concept merge disabled"
5058 );
5059 std::collections::HashMap::new()
5060 }
5061 }
5062 };
5063
5064 for fact in facts {
5065 for entity_name in &fact.related_entities {
5067 let entity = crate::graph_memory::EntityNode {
5068 uuid: Uuid::new_v4(),
5069 name: entity_name.clone(),
5070 labels: vec![crate::graph_memory::EntityLabel::Concept],
5071 created_at: now,
5072 last_seen_at: now,
5073 mention_count: 1,
5074 summary: String::new(),
5075 attributes: std::collections::HashMap::new(),
5076 name_embedding: embedding_map.get(entity_name).cloned(),
5077 salience: fact.confidence * 0.5,
5078 is_proper_noun: entity_name
5079 .chars()
5080 .next()
5081 .map(|c| c.is_uppercase())
5082 .unwrap_or(false),
5083 };
5084 if graph_guard.add_entity(entity).is_ok() {
5085 entities_added += 1;
5086 }
5087 }
5088
5089 let entities = &fact.related_entities;
5091 let l2_base_weight = crate::graph_memory::EdgeTier::L2Episodic.initial_weight();
5092 for i in 0..entities.len() {
5093 for j in (i + 1)..entities.len() {
5094 if let (Ok(Some(e1)), Ok(Some(e2))) = (
5095 graph_guard.find_entity_by_name(&entities[i]),
5096 graph_guard.find_entity_by_name(&entities[j]),
5097 ) {
5098 let semantic_weight = match (&e1.name_embedding, &e2.name_embedding) {
5099 (Some(emb1), Some(emb2)) => {
5100 let sim = crate::similarity::cosine_similarity(emb1, emb2).max(0.0);
5101 EDGE_SEMANTIC_WEIGHT_FLOOR
5102 + (1.0 - EDGE_SEMANTIC_WEIGHT_FLOOR) * sim
5103 }
5104 _ => 1.0,
5105 };
5106
5107 let edge = crate::graph_memory::RelationshipEdge {
5108 uuid: Uuid::new_v4(),
5109 from_entity: e1.uuid,
5110 to_entity: e2.uuid,
5111 relation_type: crate::graph_memory::RelationType::RelatedTo,
5112 strength: l2_base_weight * semantic_weight,
5113 created_at: now,
5114 valid_at: now,
5115 invalidated_at: None,
5116 source_episode_id: None,
5117 context: fact.fact.chars().take(100).collect(),
5118 last_activated: now,
5119 activation_count: 1,
5120 ltp_status: crate::graph_memory::LtpStatus::None,
5121 tier: crate::graph_memory::EdgeTier::L2Episodic,
5122 activation_timestamps: None,
5123 entity_confidence: Some(fact.confidence),
5124 };
5125 if graph_guard.add_relationship(edge).is_ok() {
5126 edges_added += 1;
5127 }
5128 }
5129 }
5130 }
5131 }
5132
5133 if entities_added > 0 || edges_added > 0 {
5134 tracing::debug!(
5135 entities_added,
5136 edges_added,
5137 facts = facts.len(),
5138 "Connected facts to knowledge graph"
5139 );
5140 }
5141 }
5142
5143 pub fn graph_stats(&self) -> MemoryGraphStats {
5145 if let Some(graph) = &self.graph_memory {
5146 let g = graph.read();
5147 if let Ok(stats) = g.get_stats() {
5148 let (avg_strength, potentiated_count) = if let Ok(relationships) =
5150 g.get_all_relationships()
5151 {
5152 if relationships.is_empty() {
5153 (0.0, 0)
5154 } else {
5155 let total_strength: f32 = relationships.iter().map(|r| r.strength).sum();
5156 let potentiated =
5157 relationships.iter().filter(|r| r.is_potentiated()).count();
5158 (total_strength / relationships.len() as f32, potentiated)
5159 }
5160 } else {
5161 (0.0, 0)
5162 };
5163
5164 return MemoryGraphStats {
5165 node_count: stats.entity_count,
5166 edge_count: stats.relationship_count,
5167 avg_strength,
5168 potentiated_count,
5169 };
5170 }
5171 }
5172 MemoryGraphStats {
5174 node_count: 0,
5175 edge_count: 0,
5176 avg_strength: 0.0,
5177 potentiated_count: 0,
5178 }
5179 }
5180
5181 pub fn upsert(
5204 &self,
5205 external_id: String,
5206 mut experience: Experience,
5207 change_type: ChangeType,
5208 changed_by: Option<String>,
5209 change_reason: Option<String>,
5210 ) -> Result<(MemoryId, bool)> {
5211 if let Some(mut existing) = self.long_term_memory.find_by_external_id(&external_id)? {
5213 let memory_id = existing.id.clone();
5215
5216 existing.update_content(
5218 experience.content.clone(),
5219 change_type,
5220 changed_by,
5221 change_reason,
5222 );
5223
5224 if !experience.entities.is_empty() {
5226 existing.experience.entities = experience.entities;
5227 }
5228
5229 if !experience.tags.is_empty() {
5231 existing.experience.tags = experience.tags;
5232 }
5233
5234 let content_hash = Self::sha256_hash(&existing.experience.content);
5236 if let Some(cached_embedding) = self.content_cache.get(&content_hash) {
5237 existing.experience.embeddings = Some(cached_embedding.clone());
5238 } else {
5239 match self.embedder.encode(&existing.experience.content) {
5240 Ok(embedding) => {
5241 self.content_cache.insert(content_hash, embedding.clone());
5242 existing.experience.embeddings = Some(embedding);
5243 }
5244 Err(e) => {
5245 tracing::warn!("Failed to regenerate embedding during upsert: {}", e);
5246 }
5247 }
5248 }
5249
5250 let temporal =
5252 crate::memory::query_parser::extract_temporal_refs(&existing.experience.content);
5253 existing.experience.temporal_refs.clear();
5254 for temp_ref in temporal.refs {
5255 existing
5256 .experience
5257 .temporal_refs
5258 .push(temp_ref.date.to_string());
5259 }
5260
5261 self.long_term_memory.update(&existing)?;
5263
5264 if let Err(e) = self.retriever.reindex_memory(&existing) {
5266 tracing::warn!(
5267 "Failed to reindex memory {} in vector DB: {}",
5268 memory_id.0,
5269 e
5270 );
5271 }
5272
5273 if let Err(e) = self.hybrid_search.index_memory(
5275 &memory_id,
5276 &existing.experience.content,
5277 &existing.experience.tags,
5278 &existing.experience.entities,
5279 ) {
5280 tracing::warn!("Failed to reindex memory {} in BM25: {}", memory_id.0, e);
5281 }
5282 if let Err(e) = self.hybrid_search.commit_and_reload() {
5283 tracing::warn!("Failed to commit/reload BM25 index: {}", e);
5284 }
5285
5286 {
5288 let mut working = self.working_memory.write();
5289 if working.contains(&memory_id) {
5290 working.remove(&memory_id)?;
5291 working.add_shared(Arc::new(existing.clone()))?;
5292 }
5293 }
5294 {
5295 let mut session = self.session_memory.write();
5296 if session.contains(&memory_id) {
5297 session.remove(&memory_id)?;
5298 session.add_shared(Arc::new(existing.clone()))?;
5299 }
5300 }
5301
5302 tracing::info!(
5303 external_id = %external_id,
5304 memory_id = %memory_id.0,
5305 version = existing.version,
5306 "Memory upserted (update)"
5307 );
5308
5309 Ok((memory_id, true))
5310 } else {
5311 let memory_id = MemoryId(Uuid::new_v4());
5313 let importance = self.calculate_importance(&experience);
5314
5315 if experience.embeddings.is_none() {
5317 let content_hash = Self::sha256_hash(&experience.content);
5318 if let Some(cached_embedding) = self.content_cache.get(&content_hash) {
5319 experience.embeddings = Some(cached_embedding.clone());
5320 } else {
5321 match self.embedder.encode(&experience.content) {
5322 Ok(embedding) => {
5323 self.content_cache.insert(content_hash, embedding.clone());
5324 experience.embeddings = Some(embedding);
5325 }
5326 Err(e) => {
5327 tracing::warn!("Failed to generate embedding: {}", e);
5328 }
5329 }
5330 }
5331 }
5332
5333 if experience.temporal_refs.is_empty() {
5335 let temporal =
5336 crate::memory::query_parser::extract_temporal_refs(&experience.content);
5337 for temp_ref in temporal.refs {
5338 experience.temporal_refs.push(temp_ref.date.to_string());
5339 }
5340 }
5341
5342 let memory = Arc::new(Memory::new_with_external_id(
5344 memory_id.clone(),
5345 experience,
5346 importance,
5347 external_id.clone(),
5348 None, None, None, None, ));
5353
5354 self.long_term_memory.store(&memory)?;
5356
5357 self.logger.write().log_created(&memory, "working");
5359
5360 self.working_memory
5362 .write()
5363 .add_shared(Arc::clone(&memory))?;
5364
5365 if let Err(e) = self.retriever.index_memory(&memory) {
5367 tracing::warn!("Failed to index memory {} in vector DB: {}", memory.id.0, e);
5368 }
5369
5370 if let Some(graph) = &self.graph_memory {
5373 let now = chrono::Utc::now();
5374
5375 let ner_lookup = build_ner_lookup(&memory.experience.ner_entities);
5377
5378 let entity_names: Vec<&str> = memory
5380 .experience
5381 .entities
5382 .iter()
5383 .map(|s| s.as_str())
5384 .collect();
5385 let entity_embeddings: Vec<Option<Vec<f32>>> = if entity_names.is_empty() {
5386 Vec::new()
5387 } else {
5388 match self.embedder.encode_batch(&entity_names) {
5389 Ok(embs) => embs.into_iter().map(Some).collect(),
5390 Err(e) => {
5391 tracing::debug!(
5392 error = %e,
5393 "Entity name embedding failed, concept merge disabled for this batch"
5394 );
5395 vec![None; entity_names.len()]
5396 }
5397 }
5398 };
5399
5400 let entities_to_add: Vec<crate::graph_memory::EntityNode> = memory
5401 .experience
5402 .entities
5403 .iter()
5404 .zip(entity_embeddings.into_iter())
5405 .map(|(entity_name, embedding)| {
5406 let (label, salience) = resolve_entity_label(entity_name, &ner_lookup);
5407 crate::graph_memory::EntityNode {
5408 uuid: Uuid::new_v4(),
5409 name: entity_name.clone(),
5410 labels: vec![label],
5411 created_at: now,
5412 last_seen_at: now,
5413 mention_count: 1,
5414 summary: String::new(),
5415 attributes: std::collections::HashMap::new(),
5416 name_embedding: embedding,
5417 salience,
5418 is_proper_noun: entity_name
5419 .chars()
5420 .next()
5421 .map(|c| c.is_uppercase())
5422 .unwrap_or(false),
5423 }
5424 })
5425 .collect();
5426
5427 let cooccurrence_pairs = if !memory.experience.cooccurrence_pairs.is_empty() {
5429 memory.experience.cooccurrence_pairs.clone()
5430 } else {
5431 let entity_extractor = crate::graph_memory::EntityExtractor::new();
5432 entity_extractor.extract_cooccurrence_pairs(&memory.experience.content)
5433 };
5434
5435 let edge_context = format!("Co-occurred in memory {}", memory.id.0);
5436
5437 let graph_guard = graph.read();
5439
5440 for entity in entities_to_add {
5441 if let Err(e) = graph_guard.add_entity(entity.clone()) {
5442 tracing::debug!("Failed to add entity '{}' to graph: {}", entity.name, e);
5443 }
5444 }
5445
5446 let l1_base_weight = crate::graph_memory::EdgeTier::L1Working.initial_weight();
5448 for (entity1, entity2) in cooccurrence_pairs {
5449 if let (Ok(Some(e1)), Ok(Some(e2))) = (
5450 graph_guard.find_entity_by_name(&entity1),
5451 graph_guard.find_entity_by_name(&entity2),
5452 ) {
5453 let entity_confidence = Some((e1.salience + e2.salience) / 2.0);
5454
5455 let semantic_weight = match (&e1.name_embedding, &e2.name_embedding) {
5456 (Some(emb1), Some(emb2)) => {
5457 let sim = crate::similarity::cosine_similarity(emb1, emb2).max(0.0);
5458 EDGE_SEMANTIC_WEIGHT_FLOOR
5459 + (1.0 - EDGE_SEMANTIC_WEIGHT_FLOOR) * sim
5460 }
5461 _ => 1.0,
5462 };
5463
5464 let edge = crate::graph_memory::RelationshipEdge {
5465 uuid: Uuid::new_v4(),
5466 from_entity: e1.uuid,
5467 to_entity: e2.uuid,
5468 relation_type: crate::graph_memory::RelationType::CoOccurs,
5469 strength: l1_base_weight * semantic_weight,
5470 created_at: now,
5471 valid_at: now,
5472 invalidated_at: None,
5473 source_episode_id: Some(memory.id.0),
5474 context: edge_context.clone(),
5475 last_activated: now,
5476 activation_count: 1,
5477 ltp_status: crate::graph_memory::LtpStatus::None,
5478 tier: crate::graph_memory::EdgeTier::L1Working,
5479 activation_timestamps: None,
5480 entity_confidence,
5481 };
5482
5483 if let Err(e) = graph_guard.add_relationship(edge) {
5484 tracing::trace!(
5485 "Failed to add co-occurrence edge {}<->{}: {}",
5486 entity1,
5487 entity2,
5488 e
5489 );
5490 }
5491 }
5492 }
5493 }
5494
5495 if let Err(e) = self.hybrid_search.index_memory(
5497 &memory.id,
5498 &memory.experience.content,
5499 &memory.experience.tags,
5500 &memory.experience.entities,
5501 ) {
5502 tracing::warn!("Failed to index memory {} in BM25: {}", memory.id.0, e);
5503 }
5504 if let Err(e) = self.hybrid_search.commit_and_reload() {
5505 tracing::warn!("Failed to commit/reload BM25 index: {}", e);
5506 }
5507
5508 if importance > self.config.importance_threshold {
5510 self.session_memory
5511 .write()
5512 .add_shared(Arc::clone(&memory))?;
5513 }
5514
5515 self.stats.write().total_memories += 1;
5517
5518 tracing::info!(
5519 external_id = %external_id,
5520 memory_id = %memory_id.0,
5521 "Memory upserted (create)"
5522 );
5523
5524 Ok((memory_id, false))
5525 }
5526 }
5527
5528 pub fn get_memory_history(&self, memory_id: &MemoryId) -> Result<Vec<MemoryRevision>> {
5533 let memory = self.long_term_memory.get(memory_id)?;
5534 Ok(memory.history.clone())
5535 }
5536
5537 pub fn find_by_external_id(&self, external_id: &str) -> Result<Option<Memory>> {
5541 self.long_term_memory.find_by_external_id(external_id)
5542 }
5543
5544 pub fn run_maintenance(
5557 &self,
5558 decay_factor: f32,
5559 user_id: &str,
5560 is_heavy: bool,
5561 ) -> Result<MaintenanceResult> {
5562 let start_time = std::time::Instant::now();
5563 let now = chrono::Utc::now();
5564
5565 self.consolidate_if_needed()?;
5567
5568 let mut decayed_count = 0;
5570 let mut at_risk_count = 0;
5571 const AT_RISK_THRESHOLD: f32 = 0.2; {
5575 let working = self.working_memory.read();
5576 for memory in working.all_memories() {
5577 let activation_before = memory.activation();
5578 memory.decay_activation(decay_factor);
5579 let activation_after = memory.activation();
5580 decayed_count += 1;
5581
5582 if activation_before != activation_after {
5584 let at_risk = activation_after < AT_RISK_THRESHOLD;
5585 if at_risk {
5586 at_risk_count += 1;
5587 }
5588
5589 self.record_consolidation_event(ConsolidationEvent::MemoryDecayed {
5591 memory_id: memory.id.0.to_string(),
5592 content_preview: memory.experience.content.chars().take(50).collect(),
5593 activation_before,
5594 activation_after,
5595 at_risk,
5596 timestamp: now,
5597 });
5598 }
5599 }
5600 }
5601
5602 {
5604 let session = self.session_memory.read();
5605 for memory in session.all_memories() {
5606 let activation_before = memory.activation();
5607 memory.decay_activation(decay_factor);
5608 let activation_after = memory.activation();
5609 decayed_count += 1;
5610
5611 if activation_before != activation_after {
5613 let at_risk = activation_after < AT_RISK_THRESHOLD;
5614 if at_risk {
5615 at_risk_count += 1;
5616 }
5617
5618 self.record_consolidation_event(ConsolidationEvent::MemoryDecayed {
5620 memory_id: memory.id.0.to_string(),
5621 content_preview: memory.experience.content.chars().take(50).collect(),
5622 activation_before,
5623 activation_after,
5624 at_risk,
5625 timestamp: now,
5626 });
5627 }
5628 }
5629 }
5630
5631 let mut potentiated_count = 0;
5635 {
5636 let working = self.working_memory.read();
5638 for memory in working.all_memories() {
5639 if memory.access_count() >= POTENTIATION_ACCESS_THRESHOLD
5642 && memory.importance() < 0.95
5643 {
5644 let activation_before = memory.importance();
5645 memory.boost_importance(POTENTIATION_MAINTENANCE_BOOST);
5646 potentiated_count += 1;
5647
5648 self.record_consolidation_event(ConsolidationEvent::MemoryStrengthened {
5649 memory_id: memory.id.0.to_string(),
5650 content_preview: memory.experience.content.chars().take(50).collect(),
5651 activation_before,
5652 activation_after: memory.importance(),
5653 reason: StrengtheningReason::MaintenancePotentiation,
5654 timestamp: now,
5655 });
5656 }
5657 }
5658 }
5659 {
5660 let session = self.session_memory.read();
5662 for memory in session.all_memories() {
5663 if memory.access_count() >= POTENTIATION_ACCESS_THRESHOLD
5664 && memory.importance() < 0.95
5665 {
5666 let activation_before = memory.importance();
5667 memory.boost_importance(POTENTIATION_MAINTENANCE_BOOST);
5668 potentiated_count += 1;
5669
5670 self.record_consolidation_event(ConsolidationEvent::MemoryStrengthened {
5671 memory_id: memory.id.0.to_string(),
5672 content_preview: memory.experience.content.chars().take(50).collect(),
5673 activation_before,
5674 activation_after: memory.importance(),
5675 reason: StrengtheningReason::MaintenancePotentiation,
5676 timestamp: now,
5677 });
5678 }
5679 }
5680 }
5681
5682 if potentiated_count > 0 {
5683 tracing::debug!(
5684 "Potentiated {} memories during maintenance (access >= {})",
5685 potentiated_count,
5686 POTENTIATION_ACCESS_THRESHOLD
5687 );
5688 }
5689
5690 let (facts_decayed, facts_deleted) = if is_heavy {
5698 self.decay_facts_for_all_users().unwrap_or((0, 0))
5699 } else {
5700 (0, 0)
5701 };
5702 if facts_decayed > 0 || facts_deleted > 0 {
5703 tracing::debug!(
5704 "Temporal fact maintenance: {} decayed, {} deleted",
5705 facts_decayed,
5706 facts_deleted
5707 );
5708 }
5709
5710 let all_memories_for_heavy: Vec<SharedMemory> = if is_heavy {
5713 self.get_all_memories().unwrap_or_default()
5714 } else {
5715 Vec::new()
5716 };
5717
5718 let mut facts_extracted_count = 0;
5723 let mut facts_reinforced_count = 0;
5724 if is_heavy
5725 && self
5726 .fact_extraction_needed
5727 .swap(false, std::sync::atomic::Ordering::Relaxed)
5728 {
5729 let all_memories = &all_memories_for_heavy;
5730 if !all_memories.is_empty() {
5731 let mut watermark_millis = self
5736 .fact_extraction_watermark
5737 .load(std::sync::atomic::Ordering::Relaxed);
5738 if watermark_millis == 0 {
5739 watermark_millis = self
5740 .long_term_memory
5741 .get_fact_watermark(user_id)
5742 .or_else(|| self.fact_store.latest_fact_created_at(user_id))
5743 .unwrap_or(0);
5744 if watermark_millis > 0 {
5745 self.fact_extraction_watermark
5746 .store(watermark_millis, std::sync::atomic::Ordering::Relaxed);
5747 }
5748 }
5749 let watermark_dt = chrono::DateTime::from_timestamp_millis(watermark_millis)
5750 .unwrap_or(chrono::DateTime::<chrono::Utc>::MIN_UTC);
5751
5752 let memories: Vec<Memory> = all_memories
5753 .iter()
5754 .filter(|m| m.created_at > watermark_dt)
5755 .map(|arc_mem| arc_mem.as_ref().clone())
5756 .collect();
5757
5758 tracing::info!(
5759 total_memories = all_memories.len(),
5760 new_since_watermark = memories.len(),
5761 watermark = %watermark_dt.format("%Y-%m-%dT%H:%M:%S"),
5762 "Incremental fact extraction"
5763 );
5764
5765 let consolidator = compression::SemanticConsolidator::new();
5766 let consolidation_result = consolidator.consolidate(&memories);
5767
5768 if !consolidation_result.new_facts.is_empty() {
5769 let fact_texts: Vec<&str> = consolidation_result
5771 .new_facts
5772 .iter()
5773 .map(|f| f.fact.as_str())
5774 .collect();
5775 let fact_embeddings: Vec<Option<Vec<f32>>> = match self
5776 .embedder
5777 .encode_batch(&fact_texts)
5778 {
5779 Ok(embs) => embs.into_iter().map(Some).collect(),
5780 Err(e) => {
5781 tracing::debug!(
5782 error = %e,
5783 "Fact embedding batch failed, falling back to Jaccard-only dedup"
5784 );
5785 vec![None; fact_texts.len()]
5786 }
5787 };
5788
5789 let mut truly_new: Vec<(SemanticFact, Option<Vec<f32>>)> = Vec::new();
5790
5791 for (fact, embedding) in consolidation_result
5792 .new_facts
5793 .iter()
5794 .zip(fact_embeddings.into_iter())
5795 {
5796 match self.fact_store.find_similar(
5798 user_id,
5799 &fact.fact,
5800 &fact.related_entities,
5801 embedding.as_deref(),
5802 ) {
5803 Ok(Some(mut existing)) => {
5804 existing.support_count += 1;
5806 existing.last_reinforced = now;
5807 let confidence_before = existing.confidence;
5808 let boost = 0.1 * (1.0 - existing.confidence);
5809 existing.confidence = (existing.confidence + boost).min(1.0);
5810
5811 for src in &fact.source_memories {
5813 if !existing.source_memories.contains(src) {
5814 existing.source_memories.push(src.clone());
5815 }
5816 }
5817 for entity in &fact.related_entities {
5818 if !existing.related_entities.contains(entity) {
5819 existing.related_entities.push(entity.clone());
5820 }
5821 }
5822
5823 if let Err(e) = self.fact_store.update(user_id, &existing) {
5824 tracing::debug!("Failed to reinforce fact: {e}");
5825 } else {
5826 if let Some(ref emb) = embedding {
5828 let _ = self.fact_store.store_embedding(
5829 user_id,
5830 &existing.id,
5831 emb,
5832 );
5833 }
5834 facts_reinforced_count += 1;
5835 self.record_consolidation_event_for_user(
5836 user_id,
5837 ConsolidationEvent::FactReinforced {
5838 fact_id: existing.id.clone(),
5839 fact_content: existing.fact.clone(),
5840 confidence_before,
5841 confidence_after: existing.confidence,
5842 new_support_count: existing.support_count,
5843 timestamp: now,
5844 },
5845 );
5846 }
5847 }
5848 _ => {
5849 truly_new.push((fact.clone(), embedding));
5850 }
5851 }
5852 }
5853
5854 if !truly_new.is_empty() {
5856 let facts_only: Vec<SemanticFact> =
5857 truly_new.iter().map(|(f, _)| f.clone()).collect();
5858 match self.fact_store.store_batch(user_id, &facts_only) {
5859 Ok(stored) => {
5860 facts_extracted_count = stored;
5861 for (fact, embedding) in &truly_new {
5863 if let Some(emb) = embedding {
5864 let _ =
5865 self.fact_store.store_embedding(user_id, &fact.id, emb);
5866 }
5867 self.record_consolidation_event_for_user(
5868 user_id,
5869 ConsolidationEvent::FactExtracted {
5870 fact_id: fact.id.clone(),
5871 fact_content: fact.fact.clone(),
5872 confidence: fact.confidence,
5873 fact_type: format!("{:?}", fact.fact_type),
5874 source_memory_count: fact.source_memories.len(),
5875 timestamp: now,
5876 },
5877 );
5878 }
5879 }
5880 Err(e) => {
5881 tracing::warn!("Failed to store extracted facts: {e}");
5882 }
5883 }
5884
5885 self.connect_facts_to_graph(&facts_only);
5887 }
5888
5889 if facts_extracted_count > 0 || facts_reinforced_count > 0 {
5890 tracing::debug!(
5891 extracted = facts_extracted_count,
5892 reinforced = facts_reinforced_count,
5893 "Fact consolidation during maintenance"
5894 );
5895 }
5896 }
5897
5898 if !memories.is_empty() {
5903 let new_watermark = memories
5904 .iter()
5905 .map(|m| m.created_at.timestamp_millis())
5906 .max()
5907 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
5908 self.fact_extraction_watermark
5909 .store(new_watermark, std::sync::atomic::Ordering::Relaxed);
5910 self.long_term_memory
5911 .set_fact_watermark(user_id, new_watermark);
5912 }
5913 }
5914 } else {
5915 tracing::debug!("Fact extraction skipped: no new memories since last cycle");
5916 }
5917
5918 let mut replay_result = replay::ReplayCycleResult::default();
5927 {
5928 let pattern_result = self.pattern_detector.write().detect_patterns();
5930 let has_pattern_triggers = !pattern_result.triggers.is_empty();
5931
5932 if has_pattern_triggers {
5934 tracing::debug!(
5935 "Pattern detection: {} entity, {} semantic, {} temporal, {} salience, {} behavioral triggers",
5936 pattern_result.entity_patterns_found,
5937 pattern_result.semantic_clusters_found,
5938 pattern_result.temporal_clusters_found,
5939 pattern_result.salience_spikes_found,
5940 pattern_result.behavioral_changes_found
5941 );
5942
5943 for trigger in &pattern_result.triggers {
5945 self.record_consolidation_event(
5946 introspection::ConsolidationEvent::PatternTriggeredReplay {
5947 trigger_type: trigger.trigger_type_name().to_string(),
5948 memory_ids: trigger.memory_ids(),
5949 pattern_confidence: match trigger {
5950 pattern_detection::ReplayTrigger::EntityCoOccurrence {
5951 confidence,
5952 ..
5953 } => *confidence,
5954 pattern_detection::ReplayTrigger::SemanticCluster {
5955 avg_similarity,
5956 ..
5957 } => *avg_similarity,
5958 pattern_detection::ReplayTrigger::SalienceSpike {
5959 importance,
5960 ..
5961 } => *importance,
5962 _ => 0.7, },
5964 trigger_description: trigger.description(),
5965 timestamp: now,
5966 },
5967 );
5968 }
5969 }
5970
5971 let timer_should_replay = self.replay_manager.read().should_replay();
5973 let should_replay = is_heavy && (has_pattern_triggers || timer_should_replay);
5974
5975 if should_replay && !all_memories_for_heavy.is_empty() {
5976 let graph_ref = self.graph_memory.clone();
5978 let candidates_data: Vec<_> = all_memories_for_heavy
5979 .iter()
5980 .map(|m| {
5981 let connections: Vec<String> = if let Some(ref graph) = graph_ref {
5983 let graph_guard = graph.read();
5984 graph_guard
5985 .find_memory_associations(&m.id.0, 10)
5986 .unwrap_or_default()
5987 .into_iter()
5988 .map(|(uuid, _)| uuid.to_string())
5989 .collect()
5990 } else {
5991 Vec::new()
5992 };
5993 let arousal = m
5994 .experience
5995 .context
5996 .as_ref()
5997 .map(|c| c.emotional.arousal)
5998 .unwrap_or(0.3);
5999 (
6000 m.id.0.to_string(),
6001 m.importance(),
6002 arousal,
6003 m.created_at,
6004 connections,
6005 m.experience.content.chars().take(50).collect::<String>(),
6006 )
6007 })
6008 .collect();
6009
6010 let candidates = self
6012 .replay_manager
6013 .read()
6014 .identify_replay_candidates(&candidates_data);
6015
6016 if !candidates.is_empty() {
6017 let (memory_boosts, edge_boosts, events) =
6018 self.replay_manager.write().execute_replay(&candidates);
6019
6020 replay_result.memories_replayed = candidates.len();
6021 replay_result.edges_strengthened = edge_boosts.len();
6022 replay_result.total_priority_score =
6023 candidates.iter().map(|c| c.priority_score).sum();
6024
6025 replay_result.replay_memory_ids =
6027 candidates.iter().map(|c| c.memory_id.clone()).collect();
6028
6029 for (mem_id_str, boost) in &memory_boosts {
6031 if let Ok(mem_id) = uuid::Uuid::parse_str(mem_id_str) {
6032 if let Ok(memory) = self.long_term_memory.get(&MemoryId(mem_id)) {
6033 memory.boost_importance(*boost);
6034 if let Err(e) = self.long_term_memory.update(&memory) {
6035 tracing::debug!("Failed to persist replay boost: {e}");
6036 }
6037 }
6038 }
6039 }
6040
6041 replay_result.edge_boosts = edge_boosts;
6043 if !replay_result.edge_boosts.is_empty() {
6044 tracing::debug!(
6045 "Replay produced {} edge boosts (to be applied via GraphMemory)",
6046 replay_result.edge_boosts.len()
6047 );
6048 }
6049
6050 for event in events {
6052 self.record_consolidation_event(event);
6053 }
6054
6055 self.record_consolidation_event(ConsolidationEvent::ReplayCycleCompleted {
6057 memories_replayed: replay_result.memories_replayed,
6058 edges_strengthened: replay_result.edges_strengthened,
6059 total_priority_score: replay_result.total_priority_score,
6060 duration_ms: start_time.elapsed().as_millis() as u64,
6061 timestamp: now,
6062 });
6063
6064 tracing::debug!(
6065 "Replay cycle complete: {} memories replayed, {} edges strengthened",
6066 replay_result.memories_replayed,
6067 replay_result.edges_strengthened
6068 );
6069 }
6070 }
6071 }
6072
6073 self.pattern_detector.write().cleanup();
6076
6077 if is_heavy {
6080 self.auto_repair_and_compact();
6081 }
6082
6083 let duration_ms = start_time.elapsed().as_millis() as u64;
6084
6085 self.record_consolidation_event(ConsolidationEvent::MaintenanceCycleCompleted {
6087 memories_processed: decayed_count,
6088 memories_decayed: decayed_count, edges_pruned: 0, duration_ms,
6091 timestamp: now,
6092 });
6093
6094 tracing::debug!(
6095 "Maintenance complete: {} memories decayed (factor={}), {} at risk, {} replayed, took {}ms",
6096 decayed_count,
6097 decay_factor,
6098 at_risk_count,
6099 replay_result.memories_replayed,
6100 duration_ms
6101 );
6102
6103 Ok(MaintenanceResult {
6104 decayed_count,
6105 edge_boosts: replay_result.edge_boosts,
6106 replay_memory_ids: replay_result.replay_memory_ids,
6107 memories_replayed: replay_result.memories_replayed,
6108 total_priority_score: replay_result.total_priority_score,
6109 facts_extracted: facts_extracted_count,
6110 facts_reinforced: facts_reinforced_count,
6111 })
6112 }
6113
6114 pub fn get_consolidation_report(
6129 &self,
6130 since: chrono::DateTime<chrono::Utc>,
6131 until: Option<chrono::DateTime<chrono::Utc>>,
6132 ) -> ConsolidationReport {
6133 let until = until.unwrap_or_else(chrono::Utc::now);
6134 let events = self.consolidation_events.read();
6135 events.generate_report(since, until)
6136 }
6137
6138 pub fn get_consolidation_report_for_user(
6147 &self,
6148 user_id: &str,
6149 since: chrono::DateTime<chrono::Utc>,
6150 until: Option<chrono::DateTime<chrono::Utc>>,
6151 ) -> Result<ConsolidationReport> {
6152 let until = until.unwrap_or_else(chrono::Utc::now);
6153
6154 let persisted_events = self
6156 .learning_history
6157 .events_in_range(user_id, since, until)?;
6158
6159 let ephemeral_events = {
6161 let events = self.consolidation_events.read();
6162 events.events_since(since)
6163 };
6164
6165 let mut all_events: Vec<ConsolidationEvent> = Vec::new();
6168 let mut seen_keys: std::collections::HashSet<(
6169 i64,
6170 std::mem::Discriminant<ConsolidationEvent>,
6171 )> = std::collections::HashSet::new();
6172
6173 for stored in &persisted_events {
6175 let ts = stored.event.timestamp().timestamp_nanos_opt().unwrap_or(0);
6176 let key = (ts, std::mem::discriminant(&stored.event));
6177 if seen_keys.insert(key) {
6178 all_events.push(stored.event.clone());
6179 }
6180 }
6181
6182 let until_nanos = until.timestamp_nanos_opt().unwrap_or(i64::MAX);
6184 for event in ephemeral_events {
6185 let ts = event.timestamp().timestamp_nanos_opt().unwrap_or(0);
6186 let key = (ts, std::mem::discriminant(&event));
6187 if ts <= until_nanos && seen_keys.insert(key) {
6188 all_events.push(event);
6189 }
6190 }
6191
6192 all_events.sort_by(|a, b| a.timestamp().cmp(&b.timestamp()));
6194
6195 let report =
6197 ConsolidationEventBuffer::generate_report_from_events(&all_events, since, until);
6198
6199 Ok(report)
6200 }
6201
6202 pub fn get_consolidation_events_since(
6206 &self,
6207 since: chrono::DateTime<chrono::Utc>,
6208 ) -> Vec<ConsolidationEvent> {
6209 let events = self.consolidation_events.read();
6210 events.events_since(since)
6211 }
6212
6213 pub fn get_all_consolidation_events(&self) -> Vec<ConsolidationEvent> {
6215 let events = self.consolidation_events.read();
6216 events.all_events()
6217 }
6218
6219 pub fn record_consolidation_event(&self, event: ConsolidationEvent) {
6224 let mut events = self.consolidation_events.write();
6225 events.push(event);
6226 }
6227
6228 pub fn record_consolidation_event_for_user(&self, user_id: &str, event: ConsolidationEvent) {
6236 {
6238 let mut events = self.consolidation_events.write();
6239 events.push(event.clone());
6240 }
6241
6242 if event.is_significant() {
6244 if let Err(e) = self.learning_history.record(user_id, &event) {
6245 tracing::warn!(
6246 user_id = %user_id,
6247 event_type = ?std::mem::discriminant(&event),
6248 error = %e,
6249 "Failed to persist learning event"
6250 );
6251 }
6252 }
6253 }
6254
6255 pub fn clear_consolidation_events(&self) {
6257 let mut events = self.consolidation_events.write();
6258 events.clear();
6259 }
6260
6261 pub fn consolidation_event_count(&self) -> usize {
6263 let events = self.consolidation_events.read();
6264 events.len()
6265 }
6266
6267 pub fn distill_facts(
6287 &self,
6288 user_id: &str,
6289 min_support: usize,
6290 min_age_days: i64,
6291 ) -> Result<ConsolidationResult> {
6292 let all_memories = self.get_all_memories()?;
6294
6295 let mut watermark_millis = self
6297 .fact_extraction_watermark
6298 .load(std::sync::atomic::Ordering::Relaxed);
6299 if watermark_millis == 0 {
6300 watermark_millis = self
6301 .long_term_memory
6302 .get_fact_watermark(user_id)
6303 .or_else(|| self.fact_store.latest_fact_created_at(user_id))
6304 .unwrap_or(0);
6305 if watermark_millis > 0 {
6306 self.fact_extraction_watermark
6307 .store(watermark_millis, std::sync::atomic::Ordering::Relaxed);
6308 }
6309 }
6310 let watermark_dt = chrono::DateTime::from_timestamp_millis(watermark_millis)
6311 .unwrap_or(chrono::DateTime::<chrono::Utc>::MIN_UTC);
6312
6313 let memories: Vec<Memory> = all_memories
6314 .iter()
6315 .filter(|m| m.created_at > watermark_dt)
6316 .map(|arc_mem| arc_mem.as_ref().clone())
6317 .collect();
6318
6319 tracing::info!(
6320 total_memories = all_memories.len(),
6321 new_since_watermark = memories.len(),
6322 watermark = %watermark_dt.format("%Y-%m-%dT%H:%M:%S"),
6323 "Incremental fact extraction (on-demand)"
6324 );
6325
6326 let consolidator =
6328 compression::SemanticConsolidator::with_thresholds(min_support, min_age_days);
6329
6330 let result = consolidator.consolidate(&memories);
6332
6333 if !result.new_facts.is_empty() {
6335 let stored = self.fact_store.store_batch(user_id, &result.new_facts)?;
6336 tracing::info!(
6337 user_id = %user_id,
6338 facts_extracted = result.facts_extracted,
6339 facts_stored = stored,
6340 "Semantic distillation complete"
6341 );
6342
6343 let texts: Vec<&str> = result.new_facts.iter().map(|f| f.fact.as_str()).collect();
6345 if let Ok(batch_embs) = self.embedder.encode_batch(&texts) {
6346 for (fact, emb) in result.new_facts.iter().zip(batch_embs.iter()) {
6347 let _ = self.fact_store.store_embedding(user_id, &fact.id, emb);
6348 }
6349 }
6350
6351 for fact in &result.new_facts {
6353 self.record_consolidation_event_for_user(
6354 user_id,
6355 ConsolidationEvent::FactExtracted {
6356 fact_id: fact.id.clone(),
6357 fact_content: fact.fact.clone(),
6358 confidence: fact.confidence,
6359 fact_type: format!("{:?}", fact.fact_type),
6360 source_memory_count: fact.source_memories.len(),
6361 timestamp: chrono::Utc::now(),
6362 },
6363 );
6364 }
6365 }
6366
6367 if !memories.is_empty() {
6369 let new_watermark = chrono::Utc::now().timestamp_millis();
6370 self.fact_extraction_watermark
6371 .store(new_watermark, std::sync::atomic::Ordering::Relaxed);
6372 self.long_term_memory
6373 .set_fact_watermark(user_id, new_watermark);
6374 }
6375
6376 Ok(result)
6377 }
6378
6379 pub fn get_facts(&self, user_id: &str, limit: usize) -> Result<Vec<SemanticFact>> {
6385 self.fact_store.list(user_id, limit)
6386 }
6387
6388 pub fn get_facts_by_entity(
6395 &self,
6396 user_id: &str,
6397 entity: &str,
6398 limit: usize,
6399 ) -> Result<Vec<SemanticFact>> {
6400 self.fact_store.find_by_entity(user_id, entity, limit)
6401 }
6402
6403 pub fn get_facts_by_type(
6410 &self,
6411 user_id: &str,
6412 fact_type: FactType,
6413 limit: usize,
6414 ) -> Result<Vec<SemanticFact>> {
6415 self.fact_store.find_by_type(user_id, fact_type, limit)
6416 }
6417
6418 pub fn search_facts(
6425 &self,
6426 user_id: &str,
6427 query: &str,
6428 limit: usize,
6429 ) -> Result<Vec<SemanticFact>> {
6430 self.fact_store.search(user_id, query, limit)
6431 }
6432
6433 pub fn get_fact_stats(&self, user_id: &str) -> Result<facts::FactStats> {
6435 self.fact_store.stats(user_id)
6436 }
6437
6438 pub fn get_facts_for_graph_entities(
6444 &self,
6445 user_id: &str,
6446 entity_names: &[String],
6447 limit_per_entity: usize,
6448 ) -> Result<Vec<SemanticFact>> {
6449 let mut seen_ids = std::collections::HashSet::new();
6450 let mut results = Vec::new();
6451
6452 for name in entity_names {
6453 let facts = self
6454 .fact_store
6455 .find_by_entity(user_id, name, limit_per_entity)?;
6456 for fact in facts {
6457 if seen_ids.insert(fact.id.clone()) {
6458 results.push(fact);
6459 }
6460 }
6461 }
6462
6463 results.sort_by(|a, b| b.confidence.total_cmp(&a.confidence));
6464 Ok(results)
6465 }
6466
6467 pub fn reinforce_fact(
6472 &self,
6473 user_id: &str,
6474 fact_id: &str,
6475 memory_id: &MemoryId,
6476 ) -> Result<bool> {
6477 if let Some(mut fact) = self.fact_store.get(user_id, fact_id)? {
6478 let confidence_before = fact.confidence;
6480
6481 fact.support_count += 1;
6483 fact.last_reinforced = chrono::Utc::now();
6484
6485 let boost = 0.1 * (1.0 - fact.confidence);
6487 fact.confidence = (fact.confidence + boost).min(1.0);
6488
6489 if !fact.source_memories.contains(memory_id) {
6491 fact.source_memories.push(memory_id.clone());
6492 }
6493
6494 self.fact_store.update(user_id, &fact)?;
6496
6497 self.record_consolidation_event_for_user(
6499 user_id,
6500 ConsolidationEvent::FactReinforced {
6501 fact_id: fact.id.clone(),
6502 fact_content: fact.fact.clone(),
6503 confidence_before,
6504 confidence_after: fact.confidence,
6505 new_support_count: fact.support_count,
6506 timestamp: chrono::Utc::now(),
6507 },
6508 );
6509
6510 Ok(true)
6511 } else {
6512 Ok(false)
6513 }
6514 }
6515
6516 pub fn delete_fact(&self, user_id: &str, fact_id: &str) -> Result<bool> {
6518 self.fact_store.delete(user_id, fact_id)
6519 }
6520
6521 pub fn fact_store(&self) -> &Arc<facts::SemanticFactStore> {
6523 &self.fact_store
6524 }
6525
6526 pub fn lineage_graph(&self) -> &Arc<lineage::LineageGraph> {
6532 &self.lineage_graph
6533 }
6534
6535 pub fn infer_lineage_for_memory(
6540 &self,
6541 user_id: &str,
6542 new_memory: &Memory,
6543 candidate_memories: &[Memory],
6544 ) -> Result<Vec<LineageEdge>> {
6545 let mut inferred_edges = Vec::new();
6546
6547 for candidate in candidate_memories {
6548 if let Some((relation, confidence)) =
6550 self.lineage_graph.infer_relation(candidate, new_memory)
6551 {
6552 if !self
6554 .lineage_graph
6555 .edge_exists(user_id, &candidate.id, &new_memory.id)?
6556 {
6557 let edge = LineageEdge::inferred(
6558 candidate.id.clone(),
6559 new_memory.id.clone(),
6560 relation,
6561 confidence,
6562 );
6563 self.lineage_graph.store_edge(user_id, &edge)?;
6564 inferred_edges.push(edge);
6565 }
6566 }
6567 }
6568
6569 if lineage::LineageGraph::detect_branch_signal(&new_memory.experience.content) {
6571 self.lineage_graph.ensure_main_branch(user_id)?;
6573 }
6574
6575 Ok(inferred_edges)
6576 }
6577
6578 pub fn trace_lineage(
6580 &self,
6581 user_id: &str,
6582 memory_id: &MemoryId,
6583 direction: TraceDirection,
6584 max_depth: usize,
6585 ) -> Result<LineageTrace> {
6586 self.lineage_graph
6587 .trace(user_id, memory_id, direction, max_depth)
6588 }
6589
6590 pub fn find_root_cause(&self, user_id: &str, memory_id: &MemoryId) -> Result<Option<MemoryId>> {
6592 self.lineage_graph.find_root_cause(user_id, memory_id)
6593 }
6594
6595 pub fn lineage_stats(&self, user_id: &str) -> Result<LineageStats> {
6597 self.lineage_graph.stats(user_id)
6598 }
6599
6600 fn decay_facts_for_all_users(&self) -> Result<(usize, usize)> {
6608 use crate::constants::{
6609 FACT_DECAY_GRACE_DAYS, FACT_DECAY_HALF_LIFE_BASE_DAYS,
6610 FACT_DECAY_HALF_LIFE_PER_SUPPORT_DAYS,
6611 };
6612 const DELETE_CONFIDENCE: f32 = 0.1;
6613
6614 let now = chrono::Utc::now();
6615 let mut total_decayed = 0;
6616 let mut total_deleted = 0;
6617
6618 let user_ids = self.fact_store.list_users(100)?;
6619
6620 for user_id in &user_ids {
6621 let facts = self.fact_store.list(user_id, 10000)?;
6622
6623 for mut fact in facts {
6624 let days_since_reinforcement = (now - fact.last_reinforced).num_days();
6625
6626 if days_since_reinforcement <= FACT_DECAY_GRACE_DAYS {
6628 continue;
6629 }
6630
6631 let confidence_before = fact.confidence;
6632
6633 let elapsed = (days_since_reinforcement - FACT_DECAY_GRACE_DAYS) as f64;
6637 let half_life = FACT_DECAY_HALF_LIFE_BASE_DAYS
6638 + (fact.support_count as f64 * FACT_DECAY_HALF_LIFE_PER_SUPPORT_DAYS);
6639 let decay_factor = (0.5_f64).powf(elapsed / half_life) as f32;
6640 fact.confidence = (confidence_before * decay_factor).max(0.0);
6641
6642 if fact.confidence < DELETE_CONFIDENCE {
6644 self.record_consolidation_event_for_user(
6646 user_id,
6647 ConsolidationEvent::FactDeleted {
6648 fact_id: fact.id.clone(),
6649 fact_content: fact.fact.clone(),
6650 final_confidence: fact.confidence,
6651 support_count: fact.support_count,
6652 reason: format!("confidence_below_{}", DELETE_CONFIDENCE),
6653 timestamp: now,
6654 },
6655 );
6656
6657 self.fact_store.delete(user_id, &fact.id)?;
6658 total_deleted += 1;
6659 } else if (confidence_before - fact.confidence) > 0.001 {
6660 self.record_consolidation_event(ConsolidationEvent::FactDecayed {
6662 fact_id: fact.id.clone(),
6663 fact_content: fact.fact.clone(),
6664 confidence_before,
6665 confidence_after: fact.confidence,
6666 days_since_reinforcement,
6667 timestamp: now,
6668 });
6669
6670 self.fact_store.update(user_id, &fact)?;
6671 total_decayed += 1;
6672 }
6673 }
6674 }
6675
6676 if total_decayed > 0 || total_deleted > 0 {
6677 tracing::info!(
6678 facts_decayed = total_decayed,
6679 facts_deleted = total_deleted,
6680 "Fact maintenance complete"
6681 );
6682 }
6683
6684 Ok((total_decayed, total_deleted))
6685 }
6686}
6687
6688impl Drop for MemorySystem {
6693 fn drop(&mut self) {
6694 if let Err(e) = self.long_term_memory.flush() {
6699 tracing::error!("Failed to flush storage on shutdown: {}", e);
6700 }
6701 }
6702}