ceylon_next/memory/advanced/
consolidation.rs1use super::{EnhancedMemoryEntry, ImportanceLevel, MemoryConfig};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tokio::time::{interval, Duration};
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum ConsolidationTask {
18 DeduplicateMemories,
20 ApplyDecay,
22 Cleanup,
24 Reindex,
26 All,
28}
29
30#[derive(Debug, Clone)]
32pub struct ConsolidationResult {
33 pub task: ConsolidationTask,
34 pub memories_processed: usize,
35 pub memories_merged: usize,
36 pub memories_deleted: usize,
37 pub duration_ms: u64,
38}
39
40pub struct Consolidator {
42 memories: Arc<RwLock<HashMap<String, EnhancedMemoryEntry>>>,
44 config: MemoryConfig,
46 running: Arc<RwLock<bool>>,
48}
49
50impl Consolidator {
51 pub fn new(config: MemoryConfig) -> Self {
53 Self {
54 memories: Arc::new(RwLock::new(HashMap::new())),
55 config,
56 running: Arc::new(RwLock::new(false)),
57 }
58 }
59
60 pub async fn start_background_consolidation(&self) {
62 let memories = self.memories.clone();
63 let config = self.config.clone();
64 let running = self.running.clone();
65
66 *running.write().await = true;
68
69 tokio::spawn(async move {
70 let mut ticker = interval(Duration::from_secs(config.consolidation_interval));
71
72 loop {
73 ticker.tick().await;
74
75 if !*running.read().await {
77 break;
78 }
79
80 let consolidator = Consolidator {
82 memories: memories.clone(),
83 config: config.clone(),
84 running: running.clone(),
85 };
86
87 if let Err(e) = consolidator.consolidate(ConsolidationTask::All).await {
88 eprintln!("Consolidation error: {}", e);
89 }
90 }
91 });
92 }
93
94 pub async fn stop_background_consolidation(&self) {
96 *self.running.write().await = false;
97 }
98
99 pub async fn consolidate(&self, task: ConsolidationTask) -> Result<ConsolidationResult, String> {
101 let start = std::time::Instant::now();
102 let mut result = ConsolidationResult {
103 task: task.clone(),
104 memories_processed: 0,
105 memories_merged: 0,
106 memories_deleted: 0,
107 duration_ms: 0,
108 };
109
110 match task {
111 ConsolidationTask::DeduplicateMemories => {
112 result = self.deduplicate_memories().await?;
113 }
114 ConsolidationTask::ApplyDecay => {
115 result = self.apply_decay().await?;
116 }
117 ConsolidationTask::Cleanup => {
118 result = self.cleanup_memories().await?;
119 }
120 ConsolidationTask::Reindex => {
121 result = self.reindex_memories().await?;
122 }
123 ConsolidationTask::All => {
124 let dedup = self.deduplicate_memories().await?;
126 let decay = self.apply_decay().await?;
127 let cleanup = self.cleanup_memories().await?;
128
129 result.memories_processed = dedup.memories_processed + decay.memories_processed + cleanup.memories_processed;
130 result.memories_merged = dedup.memories_merged;
131 result.memories_deleted = cleanup.memories_deleted;
132 }
133 }
134
135 result.duration_ms = start.elapsed().as_millis() as u64;
136 Ok(result)
137 }
138
139 async fn deduplicate_memories(&self) -> Result<ConsolidationResult, String> {
141 let mut memories = self.memories.write().await;
142 let memory_list: Vec<EnhancedMemoryEntry> = memories.values().cloned().collect();
143
144 let mut processed = 0;
145 let mut merged = 0;
146 let mut to_remove = Vec::new();
147
148 for i in 0..memory_list.len() {
150 for j in (i + 1)..memory_list.len() {
151 processed += 1;
152
153 let similarity = self.calculate_similarity(&memory_list[i], &memory_list[j]);
154
155 if similarity >= self.config.duplicate_threshold {
156 let (keep_id, remove_id) = if memory_list[i].relevance_score() >= memory_list[j].relevance_score() {
158 (&memory_list[i].entry.id, &memory_list[j].entry.id)
159 } else {
160 (&memory_list[j].entry.id, &memory_list[i].entry.id)
161 };
162
163 let merge_data = memories.get(remove_id).map(|remove_entry| {
165 (
166 remove_entry.access_count,
167 remove_entry.key_points.clone(),
168 remove_entry.entities.clone(),
169 )
170 });
171
172 if let Some((access_count, key_points, entities)) = merge_data {
173 if let Some(keep_entry) = memories.get_mut(keep_id) {
174 keep_entry.access_count += access_count;
175 keep_entry.related_memories.push(remove_id.clone());
176
177 for point in &key_points {
179 if !keep_entry.key_points.contains(point) {
180 keep_entry.key_points.push(point.clone());
181 }
182 }
183
184 for entity in &entities {
186 if !keep_entry.entities.contains(entity) {
187 keep_entry.entities.push(entity.clone());
188 }
189 }
190 }
191 }
192
193 to_remove.push(remove_id.clone());
194 merged += 1;
195 }
196 }
197 }
198
199 for id in to_remove {
201 memories.remove(&id);
202 }
203
204 Ok(ConsolidationResult {
205 task: ConsolidationTask::DeduplicateMemories,
206 memories_processed: processed,
207 memories_merged: merged,
208 memories_deleted: 0,
209 duration_ms: 0,
210 })
211 }
212
213 async fn apply_decay(&self) -> Result<ConsolidationResult, String> {
215 if !self.config.enable_decay {
216 return Ok(ConsolidationResult {
217 task: ConsolidationTask::ApplyDecay,
218 memories_processed: 0,
219 memories_merged: 0,
220 memories_deleted: 0,
221 duration_ms: 0,
222 });
223 }
224
225 let mut memories = self.memories.write().await;
226 let mut processed = 0;
227 let current_time = Self::current_timestamp();
228
229 for memory in memories.values_mut() {
230 if memory.importance >= self.config.min_importance_for_preservation {
232 continue;
233 }
234
235 let age_seconds = current_time.saturating_sub(memory.entry.created_at);
237 let age_days = age_seconds as f32 / 86400.0;
238
239 let decay_amount = 1.0 - (self.config.decay_rate * age_days);
241 memory.decay_factor = (memory.decay_factor * decay_amount).max(0.0);
242
243 processed += 1;
244 }
245
246 Ok(ConsolidationResult {
247 task: ConsolidationTask::ApplyDecay,
248 memories_processed: processed,
249 memories_merged: 0,
250 memories_deleted: 0,
251 duration_ms: 0,
252 })
253 }
254
255 async fn cleanup_memories(&self) -> Result<ConsolidationResult, String> {
257 let mut memories = self.memories.write().await;
258 let mut processed = 0;
259 let mut deleted = 0;
260
261 let to_remove: Vec<String> = memories
263 .iter()
264 .filter(|(_, memory)| {
265 processed += 1;
266 memory.relevance_score() < 0.1 || memory.decay_factor < 0.1
268 })
269 .map(|(id, _)| id.clone())
270 .collect();
271
272 deleted = to_remove.len();
273
274 for id in to_remove {
276 memories.remove(&id);
277 }
278
279 Ok(ConsolidationResult {
280 task: ConsolidationTask::Cleanup,
281 memories_processed: processed,
282 memories_merged: 0,
283 memories_deleted: deleted,
284 duration_ms: 0,
285 })
286 }
287
288 async fn reindex_memories(&self) -> Result<ConsolidationResult, String> {
290 let memories = self.memories.read().await;
291
292 Ok(ConsolidationResult {
293 task: ConsolidationTask::Reindex,
294 memories_processed: memories.len(),
295 memories_merged: 0,
296 memories_deleted: 0,
297 duration_ms: 0,
298 })
299 }
300
301 fn calculate_similarity(&self, mem1: &EnhancedMemoryEntry, mem2: &EnhancedMemoryEntry) -> f32 {
303 let mut similarity = 0.0;
309 let mut factors = 0;
310
311 let time_diff = (mem1.entry.created_at as i64 - mem2.entry.created_at as i64).abs();
313 if time_diff < 3600 {
314 similarity += 0.3;
315 } else if time_diff < 86400 {
316 similarity += 0.1;
317 }
318 factors += 1;
319
320 if mem1.entry.task_id == mem2.entry.task_id {
322 similarity += 0.3;
323 factors += 1;
324 }
325
326 if mem1.entry.agent_id == mem2.entry.agent_id {
328 similarity += 0.1;
329 factors += 1;
330 }
331
332 let common_points = mem1
334 .key_points
335 .iter()
336 .filter(|p| mem2.key_points.contains(p))
337 .count();
338 if !mem1.key_points.is_empty() || !mem2.key_points.is_empty() {
339 let max_points = mem1.key_points.len().max(mem2.key_points.len());
340 if max_points > 0 {
341 similarity += 0.2 * (common_points as f32 / max_points as f32);
342 factors += 1;
343 }
344 }
345
346 let common_entities = mem1
348 .entities
349 .iter()
350 .filter(|e| mem2.entities.contains(e))
351 .count();
352 if !mem1.entities.is_empty() || !mem2.entities.is_empty() {
353 let max_entities = mem1.entities.len().max(mem2.entities.len());
354 if max_entities > 0 {
355 similarity += 0.1 * (common_entities as f32 / max_entities as f32);
356 factors += 1;
357 }
358 }
359
360 if factors > 0 {
361 similarity / factors as f32
362 } else {
363 0.0
364 }
365 }
366
367 pub async fn set_memories(&self, memories: HashMap<String, EnhancedMemoryEntry>) {
369 *self.memories.write().await = memories;
370 }
371
372 pub async fn get_memories(&self) -> HashMap<String, EnhancedMemoryEntry> {
374 self.memories.read().await.clone()
375 }
376
377 fn current_timestamp() -> u64 {
378 std::time::SystemTime::now()
379 .duration_since(std::time::UNIX_EPOCH)
380 .unwrap()
381 .as_secs()
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use crate::memory::MemoryEntry;
389 use super::super::MemoryType;
390
391 #[tokio::test]
392 async fn test_consolidator_decay() {
393 let config = MemoryConfig {
394 enable_decay: true,
395 decay_rate: 0.1,
396 min_importance_for_preservation: ImportanceLevel::High,
397 ..Default::default()
398 };
399
400 let consolidator = Consolidator::new(config);
401
402 let mut memories = HashMap::new();
404 for i in 0..5 {
405 let entry = MemoryEntry::new(
406 "agent-1".to_string(),
407 format!("task-{}", i),
408 vec![],
409 );
410 let mut enhanced = EnhancedMemoryEntry::new(entry, MemoryType::Episodic);
411 enhanced.importance = if i < 2 {
412 ImportanceLevel::Critical
413 } else {
414 ImportanceLevel::Low
415 };
416 memories.insert(enhanced.entry.id.clone(), enhanced);
417 }
418
419 consolidator.set_memories(memories).await;
420
421 let result = consolidator.apply_decay().await.unwrap();
422 assert_eq!(result.memories_processed, 3); }
424
425 #[tokio::test]
426 async fn test_consolidator_cleanup() {
427 let config = MemoryConfig::default();
428 let consolidator = Consolidator::new(config);
429
430 let mut memories = HashMap::new();
432 for i in 0..5 {
433 let entry = MemoryEntry::new(
434 "agent-1".to_string(),
435 format!("task-{}", i),
436 vec![],
437 );
438 let mut enhanced = EnhancedMemoryEntry::new(entry, MemoryType::Episodic);
439 enhanced.decay_factor = if i < 2 { 0.05 } else { 0.8 }; memories.insert(enhanced.entry.id.clone(), enhanced);
441 }
442
443 consolidator.set_memories(memories).await;
444
445 let result = consolidator.cleanup_memories().await.unwrap();
446 assert_eq!(result.memories_deleted, 2); }
448}