1use super::error::Result;
47use super::models::*;
48use super::reflection_engine::ReflectionEngine;
49use super::repository::MemoryRepository;
50use chrono::{DateTime, Duration, Utc};
51use serde::{Deserialize, Serialize};
52use std::collections::{HashMap, HashSet};
53use std::sync::Arc;
54use std::time::Instant;
55use tokio::sync::RwLock;
56use tracing::{info, warn};
57use uuid::Uuid;
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct EnhancedRetrievalConfig {
62 pub consolidation_boost_multiplier: f64,
64
65 pub recent_consolidation_threshold_hours: i64,
67
68 pub max_lineage_depth: usize,
70
71 pub include_insights: bool,
73
74 pub enable_query_caching: bool,
76
77 pub cache_ttl_seconds: u64,
79
80 pub max_cache_size: usize,
82
83 pub p95_latency_target_ms: u64,
85
86 pub insight_confidence_threshold: f64,
88
89 pub insight_importance_weight: f64,
91}
92
93impl Default for EnhancedRetrievalConfig {
94 fn default() -> Self {
95 Self {
96 consolidation_boost_multiplier: 2.0,
97 recent_consolidation_threshold_hours: 24,
98 max_lineage_depth: 3,
99 include_insights: true,
100 enable_query_caching: true,
101 cache_ttl_seconds: 300, max_cache_size: 1000,
103 p95_latency_target_ms: 200,
104 insight_confidence_threshold: 0.6,
105 insight_importance_weight: 1.5,
106 }
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct MemoryLineage {
113 pub memory_id: Uuid,
114 pub ancestors: Vec<MemoryAncestor>,
115 pub descendants: Vec<MemoryDescendant>,
116 pub related_insights: Vec<Uuid>,
117 pub consolidation_chain: Vec<ConsolidationEvent>,
118 pub provenance_metadata: ProvenanceMetadata,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct MemoryAncestor {
124 pub memory_id: Uuid,
125 pub relationship_type: RelationshipType,
126 pub depth: usize,
127 pub strength: f64,
128 pub created_at: DateTime<Utc>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct MemoryDescendant {
134 pub memory_id: Uuid,
135 pub relationship_type: RelationshipType,
136 pub depth: usize,
137 pub strength: f64,
138 pub created_at: DateTime<Utc>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
143pub enum RelationshipType {
144 ParentChild,
145 Refinement,
146 Consolidation,
147 InsightSource,
148 TemporalSequence,
149 SemanticSimilarity,
150 CausalLink,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ConsolidationEvent {
156 pub event_id: Uuid,
157 pub event_type: String,
158 pub previous_strength: f64,
159 pub new_strength: f64,
160 pub timestamp: DateTime<Utc>,
161 pub trigger_reason: Option<String>,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ProvenanceMetadata {
167 pub creation_source: String,
168 pub modification_history: Vec<ModificationRecord>,
169 pub quality_indicators: QualityIndicators,
170 pub reliability_score: f64,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ModificationRecord {
176 pub timestamp: DateTime<Utc>,
177 pub modification_type: String,
178 pub agent: String,
179 pub description: String,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct QualityIndicators {
185 pub coherence_score: f64,
186 pub completeness_score: f64,
187 pub accuracy_score: f64,
188 pub timeliness_score: f64,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, Default)]
193pub struct MemoryAwareSearchRequest {
194 pub base_request: SearchRequest,
195 pub include_lineage: Option<bool>,
196 pub include_consolidation_boost: Option<bool>,
197 pub include_insights: Option<bool>,
198 pub lineage_depth: Option<usize>,
199 pub use_cache: Option<bool>,
200 pub explain_boosting: Option<bool>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct MemoryAwareSearchResult {
206 pub memory: Memory,
207 pub base_similarity_score: f32,
208 pub consolidation_boost: f64,
209 pub final_score: f64,
210 pub is_insight: bool,
211 pub is_recently_consolidated: bool,
212 pub lineage: Option<MemoryLineage>,
213 pub boost_explanation: Option<BoostExplanation>,
214 pub cache_hit: bool,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct BoostExplanation {
220 pub consolidation_boost_applied: f64,
221 pub insight_boost_applied: f64,
222 pub lineage_boost_applied: f64,
223 pub recent_consolidation_factor: f64,
224 pub total_boost_multiplier: f64,
225 pub boost_reasons: Vec<String>,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct MemoryAwareSearchResponse {
231 pub results: Vec<MemoryAwareSearchResult>,
232 pub total_count: Option<i64>,
233 pub insights_included: i32,
234 pub recently_consolidated_count: i32,
235 pub lineage_depth_analyzed: usize,
236 pub cache_hit_ratio: f64,
237 pub execution_time_ms: u64,
238 pub performance_metrics: PerformanceMetrics,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct PerformanceMetrics {
244 pub database_query_time_ms: u64,
245 pub lineage_analysis_time_ms: u64,
246 pub consolidation_analysis_time_ms: u64,
247 pub cache_operation_time_ms: u64,
248 pub total_memories_analyzed: usize,
249 pub cache_operations: CacheOperationMetrics,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct CacheOperationMetrics {
255 pub hits: u32,
256 pub misses: u32,
257 pub evictions: u32,
258 pub hit_ratio: f64,
259}
260
261#[derive(Debug, Clone)]
263pub struct CacheEntry {
264 pub results: Vec<MemoryAwareSearchResult>,
265 pub created_at: DateTime<Utc>,
266 pub access_count: u32,
267 pub last_accessed: DateTime<Utc>,
268}
269
270pub struct QueryPatternCache {
272 cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
273 config: EnhancedRetrievalConfig,
274 metrics: Arc<RwLock<CacheOperationMetrics>>,
275}
276
277impl QueryPatternCache {
278 pub fn new(config: EnhancedRetrievalConfig) -> Self {
279 Self {
280 cache: Arc::new(RwLock::new(HashMap::new())),
281 config,
282 metrics: Arc::new(RwLock::new(CacheOperationMetrics {
283 hits: 0,
284 misses: 0,
285 evictions: 0,
286 hit_ratio: 0.0,
287 })),
288 }
289 }
290
291 pub fn generate_cache_key(&self, request: &MemoryAwareSearchRequest) -> String {
293 use std::collections::hash_map::DefaultHasher;
294 use std::hash::{Hash, Hasher};
295
296 let mut hasher = DefaultHasher::new();
297
298 if let Some(query_text) = &request.base_request.query_text {
300 query_text.hash(&mut hasher);
301 }
302
303 if let Some(embedding) = &request.base_request.query_embedding {
305 embedding.iter().take(10).for_each(|f| {
306 ((*f * 1000.0) as i32).hash(&mut hasher);
307 });
308 }
309
310 request.base_request.tier.hash(&mut hasher);
312 request.base_request.search_type.hash(&mut hasher);
313 request.base_request.limit.hash(&mut hasher);
314 request.include_lineage.hash(&mut hasher);
315 request.include_insights.hash(&mut hasher);
316
317 format!("{:x}", hasher.finish())
318 }
319
320 pub async fn get(&self, cache_key: &str) -> Option<Vec<MemoryAwareSearchResult>> {
322 {
323 let cache = self.cache.read().await;
324
325 if let Some(entry) = cache.get(cache_key) {
326 let age = Utc::now().signed_duration_since(entry.created_at);
327
328 if age.num_seconds() < self.config.cache_ttl_seconds as i64 {
329 let results = entry.results.clone();
330 drop(cache);
331
332 {
334 let mut cache_write = self.cache.write().await;
335 if let Some(entry) = cache_write.get_mut(cache_key) {
336 entry.access_count += 1;
337 entry.last_accessed = Utc::now();
338 }
339 }
340
341 {
343 let mut metrics = self.metrics.write().await;
344 metrics.hits += 1;
345 metrics.hit_ratio =
346 metrics.hits as f64 / (metrics.hits + metrics.misses) as f64;
347 }
348
349 return Some(results);
350 }
351 }
352 }
353
354 let mut metrics = self.metrics.write().await;
356 metrics.misses += 1;
357 metrics.hit_ratio = metrics.hits as f64 / (metrics.hits + metrics.misses) as f64;
358
359 None
360 }
361
362 pub async fn set(&self, cache_key: String, results: Vec<MemoryAwareSearchResult>) {
364 let mut cache = self.cache.write().await;
365
366 if cache.len() >= self.config.max_cache_size {
368 self.evict_lru(&mut cache).await;
369 }
370
371 let entry = CacheEntry {
372 results,
373 created_at: Utc::now(),
374 access_count: 0,
375 last_accessed: Utc::now(),
376 };
377
378 cache.insert(cache_key, entry);
379 }
380
381 async fn evict_lru(&self, cache: &mut HashMap<String, CacheEntry>) {
383 if let Some((oldest_key, _)) = cache
384 .iter()
385 .min_by_key(|(_, entry)| entry.last_accessed)
386 .map(|(k, v)| (k.clone(), v.clone()))
387 {
388 cache.remove(&oldest_key);
389
390 let mut metrics = self.metrics.write().await;
392 metrics.evictions += 1;
393 }
394 }
395
396 pub async fn get_metrics(&self) -> CacheOperationMetrics {
398 self.metrics.read().await.clone()
399 }
400
401 pub async fn cleanup_expired(&self) -> usize {
403 let mut cache = self.cache.write().await;
404 let now = Utc::now();
405 let ttl_duration = Duration::seconds(self.config.cache_ttl_seconds as i64);
406
407 let expired_keys: Vec<String> = cache
408 .iter()
409 .filter_map(|(key, entry)| {
410 if now.signed_duration_since(entry.created_at) > ttl_duration {
411 Some(key.clone())
412 } else {
413 None
414 }
415 })
416 .collect();
417
418 let count = expired_keys.len();
419 for key in expired_keys {
420 cache.remove(&key);
421 }
422
423 count
424 }
425}
426
427pub struct MemoryAwareRetrievalEngine {
429 config: EnhancedRetrievalConfig,
430 repository: Arc<MemoryRepository>,
431 reflection_engine: Option<Arc<ReflectionEngine>>,
432 cache: Option<QueryPatternCache>,
433}
434
435impl MemoryAwareRetrievalEngine {
436 pub fn new(
437 config: EnhancedRetrievalConfig,
438 repository: Arc<MemoryRepository>,
439 reflection_engine: Option<Arc<ReflectionEngine>>,
440 ) -> Self {
441 let cache = if config.enable_query_caching {
442 Some(QueryPatternCache::new(config.clone()))
443 } else {
444 None
445 };
446
447 Self {
448 config,
449 repository,
450 reflection_engine,
451 cache,
452 }
453 }
454
455 pub async fn search(
457 &self,
458 request: MemoryAwareSearchRequest,
459 ) -> Result<MemoryAwareSearchResponse> {
460 let start_time = Instant::now();
461 let mut performance_metrics = PerformanceMetrics {
462 database_query_time_ms: 0,
463 lineage_analysis_time_ms: 0,
464 consolidation_analysis_time_ms: 0,
465 cache_operation_time_ms: 0,
466 total_memories_analyzed: 0,
467 cache_operations: CacheOperationMetrics {
468 hits: 0,
469 misses: 0,
470 evictions: 0,
471 hit_ratio: 0.0,
472 },
473 };
474
475 let cache_key = if self.config.enable_query_caching && request.use_cache.unwrap_or(true) {
477 Some(self.cache.as_ref().unwrap().generate_cache_key(&request))
478 } else {
479 None
480 };
481
482 let cache_start = Instant::now();
483 let cached_results = if let Some(cache_key) = &cache_key {
484 self.cache.as_ref().unwrap().get(cache_key).await
485 } else {
486 None
487 };
488 performance_metrics.cache_operation_time_ms += cache_start.elapsed().as_millis() as u64;
489
490 if let Some(cached_results) = cached_results {
491 info!("Returning cached results for query");
492 return Ok(MemoryAwareSearchResponse {
493 results: cached_results,
494 total_count: None,
495 insights_included: 0,
496 recently_consolidated_count: 0,
497 lineage_depth_analyzed: 0,
498 cache_hit_ratio: 1.0,
499 execution_time_ms: start_time.elapsed().as_millis() as u64,
500 performance_metrics,
501 });
502 }
503
504 let db_start = Instant::now();
506 let base_response = self
507 .repository
508 .search_memories(request.base_request.clone())
509 .await?;
510 performance_metrics.database_query_time_ms = db_start.elapsed().as_millis() as u64;
511 performance_metrics.total_memories_analyzed = base_response.results.len();
512
513 let mut enhanced_results = Vec::new();
514 let mut insights_included = 0;
515 let mut recently_consolidated_count = 0;
516
517 let memory_ids: Vec<Uuid> = base_response.results.iter().map(|r| r.memory.id).collect();
519
520 let consolidation_start = Instant::now();
522 let consolidation_status_map = if request.include_consolidation_boost.unwrap_or(true) {
523 self.check_recently_consolidated_batch(&memory_ids).await?
524 } else {
525 HashMap::new()
526 };
527
528 let consolidation_boost_map = if request.include_consolidation_boost.unwrap_or(true) {
529 self.calculate_consolidation_boosts_batch(&memory_ids)
530 .await?
531 } else {
532 HashMap::new()
533 };
534 performance_metrics.consolidation_analysis_time_ms +=
535 consolidation_start.elapsed().as_millis() as u64;
536
537 let lineage_start = Instant::now();
539 let lineage_map = if request.include_lineage.unwrap_or(false) {
540 self.get_memory_lineages_batch(
541 &memory_ids,
542 request
543 .lineage_depth
544 .unwrap_or(self.config.max_lineage_depth),
545 )
546 .await?
547 } else {
548 HashMap::new()
549 };
550 performance_metrics.lineage_analysis_time_ms += lineage_start.elapsed().as_millis() as u64;
551
552 for base_result in base_response.results {
554 let is_recently_consolidated = consolidation_status_map
556 .get(&base_result.memory.id)
557 .unwrap_or(&false);
558
559 if *is_recently_consolidated {
560 recently_consolidated_count += 1;
561 }
562
563 let consolidation_boost = consolidation_boost_map
564 .get(&base_result.memory.id)
565 .unwrap_or(&1.0);
566
567 let is_insight = self.is_insight_memory(&base_result.memory);
569 if is_insight {
570 insights_included += 1;
571 }
572
573 let lineage = lineage_map.get(&base_result.memory.id).cloned();
575
576 let final_score = (base_result.combined_score as f64) * consolidation_boost;
578
579 let boost_explanation = if request.explain_boosting.unwrap_or(false) {
581 Some(BoostExplanation {
582 consolidation_boost_applied: *consolidation_boost,
583 insight_boost_applied: if is_insight {
584 self.config.insight_importance_weight
585 } else {
586 1.0
587 },
588 lineage_boost_applied: 1.0, recent_consolidation_factor: if *is_recently_consolidated { 1.0 } else { 0.0 },
590 total_boost_multiplier: *consolidation_boost,
591 boost_reasons: self
592 .generate_boost_reasons(*is_recently_consolidated, is_insight),
593 })
594 } else {
595 None
596 };
597
598 enhanced_results.push(MemoryAwareSearchResult {
599 memory: base_result.memory,
600 base_similarity_score: base_result.similarity_score,
601 consolidation_boost: *consolidation_boost,
602 final_score,
603 is_insight,
604 is_recently_consolidated: *is_recently_consolidated,
605 lineage,
606 boost_explanation,
607 cache_hit: false,
608 });
609 }
610
611 enhanced_results.sort_by(|a, b| {
613 b.final_score
614 .partial_cmp(&a.final_score)
615 .unwrap_or(std::cmp::Ordering::Equal)
616 });
617
618 if self.config.include_insights && request.include_insights.unwrap_or(true) {
620 let insight_results = self.get_relevant_insights(&request).await?;
621 let insight_count = insight_results.len() as i32;
622 enhanced_results.extend(insight_results);
623 insights_included += insight_count;
624 }
625
626 let cache_store_start = Instant::now();
628 if let Some(cache_key) = cache_key {
629 self.cache
630 .as_ref()
631 .unwrap()
632 .set(cache_key, enhanced_results.clone())
633 .await;
634 }
635 performance_metrics.cache_operation_time_ms +=
636 cache_store_start.elapsed().as_millis() as u64;
637
638 if let Some(cache) = &self.cache {
640 performance_metrics.cache_operations = cache.get_metrics().await;
641 }
642
643 let execution_time = start_time.elapsed().as_millis() as u64;
644
645 if execution_time > self.config.p95_latency_target_ms {
647 warn!(
648 "Search latency exceeded target: {}ms > {}ms",
649 execution_time, self.config.p95_latency_target_ms
650 );
651 }
652
653 Ok(MemoryAwareSearchResponse {
654 results: enhanced_results,
655 total_count: base_response.total_count,
656 insights_included,
657 recently_consolidated_count,
658 lineage_depth_analyzed: request
659 .lineage_depth
660 .unwrap_or(self.config.max_lineage_depth),
661 cache_hit_ratio: performance_metrics.cache_operations.hit_ratio,
662 execution_time_ms: execution_time,
663 performance_metrics,
664 })
665 }
666
667 async fn is_recently_consolidated(&self, memory: &Memory) -> Result<bool> {
669 let cutoff_time =
670 Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
671
672 let recent_events = sqlx::query_as::<_, MemoryConsolidationLog>(
674 "SELECT * FROM memory_consolidation_log WHERE memory_id = $1 AND created_at > $2 ORDER BY created_at DESC LIMIT 1"
675 )
676 .bind(memory.id)
677 .bind(cutoff_time)
678 .fetch_optional(self.repository.pool())
679 .await?;
680
681 Ok(recent_events.is_some())
682 }
683
684 async fn calculate_consolidation_boost(&self, memory: &Memory) -> Result<f64> {
686 let cutoff_time =
687 Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
688
689 let recent_event = sqlx::query_as::<_, MemoryConsolidationLog>(
691 "SELECT * FROM memory_consolidation_log WHERE memory_id = $1 AND created_at > $2 ORDER BY created_at DESC LIMIT 1"
692 )
693 .bind(memory.id)
694 .bind(cutoff_time)
695 .fetch_optional(self.repository.pool())
696 .await?;
697
698 if let Some(event) = recent_event {
699 let hours_since = Utc::now()
701 .signed_duration_since(event.created_at)
702 .num_hours() as f64;
703 let time_factor = (-hours_since / 24.0).exp(); let strength_factor =
705 (event.new_consolidation_strength - event.previous_consolidation_strength).max(0.0);
706
707 let boost = 1.0
708 + (self.config.consolidation_boost_multiplier - 1.0)
709 * time_factor
710 * (1.0 + strength_factor);
711 Ok(boost.min(self.config.consolidation_boost_multiplier))
712 } else {
713 Ok(1.0)
714 }
715 }
716
717 pub async fn calculate_consolidation_boosts_batch(
719 &self,
720 memory_ids: &[Uuid],
721 ) -> Result<HashMap<Uuid, f64>> {
722 if memory_ids.is_empty() {
723 return Ok(HashMap::new());
724 }
725
726 let cutoff_time =
727 Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
728
729 let recent_events = sqlx::query_as::<_, MemoryConsolidationLog>(
731 "SELECT DISTINCT ON (memory_id) * FROM memory_consolidation_log
732 WHERE memory_id = ANY($1) AND created_at > $2
733 ORDER BY memory_id, created_at DESC",
734 )
735 .bind(memory_ids)
736 .bind(cutoff_time)
737 .fetch_all(self.repository.pool())
738 .await?;
739
740 let mut boost_map = HashMap::new();
741
742 for &memory_id in memory_ids {
744 boost_map.insert(memory_id, 1.0);
745 }
746
747 for event in recent_events {
749 let hours_since = Utc::now()
750 .signed_duration_since(event.created_at)
751 .num_hours() as f64;
752
753 let time_factor = (-hours_since / 24.0).exp();
754 let strength_factor =
755 (event.new_consolidation_strength - event.previous_consolidation_strength).max(0.0);
756
757 let boost = 1.0
758 + (self.config.consolidation_boost_multiplier - 1.0)
759 * time_factor
760 * (1.0 + strength_factor);
761
762 boost_map.insert(
763 event.memory_id,
764 boost.min(self.config.consolidation_boost_multiplier),
765 );
766 }
767
768 Ok(boost_map)
769 }
770
771 pub async fn check_recently_consolidated_batch(
773 &self,
774 memory_ids: &[Uuid],
775 ) -> Result<HashMap<Uuid, bool>> {
776 if memory_ids.is_empty() {
777 return Ok(HashMap::new());
778 }
779
780 let cutoff_time =
781 Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
782
783 let recent_memory_ids: Vec<Uuid> = sqlx::query_scalar(
785 "SELECT DISTINCT memory_id FROM memory_consolidation_log
786 WHERE memory_id = ANY($1) AND created_at > $2",
787 )
788 .bind(memory_ids)
789 .bind(cutoff_time)
790 .fetch_all(self.repository.pool())
791 .await?;
792
793 let mut status_map = HashMap::new();
794
795 for &memory_id in memory_ids {
797 status_map.insert(memory_id, false);
798 }
799
800 for memory_id in recent_memory_ids {
802 status_map.insert(memory_id, true);
803 }
804
805 Ok(status_map)
806 }
807
808 async fn get_memory_lineages_batch(
810 &self,
811 memory_ids: &[Uuid],
812 max_depth: usize,
813 ) -> Result<HashMap<Uuid, MemoryLineage>> {
814 if memory_ids.is_empty() {
815 return Ok(HashMap::new());
816 }
817
818 let mut lineage_map = HashMap::new();
819
820 for &memory_id in memory_ids {
823 if let Ok(memory) = self.repository.get_memory(memory_id).await {
825 let lineage = self.get_memory_lineage(&memory, max_depth).await?;
826 lineage_map.insert(memory_id, lineage);
827 }
828 }
829
830 Ok(lineage_map)
831 }
832
833 fn is_insight_memory(&self, memory: &Memory) -> bool {
835 if let Some(metadata_obj) = memory.metadata.as_object() {
836 metadata_obj
837 .get("is_meta_memory")
838 .and_then(|v| v.as_bool())
839 .unwrap_or(false)
840 || metadata_obj.get("generated_by").and_then(|v| v.as_str())
841 == Some("reflection_engine")
842 } else {
843 false
844 }
845 }
846
847 async fn get_memory_lineage(&self, memory: &Memory, max_depth: usize) -> Result<MemoryLineage> {
849 let mut ancestors = Vec::new();
850 let mut descendants = Vec::new();
851 let mut visited = HashSet::new();
852 let _consolidation_chain: Vec<ConsolidationEvent> = Vec::new();
853
854 self.traverse_ancestors(memory.id, max_depth, 0, &mut ancestors, &mut visited)
856 .await?;
857
858 visited.clear();
860 self.traverse_descendants(memory.id, max_depth, 0, &mut descendants, &mut visited)
861 .await?;
862
863 let consolidation_chain = self.get_consolidation_chain(memory.id).await?;
865
866 let related_insights = self.get_related_insights(memory.id).await?;
868
869 let provenance_metadata = self.generate_provenance_metadata(memory).await?;
871
872 Ok(MemoryLineage {
873 memory_id: memory.id,
874 ancestors,
875 descendants,
876 related_insights,
877 consolidation_chain,
878 provenance_metadata,
879 })
880 }
881
882 async fn traverse_ancestors(
884 &self,
885 memory_id: Uuid,
886 max_depth: usize,
887 _current_depth: usize,
888 ancestors: &mut Vec<MemoryAncestor>,
889 visited: &mut HashSet<Uuid>,
890 ) -> Result<()> {
891 let mut stack = vec![(memory_id, 0)];
892
893 while let Some((current_id, depth)) = stack.pop() {
894 if depth >= max_depth || visited.contains(¤t_id) {
895 continue;
896 }
897
898 visited.insert(current_id);
899
900 let parent_memories = sqlx::query_as::<_, Memory>(
902 "SELECT * FROM memories WHERE id = (SELECT parent_id FROM memories WHERE id = $1) AND parent_id IS NOT NULL"
903 )
904 .bind(current_id)
905 .fetch_all(self.repository.pool())
906 .await?;
907
908 for parent in parent_memories {
909 ancestors.push(MemoryAncestor {
910 memory_id: parent.id,
911 relationship_type: RelationshipType::ParentChild,
912 depth: depth + 1,
913 strength: parent.importance_score,
914 created_at: parent.created_at,
915 });
916
917 stack.push((parent.id, depth + 1));
919 }
920 }
921
922 Ok(())
923 }
924
925 async fn traverse_descendants(
927 &self,
928 memory_id: Uuid,
929 max_depth: usize,
930 _current_depth: usize,
931 descendants: &mut Vec<MemoryDescendant>,
932 visited: &mut HashSet<Uuid>,
933 ) -> Result<()> {
934 let mut stack = vec![(memory_id, 0)];
935
936 while let Some((current_id, depth)) = stack.pop() {
937 if depth >= max_depth || visited.contains(¤t_id) {
938 continue;
939 }
940
941 visited.insert(current_id);
942
943 let child_memories = sqlx::query_as::<_, Memory>(
945 "SELECT * FROM memories WHERE parent_id = $1 AND status = 'active'",
946 )
947 .bind(current_id)
948 .fetch_all(self.repository.pool())
949 .await?;
950
951 for child in child_memories {
952 descendants.push(MemoryDescendant {
953 memory_id: child.id,
954 relationship_type: RelationshipType::ParentChild,
955 depth: depth + 1,
956 strength: child.importance_score,
957 created_at: child.created_at,
958 });
959
960 stack.push((child.id, depth + 1));
962 }
963 }
964
965 Ok(())
966 }
967
968 async fn get_consolidation_chain(&self, memory_id: Uuid) -> Result<Vec<ConsolidationEvent>> {
970 let events = sqlx::query_as::<_, MemoryConsolidationLog>(
971 "SELECT * FROM memory_consolidation_log WHERE memory_id = $1 ORDER BY created_at DESC LIMIT 10"
972 )
973 .bind(memory_id)
974 .fetch_all(self.repository.pool())
975 .await?;
976
977 Ok(events
978 .into_iter()
979 .map(|event| ConsolidationEvent {
980 event_id: event.id,
981 event_type: event.event_type,
982 previous_strength: event.previous_consolidation_strength,
983 new_strength: event.new_consolidation_strength,
984 timestamp: event.created_at,
985 trigger_reason: None, })
987 .collect())
988 }
989
990 async fn get_related_insights(&self, memory_id: Uuid) -> Result<Vec<Uuid>> {
992 let related = sqlx::query_scalar::<_, Uuid>(
994 r#"
995 SELECT id FROM memories
996 WHERE status = 'active'
997 AND metadata->>'is_meta_memory' = 'true'
998 AND metadata->'source_memory_ids' ? $1::text
999 "#,
1000 )
1001 .bind(memory_id.to_string())
1002 .fetch_all(self.repository.pool())
1003 .await?;
1004
1005 Ok(related)
1006 }
1007
1008 async fn generate_provenance_metadata(&self, memory: &Memory) -> Result<ProvenanceMetadata> {
1010 Ok(ProvenanceMetadata {
1011 creation_source: "memory_system".to_string(),
1012 modification_history: Vec::new(), quality_indicators: QualityIndicators {
1014 coherence_score: 0.8,
1015 completeness_score: 0.7,
1016 accuracy_score: 0.9,
1017 timeliness_score: 0.8,
1018 },
1019 reliability_score: memory.consolidation_strength / 10.0,
1020 })
1021 }
1022
1023 async fn get_relevant_insights(
1025 &self,
1026 request: &MemoryAwareSearchRequest,
1027 ) -> Result<Vec<MemoryAwareSearchResult>> {
1028 let insight_search = SearchRequest {
1030 query_text: request.base_request.query_text.clone(),
1031 query_embedding: request.base_request.query_embedding.clone(),
1032 search_type: Some(SearchType::Hybrid),
1033 limit: Some(5), ..Default::default()
1035 };
1036
1037 let insight_response = self.repository.search_memories(insight_search).await?;
1038
1039 let mut insight_results = Vec::new();
1040 for result in insight_response.results {
1041 if self.is_insight_memory(&result.memory) {
1042 let insight_boost = self.config.insight_importance_weight;
1044 let final_score = (result.combined_score as f64) * insight_boost;
1045
1046 insight_results.push(MemoryAwareSearchResult {
1047 memory: result.memory,
1048 base_similarity_score: result.similarity_score,
1049 consolidation_boost: insight_boost,
1050 final_score,
1051 is_insight: true,
1052 is_recently_consolidated: false,
1053 lineage: None,
1054 boost_explanation: None,
1055 cache_hit: false,
1056 });
1057 }
1058 }
1059
1060 Ok(insight_results)
1061 }
1062
1063 fn generate_boost_reasons(
1065 &self,
1066 is_recently_consolidated: bool,
1067 is_insight: bool,
1068 ) -> Vec<String> {
1069 let mut reasons = Vec::new();
1070
1071 if is_recently_consolidated {
1072 reasons.push("Recently consolidated memory - enhanced retrieval strength".to_string());
1073 }
1074
1075 if is_insight {
1076 reasons.push("Insight/reflection memory - meta-cognitive content".to_string());
1077 }
1078
1079 reasons
1080 }
1081
1082 pub async fn get_cache_stats(&self) -> Option<CacheOperationMetrics> {
1084 if let Some(cache) = &self.cache {
1085 Some(cache.get_metrics().await)
1086 } else {
1087 None
1088 }
1089 }
1090
1091 pub async fn clear_cache(&self) -> Result<()> {
1093 if let Some(cache) = &self.cache {
1094 let mut cache_map = cache.cache.write().await;
1095 cache_map.clear();
1096 }
1097 Ok(())
1098 }
1099
1100 pub async fn cleanup_cache(&self) -> Result<usize> {
1102 if let Some(cache) = &self.cache {
1103 Ok(cache.cleanup_expired().await)
1104 } else {
1105 Ok(0)
1106 }
1107 }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112 use super::*;
1113 use tokio;
1114
1115 #[tokio::test]
1116 async fn test_cache_key_generation() {
1117 let config = EnhancedRetrievalConfig::default();
1118 let cache = QueryPatternCache::new(config);
1119
1120 let request = MemoryAwareSearchRequest {
1121 base_request: SearchRequest {
1122 query_text: Some("test query".to_string()),
1123 ..Default::default()
1124 },
1125 include_lineage: Some(true),
1126 include_insights: Some(true),
1127 ..Default::default()
1128 };
1129
1130 let key1 = cache.generate_cache_key(&request);
1131 let key2 = cache.generate_cache_key(&request);
1132
1133 assert_eq!(key1, key2, "Same request should generate same cache key");
1134 }
1135
1136 #[tokio::test]
1137 async fn test_cache_expiration() {
1138 let mut config = EnhancedRetrievalConfig::default();
1139 config.cache_ttl_seconds = 1; let cache = QueryPatternCache::new(config);
1142 let results = vec![]; cache.set("test_key".to_string(), results).await;
1145
1146 assert!(cache.get("test_key").await.is_some());
1148
1149 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1151
1152 assert!(cache.get("test_key").await.is_none());
1154 }
1155
1156 #[test]
1157 fn test_relationship_type_serialization() {
1158 let rel_type = RelationshipType::ParentChild;
1159 let serialized = serde_json::to_string(&rel_type).unwrap();
1160 let deserialized: RelationshipType = serde_json::from_str(&serialized).unwrap();
1161 assert_eq!(rel_type, deserialized);
1162 }
1163
1164 #[test]
1165 fn test_boost_explanation_creation() {
1166 let explanation = BoostExplanation {
1167 consolidation_boost_applied: 2.0,
1168 insight_boost_applied: 1.5,
1169 lineage_boost_applied: 1.0,
1170 recent_consolidation_factor: 1.0,
1171 total_boost_multiplier: 2.0,
1172 boost_reasons: vec!["Recently consolidated".to_string()],
1173 };
1174
1175 assert_eq!(explanation.total_boost_multiplier, 2.0);
1176 assert_eq!(explanation.boost_reasons.len(), 1);
1177 }
1178}