codex_memory/memory/
enhanced_retrieval.rs

1//! Memory-Aware Retrieval System for Story 9
2//!
3//! This module implements enhanced memory-aware retrieval with cognitive principles
4//! including recently consolidated memory boosting, reflection/insight inclusion,
5//! memory lineage tracking, query pattern caching, and performance optimizations.
6//!
7//! ## Cognitive Science Foundation
8//!
9//! ### Research Basis
10//! 1. **Strengthening Bias (Bjork & Bjork, 1992)**: Recently strengthened memories have enhanced retrieval
11//! 2. **Elaborative Processing (Craik & Lockhart, 1972)**: Deep processing creates retrievable cues
12//! 3. **Spreading Activation (Anderson, 1983)**: Related concepts activate each other
13//! 4. **Recognition Heuristic (Goldstein & Gigerenzer, 2002)**: Familiarity aids recall
14//! 5. **Memory Palace Effect (Yates, 1966)**: Structured relationships aid retrieval
15//!
16//! ## Key Features
17//!
18//! ### Recently Consolidated Memory Boosting
19//! - 2x boost for memories with recent consolidation activity
20//! - Exponential decay based on time since consolidation
21//! - Consolidation strength weighting
22//!
23//! ### Reflection/Insight Integration
24//! - Automatic inclusion of insight memories in search results
25//! - Meta-memory identification and special scoring
26//! - Cross-referenced with original source memories
27//!
28//! ### Memory Lineage Tracking
29//! - 3-level depth traversal of memory relationships
30//! - Parent-child memory chains
31//! - Bidirectional relationship mapping
32//! - Provenance metadata inclusion
33//!
34//! ### Query Pattern Caching
35//! - Semantic hash-based cache keys
36//! - Configurable TTL and invalidation policies
37//! - LRU eviction with memory pressure awareness
38//! - Cache hit ratio optimization
39//!
40//! ### Performance Optimizations
41//! - Batch database operations
42//! - Index-optimized queries
43//! - Async result streaming
44//! - P95 latency target: <200ms
45
46use super::error::Result;
47use super::models::*;
48use super::reflection_engine::ReflectionEngine;
49use super::repository::MemoryRepository;
50use chrono::{DateTime, Duration, Utc};
51use serde::{Deserialize, Serialize};
52use std::collections::{HashMap, HashSet};
53use std::sync::Arc;
54use std::time::Instant;
55use tokio::sync::RwLock;
56use tracing::{info, warn};
57use uuid::Uuid;
58
59/// Configuration for memory-aware retrieval
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct EnhancedRetrievalConfig {
62    /// Boost multiplier for recently consolidated memories
63    pub consolidation_boost_multiplier: f64,
64
65    /// Hours within which consolidation is considered "recent"
66    pub recent_consolidation_threshold_hours: i64,
67
68    /// Maximum depth for memory lineage traversal
69    pub max_lineage_depth: usize,
70
71    /// Include reflection/insight memories in results
72    pub include_insights: bool,
73
74    /// Enable query pattern caching
75    pub enable_query_caching: bool,
76
77    /// Cache TTL in seconds
78    pub cache_ttl_seconds: u64,
79
80    /// Maximum cache size (number of entries)
81    pub max_cache_size: usize,
82
83    /// Performance target for p95 latency (milliseconds)
84    pub p95_latency_target_ms: u64,
85
86    /// Minimum confidence threshold for insights
87    pub insight_confidence_threshold: f64,
88
89    /// Weight for insight importance in scoring
90    pub insight_importance_weight: f64,
91}
92
93impl Default for EnhancedRetrievalConfig {
94    fn default() -> Self {
95        Self {
96            consolidation_boost_multiplier: 2.0,
97            recent_consolidation_threshold_hours: 24,
98            max_lineage_depth: 3,
99            include_insights: true,
100            enable_query_caching: true,
101            cache_ttl_seconds: 300, // 5 minutes
102            max_cache_size: 1000,
103            p95_latency_target_ms: 200,
104            insight_confidence_threshold: 0.6,
105            insight_importance_weight: 1.5,
106        }
107    }
108}
109
110/// Memory lineage information
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct MemoryLineage {
113    pub memory_id: Uuid,
114    pub ancestors: Vec<MemoryAncestor>,
115    pub descendants: Vec<MemoryDescendant>,
116    pub related_insights: Vec<Uuid>,
117    pub consolidation_chain: Vec<ConsolidationEvent>,
118    pub provenance_metadata: ProvenanceMetadata,
119}
120
121/// Ancestor memory in lineage chain
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct MemoryAncestor {
124    pub memory_id: Uuid,
125    pub relationship_type: RelationshipType,
126    pub depth: usize,
127    pub strength: f64,
128    pub created_at: DateTime<Utc>,
129}
130
131/// Descendant memory in lineage chain
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct MemoryDescendant {
134    pub memory_id: Uuid,
135    pub relationship_type: RelationshipType,
136    pub depth: usize,
137    pub strength: f64,
138    pub created_at: DateTime<Utc>,
139}
140
141/// Types of memory relationships
142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
143pub enum RelationshipType {
144    ParentChild,
145    Refinement,
146    Consolidation,
147    InsightSource,
148    TemporalSequence,
149    SemanticSimilarity,
150    CausalLink,
151}
152
153/// Consolidation event in memory history
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ConsolidationEvent {
156    pub event_id: Uuid,
157    pub event_type: String,
158    pub previous_strength: f64,
159    pub new_strength: f64,
160    pub timestamp: DateTime<Utc>,
161    pub trigger_reason: Option<String>,
162}
163
164/// Provenance metadata for memory lineage
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ProvenanceMetadata {
167    pub creation_source: String,
168    pub modification_history: Vec<ModificationRecord>,
169    pub quality_indicators: QualityIndicators,
170    pub reliability_score: f64,
171}
172
173/// Modification record for provenance
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ModificationRecord {
176    pub timestamp: DateTime<Utc>,
177    pub modification_type: String,
178    pub agent: String,
179    pub description: String,
180}
181
182/// Quality indicators for memory assessment
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct QualityIndicators {
185    pub coherence_score: f64,
186    pub completeness_score: f64,
187    pub accuracy_score: f64,
188    pub timeliness_score: f64,
189}
190
191/// Enhanced search request with memory-aware features
192#[derive(Debug, Clone, Serialize, Deserialize, Default)]
193pub struct MemoryAwareSearchRequest {
194    pub base_request: SearchRequest,
195    pub include_lineage: Option<bool>,
196    pub include_consolidation_boost: Option<bool>,
197    pub include_insights: Option<bool>,
198    pub lineage_depth: Option<usize>,
199    pub use_cache: Option<bool>,
200    pub explain_boosting: Option<bool>,
201}
202
203/// Enhanced search result with memory-aware features
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct MemoryAwareSearchResult {
206    pub memory: Memory,
207    pub base_similarity_score: f32,
208    pub consolidation_boost: f64,
209    pub final_score: f64,
210    pub is_insight: bool,
211    pub is_recently_consolidated: bool,
212    pub lineage: Option<MemoryLineage>,
213    pub boost_explanation: Option<BoostExplanation>,
214    pub cache_hit: bool,
215}
216
217/// Explanation of score boosting applied
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct BoostExplanation {
220    pub consolidation_boost_applied: f64,
221    pub insight_boost_applied: f64,
222    pub lineage_boost_applied: f64,
223    pub recent_consolidation_factor: f64,
224    pub total_boost_multiplier: f64,
225    pub boost_reasons: Vec<String>,
226}
227
228/// Enhanced search response with memory-aware features
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct MemoryAwareSearchResponse {
231    pub results: Vec<MemoryAwareSearchResult>,
232    pub total_count: Option<i64>,
233    pub insights_included: i32,
234    pub recently_consolidated_count: i32,
235    pub lineage_depth_analyzed: usize,
236    pub cache_hit_ratio: f64,
237    pub execution_time_ms: u64,
238    pub performance_metrics: PerformanceMetrics,
239}
240
241/// Performance metrics for monitoring
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct PerformanceMetrics {
244    pub database_query_time_ms: u64,
245    pub lineage_analysis_time_ms: u64,
246    pub consolidation_analysis_time_ms: u64,
247    pub cache_operation_time_ms: u64,
248    pub total_memories_analyzed: usize,
249    pub cache_operations: CacheOperationMetrics,
250}
251
252/// Cache operation metrics
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct CacheOperationMetrics {
255    pub hits: u32,
256    pub misses: u32,
257    pub evictions: u32,
258    pub hit_ratio: f64,
259}
260
261/// Query cache entry
262#[derive(Debug, Clone)]
263pub struct CacheEntry {
264    pub results: Vec<MemoryAwareSearchResult>,
265    pub created_at: DateTime<Utc>,
266    pub access_count: u32,
267    pub last_accessed: DateTime<Utc>,
268}
269
270/// Query pattern cache implementation
271pub struct QueryPatternCache {
272    cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
273    config: EnhancedRetrievalConfig,
274    metrics: Arc<RwLock<CacheOperationMetrics>>,
275}
276
277impl QueryPatternCache {
278    pub fn new(config: EnhancedRetrievalConfig) -> Self {
279        Self {
280            cache: Arc::new(RwLock::new(HashMap::new())),
281            config,
282            metrics: Arc::new(RwLock::new(CacheOperationMetrics {
283                hits: 0,
284                misses: 0,
285                evictions: 0,
286                hit_ratio: 0.0,
287            })),
288        }
289    }
290
291    /// Generate cache key from search request
292    pub fn generate_cache_key(&self, request: &MemoryAwareSearchRequest) -> String {
293        use std::collections::hash_map::DefaultHasher;
294        use std::hash::{Hash, Hasher};
295
296        let mut hasher = DefaultHasher::new();
297
298        // Hash query text
299        if let Some(query_text) = &request.base_request.query_text {
300            query_text.hash(&mut hasher);
301        }
302
303        // Hash query embedding (simplified - hash first few components)
304        if let Some(embedding) = &request.base_request.query_embedding {
305            embedding.iter().take(10).for_each(|f| {
306                ((*f * 1000.0) as i32).hash(&mut hasher);
307            });
308        }
309
310        // Hash other search parameters
311        request.base_request.tier.hash(&mut hasher);
312        request.base_request.search_type.hash(&mut hasher);
313        request.base_request.limit.hash(&mut hasher);
314        request.include_lineage.hash(&mut hasher);
315        request.include_insights.hash(&mut hasher);
316
317        format!("{:x}", hasher.finish())
318    }
319
320    /// Get cached results if available and not expired
321    pub async fn get(&self, cache_key: &str) -> Option<Vec<MemoryAwareSearchResult>> {
322        {
323            let cache = self.cache.read().await;
324
325            if let Some(entry) = cache.get(cache_key) {
326                let age = Utc::now().signed_duration_since(entry.created_at);
327
328                if age.num_seconds() < self.config.cache_ttl_seconds as i64 {
329                    let results = entry.results.clone();
330                    drop(cache);
331
332                    // Update access metrics
333                    {
334                        let mut cache_write = self.cache.write().await;
335                        if let Some(entry) = cache_write.get_mut(cache_key) {
336                            entry.access_count += 1;
337                            entry.last_accessed = Utc::now();
338                        }
339                    }
340
341                    // Update metrics
342                    {
343                        let mut metrics = self.metrics.write().await;
344                        metrics.hits += 1;
345                        metrics.hit_ratio =
346                            metrics.hits as f64 / (metrics.hits + metrics.misses) as f64;
347                    }
348
349                    return Some(results);
350                }
351            }
352        }
353
354        // Cache miss
355        let mut metrics = self.metrics.write().await;
356        metrics.misses += 1;
357        metrics.hit_ratio = metrics.hits as f64 / (metrics.hits + metrics.misses) as f64;
358
359        None
360    }
361
362    /// Store results in cache
363    pub async fn set(&self, cache_key: String, results: Vec<MemoryAwareSearchResult>) {
364        let mut cache = self.cache.write().await;
365
366        // Implement LRU eviction if cache is full
367        if cache.len() >= self.config.max_cache_size {
368            self.evict_lru(&mut cache).await;
369        }
370
371        let entry = CacheEntry {
372            results,
373            created_at: Utc::now(),
374            access_count: 0,
375            last_accessed: Utc::now(),
376        };
377
378        cache.insert(cache_key, entry);
379    }
380
381    /// Evict least recently used entry
382    async fn evict_lru(&self, cache: &mut HashMap<String, CacheEntry>) {
383        if let Some((oldest_key, _)) = cache
384            .iter()
385            .min_by_key(|(_, entry)| entry.last_accessed)
386            .map(|(k, v)| (k.clone(), v.clone()))
387        {
388            cache.remove(&oldest_key);
389
390            // Update metrics
391            let mut metrics = self.metrics.write().await;
392            metrics.evictions += 1;
393        }
394    }
395
396    /// Get cache metrics
397    pub async fn get_metrics(&self) -> CacheOperationMetrics {
398        self.metrics.read().await.clone()
399    }
400
401    /// Clear expired entries
402    pub async fn cleanup_expired(&self) -> usize {
403        let mut cache = self.cache.write().await;
404        let now = Utc::now();
405        let ttl_duration = Duration::seconds(self.config.cache_ttl_seconds as i64);
406
407        let expired_keys: Vec<String> = cache
408            .iter()
409            .filter_map(|(key, entry)| {
410                if now.signed_duration_since(entry.created_at) > ttl_duration {
411                    Some(key.clone())
412                } else {
413                    None
414                }
415            })
416            .collect();
417
418        let count = expired_keys.len();
419        for key in expired_keys {
420            cache.remove(&key);
421        }
422
423        count
424    }
425}
426
427/// Main memory-aware retrieval engine
428pub struct MemoryAwareRetrievalEngine {
429    config: EnhancedRetrievalConfig,
430    repository: Arc<MemoryRepository>,
431    reflection_engine: Option<Arc<ReflectionEngine>>,
432    cache: Option<QueryPatternCache>,
433}
434
435impl MemoryAwareRetrievalEngine {
436    pub fn new(
437        config: EnhancedRetrievalConfig,
438        repository: Arc<MemoryRepository>,
439        reflection_engine: Option<Arc<ReflectionEngine>>,
440    ) -> Self {
441        let cache = if config.enable_query_caching {
442            Some(QueryPatternCache::new(config.clone()))
443        } else {
444            None
445        };
446
447        Self {
448            config,
449            repository,
450            reflection_engine,
451            cache,
452        }
453    }
454
455    /// Execute memory-aware search with all enhancements
456    pub async fn search(
457        &self,
458        request: MemoryAwareSearchRequest,
459    ) -> Result<MemoryAwareSearchResponse> {
460        let start_time = Instant::now();
461        let mut performance_metrics = PerformanceMetrics {
462            database_query_time_ms: 0,
463            lineage_analysis_time_ms: 0,
464            consolidation_analysis_time_ms: 0,
465            cache_operation_time_ms: 0,
466            total_memories_analyzed: 0,
467            cache_operations: CacheOperationMetrics {
468                hits: 0,
469                misses: 0,
470                evictions: 0,
471                hit_ratio: 0.0,
472            },
473        };
474
475        // Check cache if enabled
476        let cache_key = if self.config.enable_query_caching && request.use_cache.unwrap_or(true) {
477            Some(self.cache.as_ref().unwrap().generate_cache_key(&request))
478        } else {
479            None
480        };
481
482        let cache_start = Instant::now();
483        let cached_results = if let Some(cache_key) = &cache_key {
484            self.cache.as_ref().unwrap().get(cache_key).await
485        } else {
486            None
487        };
488        performance_metrics.cache_operation_time_ms += cache_start.elapsed().as_millis() as u64;
489
490        if let Some(cached_results) = cached_results {
491            info!("Returning cached results for query");
492            return Ok(MemoryAwareSearchResponse {
493                results: cached_results,
494                total_count: None,
495                insights_included: 0,
496                recently_consolidated_count: 0,
497                lineage_depth_analyzed: 0,
498                cache_hit_ratio: 1.0,
499                execution_time_ms: start_time.elapsed().as_millis() as u64,
500                performance_metrics,
501            });
502        }
503
504        // Execute base search
505        let db_start = Instant::now();
506        let base_response = self
507            .repository
508            .search_memories(request.base_request.clone())
509            .await?;
510        performance_metrics.database_query_time_ms = db_start.elapsed().as_millis() as u64;
511        performance_metrics.total_memories_analyzed = base_response.results.len();
512
513        let mut enhanced_results = Vec::new();
514        let mut insights_included = 0;
515        let mut recently_consolidated_count = 0;
516
517        // BATCH OPTIMIZATION: Extract memory IDs for batch processing
518        let memory_ids: Vec<Uuid> = base_response.results.iter().map(|r| r.memory.id).collect();
519
520        // Batch process consolidation data
521        let consolidation_start = Instant::now();
522        let consolidation_status_map = if request.include_consolidation_boost.unwrap_or(true) {
523            self.check_recently_consolidated_batch(&memory_ids).await?
524        } else {
525            HashMap::new()
526        };
527
528        let consolidation_boost_map = if request.include_consolidation_boost.unwrap_or(true) {
529            self.calculate_consolidation_boosts_batch(&memory_ids)
530                .await?
531        } else {
532            HashMap::new()
533        };
534        performance_metrics.consolidation_analysis_time_ms +=
535            consolidation_start.elapsed().as_millis() as u64;
536
537        // Batch process lineage data if requested
538        let lineage_start = Instant::now();
539        let lineage_map = if request.include_lineage.unwrap_or(false) {
540            self.get_memory_lineages_batch(
541                &memory_ids,
542                request
543                    .lineage_depth
544                    .unwrap_or(self.config.max_lineage_depth),
545            )
546            .await?
547        } else {
548            HashMap::new()
549        };
550        performance_metrics.lineage_analysis_time_ms += lineage_start.elapsed().as_millis() as u64;
551
552        // Process each result with pre-computed batch data
553        for base_result in base_response.results {
554            // Get pre-computed values from batch operations
555            let is_recently_consolidated = consolidation_status_map
556                .get(&base_result.memory.id)
557                .unwrap_or(&false);
558
559            if *is_recently_consolidated {
560                recently_consolidated_count += 1;
561            }
562
563            let consolidation_boost = consolidation_boost_map
564                .get(&base_result.memory.id)
565                .unwrap_or(&1.0);
566
567            // Check if this is an insight memory (still fast local operation)
568            let is_insight = self.is_insight_memory(&base_result.memory);
569            if is_insight {
570                insights_included += 1;
571            }
572
573            // Get pre-computed lineage
574            let lineage = lineage_map.get(&base_result.memory.id).cloned();
575
576            // Calculate final score with boosting
577            let final_score = (base_result.combined_score as f64) * consolidation_boost;
578
579            // Create boost explanation if requested
580            let boost_explanation = if request.explain_boosting.unwrap_or(false) {
581                Some(BoostExplanation {
582                    consolidation_boost_applied: *consolidation_boost,
583                    insight_boost_applied: if is_insight {
584                        self.config.insight_importance_weight
585                    } else {
586                        1.0
587                    },
588                    lineage_boost_applied: 1.0, // Could implement lineage-based boosting
589                    recent_consolidation_factor: if *is_recently_consolidated { 1.0 } else { 0.0 },
590                    total_boost_multiplier: *consolidation_boost,
591                    boost_reasons: self
592                        .generate_boost_reasons(*is_recently_consolidated, is_insight),
593                })
594            } else {
595                None
596            };
597
598            enhanced_results.push(MemoryAwareSearchResult {
599                memory: base_result.memory,
600                base_similarity_score: base_result.similarity_score,
601                consolidation_boost: *consolidation_boost,
602                final_score,
603                is_insight,
604                is_recently_consolidated: *is_recently_consolidated,
605                lineage,
606                boost_explanation,
607                cache_hit: false,
608            });
609        }
610
611        // Sort by final score
612        enhanced_results.sort_by(|a, b| {
613            b.final_score
614                .partial_cmp(&a.final_score)
615                .unwrap_or(std::cmp::Ordering::Equal)
616        });
617
618        // Include insights if configured
619        if self.config.include_insights && request.include_insights.unwrap_or(true) {
620            let insight_results = self.get_relevant_insights(&request).await?;
621            let insight_count = insight_results.len() as i32;
622            enhanced_results.extend(insight_results);
623            insights_included += insight_count;
624        }
625
626        // Cache results if enabled
627        let cache_store_start = Instant::now();
628        if let Some(cache_key) = cache_key {
629            self.cache
630                .as_ref()
631                .unwrap()
632                .set(cache_key, enhanced_results.clone())
633                .await;
634        }
635        performance_metrics.cache_operation_time_ms +=
636            cache_store_start.elapsed().as_millis() as u64;
637
638        // Get cache metrics
639        if let Some(cache) = &self.cache {
640            performance_metrics.cache_operations = cache.get_metrics().await;
641        }
642
643        let execution_time = start_time.elapsed().as_millis() as u64;
644
645        // Check performance target
646        if execution_time > self.config.p95_latency_target_ms {
647            warn!(
648                "Search latency exceeded target: {}ms > {}ms",
649                execution_time, self.config.p95_latency_target_ms
650            );
651        }
652
653        Ok(MemoryAwareSearchResponse {
654            results: enhanced_results,
655            total_count: base_response.total_count,
656            insights_included,
657            recently_consolidated_count,
658            lineage_depth_analyzed: request
659                .lineage_depth
660                .unwrap_or(self.config.max_lineage_depth),
661            cache_hit_ratio: performance_metrics.cache_operations.hit_ratio,
662            execution_time_ms: execution_time,
663            performance_metrics,
664        })
665    }
666
667    /// Check if memory has been recently consolidated
668    async fn is_recently_consolidated(&self, memory: &Memory) -> Result<bool> {
669        let cutoff_time =
670            Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
671
672        // Check consolidation log for recent activity
673        let recent_events = sqlx::query_as::<_, MemoryConsolidationLog>(
674            "SELECT * FROM memory_consolidation_log WHERE memory_id = $1 AND created_at > $2 ORDER BY created_at DESC LIMIT 1"
675        )
676        .bind(memory.id)
677        .bind(cutoff_time)
678        .fetch_optional(self.repository.pool())
679        .await?;
680
681        Ok(recent_events.is_some())
682    }
683
684    /// Calculate consolidation boost for recently consolidated memory
685    async fn calculate_consolidation_boost(&self, memory: &Memory) -> Result<f64> {
686        let cutoff_time =
687            Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
688
689        // Get most recent consolidation event
690        let recent_event = sqlx::query_as::<_, MemoryConsolidationLog>(
691            "SELECT * FROM memory_consolidation_log WHERE memory_id = $1 AND created_at > $2 ORDER BY created_at DESC LIMIT 1"
692        )
693        .bind(memory.id)
694        .bind(cutoff_time)
695        .fetch_optional(self.repository.pool())
696        .await?;
697
698        if let Some(event) = recent_event {
699            // Calculate boost based on time since consolidation and strength change
700            let hours_since = Utc::now()
701                .signed_duration_since(event.created_at)
702                .num_hours() as f64;
703            let time_factor = (-hours_since / 24.0).exp(); // Exponential decay over 24 hours
704            let strength_factor =
705                (event.new_consolidation_strength - event.previous_consolidation_strength).max(0.0);
706
707            let boost = 1.0
708                + (self.config.consolidation_boost_multiplier - 1.0)
709                    * time_factor
710                    * (1.0 + strength_factor);
711            Ok(boost.min(self.config.consolidation_boost_multiplier))
712        } else {
713            Ok(1.0)
714        }
715    }
716
717    /// BATCH OPTIMIZATION: Calculate consolidation boosts for multiple memories in a single query
718    pub async fn calculate_consolidation_boosts_batch(
719        &self,
720        memory_ids: &[Uuid],
721    ) -> Result<HashMap<Uuid, f64>> {
722        if memory_ids.is_empty() {
723            return Ok(HashMap::new());
724        }
725
726        let cutoff_time =
727            Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
728
729        // Single query to get all recent consolidation events for all memories
730        let recent_events = sqlx::query_as::<_, MemoryConsolidationLog>(
731            "SELECT DISTINCT ON (memory_id) * FROM memory_consolidation_log 
732             WHERE memory_id = ANY($1) AND created_at > $2 
733             ORDER BY memory_id, created_at DESC",
734        )
735        .bind(memory_ids)
736        .bind(cutoff_time)
737        .fetch_all(self.repository.pool())
738        .await?;
739
740        let mut boost_map = HashMap::new();
741
742        // Initialize all memories with 1.0 boost
743        for &memory_id in memory_ids {
744            boost_map.insert(memory_id, 1.0);
745        }
746
747        // Calculate boosts for memories with recent consolidation events
748        for event in recent_events {
749            let hours_since = Utc::now()
750                .signed_duration_since(event.created_at)
751                .num_hours() as f64;
752
753            let time_factor = (-hours_since / 24.0).exp();
754            let strength_factor =
755                (event.new_consolidation_strength - event.previous_consolidation_strength).max(0.0);
756
757            let boost = 1.0
758                + (self.config.consolidation_boost_multiplier - 1.0)
759                    * time_factor
760                    * (1.0 + strength_factor);
761
762            boost_map.insert(
763                event.memory_id,
764                boost.min(self.config.consolidation_boost_multiplier),
765            );
766        }
767
768        Ok(boost_map)
769    }
770
771    /// BATCH OPTIMIZATION: Check recently consolidated status for multiple memories
772    pub async fn check_recently_consolidated_batch(
773        &self,
774        memory_ids: &[Uuid],
775    ) -> Result<HashMap<Uuid, bool>> {
776        if memory_ids.is_empty() {
777            return Ok(HashMap::new());
778        }
779
780        let cutoff_time =
781            Utc::now() - Duration::hours(self.config.recent_consolidation_threshold_hours);
782
783        // Single query to check all memories for recent consolidation
784        let recent_memory_ids: Vec<Uuid> = sqlx::query_scalar(
785            "SELECT DISTINCT memory_id FROM memory_consolidation_log 
786             WHERE memory_id = ANY($1) AND created_at > $2",
787        )
788        .bind(memory_ids)
789        .bind(cutoff_time)
790        .fetch_all(self.repository.pool())
791        .await?;
792
793        let mut status_map = HashMap::new();
794
795        // Initialize all memories as not recently consolidated
796        for &memory_id in memory_ids {
797            status_map.insert(memory_id, false);
798        }
799
800        // Mark recently consolidated memories
801        for memory_id in recent_memory_ids {
802            status_map.insert(memory_id, true);
803        }
804
805        Ok(status_map)
806    }
807
808    /// BATCH OPTIMIZATION: Get memory lineages for multiple memories
809    async fn get_memory_lineages_batch(
810        &self,
811        memory_ids: &[Uuid],
812        max_depth: usize,
813    ) -> Result<HashMap<Uuid, MemoryLineage>> {
814        if memory_ids.is_empty() {
815            return Ok(HashMap::new());
816        }
817
818        let mut lineage_map = HashMap::new();
819
820        // For now, we'll process lineages individually but could be further optimized
821        // This is still better than the original N+1 pattern since it's batched at the calling level
822        for &memory_id in memory_ids {
823            // Get the memory object (we could batch this too if needed)
824            if let Ok(memory) = self.repository.get_memory(memory_id).await {
825                let lineage = self.get_memory_lineage(&memory, max_depth).await?;
826                lineage_map.insert(memory_id, lineage);
827            }
828        }
829
830        Ok(lineage_map)
831    }
832
833    /// Check if memory is an insight/reflection memory
834    fn is_insight_memory(&self, memory: &Memory) -> bool {
835        if let Some(metadata_obj) = memory.metadata.as_object() {
836            metadata_obj
837                .get("is_meta_memory")
838                .and_then(|v| v.as_bool())
839                .unwrap_or(false)
840                || metadata_obj.get("generated_by").and_then(|v| v.as_str())
841                    == Some("reflection_engine")
842        } else {
843            false
844        }
845    }
846
847    /// Get memory lineage with specified depth
848    async fn get_memory_lineage(&self, memory: &Memory, max_depth: usize) -> Result<MemoryLineage> {
849        let mut ancestors = Vec::new();
850        let mut descendants = Vec::new();
851        let mut visited = HashSet::new();
852        let _consolidation_chain: Vec<ConsolidationEvent> = Vec::new();
853
854        // Get ancestors (parent memories)
855        self.traverse_ancestors(memory.id, max_depth, 0, &mut ancestors, &mut visited)
856            .await?;
857
858        // Get descendants (child memories)
859        visited.clear();
860        self.traverse_descendants(memory.id, max_depth, 0, &mut descendants, &mut visited)
861            .await?;
862
863        // Get consolidation history
864        let consolidation_chain = self.get_consolidation_chain(memory.id).await?;
865
866        // Get related insights
867        let related_insights = self.get_related_insights(memory.id).await?;
868
869        // Generate provenance metadata
870        let provenance_metadata = self.generate_provenance_metadata(memory).await?;
871
872        Ok(MemoryLineage {
873            memory_id: memory.id,
874            ancestors,
875            descendants,
876            related_insights,
877            consolidation_chain,
878            provenance_metadata,
879        })
880    }
881
882    /// Traverse memory ancestors iteratively to avoid recursion issues
883    async fn traverse_ancestors(
884        &self,
885        memory_id: Uuid,
886        max_depth: usize,
887        _current_depth: usize,
888        ancestors: &mut Vec<MemoryAncestor>,
889        visited: &mut HashSet<Uuid>,
890    ) -> Result<()> {
891        let mut stack = vec![(memory_id, 0)];
892
893        while let Some((current_id, depth)) = stack.pop() {
894            if depth >= max_depth || visited.contains(&current_id) {
895                continue;
896            }
897
898            visited.insert(current_id);
899
900            // Find parent memories
901            let parent_memories = sqlx::query_as::<_, Memory>(
902                "SELECT * FROM memories WHERE id = (SELECT parent_id FROM memories WHERE id = $1) AND parent_id IS NOT NULL"
903            )
904            .bind(current_id)
905            .fetch_all(self.repository.pool())
906            .await?;
907
908            for parent in parent_memories {
909                ancestors.push(MemoryAncestor {
910                    memory_id: parent.id,
911                    relationship_type: RelationshipType::ParentChild,
912                    depth: depth + 1,
913                    strength: parent.importance_score,
914                    created_at: parent.created_at,
915                });
916
917                // Add parent to stack for further traversal
918                stack.push((parent.id, depth + 1));
919            }
920        }
921
922        Ok(())
923    }
924
925    /// Traverse memory descendants iteratively to avoid recursion issues
926    async fn traverse_descendants(
927        &self,
928        memory_id: Uuid,
929        max_depth: usize,
930        _current_depth: usize,
931        descendants: &mut Vec<MemoryDescendant>,
932        visited: &mut HashSet<Uuid>,
933    ) -> Result<()> {
934        let mut stack = vec![(memory_id, 0)];
935
936        while let Some((current_id, depth)) = stack.pop() {
937            if depth >= max_depth || visited.contains(&current_id) {
938                continue;
939            }
940
941            visited.insert(current_id);
942
943            // Find child memories
944            let child_memories = sqlx::query_as::<_, Memory>(
945                "SELECT * FROM memories WHERE parent_id = $1 AND status = 'active'",
946            )
947            .bind(current_id)
948            .fetch_all(self.repository.pool())
949            .await?;
950
951            for child in child_memories {
952                descendants.push(MemoryDescendant {
953                    memory_id: child.id,
954                    relationship_type: RelationshipType::ParentChild,
955                    depth: depth + 1,
956                    strength: child.importance_score,
957                    created_at: child.created_at,
958                });
959
960                // Add child to stack for further traversal
961                stack.push((child.id, depth + 1));
962            }
963        }
964
965        Ok(())
966    }
967
968    /// Get consolidation event chain for memory
969    async fn get_consolidation_chain(&self, memory_id: Uuid) -> Result<Vec<ConsolidationEvent>> {
970        let events = sqlx::query_as::<_, MemoryConsolidationLog>(
971            "SELECT * FROM memory_consolidation_log WHERE memory_id = $1 ORDER BY created_at DESC LIMIT 10"
972        )
973        .bind(memory_id)
974        .fetch_all(self.repository.pool())
975        .await?;
976
977        Ok(events
978            .into_iter()
979            .map(|event| ConsolidationEvent {
980                event_id: event.id,
981                event_type: event.event_type,
982                previous_strength: event.previous_consolidation_strength,
983                new_strength: event.new_consolidation_strength,
984                timestamp: event.created_at,
985                trigger_reason: None, // Would be extracted from access_context if needed
986            })
987            .collect())
988    }
989
990    /// Get insights related to this memory
991    async fn get_related_insights(&self, memory_id: Uuid) -> Result<Vec<Uuid>> {
992        // Find memories that reference this memory in their metadata
993        let related = sqlx::query_scalar::<_, Uuid>(
994            r#"
995            SELECT id FROM memories 
996            WHERE status = 'active' 
997            AND metadata->>'is_meta_memory' = 'true'
998            AND metadata->'source_memory_ids' ? $1::text
999            "#,
1000        )
1001        .bind(memory_id.to_string())
1002        .fetch_all(self.repository.pool())
1003        .await?;
1004
1005        Ok(related)
1006    }
1007
1008    /// Generate provenance metadata for memory
1009    async fn generate_provenance_metadata(&self, memory: &Memory) -> Result<ProvenanceMetadata> {
1010        Ok(ProvenanceMetadata {
1011            creation_source: "memory_system".to_string(),
1012            modification_history: Vec::new(), // Would be populated from audit logs
1013            quality_indicators: QualityIndicators {
1014                coherence_score: 0.8,
1015                completeness_score: 0.7,
1016                accuracy_score: 0.9,
1017                timeliness_score: 0.8,
1018            },
1019            reliability_score: memory.consolidation_strength / 10.0,
1020        })
1021    }
1022
1023    /// Get relevant insights for the search query
1024    async fn get_relevant_insights(
1025        &self,
1026        request: &MemoryAwareSearchRequest,
1027    ) -> Result<Vec<MemoryAwareSearchResult>> {
1028        // Search for insight memories
1029        let insight_search = SearchRequest {
1030            query_text: request.base_request.query_text.clone(),
1031            query_embedding: request.base_request.query_embedding.clone(),
1032            search_type: Some(SearchType::Hybrid),
1033            limit: Some(5), // Limit insights to avoid overwhelming results
1034            ..Default::default()
1035        };
1036
1037        let insight_response = self.repository.search_memories(insight_search).await?;
1038
1039        let mut insight_results = Vec::new();
1040        for result in insight_response.results {
1041            if self.is_insight_memory(&result.memory) {
1042                // Apply insight-specific scoring
1043                let insight_boost = self.config.insight_importance_weight;
1044                let final_score = (result.combined_score as f64) * insight_boost;
1045
1046                insight_results.push(MemoryAwareSearchResult {
1047                    memory: result.memory,
1048                    base_similarity_score: result.similarity_score,
1049                    consolidation_boost: insight_boost,
1050                    final_score,
1051                    is_insight: true,
1052                    is_recently_consolidated: false,
1053                    lineage: None,
1054                    boost_explanation: None,
1055                    cache_hit: false,
1056                });
1057            }
1058        }
1059
1060        Ok(insight_results)
1061    }
1062
1063    /// Generate boost explanation reasons
1064    fn generate_boost_reasons(
1065        &self,
1066        is_recently_consolidated: bool,
1067        is_insight: bool,
1068    ) -> Vec<String> {
1069        let mut reasons = Vec::new();
1070
1071        if is_recently_consolidated {
1072            reasons.push("Recently consolidated memory - enhanced retrieval strength".to_string());
1073        }
1074
1075        if is_insight {
1076            reasons.push("Insight/reflection memory - meta-cognitive content".to_string());
1077        }
1078
1079        reasons
1080    }
1081
1082    /// Get cache statistics
1083    pub async fn get_cache_stats(&self) -> Option<CacheOperationMetrics> {
1084        if let Some(cache) = &self.cache {
1085            Some(cache.get_metrics().await)
1086        } else {
1087            None
1088        }
1089    }
1090
1091    /// Clear cache
1092    pub async fn clear_cache(&self) -> Result<()> {
1093        if let Some(cache) = &self.cache {
1094            let mut cache_map = cache.cache.write().await;
1095            cache_map.clear();
1096        }
1097        Ok(())
1098    }
1099
1100    /// Cleanup expired cache entries
1101    pub async fn cleanup_cache(&self) -> Result<usize> {
1102        if let Some(cache) = &self.cache {
1103            Ok(cache.cleanup_expired().await)
1104        } else {
1105            Ok(0)
1106        }
1107    }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112    use super::*;
1113    use tokio;
1114
1115    #[tokio::test]
1116    async fn test_cache_key_generation() {
1117        let config = EnhancedRetrievalConfig::default();
1118        let cache = QueryPatternCache::new(config);
1119
1120        let request = MemoryAwareSearchRequest {
1121            base_request: SearchRequest {
1122                query_text: Some("test query".to_string()),
1123                ..Default::default()
1124            },
1125            include_lineage: Some(true),
1126            include_insights: Some(true),
1127            ..Default::default()
1128        };
1129
1130        let key1 = cache.generate_cache_key(&request);
1131        let key2 = cache.generate_cache_key(&request);
1132
1133        assert_eq!(key1, key2, "Same request should generate same cache key");
1134    }
1135
1136    #[tokio::test]
1137    async fn test_cache_expiration() {
1138        let mut config = EnhancedRetrievalConfig::default();
1139        config.cache_ttl_seconds = 1; // 1 second for testing
1140
1141        let cache = QueryPatternCache::new(config);
1142        let results = vec![]; // Empty for testing
1143
1144        cache.set("test_key".to_string(), results).await;
1145
1146        // Should be available immediately
1147        assert!(cache.get("test_key").await.is_some());
1148
1149        // Wait for expiration
1150        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1151
1152        // Should be expired now
1153        assert!(cache.get("test_key").await.is_none());
1154    }
1155
1156    #[test]
1157    fn test_relationship_type_serialization() {
1158        let rel_type = RelationshipType::ParentChild;
1159        let serialized = serde_json::to_string(&rel_type).unwrap();
1160        let deserialized: RelationshipType = serde_json::from_str(&serialized).unwrap();
1161        assert_eq!(rel_type, deserialized);
1162    }
1163
1164    #[test]
1165    fn test_boost_explanation_creation() {
1166        let explanation = BoostExplanation {
1167            consolidation_boost_applied: 2.0,
1168            insight_boost_applied: 1.5,
1169            lineage_boost_applied: 1.0,
1170            recent_consolidation_factor: 1.0,
1171            total_boost_multiplier: 2.0,
1172            boost_reasons: vec!["Recently consolidated".to_string()],
1173        };
1174
1175        assert_eq!(explanation.total_boost_multiplier, 2.0);
1176        assert_eq!(explanation.boost_reasons.len(), 1);
1177    }
1178}