ceylon_next/memory/advanced/
consolidation.rs

1//! Memory Consolidation - Background Processing and Maintenance
2//!
3//! This module handles:
4//! - Background memory processing
5//! - Duplicate detection and merging
6//! - Memory aging and decay
7//! - Memory cleanup and optimization
8
9use super::{EnhancedMemoryEntry, ImportanceLevel, MemoryConfig};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tokio::time::{interval, Duration};
14
15/// Tasks that can be performed during consolidation
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum ConsolidationTask {
18    /// Detect and merge duplicate memories
19    DeduplicateMemories,
20    /// Apply decay to old memories
21    ApplyDecay,
22    /// Clean up low-value memories
23    Cleanup,
24    /// Reindex memories for faster retrieval
25    Reindex,
26    /// All consolidation tasks
27    All,
28}
29
30/// Result of a consolidation operation
31#[derive(Debug, Clone)]
32pub struct ConsolidationResult {
33    pub task: ConsolidationTask,
34    pub memories_processed: usize,
35    pub memories_merged: usize,
36    pub memories_deleted: usize,
37    pub duration_ms: u64,
38}
39
40/// Memory consolidator
41pub struct Consolidator {
42    /// Memories being managed
43    memories: Arc<RwLock<HashMap<String, EnhancedMemoryEntry>>>,
44    /// Configuration
45    config: MemoryConfig,
46    /// Running flag
47    running: Arc<RwLock<bool>>,
48}
49
50impl Consolidator {
51    /// Create a new consolidator
52    pub fn new(config: MemoryConfig) -> Self {
53        Self {
54            memories: Arc::new(RwLock::new(HashMap::new())),
55            config,
56            running: Arc::new(RwLock::new(false)),
57        }
58    }
59
60    /// Start background consolidation process
61    pub async fn start_background_consolidation(&self) {
62        let memories = self.memories.clone();
63        let config = self.config.clone();
64        let running = self.running.clone();
65
66        // Set running flag
67        *running.write().await = true;
68
69        tokio::spawn(async move {
70            let mut ticker = interval(Duration::from_secs(config.consolidation_interval));
71
72            loop {
73                ticker.tick().await;
74
75                // Check if still running
76                if !*running.read().await {
77                    break;
78                }
79
80                // Perform consolidation
81                let consolidator = Consolidator {
82                    memories: memories.clone(),
83                    config: config.clone(),
84                    running: running.clone(),
85                };
86
87                if let Err(e) = consolidator.consolidate(ConsolidationTask::All).await {
88                    eprintln!("Consolidation error: {}", e);
89                }
90            }
91        });
92    }
93
94    /// Stop background consolidation
95    pub async fn stop_background_consolidation(&self) {
96        *self.running.write().await = false;
97    }
98
99    /// Perform consolidation tasks
100    pub async fn consolidate(&self, task: ConsolidationTask) -> Result<ConsolidationResult, String> {
101        let start = std::time::Instant::now();
102        let mut result = ConsolidationResult {
103            task: task.clone(),
104            memories_processed: 0,
105            memories_merged: 0,
106            memories_deleted: 0,
107            duration_ms: 0,
108        };
109
110        match task {
111            ConsolidationTask::DeduplicateMemories => {
112                result = self.deduplicate_memories().await?;
113            }
114            ConsolidationTask::ApplyDecay => {
115                result = self.apply_decay().await?;
116            }
117            ConsolidationTask::Cleanup => {
118                result = self.cleanup_memories().await?;
119            }
120            ConsolidationTask::Reindex => {
121                result = self.reindex_memories().await?;
122            }
123            ConsolidationTask::All => {
124                // Run all tasks
125                let dedup = self.deduplicate_memories().await?;
126                let decay = self.apply_decay().await?;
127                let cleanup = self.cleanup_memories().await?;
128
129                result.memories_processed = dedup.memories_processed + decay.memories_processed + cleanup.memories_processed;
130                result.memories_merged = dedup.memories_merged;
131                result.memories_deleted = cleanup.memories_deleted;
132            }
133        }
134
135        result.duration_ms = start.elapsed().as_millis() as u64;
136        Ok(result)
137    }
138
139    /// Detect and merge duplicate memories
140    async fn deduplicate_memories(&self) -> Result<ConsolidationResult, String> {
141        let mut memories = self.memories.write().await;
142        let memory_list: Vec<EnhancedMemoryEntry> = memories.values().cloned().collect();
143
144        let mut processed = 0;
145        let mut merged = 0;
146        let mut to_remove = Vec::new();
147
148        // Compare each pair of memories
149        for i in 0..memory_list.len() {
150            for j in (i + 1)..memory_list.len() {
151                processed += 1;
152
153                let similarity = self.calculate_similarity(&memory_list[i], &memory_list[j]);
154
155                if similarity >= self.config.duplicate_threshold {
156                    // Merge the two memories
157                    let (keep_id, remove_id) = if memory_list[i].relevance_score() >= memory_list[j].relevance_score() {
158                        (&memory_list[i].entry.id, &memory_list[j].entry.id)
159                    } else {
160                        (&memory_list[j].entry.id, &memory_list[i].entry.id)
161                    };
162
163                    // Merge metadata - clone data first to avoid borrow conflicts
164                    let merge_data = memories.get(remove_id).map(|remove_entry| {
165                        (
166                            remove_entry.access_count,
167                            remove_entry.key_points.clone(),
168                            remove_entry.entities.clone(),
169                        )
170                    });
171
172                    if let Some((access_count, key_points, entities)) = merge_data {
173                        if let Some(keep_entry) = memories.get_mut(keep_id) {
174                            keep_entry.access_count += access_count;
175                            keep_entry.related_memories.push(remove_id.clone());
176
177                            // Merge key points
178                            for point in &key_points {
179                                if !keep_entry.key_points.contains(point) {
180                                    keep_entry.key_points.push(point.clone());
181                                }
182                            }
183
184                            // Merge entities
185                            for entity in &entities {
186                                if !keep_entry.entities.contains(entity) {
187                                    keep_entry.entities.push(entity.clone());
188                                }
189                            }
190                        }
191                    }
192
193                    to_remove.push(remove_id.clone());
194                    merged += 1;
195                }
196            }
197        }
198
199        // Remove duplicates
200        for id in to_remove {
201            memories.remove(&id);
202        }
203
204        Ok(ConsolidationResult {
205            task: ConsolidationTask::DeduplicateMemories,
206            memories_processed: processed,
207            memories_merged: merged,
208            memories_deleted: 0,
209            duration_ms: 0,
210        })
211    }
212
213    /// Apply decay to memories
214    async fn apply_decay(&self) -> Result<ConsolidationResult, String> {
215        if !self.config.enable_decay {
216            return Ok(ConsolidationResult {
217                task: ConsolidationTask::ApplyDecay,
218                memories_processed: 0,
219                memories_merged: 0,
220                memories_deleted: 0,
221                duration_ms: 0,
222            });
223        }
224
225        let mut memories = self.memories.write().await;
226        let mut processed = 0;
227        let current_time = Self::current_timestamp();
228
229        for memory in memories.values_mut() {
230            // Don't decay important memories
231            if memory.importance >= self.config.min_importance_for_preservation {
232                continue;
233            }
234
235            // Calculate age in days
236            let age_seconds = current_time.saturating_sub(memory.entry.created_at);
237            let age_days = age_seconds as f32 / 86400.0;
238
239            // Apply exponential decay
240            let decay_amount = 1.0 - (self.config.decay_rate * age_days);
241            memory.decay_factor = (memory.decay_factor * decay_amount).max(0.0);
242
243            processed += 1;
244        }
245
246        Ok(ConsolidationResult {
247            task: ConsolidationTask::ApplyDecay,
248            memories_processed: processed,
249            memories_merged: 0,
250            memories_deleted: 0,
251            duration_ms: 0,
252        })
253    }
254
255    /// Clean up low-value memories
256    async fn cleanup_memories(&self) -> Result<ConsolidationResult, String> {
257        let mut memories = self.memories.write().await;
258        let mut processed = 0;
259        let mut deleted = 0;
260
261        // Collect IDs to remove
262        let to_remove: Vec<String> = memories
263            .iter()
264            .filter(|(_, memory)| {
265                processed += 1;
266                // Remove memories with very low relevance score or fully decayed
267                memory.relevance_score() < 0.1 || memory.decay_factor < 0.1
268            })
269            .map(|(id, _)| id.clone())
270            .collect();
271
272        deleted = to_remove.len();
273
274        // Remove them
275        for id in to_remove {
276            memories.remove(&id);
277        }
278
279        Ok(ConsolidationResult {
280            task: ConsolidationTask::Cleanup,
281            memories_processed: processed,
282            memories_merged: 0,
283            memories_deleted: deleted,
284            duration_ms: 0,
285        })
286    }
287
288    /// Reindex memories for faster retrieval
289    async fn reindex_memories(&self) -> Result<ConsolidationResult, String> {
290        let memories = self.memories.read().await;
291
292        Ok(ConsolidationResult {
293            task: ConsolidationTask::Reindex,
294            memories_processed: memories.len(),
295            memories_merged: 0,
296            memories_deleted: 0,
297            duration_ms: 0,
298        })
299    }
300
301    /// Calculate similarity between two memories
302    fn calculate_similarity(&self, mem1: &EnhancedMemoryEntry, mem2: &EnhancedMemoryEntry) -> f32 {
303        // Simple similarity based on:
304        // 1. Time proximity
305        // 2. Task ID match
306        // 3. Content overlap (key points, entities)
307
308        let mut similarity = 0.0;
309        let mut factors = 0;
310
311        // Time proximity (within 1 hour = high similarity)
312        let time_diff = (mem1.entry.created_at as i64 - mem2.entry.created_at as i64).abs();
313        if time_diff < 3600 {
314            similarity += 0.3;
315        } else if time_diff < 86400 {
316            similarity += 0.1;
317        }
318        factors += 1;
319
320        // Task ID match
321        if mem1.entry.task_id == mem2.entry.task_id {
322            similarity += 0.3;
323            factors += 1;
324        }
325
326        // Agent ID match
327        if mem1.entry.agent_id == mem2.entry.agent_id {
328            similarity += 0.1;
329            factors += 1;
330        }
331
332        // Key points overlap
333        let common_points = mem1
334            .key_points
335            .iter()
336            .filter(|p| mem2.key_points.contains(p))
337            .count();
338        if !mem1.key_points.is_empty() || !mem2.key_points.is_empty() {
339            let max_points = mem1.key_points.len().max(mem2.key_points.len());
340            if max_points > 0 {
341                similarity += 0.2 * (common_points as f32 / max_points as f32);
342                factors += 1;
343            }
344        }
345
346        // Entities overlap
347        let common_entities = mem1
348            .entities
349            .iter()
350            .filter(|e| mem2.entities.contains(e))
351            .count();
352        if !mem1.entities.is_empty() || !mem2.entities.is_empty() {
353            let max_entities = mem1.entities.len().max(mem2.entities.len());
354            if max_entities > 0 {
355                similarity += 0.1 * (common_entities as f32 / max_entities as f32);
356                factors += 1;
357            }
358        }
359
360        if factors > 0 {
361            similarity / factors as f32
362        } else {
363            0.0
364        }
365    }
366
367    /// Update memories reference
368    pub async fn set_memories(&self, memories: HashMap<String, EnhancedMemoryEntry>) {
369        *self.memories.write().await = memories;
370    }
371
372    /// Get current memories
373    pub async fn get_memories(&self) -> HashMap<String, EnhancedMemoryEntry> {
374        self.memories.read().await.clone()
375    }
376
377    fn current_timestamp() -> u64 {
378        std::time::SystemTime::now()
379            .duration_since(std::time::UNIX_EPOCH)
380            .unwrap()
381            .as_secs()
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388    use crate::memory::MemoryEntry;
389    use super::super::MemoryType;
390
391    #[tokio::test]
392    async fn test_consolidator_decay() {
393        let config = MemoryConfig {
394            enable_decay: true,
395            decay_rate: 0.1,
396            min_importance_for_preservation: ImportanceLevel::High,
397            ..Default::default()
398        };
399
400        let consolidator = Consolidator::new(config);
401
402        // Add some memories
403        let mut memories = HashMap::new();
404        for i in 0..5 {
405            let entry = MemoryEntry::new(
406                "agent-1".to_string(),
407                format!("task-{}", i),
408                vec![],
409            );
410            let mut enhanced = EnhancedMemoryEntry::new(entry, MemoryType::Episodic);
411            enhanced.importance = if i < 2 {
412                ImportanceLevel::Critical
413            } else {
414                ImportanceLevel::Low
415            };
416            memories.insert(enhanced.entry.id.clone(), enhanced);
417        }
418
419        consolidator.set_memories(memories).await;
420
421        let result = consolidator.apply_decay().await.unwrap();
422        assert_eq!(result.memories_processed, 3); // Only low importance ones
423    }
424
425    #[tokio::test]
426    async fn test_consolidator_cleanup() {
427        let config = MemoryConfig::default();
428        let consolidator = Consolidator::new(config);
429
430        // Add memories with different relevance
431        let mut memories = HashMap::new();
432        for i in 0..5 {
433            let entry = MemoryEntry::new(
434                "agent-1".to_string(),
435                format!("task-{}", i),
436                vec![],
437            );
438            let mut enhanced = EnhancedMemoryEntry::new(entry, MemoryType::Episodic);
439            enhanced.decay_factor = if i < 2 { 0.05 } else { 0.8 }; // Low decay = should be cleaned
440            memories.insert(enhanced.entry.id.clone(), enhanced);
441        }
442
443        consolidator.set_memories(memories).await;
444
445        let result = consolidator.cleanup_memories().await.unwrap();
446        assert_eq!(result.memories_deleted, 2); // Two with low decay factor
447    }
448}