1use super::error::{MemoryError, Result};
2use super::models::CreateMemoryRequest;
3use super::models::*;
4use super::repository::MemoryRepository;
5use crate::embedding::EmbeddingService;
6use chrono::{DateTime, Duration, Utc};
7use pgvector::Vector;
8use serde::{Deserialize, Serialize};
9use sqlx::{Postgres, Row, Transaction};
10use std::collections::{HashMap, HashSet};
11use std::hash::Hash;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::sync::{Mutex, RwLock};
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct SemanticDeduplicationConfig {
21 pub similarity_threshold: f32,
23 pub batch_size: usize,
25 pub max_memories_per_operation: usize,
27 pub min_memory_age_hours: i64,
29 pub prune_threshold: f64,
31 pub prune_age_days: i64,
33 pub target_memory_headroom: f32,
35 pub compression_targets: HashMap<MemoryTier, f32>,
37 pub lossless_critical: bool,
39 pub max_operation_time_seconds: u64,
41}
42
43impl Default for SemanticDeduplicationConfig {
44 fn default() -> Self {
45 let mut compression_targets = HashMap::new();
46 compression_targets.insert(MemoryTier::Working, 2.0);
47 compression_targets.insert(MemoryTier::Warm, 3.0);
48 compression_targets.insert(MemoryTier::Cold, 5.0);
49 compression_targets.insert(MemoryTier::Frozen, 10.0);
50
51 Self {
52 similarity_threshold: 0.85,
53 batch_size: 100,
54 max_memories_per_operation: 10_000,
55 min_memory_age_hours: 1,
56 prune_threshold: 0.2,
57 prune_age_days: 30,
58 target_memory_headroom: 0.2,
59 compression_targets,
60 lossless_critical: true,
61 max_operation_time_seconds: 30,
62 }
63 }
64}
65
66#[allow(dead_code)]
68pub struct SemanticDeduplicationEngine {
69 config: SemanticDeduplicationConfig,
70 repository: Arc<MemoryRepository>,
71 #[allow(dead_code)]
72 embedding_service: Arc<dyn EmbeddingService>,
73 merger: Arc<MemoryMerger>,
74 #[allow(dead_code)]
75 compression_manager: Arc<CompressionManager>,
76 audit_trail: Arc<AuditTrail>,
77 auto_pruner: Arc<AutoPruner>,
78 metrics: Arc<RwLock<DeduplicationMetrics>>,
79 operation_lock: Arc<Mutex<()>>,
81 active_operations: Arc<RwLock<HashSet<String>>>,
83}
84
85struct OperationLockGuard {
87 operation_id: String,
88 lock_key: i64,
89 pool: sqlx::PgPool,
90 active_operations: Arc<RwLock<HashSet<String>>>,
91 released: bool,
92}
93
94impl OperationLockGuard {
95 async fn release(&mut self) -> Result<()> {
96 if self.released {
97 return Ok(());
98 }
99
100 self.released = true;
101
102 {
104 let mut active_ops = self.active_operations.write().await;
105 active_ops.remove(&self.operation_id);
106 }
107
108 sqlx::query("SELECT pg_advisory_unlock($1)")
110 .bind(self.lock_key)
111 .execute(&self.pool)
112 .await
113 .map_err(|e| MemoryError::DatabaseError {
114 message: format!("Failed to release advisory lock: {e}"),
115 })?;
116
117 debug!(
118 "Released distributed lock for operation: {}",
119 self.operation_id
120 );
121 Ok(())
122 }
123}
124
125impl Drop for OperationLockGuard {
126 fn drop(&mut self) {
127 if !self.released {
128 let operation_id = self.operation_id.clone();
129 let lock_key = self.lock_key;
130 let pool = self.pool.clone();
131 let active_operations = self.active_operations.clone();
132
133 tokio::spawn(async move {
135 {
137 let mut active_ops = active_operations.write().await;
138 active_ops.remove(&operation_id);
139 }
140
141 if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)")
143 .bind(lock_key)
144 .execute(&pool)
145 .await
146 {
147 error!(
148 "Failed to release advisory lock for operation {}: {}",
149 operation_id, e
150 );
151 } else {
152 debug!("Released distributed lock for operation: {}", operation_id);
153 }
154 });
155 }
156 }
157}
158
159impl SemanticDeduplicationEngine {
160 pub fn new(
161 config: SemanticDeduplicationConfig,
162 repository: Arc<MemoryRepository>,
163 embedding_service: Arc<dyn EmbeddingService>,
164 ) -> Self {
165 let merger = Arc::new(MemoryMerger::new(config.clone(), repository.clone()));
166 let compression_manager = Arc::new(CompressionManager::new(config.clone()));
167 let audit_trail = Arc::new(AuditTrail::new(repository.clone()));
168 let auto_pruner = Arc::new(AutoPruner::new(config.clone(), repository.clone()));
169 let metrics = Arc::new(RwLock::new(DeduplicationMetrics::default()));
170
171 Self {
172 config,
173 repository,
174 embedding_service,
175 merger,
176 compression_manager,
177 audit_trail,
178 auto_pruner,
179 metrics,
180 operation_lock: Arc::new(Mutex::new(())),
181 active_operations: Arc::new(RwLock::new(HashSet::new())),
182 }
183 }
184
185 pub async fn deduplicate_batch(&self, memory_ids: &[Uuid]) -> Result<DeduplicationResult> {
187 let operation_id = format!("dedup_{}", Uuid::new_v4());
188 let start_time = Instant::now();
189 let mut result;
190
191 let (_mutex_guard, mut _lock_guard) = self.acquire_operation_lock(&operation_id).await?;
193
194 let mut performance_metrics = PerformanceMetrics {
196 operation_id: operation_id.clone(),
197 start_time,
198 phase_timings: HashMap::new(),
199 memory_count: memory_ids.len(),
200 target_time_seconds: self.config.max_operation_time_seconds,
201 };
202
203 info!(
204 "Starting deduplication operation {} for {} memories with threshold {}",
205 operation_id,
206 memory_ids.len(),
207 self.config.similarity_threshold
208 );
209
210 let timeout_check = start_time.elapsed().as_secs();
212 if timeout_check > self.config.max_operation_time_seconds {
213 return Err(MemoryError::OperationTimeout {
214 message: format!("Operation timed out after {timeout_check} seconds"),
215 });
216 }
217
218 if memory_ids.len() > 10_000 && self.config.max_operation_time_seconds > 30 {
220 warn!(
221 "Processing {} memories may exceed performance target of 30 seconds",
222 memory_ids.len()
223 );
224 }
225
226 performance_metrics.record_phase("lock_acquisition", start_time.elapsed());
227
228 let transaction_start = Instant::now();
230 let mut transaction =
231 self.repository
232 .pool()
233 .begin()
234 .await
235 .map_err(|e| MemoryError::DatabaseError {
236 message: format!("Failed to begin transaction: {e}"),
237 })?;
238
239 performance_metrics.record_phase("transaction_begin", transaction_start.elapsed());
240
241 let processing_result = self
242 .execute_deduplication_with_transaction(&mut transaction, memory_ids, &operation_id)
243 .await;
244
245 match processing_result {
246 Ok(dedup_result) => {
247 let commit_start = Instant::now();
249 transaction
250 .commit()
251 .await
252 .map_err(|e| MemoryError::DatabaseError {
253 message: format!("Failed to commit transaction: {e}"),
254 })?;
255 performance_metrics.record_phase("transaction_commit", commit_start.elapsed());
256
257 result = dedup_result;
258 info!(
259 "Deduplication operation {} committed successfully",
260 operation_id
261 );
262 }
263 Err(e) => {
264 let rollback_start = Instant::now();
266 if let Err(rollback_err) = transaction.rollback().await {
267 error!(
268 "Failed to rollback transaction for operation {}: {}",
269 operation_id, rollback_err
270 );
271 } else {
272 performance_metrics
273 .record_phase("transaction_rollback", rollback_start.elapsed());
274 }
275
276 if let Err(cleanup_err) = _lock_guard.release().await {
278 error!(
279 "Failed to cleanup lock for operation {}: {}",
280 operation_id, cleanup_err
281 );
282 }
283
284 error!("Deduplication operation {} failed: {}", operation_id, e);
285 return Err(e);
286 }
287 }
288
289 if let Err(cleanup_err) = _lock_guard.release().await {
291 error!(
292 "Failed to cleanup lock for operation {}: {}",
293 operation_id, cleanup_err
294 );
295 }
296
297 result.execution_time_ms = start_time.elapsed().as_millis() as u64;
298 performance_metrics.record_phase("total_operation", start_time.elapsed());
299
300 if result.execution_time_ms > (self.config.max_operation_time_seconds * 1000) {
302 warn!(
303 "Operation {} exceeded time limit: {}ms > {}ms",
304 operation_id,
305 result.execution_time_ms,
306 self.config.max_operation_time_seconds * 1000
307 );
308 }
309
310 if memory_ids.len() >= 10_000 && result.execution_time_ms > 30_000 {
312 error!(
313 "PERFORMANCE VIOLATION: {} memories processed in {}ms (> 30s target)",
314 memory_ids.len(),
315 result.execution_time_ms
316 );
317 }
318
319 self.update_metrics_with_performance(&result, &performance_metrics)
321 .await;
322
323 info!(
324 "Deduplication operation {} completed: {} memories processed, {} merged, {:.2}% storage saved in {}ms (phases: {})",
325 operation_id,
326 result.total_processed,
327 result.memories_merged,
328 (result.storage_saved_bytes as f64 / (result.total_processed as f64 * 1024.0)) * 100.0,
329 result.execution_time_ms,
330 performance_metrics.format_phase_summary()
331 );
332
333 Ok(result)
334 }
335
336 async fn find_similar_groups_optimized(
338 &self,
339 memories: &[Memory],
340 ) -> Result<Vec<SimilarMemoryGroup>> {
341 let mut groups = Vec::new();
342 let mut processed_ids = HashSet::new();
343
344 info!(
345 "Finding similar groups for {} memories using pgvector optimization",
346 memories.len()
347 );
348
349 for memory in memories {
350 if processed_ids.contains(&memory.id) {
351 continue;
352 }
353
354 let embedding = match &memory.embedding {
355 Some(emb) => emb,
356 None => continue,
357 };
358
359 let similar_memories = self
361 .find_similar_memories_pgvector(
362 memory,
363 embedding,
364 &memories.iter().map(|m| m.id).collect::<Vec<_>>(),
365 self.config.similarity_threshold,
366 )
367 .await?;
368
369 if similar_memories.len() > 1 {
370 for sim_memory in &similar_memories {
372 processed_ids.insert(sim_memory.id);
373 }
374
375 let average_similarity =
376 self.calculate_average_similarity(&similar_memories).await?;
377 let merge_strategy = self.determine_merge_strategy(&similar_memories);
378
379 groups.push(SimilarMemoryGroup {
380 memories: similar_memories,
381 average_similarity,
382 merge_strategy,
383 });
384 } else {
385 processed_ids.insert(memory.id);
386 }
387 }
388
389 info!(
390 "Found {} similar groups using optimized search",
391 groups.len()
392 );
393 Ok(groups)
394 }
395
396 async fn find_similar_memories_pgvector(
398 &self,
399 _query_memory: &Memory,
400 query_embedding: &Vector,
401 candidate_ids: &[Uuid],
402 threshold: f32,
403 ) -> Result<Vec<Memory>> {
404 let query = r#"
405 SELECT m.*, (m.embedding <=> $1) as similarity_distance
406 FROM memories m
407 WHERE m.id = ANY($2)
408 AND m.status = 'active'
409 AND m.embedding IS NOT NULL
410 AND (m.embedding <=> $1) <= $3
411 ORDER BY m.embedding <=> $1
412 LIMIT 100
413 "#;
414
415 let distance_threshold = 1.0 - threshold;
418
419 let rows = sqlx::query(query)
420 .bind(query_embedding)
421 .bind(candidate_ids)
422 .bind(distance_threshold)
423 .fetch_all(self.repository.pool())
424 .await
425 .map_err(|e| MemoryError::DatabaseError {
426 message: format!("Failed to execute pgvector similarity search: {e}"),
427 })?;
428
429 let mut similar_memories = Vec::new();
430 for row in rows {
431 let memory = Memory {
432 id: row.get("id"),
433 content: row.get("content"),
434 content_hash: row.get("content_hash"),
435 embedding: row.get("embedding"),
436 tier: row.get("tier"),
437 status: row.get("status"),
438 importance_score: row.get("importance_score"),
439 access_count: row.get("access_count"),
440 last_accessed_at: row.get("last_accessed_at"),
441 metadata: row.get("metadata"),
442 parent_id: row.get("parent_id"),
443 created_at: row.get("created_at"),
444 updated_at: row.get("updated_at"),
445 expires_at: row.get("expires_at"),
446 consolidation_strength: row.get("consolidation_strength"),
447 decay_rate: row.get("decay_rate"),
448 recall_probability: row.get("recall_probability"),
449 last_recall_interval: row.get("last_recall_interval"),
450 recency_score: row.get("recency_score"),
451 relevance_score: row.get("relevance_score"),
452 };
453 similar_memories.push(memory);
454 }
455
456 Ok(similar_memories)
457 }
458
459 async fn find_similar_groups(&self, memories: &[Memory]) -> Result<Vec<SimilarMemoryGroup>> {
461 self.find_similar_groups_optimized(memories).await
463 }
464
465 pub fn calculate_cosine_similarity(&self, a: &Vector, b: &Vector) -> Result<f32> {
467 let a_slice = a.as_slice();
468 let b_slice = b.as_slice();
469
470 if a_slice.len() != b_slice.len() {
471 return Err(MemoryError::InvalidData {
472 message: "Embedding dimensions don't match".to_string(),
473 });
474 }
475
476 let dot_product: f32 = a_slice.iter().zip(b_slice.iter()).map(|(x, y)| x * y).sum();
477 let norm_a: f32 = a_slice.iter().map(|x| x * x).sum::<f32>().sqrt();
478 let norm_b: f32 = b_slice.iter().map(|x| x * x).sum::<f32>().sqrt();
479
480 if norm_a == 0.0 || norm_b == 0.0 {
481 return Ok(0.0);
482 }
483
484 Ok(dot_product / (norm_a * norm_b))
485 }
486
487 async fn calculate_average_similarity(&self, memories: &[Memory]) -> Result<f32> {
489 if memories.len() < 2 {
490 return Ok(1.0);
491 }
492
493 let mut total_similarity = 0.0;
494 let mut comparisons = 0;
495
496 for i in 0..memories.len() {
497 for j in (i + 1)..memories.len() {
498 if let (Some(emb_i), Some(emb_j)) = (&memories[i].embedding, &memories[j].embedding)
499 {
500 total_similarity += self.calculate_cosine_similarity(emb_i, emb_j)?;
501 comparisons += 1;
502 }
503 }
504 }
505
506 Ok(if comparisons > 0 {
507 total_similarity / comparisons as f32
508 } else {
509 0.0
510 })
511 }
512
513 fn determine_merge_strategy(&self, memories: &[Memory]) -> MergeStrategy {
515 let has_critical = memories.iter().any(|m| {
517 m.importance_score > 0.8
518 || m.last_accessed_at.is_some_and(|last| {
519 Utc::now().signed_duration_since(last) < Duration::hours(24)
520 })
521 });
522
523 let has_working_tier = memories
525 .iter()
526 .any(|m| matches!(m.tier, MemoryTier::Working));
527 let has_different_tiers = memories
528 .iter()
529 .map(|m| m.tier)
530 .collect::<HashSet<_>>()
531 .len()
532 > 1;
533
534 if has_critical && self.config.lossless_critical {
535 MergeStrategy::LosslessPreservation
536 } else if has_working_tier || has_different_tiers {
537 MergeStrategy::MetadataConsolidation
538 } else {
539 MergeStrategy::ContentSummarization
540 }
541 }
542
543 async fn process_similar_group_tx(
545 &self,
546 transaction: &mut Transaction<'_, Postgres>,
547 group: SimilarMemoryGroup,
548 operation_id: &str,
549 ) -> Result<GroupMergeResult> {
550 debug!(
551 "Processing similar group with {} memories, strategy: {:?}, operation: {}",
552 group.memories.len(),
553 group.merge_strategy,
554 operation_id
555 );
556
557 let audit_entry = self
559 .audit_trail
560 .create_merge_entry_tx(transaction, &group)
561 .await?;
562
563 let merge_result = self.merger.merge_group_tx(transaction, &group).await?;
565
566 self.audit_trail
568 .complete_merge_entry_tx(transaction, audit_entry.id, &merge_result)
569 .await?;
570
571 Ok(GroupMergeResult {
572 merged_memory_id: merge_result.merged_memory.id,
573 original_memory_ids: group.memories.iter().map(|m| m.id).collect(),
574 memories_merged: group.memories.len(),
575 storage_saved: merge_result.storage_saved,
576 compression_ratio: merge_result.compression_ratio,
577 merge_strategy: group.merge_strategy,
578 })
579 }
580
581 async fn process_similar_group(&self, group: SimilarMemoryGroup) -> Result<GroupMergeResult> {
583 debug!(
584 "Processing similar group with {} memories, strategy: {:?}",
585 group.memories.len(),
586 group.merge_strategy
587 );
588
589 let audit_entry = self.audit_trail.create_merge_entry(&group).await?;
591
592 let merge_result = self.merger.merge_group(&group).await?;
594
595 self.audit_trail
597 .complete_merge_entry(audit_entry.id, &merge_result)
598 .await?;
599
600 Ok(GroupMergeResult {
601 merged_memory_id: merge_result.merged_memory.id,
602 original_memory_ids: group.memories.iter().map(|m| m.id).collect(),
603 memories_merged: group.memories.len(),
604 storage_saved: merge_result.storage_saved,
605 compression_ratio: merge_result.compression_ratio,
606 merge_strategy: group.merge_strategy,
607 })
608 }
609
610 async fn load_memories_with_embeddings(&self, memory_ids: &[Uuid]) -> Result<Vec<Memory>> {
612 let query = r#"
613 SELECT * FROM memories
614 WHERE id = ANY($1)
615 AND status = 'active'
616 AND embedding IS NOT NULL
617 ORDER BY importance_score DESC, last_accessed_at DESC NULLS LAST
618 "#;
619
620 let memories = sqlx::query_as::<_, Memory>(query)
621 .bind(memory_ids)
622 .fetch_all(self.repository.pool())
623 .await?;
624
625 Ok(memories)
626 }
627
628 async fn load_memories_with_embeddings_tx(
630 &self,
631 transaction: &mut Transaction<'_, Postgres>,
632 memory_ids: &[Uuid],
633 ) -> Result<Vec<Memory>> {
634 let query = r#"
635 SELECT * FROM memories
636 WHERE id = ANY($1)
637 AND status = 'active'
638 AND embedding IS NOT NULL
639 ORDER BY importance_score DESC, last_accessed_at DESC NULLS LAST
640 "#;
641
642 let memories = sqlx::query_as::<_, Memory>(query)
643 .bind(memory_ids)
644 .fetch_all(&mut **transaction)
645 .await
646 .map_err(|e| MemoryError::DatabaseError {
647 message: format!("Failed to load memories in transaction: {e}"),
648 })?;
649
650 Ok(memories)
651 }
652
653 pub async fn run_auto_pruning(&self) -> Result<PruningResult> {
655 info!(
656 "Starting auto-pruning with threshold {}",
657 self.config.prune_threshold
658 );
659
660 let cutoff_date = Utc::now() - Duration::days(self.config.prune_age_days);
661
662 let prune_result = self
663 .auto_pruner
664 .prune_memories(self.config.prune_threshold, cutoff_date)
665 .await?;
666
667 info!(
668 "Auto-pruning completed: {} memories pruned, {} bytes freed",
669 prune_result.memories_pruned, prune_result.storage_freed
670 );
671
672 Ok(prune_result)
673 }
674
675 pub async fn maintain_memory_headroom(&self) -> Result<HeadroomMaintenanceResult> {
677 let stats = self.get_memory_statistics().await?;
678 let current_utilization =
679 1.0 - (stats.free_space_bytes as f32 / stats.total_space_bytes as f32);
680
681 if current_utilization > (1.0 - self.config.target_memory_headroom) {
682 info!(
683 "Memory utilization {:.2}% exceeds target, starting aggressive cleanup",
684 current_utilization * 100.0
685 );
686
687 let all_memory_ids = self.get_all_active_memory_ids().await?;
689 let dedup_result = self.deduplicate_batch(&all_memory_ids).await?;
690 let prune_result = self.run_auto_pruning().await?;
691
692 Ok(HeadroomMaintenanceResult {
693 initial_utilization: current_utilization,
694 final_utilization: self.get_memory_utilization().await?,
695 memories_processed: dedup_result.total_processed,
696 memories_merged: dedup_result.memories_merged,
697 memories_pruned: prune_result.memories_pruned,
698 storage_freed: dedup_result.storage_saved_bytes + prune_result.storage_freed,
699 })
700 } else {
701 Ok(HeadroomMaintenanceResult {
702 initial_utilization: current_utilization,
703 final_utilization: current_utilization,
704 memories_processed: 0,
705 memories_merged: 0,
706 memories_pruned: 0,
707 storage_freed: 0,
708 })
709 }
710 }
711
712 async fn get_memory_utilization(&self) -> Result<f32> {
714 let stats = self.get_memory_statistics().await?;
715 Ok(1.0 - (stats.free_space_bytes as f32 / stats.total_space_bytes as f32))
716 }
717
718 pub async fn get_memory_statistics(&self) -> Result<MemoryStatistics> {
720 let query = r#"
721 SELECT
722 COUNT(*) as total_memories,
723 SUM(LENGTH(content)) as total_content_bytes,
724 AVG(importance_score) as avg_importance,
725 COUNT(CASE WHEN tier = 'working' THEN 1 END) as working_count,
726 COUNT(CASE WHEN tier = 'warm' THEN 1 END) as warm_count,
727 COUNT(CASE WHEN tier = 'cold' THEN 1 END) as cold_count,
728 COUNT(CASE WHEN tier = 'frozen' THEN 1 END) as frozen_count
729 FROM memories
730 WHERE status = 'active'
731 "#;
732
733 let row = sqlx::query(query).fetch_one(self.repository.pool()).await?;
734
735 let total_content_bytes: i64 = row.get("total_content_bytes");
737 let estimated_total_space = total_content_bytes * 5; let estimated_free_space = estimated_total_space / 5; Ok(MemoryStatistics {
741 total_memories: row.get("total_memories"),
742 total_content_bytes,
743 total_space_bytes: estimated_total_space,
744 free_space_bytes: estimated_free_space,
745 avg_importance: row.get::<Option<f64>, _>("avg_importance").unwrap_or(0.0),
746 working_count: row.get("working_count"),
747 warm_count: row.get("warm_count"),
748 cold_count: row.get("cold_count"),
749 frozen_count: row.get("frozen_count"),
750 })
751 }
752
753 async fn get_all_active_memory_ids(&self) -> Result<Vec<Uuid>> {
755 let query = "SELECT id FROM memories WHERE status = 'active' ORDER BY created_at ASC";
756
757 let rows = sqlx::query(query).fetch_all(self.repository.pool()).await?;
758
759 Ok(rows.into_iter().map(|row| row.get("id")).collect())
760 }
761
762 async fn update_metrics_with_performance(
764 &self,
765 result: &DeduplicationResult,
766 performance: &PerformanceMetrics,
767 ) {
768 let mut metrics = self.metrics.write().await;
769 metrics.total_operations += 1;
770 metrics.total_memories_processed += result.total_processed;
771 metrics.total_memories_merged += result.memories_merged;
772 metrics.total_storage_saved += result.storage_saved_bytes;
773 metrics.total_execution_time_ms += result.execution_time_ms;
774 metrics.errors_encountered += result.errors_encountered;
775 metrics.last_operation_timestamp = Some(Utc::now());
776
777 if result.compression_ratio > 0.0 {
778 metrics.average_compression_ratio =
779 (metrics.average_compression_ratio + result.compression_ratio) / 2.0;
780 }
781
782 let violations = performance.get_performance_violations();
784 if !violations.is_empty() {
785 error!(
786 "Performance violations in operation {}: {}",
787 performance.operation_id,
788 violations.join("; ")
789 );
790 }
791
792 if let Err(e) = self.record_performance_metrics(performance, result).await {
794 warn!("Failed to record performance metrics: {}", e);
795 }
796 }
797
798 async fn update_metrics(&self, result: &DeduplicationResult) {
800 let mut metrics = self.metrics.write().await;
801 metrics.total_operations += 1;
802 metrics.total_memories_processed += result.total_processed;
803 metrics.total_memories_merged += result.memories_merged;
804 metrics.total_storage_saved += result.storage_saved_bytes;
805 metrics.total_execution_time_ms += result.execution_time_ms;
806 metrics.errors_encountered += result.errors_encountered;
807 metrics.last_operation_timestamp = Some(Utc::now());
808
809 if result.compression_ratio > 0.0 {
810 metrics.average_compression_ratio =
811 (metrics.average_compression_ratio + result.compression_ratio) / 2.0;
812 }
813 }
814
815 async fn record_performance_metrics(
817 &self,
818 performance: &PerformanceMetrics,
819 result: &DeduplicationResult,
820 ) -> Result<()> {
821 let metrics_data = serde_json::json!({
822 "operation_id": performance.operation_id,
823 "memory_count": performance.memory_count,
824 "execution_time_ms": result.execution_time_ms,
825 "target_time_seconds": performance.target_time_seconds,
826 "phase_timings": performance.phase_timings.iter().map(|(k, v)| (k, v.as_millis())).collect::<HashMap<_, _>>(),
827 "memories_processed": result.total_processed,
828 "memories_merged": result.memories_merged,
829 "storage_saved_bytes": result.storage_saved_bytes,
830 "compression_ratio": result.compression_ratio,
831 "errors_encountered": result.errors_encountered,
832 "performance_violations": performance.get_performance_violations()
833 });
834
835 sqlx::query(
836 r#"
837 INSERT INTO deduplication_metrics (
838 measurement_type, metrics_data, recorded_at
839 ) VALUES ($1, $2, $3)
840 "#,
841 )
842 .bind("operation_performance")
843 .bind(metrics_data)
844 .bind(Utc::now())
845 .execute(self.repository.pool())
846 .await
847 .map_err(|e| MemoryError::DatabaseError {
848 message: format!("Failed to record performance metrics: {e}"),
849 })?;
850
851 Ok(())
852 }
853
854 pub async fn get_metrics(&self) -> DeduplicationMetrics {
856 self.metrics.read().await.clone()
857 }
858
859 pub async fn deduplicate_large_batch(
861 &self,
862 memory_ids: &[Uuid],
863 ) -> Result<DeduplicationResult> {
864 let total_memories = memory_ids.len();
865 info!(
866 "Starting large batch deduplication for {} memories",
867 total_memories
868 );
869
870 if total_memories <= self.config.batch_size {
872 return self.deduplicate_batch(memory_ids).await;
873 }
874
875 let optimal_batch_size = self.calculate_optimal_batch_size(total_memories);
877 let batches: Vec<_> = memory_ids
878 .chunks(optimal_batch_size)
879 .map(|chunk| chunk.to_vec())
880 .collect();
881
882 info!(
883 "Split {} memories into {} batches of ~{} memories each",
884 total_memories,
885 batches.len(),
886 optimal_batch_size
887 );
888
889 let mut combined_result = DeduplicationResult::default();
890 let start_time = Instant::now();
891
892 let mut successful_batches = 0;
894 let mut failed_batches = 0;
895
896 for (batch_index, batch_memory_ids) in batches.into_iter().enumerate() {
897 info!(
898 "Processing batch {} with {} memories",
899 batch_index,
900 batch_memory_ids.len()
901 );
902
903 let batch_start = Instant::now();
904
905 if start_time.elapsed().as_secs() > (self.config.max_operation_time_seconds * 3 / 4) {
907 warn!(
908 "Approaching time limit, stopping batch processing after {} batches",
909 batch_index
910 );
911 break;
912 }
913
914 match self.deduplicate_batch(&batch_memory_ids).await {
915 Ok(batch_result) => {
916 successful_batches += 1;
917 combined_result.total_processed += batch_result.total_processed;
918 combined_result.groups_identified += batch_result.groups_identified;
919 combined_result.memories_merged += batch_result.memories_merged;
920 combined_result.storage_saved_bytes += batch_result.storage_saved_bytes;
921 combined_result.errors_encountered += batch_result.errors_encountered;
922
923 if batch_result.compression_ratio > 0.0 {
925 combined_result.compression_ratio = (combined_result.compression_ratio
926 * (successful_batches - 1) as f32
927 + batch_result.compression_ratio)
928 / successful_batches as f32;
929 }
930
931 info!(
932 "Batch {} completed in {}ms: {} processed, {} merged",
933 batch_index,
934 batch_start.elapsed().as_millis(),
935 batch_result.total_processed,
936 batch_result.memories_merged
937 );
938 }
939 Err(e) => {
940 failed_batches += 1;
941 combined_result.errors_encountered += 1;
942 error!("Batch {} failed: {}", batch_index, e);
943
944 }
947 }
948 }
949
950 combined_result.execution_time_ms = start_time.elapsed().as_millis() as u64;
951
952 info!(
953 "Large batch deduplication completed: {}/{} batches successful, {} total memories processed, {} merged, {}ms total time",
954 successful_batches, successful_batches + failed_batches,
955 combined_result.total_processed, combined_result.memories_merged,
956 combined_result.execution_time_ms
957 );
958
959 self.update_metrics(&combined_result).await;
961
962 Ok(combined_result)
963 }
964
965 fn calculate_optimal_batch_size(&self, total_memories: usize) -> usize {
967 let mut batch_size = self.config.batch_size;
969
970 if total_memories > 50_000 {
972 batch_size = std::cmp::max(batch_size * 2, 1000); } else if total_memories > 20_000 {
974 batch_size = std::cmp::max(batch_size * 3 / 2, 500); }
976
977 batch_size = std::cmp::min(batch_size, self.config.max_memories_per_operation);
979
980 std::cmp::max(batch_size, 50)
982 }
983
984 async fn acquire_operation_lock(
986 &self,
987 operation_id: &str,
988 ) -> Result<(tokio::sync::MutexGuard<'_, ()>, OperationLockGuard)> {
989 {
991 let active_ops = self.active_operations.read().await;
992 if active_ops.contains(operation_id) {
993 return Err(MemoryError::ConcurrencyError {
994 message: format!("Operation {operation_id} is already in progress"),
995 });
996 }
997 }
998
999 let lock_key = self.calculate_lock_key(operation_id);
1001 let acquired = sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)")
1002 .bind(lock_key)
1003 .fetch_one(self.repository.pool())
1004 .await
1005 .map_err(|e| MemoryError::DatabaseError {
1006 message: format!("Failed to acquire advisory lock: {e}"),
1007 })?;
1008
1009 if !acquired {
1010 return Err(MemoryError::ConcurrencyError {
1011 message: "Another deduplication operation is in progress".to_string(),
1012 });
1013 }
1014
1015 {
1017 let mut active_ops = self.active_operations.write().await;
1018 active_ops.insert(operation_id.to_string());
1019 }
1020
1021 let mutex_guard = self.operation_lock.lock().await;
1023
1024 info!("Acquired distributed lock for operation: {}", operation_id);
1025
1026 let lock_guard = OperationLockGuard {
1027 operation_id: operation_id.to_string(),
1028 lock_key,
1029 pool: self.repository.pool().clone(),
1030 active_operations: self.active_operations.clone(),
1031 released: false,
1032 };
1033
1034 Ok((mutex_guard, lock_guard))
1035 }
1036
1037 fn calculate_lock_key(&self, operation_id: &str) -> i64 {
1039 use std::collections::hash_map::DefaultHasher;
1040 use std::hash::{Hash, Hasher};
1041
1042 let mut hasher = DefaultHasher::new();
1043 operation_id.hash(&mut hasher);
1044 hasher.finish() as i64
1045 }
1046
1047 async fn execute_deduplication_with_transaction(
1049 &self,
1050 transaction: &mut Transaction<'_, Postgres>,
1051 memory_ids: &[Uuid],
1052 operation_id: &str,
1053 ) -> Result<DeduplicationResult> {
1054 let mut result = DeduplicationResult::default();
1055
1056 let memories = self
1058 .load_memories_with_embeddings_tx(transaction, memory_ids)
1059 .await?;
1060 if memories.is_empty() {
1061 return Ok(result);
1062 }
1063
1064 result.total_processed = memories.len();
1065
1066 let similar_groups = self.find_similar_groups_optimized(&memories).await?;
1068 result.groups_identified = similar_groups.len();
1069
1070 for group in similar_groups {
1072 match self
1073 .process_similar_group_tx(transaction, group, operation_id)
1074 .await
1075 {
1076 Ok(merge_result) => {
1077 result.memories_merged += merge_result.memories_merged;
1078 result.storage_saved_bytes += merge_result.storage_saved;
1079 result.compression_ratio =
1080 (result.compression_ratio + merge_result.compression_ratio) / 2.0;
1081 }
1082 Err(e) => {
1083 warn!(
1084 "Failed to process similar group in operation {}: {}",
1085 operation_id, e
1086 );
1087 result.errors_encountered += 1;
1088 return Err(e);
1090 }
1091 }
1092 }
1093
1094 Ok(result)
1095 }
1096}
1097
1098pub struct MemoryMerger {
1100 #[allow(dead_code)]
1101 config: SemanticDeduplicationConfig,
1102 repository: Arc<MemoryRepository>,
1103}
1104
1105impl MemoryMerger {
1106 pub fn new(config: SemanticDeduplicationConfig, repository: Arc<MemoryRepository>) -> Self {
1107 Self { config, repository }
1108 }
1109
1110 pub async fn merge_group(&self, group: &SimilarMemoryGroup) -> Result<MergeResult> {
1112 match group.merge_strategy {
1113 MergeStrategy::LosslessPreservation => self.merge_lossless(group).await,
1114 MergeStrategy::MetadataConsolidation => {
1115 self.merge_with_metadata_consolidation(group).await
1116 }
1117 MergeStrategy::ContentSummarization => self.merge_with_summarization(group).await,
1118 }
1119 }
1120
1121 pub async fn merge_group_tx(
1123 &self,
1124 transaction: &mut Transaction<'_, Postgres>,
1125 group: &SimilarMemoryGroup,
1126 ) -> Result<MergeResult> {
1127 match group.merge_strategy {
1128 MergeStrategy::LosslessPreservation => self.merge_lossless_tx(transaction, group).await,
1129 MergeStrategy::MetadataConsolidation => {
1130 self.merge_with_metadata_consolidation_tx(transaction, group)
1131 .await
1132 }
1133 MergeStrategy::ContentSummarization => {
1134 self.merge_with_summarization_tx(transaction, group).await
1135 }
1136 }
1137 }
1138
1139 async fn merge_lossless(&self, group: &SimilarMemoryGroup) -> Result<MergeResult> {
1141 let memories = &group.memories;
1142 let primary_memory = &memories[0]; let combined_content = memories
1146 .iter()
1147 .enumerate()
1148 .map(|(i, m)| format!("--- Memory {} (ID: {}) ---\n{}", i + 1, m.id, m.content))
1149 .collect::<Vec<_>>()
1150 .join("\n\n");
1151
1152 let combined_metadata = self.merge_metadata_lossless(memories)?;
1154
1155 let combined_embedding = self.calculate_combined_embedding(memories)?;
1157
1158 let combined_importance = self.calculate_weighted_importance(memories);
1160
1161 let combined_content_len = combined_content.len();
1163
1164 let merged_memory = self
1166 .create_merged_memory(
1167 combined_content,
1168 combined_embedding,
1169 combined_importance,
1170 combined_metadata,
1171 primary_memory,
1172 )
1173 .await?;
1174
1175 let storage_saved = self.archive_original_memories(memories).await?;
1177
1178 Ok(MergeResult {
1179 merged_memory,
1180 storage_saved,
1181 compression_ratio: storage_saved as f32 / combined_content_len as f32,
1182 original_count: memories.len(),
1183 })
1184 }
1185
1186 async fn merge_with_metadata_consolidation(
1188 &self,
1189 group: &SimilarMemoryGroup,
1190 ) -> Result<MergeResult> {
1191 let memories = &group.memories;
1192 let primary_memory = &memories[0];
1193
1194 let mut combined_content = primary_memory.content.clone();
1196 combined_content.push_str("\n\n--- Related Content References ---\n");
1197
1198 for (i, memory) in memories.iter().skip(1).enumerate() {
1199 combined_content.push_str(&format!(
1200 "Related Memory {}: {} (similarity: {:.3})\n",
1201 i + 1,
1202 memory.content.chars().take(100).collect::<String>(),
1203 group.average_similarity
1204 ));
1205 }
1206
1207 let combined_metadata = self.merge_metadata_consolidated(memories)?;
1208 let combined_embedding = self.calculate_combined_embedding(memories)?;
1209 let combined_importance = self.calculate_weighted_importance(memories);
1210
1211 let merged_memory = self
1212 .create_merged_memory(
1213 combined_content,
1214 combined_embedding,
1215 combined_importance,
1216 combined_metadata,
1217 primary_memory,
1218 )
1219 .await?;
1220
1221 let storage_saved = self.archive_original_memories(memories).await?;
1222
1223 Ok(MergeResult {
1224 merged_memory,
1225 storage_saved,
1226 compression_ratio: 2.0, original_count: memories.len(),
1228 })
1229 }
1230
1231 async fn merge_with_summarization(&self, group: &SimilarMemoryGroup) -> Result<MergeResult> {
1233 let memories = &group.memories;
1234 let primary_memory = &memories[0];
1235
1236 let summary_content = self.create_content_summary(memories).await?;
1238
1239 let combined_metadata = self.merge_metadata_summarized(memories)?;
1240 let combined_embedding = self.calculate_combined_embedding(memories)?;
1241 let combined_importance = self.calculate_weighted_importance(memories);
1242
1243 let merged_memory = self
1244 .create_merged_memory(
1245 summary_content,
1246 combined_embedding,
1247 combined_importance,
1248 combined_metadata,
1249 primary_memory,
1250 )
1251 .await?;
1252
1253 let storage_saved = self.archive_original_memories(memories).await?;
1254
1255 Ok(MergeResult {
1256 merged_memory,
1257 storage_saved,
1258 compression_ratio: 5.0, original_count: memories.len(),
1260 })
1261 }
1262
1263 async fn create_content_summary(&self, memories: &[Memory]) -> Result<String> {
1265 let mut key_sentences = Vec::new();
1267
1268 for memory in memories {
1269 let sentences: Vec<&str> = memory.content.split('.').collect();
1270 if !sentences.is_empty() {
1271 key_sentences.push(sentences[0]); }
1273 }
1274
1275 let summary = format!(
1276 "Summary of {} related memories:\n{}",
1277 memories.len(),
1278 key_sentences.join(". ")
1279 );
1280
1281 Ok(summary)
1282 }
1283
1284 fn merge_metadata_lossless(&self, memories: &[Memory]) -> Result<serde_json::Value> {
1286 let mut combined = serde_json::Map::new();
1287
1288 combined.insert(
1290 "merge_info".to_string(),
1291 serde_json::json!({
1292 "merge_type": "lossless",
1293 "original_count": memories.len(),
1294 "merged_at": Utc::now(),
1295 "original_ids": memories.iter().map(|m| m.id).collect::<Vec<_>>()
1296 }),
1297 );
1298
1299 for (i, memory) in memories.iter().enumerate() {
1301 if let serde_json::Value::Object(metadata_map) = &memory.metadata {
1302 for (key, value) in metadata_map {
1303 let prefixed_key = format!("memory_{i}_{key}");
1304 combined.insert(prefixed_key, value.clone());
1305 }
1306 }
1307 }
1308
1309 Ok(serde_json::Value::Object(combined))
1310 }
1311
1312 fn merge_metadata_consolidated(&self, memories: &[Memory]) -> Result<serde_json::Value> {
1314 let mut combined = serde_json::Map::new();
1315
1316 combined.insert(
1317 "merge_info".to_string(),
1318 serde_json::json!({
1319 "merge_type": "consolidated",
1320 "original_count": memories.len(),
1321 "merged_at": Utc::now(),
1322 "original_ids": memories.iter().map(|m| m.id).collect::<Vec<_>>()
1323 }),
1324 );
1325
1326 let mut common_keys = HashMap::new();
1328 for memory in memories {
1329 if let serde_json::Value::Object(metadata_map) = &memory.metadata {
1330 for (key, value) in metadata_map {
1331 common_keys
1332 .entry(key.clone())
1333 .or_insert_with(Vec::new)
1334 .push(value.clone());
1335 }
1336 }
1337 }
1338
1339 for (key, values) in common_keys {
1340 if values.len() == memories.len() && values.iter().all(|v| v == &values[0]) {
1341 combined.insert(key, values[0].clone());
1343 } else {
1344 combined.insert(key, serde_json::Value::Array(values));
1346 }
1347 }
1348
1349 Ok(serde_json::Value::Object(combined))
1350 }
1351
1352 fn merge_metadata_summarized(&self, memories: &[Memory]) -> Result<serde_json::Value> {
1354 let mut combined = serde_json::Map::new();
1355
1356 combined.insert(
1357 "merge_info".to_string(),
1358 serde_json::json!({
1359 "merge_type": "summarized",
1360 "original_count": memories.len(),
1361 "merged_at": Utc::now(),
1362 "original_ids": memories.iter().map(|m| m.id).collect::<Vec<_>>(),
1363 "compression_applied": true
1364 }),
1365 );
1366
1367 if let serde_json::Value::Object(primary_metadata) = &memories[0].metadata {
1369 for (key, value) in primary_metadata {
1370 if ["tags", "category", "type", "priority"].contains(&key.as_str()) {
1371 combined.insert(key.clone(), value.clone());
1372 }
1373 }
1374 }
1375
1376 Ok(serde_json::Value::Object(combined))
1377 }
1378
1379 fn calculate_combined_embedding(&self, memories: &[Memory]) -> Result<Option<Vector>> {
1381 let embeddings: Vec<&Vector> = memories
1382 .iter()
1383 .filter_map(|m| m.embedding.as_ref())
1384 .collect();
1385
1386 if embeddings.is_empty() {
1387 return Ok(None);
1388 }
1389
1390 let dimension = embeddings[0].as_slice().len();
1391 let mut combined = vec![0.0f32; dimension];
1392
1393 for embedding in &embeddings {
1394 let slice = embedding.as_slice();
1395 for (i, &value) in slice.iter().enumerate() {
1396 combined[i] += value;
1397 }
1398 }
1399
1400 for value in &mut combined {
1402 *value /= embeddings.len() as f32;
1403 }
1404
1405 Ok(Some(Vector::from(combined)))
1406 }
1407
1408 fn calculate_weighted_importance(&self, memories: &[Memory]) -> f64 {
1410 let total_weight: f64 = memories.iter().map(|m| m.access_count as f64 + 1.0).sum();
1411 let weighted_sum: f64 = memories
1412 .iter()
1413 .map(|m| m.importance_score * (m.access_count as f64 + 1.0))
1414 .sum();
1415
1416 weighted_sum / total_weight
1417 }
1418
1419 async fn create_merged_memory_tx(
1421 &self,
1422 transaction: &mut Transaction<'_, Postgres>,
1423 content: String,
1424 embedding: Option<Vector>,
1425 importance: f64,
1426 metadata: serde_json::Value,
1427 primary_memory: &Memory,
1428 ) -> Result<Memory> {
1429 let memory_id = Uuid::new_v4();
1430 let content_hash = format!("{:x}", md5::compute(&content));
1431 let now = Utc::now();
1432
1433 let query = r#"
1434 INSERT INTO memories (
1435 id, content, content_hash, embedding, tier, status,
1436 importance_score, access_count, last_accessed_at, metadata,
1437 parent_id, created_at, updated_at, expires_at,
1438 consolidation_strength, decay_rate, recall_probability,
1439 recency_score, relevance_score, is_merged_result,
1440 original_memory_count, merge_generation
1441 ) VALUES (
1442 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1443 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
1444 ) RETURNING *
1445 "#;
1446
1447 let row = sqlx::query(query)
1448 .bind(memory_id)
1449 .bind(&content)
1450 .bind(&content_hash)
1451 .bind(embedding)
1452 .bind(primary_memory.tier)
1453 .bind(MemoryStatus::Active)
1454 .bind(importance)
1455 .bind(0i32) .bind(now) .bind(&metadata)
1458 .bind(primary_memory.parent_id)
1459 .bind(now) .bind(now) .bind(primary_memory.expires_at)
1462 .bind(primary_memory.consolidation_strength)
1463 .bind(primary_memory.decay_rate)
1464 .bind(primary_memory.recall_probability)
1465 .bind(primary_memory.recency_score)
1466 .bind(primary_memory.relevance_score)
1467 .bind(true) .bind(1i32) .bind(
1470 primary_memory
1471 .metadata
1472 .get("merge_generation")
1473 .and_then(|v| v.as_i64())
1474 .unwrap_or(0)
1475 + 1,
1476 ) .fetch_one(&mut **transaction)
1478 .await
1479 .map_err(|e| MemoryError::DatabaseError {
1480 message: format!("Failed to create merged memory: {e}"),
1481 })?;
1482
1483 Ok(Memory {
1484 id: row.get("id"),
1485 content: row.get("content"),
1486 content_hash: row.get("content_hash"),
1487 embedding: row.get("embedding"),
1488 tier: row.get("tier"),
1489 status: row.get("status"),
1490 importance_score: row.get("importance_score"),
1491 access_count: row.get("access_count"),
1492 last_accessed_at: row.get("last_accessed_at"),
1493 metadata: row.get("metadata"),
1494 parent_id: row.get("parent_id"),
1495 created_at: row.get("created_at"),
1496 updated_at: row.get("updated_at"),
1497 expires_at: row.get("expires_at"),
1498 consolidation_strength: row.get("consolidation_strength"),
1499 decay_rate: row.get("decay_rate"),
1500 recall_probability: row.get("recall_probability"),
1501 last_recall_interval: row.get("last_recall_interval"),
1502 recency_score: row.get("recency_score"),
1503 relevance_score: row.get("relevance_score"),
1504 })
1505 }
1506
1507 async fn create_merged_memory(
1509 &self,
1510 content: String,
1511 embedding: Option<Vector>,
1512 importance: f64,
1513 metadata: serde_json::Value,
1514 primary_memory: &Memory,
1515 ) -> Result<Memory> {
1516 let create_request = CreateMemoryRequest {
1517 content,
1518 embedding: embedding.map(|v| v.as_slice().to_vec()),
1519 tier: Some(primary_memory.tier),
1520 importance_score: Some(importance),
1521 metadata: Some(metadata),
1522 parent_id: None,
1523 expires_at: primary_memory.expires_at,
1524 };
1525
1526 self.repository.create_memory(create_request).await
1527 }
1528
1529 async fn merge_lossless_tx(
1531 &self,
1532 transaction: &mut Transaction<'_, Postgres>,
1533 group: &SimilarMemoryGroup,
1534 ) -> Result<MergeResult> {
1535 let memories = &group.memories;
1536 let primary_memory = &memories[0];
1537
1538 let combined_content = memories
1540 .iter()
1541 .enumerate()
1542 .map(|(i, m)| format!("--- Memory {} (ID: {}) ---\n{}", i + 1, m.id, m.content))
1543 .collect::<Vec<_>>()
1544 .join("\n\n");
1545
1546 let combined_metadata = self.merge_metadata_lossless(memories)?;
1548 let combined_embedding = self.calculate_combined_embedding(memories)?;
1549 let combined_importance = self.calculate_weighted_importance(memories);
1550
1551 let merged_memory = self
1553 .create_merged_memory_tx(
1554 transaction,
1555 combined_content,
1556 combined_embedding,
1557 combined_importance,
1558 combined_metadata,
1559 primary_memory,
1560 )
1561 .await?;
1562
1563 let storage_saved = self
1565 .archive_original_memories_tx(transaction, memories, merged_memory.id)
1566 .await?;
1567
1568 Ok(MergeResult {
1569 merged_memory: merged_memory.clone(),
1570 storage_saved,
1571 compression_ratio: storage_saved as f32 / merged_memory.content.len() as f32,
1572 original_count: memories.len(),
1573 })
1574 }
1575
1576 async fn merge_with_metadata_consolidation_tx(
1578 &self,
1579 transaction: &mut Transaction<'_, Postgres>,
1580 group: &SimilarMemoryGroup,
1581 ) -> Result<MergeResult> {
1582 let memories = &group.memories;
1583 let primary_memory = &memories[0];
1584
1585 let mut combined_content = primary_memory.content.clone();
1587 combined_content.push_str("\n\n--- Related Content References ---\n");
1588
1589 for (i, memory) in memories.iter().skip(1).enumerate() {
1590 combined_content.push_str(&format!(
1591 "Related Memory {}: {} (similarity: {:.3})\n",
1592 i + 1,
1593 memory.content.chars().take(100).collect::<String>(),
1594 group.average_similarity
1595 ));
1596 }
1597
1598 let combined_metadata = self.merge_metadata_consolidated(memories)?;
1599 let combined_embedding = self.calculate_combined_embedding(memories)?;
1600 let combined_importance = self.calculate_weighted_importance(memories);
1601
1602 let merged_memory = self
1603 .create_merged_memory_tx(
1604 transaction,
1605 combined_content,
1606 combined_embedding,
1607 combined_importance,
1608 combined_metadata,
1609 primary_memory,
1610 )
1611 .await?;
1612
1613 let storage_saved = self
1614 .archive_original_memories_tx(transaction, memories, merged_memory.id)
1615 .await?;
1616
1617 Ok(MergeResult {
1618 merged_memory,
1619 storage_saved,
1620 compression_ratio: 2.0, original_count: memories.len(),
1622 })
1623 }
1624
1625 async fn merge_with_summarization_tx(
1627 &self,
1628 transaction: &mut Transaction<'_, Postgres>,
1629 group: &SimilarMemoryGroup,
1630 ) -> Result<MergeResult> {
1631 let memories = &group.memories;
1632 let primary_memory = &memories[0];
1633
1634 let summary_content = self.create_content_summary(memories).await?;
1636
1637 let combined_metadata = self.merge_metadata_summarized(memories)?;
1638 let combined_embedding = self.calculate_combined_embedding(memories)?;
1639 let combined_importance = self.calculate_weighted_importance(memories);
1640
1641 let merged_memory = self
1642 .create_merged_memory_tx(
1643 transaction,
1644 summary_content,
1645 combined_embedding,
1646 combined_importance,
1647 combined_metadata,
1648 primary_memory,
1649 )
1650 .await?;
1651
1652 let storage_saved = self
1653 .archive_original_memories_tx(transaction, memories, merged_memory.id)
1654 .await?;
1655
1656 Ok(MergeResult {
1657 merged_memory,
1658 storage_saved,
1659 compression_ratio: 5.0, original_count: memories.len(),
1661 })
1662 }
1663
1664 async fn archive_original_memories_tx(
1666 &self,
1667 transaction: &mut Transaction<'_, Postgres>,
1668 memories: &[Memory],
1669 merged_memory_id: Uuid,
1670 ) -> Result<u64> {
1671 let mut total_size = 0u64;
1672
1673 let merged_exists = sqlx::query_scalar::<_, bool>(
1675 "SELECT EXISTS(SELECT 1 FROM memories WHERE id = $1 AND status = 'active')",
1676 )
1677 .bind(merged_memory_id)
1678 .fetch_one(&mut **transaction)
1679 .await
1680 .map_err(|e| MemoryError::DatabaseError {
1681 message: format!("Failed to verify merged memory exists: {e}"),
1682 })?;
1683
1684 if !merged_exists {
1685 return Err(MemoryError::SafetyViolation {
1686 message: format!(
1687 "Cannot archive original memories: merged memory {merged_memory_id} does not exist"
1688 ),
1689 });
1690 }
1691
1692 for memory in memories {
1693 let current_status =
1695 sqlx::query_scalar::<_, String>("SELECT status FROM memories WHERE id = $1")
1696 .bind(memory.id)
1697 .fetch_optional(&mut **transaction)
1698 .await
1699 .map_err(|e| MemoryError::DatabaseError {
1700 message: format!("Failed to check memory status: {e}"),
1701 })?;
1702
1703 match current_status {
1704 Some(status) if status == "active" => {
1705 total_size += memory.content.len() as u64;
1706
1707 sqlx::query(
1709 r#"
1710 INSERT INTO memory_compression_log (
1711 memory_id, original_content, original_metadata,
1712 compression_type, compression_ratio,
1713 reversible_until
1714 ) VALUES ($1, $2, $3, $4, $5, $6)
1715 "#,
1716 )
1717 .bind(memory.id)
1718 .bind(&memory.content)
1719 .bind(&memory.metadata)
1720 .bind("archive")
1721 .bind(1.0)
1722 .bind(Utc::now() + Duration::days(7))
1723 .execute(&mut **transaction)
1724 .await
1725 .map_err(|e| MemoryError::DatabaseError {
1726 message: format!("Failed to create archive backup: {e}"),
1727 })?;
1728
1729 let archive_result = sqlx::query(
1731 "UPDATE memories SET status = 'archived', updated_at = NOW() WHERE id = $1 AND status = 'active'",
1732 )
1733 .bind(memory.id)
1734 .execute(&mut **transaction)
1735 .await
1736 .map_err(|e| MemoryError::DatabaseError {
1737 message: format!("Failed to archive memory: {e}"),
1738 })?;
1739
1740 if archive_result.rows_affected() == 0 {
1741 warn!(
1742 "Memory {} was not archived (may have been modified concurrently)",
1743 memory.id
1744 );
1745 } else {
1746 debug!("Successfully archived memory: {}", memory.id);
1747 }
1748 }
1749 Some(status) => {
1750 warn!(
1751 "Skipping archival of memory {} - status is {}, not active",
1752 memory.id, status
1753 );
1754 }
1755 None => {
1756 warn!("Memory {} no longer exists, cannot archive", memory.id);
1757 }
1758 }
1759 }
1760
1761 Ok(total_size)
1762 }
1763
1764 async fn archive_original_memories(&self, memories: &[Memory]) -> Result<u64> {
1766 let mut total_size = 0u64;
1767
1768 for memory in memories {
1769 total_size += memory.content.len() as u64;
1770
1771 sqlx::query(
1773 "UPDATE memories SET status = 'archived', updated_at = NOW() WHERE id = $1",
1774 )
1775 .bind(memory.id)
1776 .execute(self.repository.pool())
1777 .await?;
1778 }
1779
1780 Ok(total_size)
1781 }
1782}
1783
1784pub struct CompressionManager {
1786 config: SemanticDeduplicationConfig,
1787}
1788
1789impl CompressionManager {
1790 pub fn new(config: SemanticDeduplicationConfig) -> Self {
1791 Self { config }
1792 }
1793
1794 pub async fn compress_memory(&self, memory: &Memory) -> Result<CompressionResult> {
1796 let is_critical =
1797 memory.importance_score > 0.8 || matches!(memory.tier, MemoryTier::Working);
1798
1799 if is_critical && self.config.lossless_critical {
1800 self.apply_lossless_compression(memory).await
1801 } else {
1802 self.apply_lossy_compression(memory).await
1803 }
1804 }
1805
1806 async fn apply_lossless_compression(&self, memory: &Memory) -> Result<CompressionResult> {
1808 let compressed_content = self.normalize_content(&memory.content);
1810 let compression_ratio = memory.content.len() as f32 / compressed_content.len() as f32;
1811
1812 Ok(CompressionResult {
1813 original_size: memory.content.len(),
1814 compressed_size: compressed_content.len(),
1815 compression_ratio,
1816 is_lossless: true,
1817 compressed_content,
1818 })
1819 }
1820
1821 async fn apply_lossy_compression(&self, memory: &Memory) -> Result<CompressionResult> {
1823 let compressed_content = self.extract_key_information(&memory.content);
1825 let compression_ratio = memory.content.len() as f32 / compressed_content.len() as f32;
1826
1827 Ok(CompressionResult {
1828 original_size: memory.content.len(),
1829 compressed_size: compressed_content.len(),
1830 compression_ratio,
1831 is_lossless: false,
1832 compressed_content,
1833 })
1834 }
1835
1836 fn normalize_content(&self, content: &str) -> String {
1838 content
1840 .lines()
1841 .map(|line| line.trim())
1842 .filter(|line| !line.is_empty())
1843 .collect::<Vec<_>>()
1844 .join("\n")
1845 }
1846
1847 fn extract_key_information(&self, content: &str) -> String {
1849 let sentences: Vec<&str> = content
1851 .split('.')
1852 .filter(|s| !s.trim().is_empty())
1853 .collect();
1854
1855 if sentences.len() <= 2 {
1856 return content.to_string();
1857 }
1858
1859 format!(
1860 "{}. ... {} (content compressed from {} to 2 key sentences)",
1861 sentences[0].trim(),
1862 sentences.last().unwrap().trim(),
1863 sentences.len()
1864 )
1865 }
1866}
1867
1868pub struct AuditTrail {
1870 repository: Arc<MemoryRepository>,
1871}
1872
1873impl AuditTrail {
1874 pub fn new(repository: Arc<MemoryRepository>) -> Self {
1875 Self { repository }
1876 }
1877
1878 pub async fn create_merge_entry_tx(
1880 &self,
1881 transaction: &mut Transaction<'_, Postgres>,
1882 group: &SimilarMemoryGroup,
1883 ) -> Result<AuditEntry> {
1884 let entry_id = Uuid::new_v4();
1885 let operation_data = serde_json::json!({
1886 "operation_type": "merge",
1887 "memory_ids": group.memories.iter().map(|m| m.id).collect::<Vec<_>>(),
1888 "strategy": group.merge_strategy,
1889 "similarity": group.average_similarity,
1890 "memory_count": group.memories.len()
1891 });
1892
1893 sqlx::query(
1894 r#"
1895 INSERT INTO deduplication_audit_log
1896 (id, operation_type, operation_data, created_at, status)
1897 VALUES ($1, $2, $3, $4, $5)
1898 "#,
1899 )
1900 .bind(entry_id)
1901 .bind("merge")
1902 .bind(operation_data)
1903 .bind(Utc::now())
1904 .bind("in_progress")
1905 .execute(&mut **transaction)
1906 .await
1907 .map_err(|e| MemoryError::DatabaseError {
1908 message: format!("Failed to create merge audit entry: {e}"),
1909 })?;
1910
1911 Ok(AuditEntry {
1912 id: entry_id,
1913 operation_type: "merge".to_string(),
1914 created_at: Utc::now(),
1915 status: "in_progress".to_string(),
1916 })
1917 }
1918
1919 pub async fn create_merge_entry(&self, group: &SimilarMemoryGroup) -> Result<AuditEntry> {
1921 let entry_id = Uuid::new_v4();
1922 let operation_data = serde_json::json!({
1923 "operation_type": "merge",
1924 "memory_ids": group.memories.iter().map(|m| m.id).collect::<Vec<_>>(),
1925 "strategy": group.merge_strategy,
1926 "similarity": group.average_similarity,
1927 "memory_count": group.memories.len()
1928 });
1929
1930 sqlx::query(
1931 r#"
1932 INSERT INTO deduplication_audit_log
1933 (id, operation_type, operation_data, created_at, status)
1934 VALUES ($1, $2, $3, $4, $5)
1935 "#,
1936 )
1937 .bind(entry_id)
1938 .bind("merge")
1939 .bind(operation_data)
1940 .bind(Utc::now())
1941 .bind("in_progress")
1942 .execute(self.repository.pool())
1943 .await?;
1944
1945 Ok(AuditEntry {
1946 id: entry_id,
1947 operation_type: "merge".to_string(),
1948 created_at: Utc::now(),
1949 status: "in_progress".to_string(),
1950 })
1951 }
1952
1953 pub async fn complete_merge_entry_tx(
1955 &self,
1956 transaction: &mut Transaction<'_, Postgres>,
1957 entry_id: Uuid,
1958 result: &MergeResult,
1959 ) -> Result<()> {
1960 let completion_data = serde_json::json!({
1961 "merged_memory_id": result.merged_memory.id,
1962 "storage_saved": result.storage_saved,
1963 "compression_ratio": result.compression_ratio,
1964 "original_count": result.original_count
1965 });
1966
1967 sqlx::query(
1968 r#"
1969 UPDATE deduplication_audit_log
1970 SET status = $1, completion_data = $2, completed_at = $3
1971 WHERE id = $4
1972 "#,
1973 )
1974 .bind("completed")
1975 .bind(completion_data)
1976 .bind(Utc::now())
1977 .bind(entry_id)
1978 .execute(&mut **transaction)
1979 .await
1980 .map_err(|e| MemoryError::DatabaseError {
1981 message: format!("Failed to complete merge audit entry: {e}"),
1982 })?;
1983
1984 Ok(())
1985 }
1986
1987 pub async fn complete_merge_entry(&self, entry_id: Uuid, result: &MergeResult) -> Result<()> {
1989 let completion_data = serde_json::json!({
1990 "merged_memory_id": result.merged_memory.id,
1991 "storage_saved": result.storage_saved,
1992 "compression_ratio": result.compression_ratio,
1993 "original_count": result.original_count
1994 });
1995
1996 sqlx::query(
1997 r#"
1998 UPDATE deduplication_audit_log
1999 SET status = $1, completion_data = $2, completed_at = $3
2000 WHERE id = $4
2001 "#,
2002 )
2003 .bind("completed")
2004 .bind(completion_data)
2005 .bind(Utc::now())
2006 .bind(entry_id)
2007 .execute(self.repository.pool())
2008 .await?;
2009
2010 Ok(())
2011 }
2012
2013 pub async fn record_pruning(&self, pruned_ids: &[Uuid], reason: &str) -> Result<AuditEntry> {
2015 let entry_id = Uuid::new_v4();
2016 let operation_data = serde_json::json!({
2017 "operation_type": "prune",
2018 "pruned_memory_ids": pruned_ids,
2019 "reason": reason,
2020 "count": pruned_ids.len()
2021 });
2022
2023 sqlx::query(
2024 r#"
2025 INSERT INTO deduplication_audit_log
2026 (id, operation_type, operation_data, created_at, status, completed_at)
2027 VALUES ($1, $2, $3, $4, $5, $6)
2028 "#,
2029 )
2030 .bind(entry_id)
2031 .bind("prune")
2032 .bind(operation_data)
2033 .bind(Utc::now())
2034 .bind("completed")
2035 .bind(Utc::now())
2036 .execute(self.repository.pool())
2037 .await?;
2038
2039 Ok(AuditEntry {
2040 id: entry_id,
2041 operation_type: "prune".to_string(),
2042 created_at: Utc::now(),
2043 status: "completed".to_string(),
2044 })
2045 }
2046
2047 pub async fn get_reversible_operations(&self) -> Result<Vec<ReversibleOperation>> {
2049 let cutoff_date = Utc::now() - Duration::days(7);
2050
2051 let query = r#"
2052 SELECT id, operation_type, operation_data, completion_data, created_at
2053 FROM deduplication_audit_log
2054 WHERE created_at > $1
2055 AND operation_type IN ('merge', 'prune')
2056 AND status = 'completed'
2057 AND reversible_until IS NOT NULL
2058 AND reversible_until > NOW()
2059 ORDER BY created_at DESC
2060 "#;
2061
2062 let rows = sqlx::query(query)
2063 .bind(cutoff_date)
2064 .fetch_all(self.repository.pool())
2065 .await?;
2066
2067 let mut operations = Vec::new();
2068 for row in rows {
2069 operations.push(ReversibleOperation {
2070 id: row.get("id"),
2071 operation_type: row.get("operation_type"),
2072 operation_data: row.get("operation_data"),
2073 completion_data: row.get("completion_data"),
2074 created_at: row.get("created_at"),
2075 });
2076 }
2077
2078 Ok(operations)
2079 }
2080
2081 pub async fn reverse_operation(&self, operation_id: Uuid) -> Result<ReversalResult> {
2083 info!("Starting reversal of operation: {}", operation_id);
2084
2085 let mut transaction =
2087 self.repository
2088 .pool()
2089 .begin()
2090 .await
2091 .map_err(|e| MemoryError::DatabaseError {
2092 message: format!("Failed to begin reversal transaction: {e}"),
2093 })?;
2094
2095 let result = self
2096 .execute_operation_reversal(&mut transaction, operation_id)
2097 .await;
2098
2099 match result {
2100 Ok(reversal_result) => {
2101 transaction
2102 .commit()
2103 .await
2104 .map_err(|e| MemoryError::DatabaseError {
2105 message: format!("Failed to commit reversal transaction: {e}"),
2106 })?;
2107 info!("Successfully reversed operation: {}", operation_id);
2108 Ok(reversal_result)
2109 }
2110 Err(e) => {
2111 if let Err(rollback_err) = transaction.rollback().await {
2112 error!("Failed to rollback reversal transaction: {}", rollback_err);
2113 }
2114 error!("Reversal failed for operation {}: {}", operation_id, e);
2115 Err(e)
2116 }
2117 }
2118 }
2119
2120 async fn execute_operation_reversal(
2122 &self,
2123 transaction: &mut Transaction<'_, Postgres>,
2124 operation_id: Uuid,
2125 ) -> Result<ReversalResult> {
2126 let operation_query = r#"
2128 SELECT operation_type, operation_data, completion_data, status, reversible_until
2129 FROM deduplication_audit_log
2130 WHERE id = $1
2131 "#;
2132
2133 let operation_row = sqlx::query(operation_query)
2134 .bind(operation_id)
2135 .fetch_optional(&mut **transaction)
2136 .await
2137 .map_err(|e| MemoryError::DatabaseError {
2138 message: format!("Failed to fetch operation details: {e}"),
2139 })?
2140 .ok_or_else(|| MemoryError::NotFound {
2141 id: operation_id.to_string(),
2142 })?;
2143
2144 let operation_type: String = operation_row.get("operation_type");
2145 let operation_data: serde_json::Value = operation_row.get("operation_data");
2146 let status: String = operation_row.get("status");
2147 let reversible_until: Option<DateTime<Utc>> = operation_row.get("reversible_until");
2148
2149 if status != "completed" {
2151 return Err(MemoryError::InvalidRequest {
2152 message: format!("Operation {operation_id} is not in completed state: {status}"),
2153 });
2154 }
2155
2156 if let Some(cutoff) = reversible_until {
2157 if Utc::now() > cutoff {
2158 return Err(MemoryError::InvalidRequest {
2159 message: format!("Operation {operation_id} is past its reversal deadline"),
2160 });
2161 }
2162 } else {
2163 return Err(MemoryError::InvalidRequest {
2164 message: format!("Operation {operation_id} is not reversible"),
2165 });
2166 }
2167
2168 let reversal_result = match operation_type.as_str() {
2170 "merge" => {
2171 self.reverse_merge_operation(transaction, operation_id, &operation_data)
2172 .await?
2173 }
2174 "prune" => {
2175 self.reverse_prune_operation(transaction, operation_id, &operation_data)
2176 .await?
2177 }
2178 _ => {
2179 return Err(MemoryError::InvalidRequest {
2180 message: format!("Unsupported operation type for reversal: {operation_type}"),
2181 });
2182 }
2183 };
2184
2185 sqlx::query(
2187 r#"
2188 UPDATE deduplication_audit_log
2189 SET status = 'reversed', reversible_until = NULL
2190 WHERE id = $1
2191 "#,
2192 )
2193 .bind(operation_id)
2194 .execute(&mut **transaction)
2195 .await
2196 .map_err(|e| MemoryError::DatabaseError {
2197 message: format!("Failed to mark operation as reversed: {e}"),
2198 })?;
2199
2200 Ok(reversal_result)
2201 }
2202
2203 async fn reverse_merge_operation(
2205 &self,
2206 transaction: &mut Transaction<'_, Postgres>,
2207 operation_id: Uuid,
2208 operation_data: &serde_json::Value,
2209 ) -> Result<ReversalResult> {
2210 info!("Reversing merge operation: {}", operation_id);
2211
2212 let memory_ids: Vec<Uuid> = operation_data["memory_ids"]
2214 .as_array()
2215 .ok_or_else(|| MemoryError::InvalidData {
2216 message: "Missing memory_ids in merge operation data".to_string(),
2217 })?
2218 .iter()
2219 .map(|id| {
2220 Uuid::parse_str(id.as_str().unwrap_or("")).map_err(|e| MemoryError::InvalidData {
2221 message: format!("Invalid UUID in memory_ids: {e}"),
2222 })
2223 })
2224 .collect::<Result<Vec<_>>>()?;
2225
2226 let merge_history_query = r#"
2228 SELECT merged_memory_id, original_memory_id
2229 FROM memory_merge_history
2230 WHERE merge_operation_id = $1
2231 "#;
2232
2233 let merge_rows = sqlx::query(merge_history_query)
2234 .bind(operation_id)
2235 .fetch_all(&mut **transaction)
2236 .await
2237 .map_err(|e| MemoryError::DatabaseError {
2238 message: format!("Failed to fetch merge history: {e}"),
2239 })?;
2240
2241 if merge_rows.is_empty() {
2242 return Err(MemoryError::InvalidData {
2243 message: "No merge history found for operation".to_string(),
2244 });
2245 }
2246
2247 let merged_memory_id: Uuid = merge_rows[0].get("merged_memory_id");
2248 let mut restored_count = 0;
2249
2250 for memory_id in &memory_ids {
2252 let restore_result = sqlx::query(
2253 r#"
2254 UPDATE memories
2255 SET status = 'active', updated_at = NOW()
2256 WHERE id = $1 AND status = 'archived'
2257 "#,
2258 )
2259 .bind(memory_id)
2260 .execute(&mut **transaction)
2261 .await
2262 .map_err(|e| MemoryError::DatabaseError {
2263 message: format!("Failed to restore memory {memory_id}: {e}"),
2264 })?;
2265
2266 if restore_result.rows_affected() > 0 {
2267 restored_count += 1;
2268 info!("Restored original memory: {}", memory_id);
2269 } else {
2270 warn!(
2271 "Memory {} was not in archived status, cannot restore",
2272 memory_id
2273 );
2274 }
2275 }
2276
2277 sqlx::query(
2279 r#"
2280 UPDATE memories
2281 SET status = 'archived', updated_at = NOW()
2282 WHERE id = $1
2283 "#,
2284 )
2285 .bind(merged_memory_id)
2286 .execute(&mut **transaction)
2287 .await
2288 .map_err(|e| MemoryError::DatabaseError {
2289 message: format!("Failed to archive merged memory: {e}"),
2290 })?;
2291
2292 info!("Archived merged memory result: {}", merged_memory_id);
2293
2294 Ok(ReversalResult {
2295 operation_id,
2296 operation_type: "merge".to_string(),
2297 memories_restored: restored_count,
2298 success: restored_count > 0,
2299 message: format!(
2300 "Restored {} of {} original memories from merge operation",
2301 restored_count,
2302 memory_ids.len()
2303 ),
2304 })
2305 }
2306
2307 async fn reverse_prune_operation(
2309 &self,
2310 transaction: &mut Transaction<'_, Postgres>,
2311 operation_id: Uuid,
2312 operation_data: &serde_json::Value,
2313 ) -> Result<ReversalResult> {
2314 info!("Reversing prune operation: {}", operation_id);
2315
2316 let pruned_memory_ids: Vec<Uuid> = operation_data["pruned_memory_ids"]
2318 .as_array()
2319 .ok_or_else(|| MemoryError::InvalidData {
2320 message: "Missing pruned_memory_ids in prune operation data".to_string(),
2321 })?
2322 .iter()
2323 .map(|id| {
2324 Uuid::parse_str(id.as_str().unwrap_or("")).map_err(|e| MemoryError::InvalidData {
2325 message: format!("Invalid UUID in pruned_memory_ids: {e}"),
2326 })
2327 })
2328 .collect::<Result<Vec<_>>>()?;
2329
2330 let mut restored_count = 0;
2331
2332 for memory_id in &pruned_memory_ids {
2334 let restore_result = sqlx::query(
2335 r#"
2336 UPDATE memories
2337 SET status = 'active', updated_at = NOW()
2338 WHERE id = $1 AND status = 'deleted'
2339 "#,
2340 )
2341 .bind(memory_id)
2342 .execute(&mut **transaction)
2343 .await
2344 .map_err(|e| MemoryError::DatabaseError {
2345 message: format!("Failed to restore pruned memory {memory_id}: {e}"),
2346 })?;
2347
2348 if restore_result.rows_affected() > 0 {
2349 restored_count += 1;
2350 info!("Restored pruned memory: {}", memory_id);
2351 } else {
2352 warn!(
2353 "Memory {} was not in deleted status, cannot restore",
2354 memory_id
2355 );
2356 }
2357 }
2358
2359 Ok(ReversalResult {
2360 operation_id,
2361 operation_type: "prune".to_string(),
2362 memories_restored: restored_count,
2363 success: restored_count > 0,
2364 message: format!(
2365 "Restored {} of {} pruned memories",
2366 restored_count,
2367 pruned_memory_ids.len()
2368 ),
2369 })
2370 }
2371}
2372
2373pub struct AutoPruner {
2375 #[allow(dead_code)]
2376 config: SemanticDeduplicationConfig,
2377 repository: Arc<MemoryRepository>,
2378}
2379
2380impl AutoPruner {
2381 pub fn new(config: SemanticDeduplicationConfig, repository: Arc<MemoryRepository>) -> Self {
2382 Self { config, repository }
2383 }
2384
2385 pub async fn prune_memories(
2387 &self,
2388 threshold: f64,
2389 cutoff_date: DateTime<Utc>,
2390 ) -> Result<PruningResult> {
2391 info!(
2392 "Starting safe pruning with threshold {} and cutoff date {}",
2393 threshold, cutoff_date
2394 );
2395
2396 let mut transaction =
2398 self.repository
2399 .pool()
2400 .begin()
2401 .await
2402 .map_err(|e| MemoryError::DatabaseError {
2403 message: format!("Failed to begin pruning transaction: {e}"),
2404 })?;
2405
2406 let result = self
2407 .execute_safe_pruning(&mut transaction, threshold, cutoff_date)
2408 .await;
2409
2410 match result {
2411 Ok(pruning_result) => {
2412 transaction
2413 .commit()
2414 .await
2415 .map_err(|e| MemoryError::DatabaseError {
2416 message: format!("Failed to commit pruning transaction: {e}"),
2417 })?;
2418 info!(
2419 "Safe pruning completed: {} memories pruned, {} bytes freed",
2420 pruning_result.memories_pruned, pruning_result.storage_freed
2421 );
2422 Ok(pruning_result)
2423 }
2424 Err(e) => {
2425 if let Err(rollback_err) = transaction.rollback().await {
2426 error!("Failed to rollback pruning transaction: {}", rollback_err);
2427 }
2428 error!("Pruning failed: {}", e);
2429 Err(e)
2430 }
2431 }
2432 }
2433
2434 async fn execute_safe_pruning(
2436 &self,
2437 transaction: &mut Transaction<'_, Postgres>,
2438 threshold: f64,
2439 cutoff_date: DateTime<Utc>,
2440 ) -> Result<PruningResult> {
2441 let query = r#"
2443 SELECT
2444 id,
2445 content,
2446 recall_probability,
2447 importance_score,
2448 access_count,
2449 last_accessed_at,
2450 tier,
2451 metadata,
2452 consolidation_strength
2453 FROM memories
2454 WHERE recall_probability IS NOT NULL
2455 AND recall_probability < $1
2456 AND created_at < $2
2457 AND status = 'active'
2458 AND tier IN ('cold', 'frozen')
2459 -- Additional safety checks
2460 AND importance_score < 0.3 -- Don't prune important memories
2461 AND access_count < 10 -- Don't prune frequently accessed memories
2462 AND (
2463 last_accessed_at IS NULL OR
2464 last_accessed_at < (NOW() - INTERVAL '30 days')
2465 ) -- Don't prune recently accessed memories
2466 AND consolidation_strength < 0.5 -- Don't prune well-consolidated memories
2467 -- Exclude memories with critical metadata markers
2468 AND NOT (
2469 metadata ? 'critical' OR
2470 metadata ? 'important' OR
2471 metadata ? 'permanent' OR
2472 metadata ? 'do_not_prune'
2473 )
2474 ORDER BY
2475 recall_probability ASC,
2476 importance_score ASC,
2477 last_accessed_at ASC NULLS FIRST
2478 LIMIT 500 -- Conservative batch size for safety
2479 "#;
2480
2481 let candidates = sqlx::query(query)
2482 .bind(threshold)
2483 .bind(cutoff_date)
2484 .fetch_all(&mut **transaction)
2485 .await
2486 .map_err(|e| MemoryError::DatabaseError {
2487 message: format!("Failed to fetch pruning candidates: {e}"),
2488 })?;
2489
2490 info!(
2491 "Found {} pruning candidates after safety filtering",
2492 candidates.len()
2493 );
2494
2495 let mut pruned_ids = Vec::new();
2496 let mut storage_freed = 0u64;
2497 let mut safety_violations = 0;
2498
2499 for row in candidates {
2500 let memory_id: Uuid = row.get("id");
2501 let content: String = row.get("content");
2502 let importance_score: f64 = row.get("importance_score");
2503 let access_count: i32 = row.get("access_count");
2504 let metadata: serde_json::Value = row.get("metadata");
2505 let tier: MemoryTier = row.get("tier");
2506
2507 if let Err(violation) = self.validate_pruning_safety(
2509 memory_id,
2510 importance_score,
2511 access_count,
2512 &metadata,
2513 tier,
2514 ) {
2515 warn!("Skipping pruning due to safety violation: {}", violation);
2516 safety_violations += 1;
2517 continue;
2518 }
2519
2520 let recheck_query = r#"
2522 SELECT status, tier, importance_score
2523 FROM memories
2524 WHERE id = $1
2525 AND status = 'active'
2526 AND tier IN ('cold', 'frozen')
2527 AND importance_score < 0.3
2528 "#;
2529
2530 let recheck_result = sqlx::query(recheck_query)
2531 .bind(memory_id)
2532 .fetch_optional(&mut **transaction)
2533 .await
2534 .map_err(|e| MemoryError::DatabaseError {
2535 message: format!("Failed to recheck memory eligibility: {e}"),
2536 })?;
2537
2538 if recheck_result.is_none() {
2539 warn!(
2540 "Memory {} no longer eligible for pruning, skipping",
2541 memory_id
2542 );
2543 continue;
2544 }
2545
2546 storage_freed += content.len() as u64;
2547 pruned_ids.push(memory_id);
2548
2549 sqlx::query(
2551 r#"
2552 INSERT INTO memory_pruning_log (
2553 memory_id, recall_probability, age_days, tier,
2554 importance_score, access_count, content_size_bytes,
2555 pruning_reason
2556 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
2557 "#,
2558 )
2559 .bind(memory_id)
2560 .bind(
2561 row.get::<Option<f64>, _>("recall_probability")
2562 .unwrap_or(0.0),
2563 )
2564 .bind((Utc::now() - cutoff_date).num_days())
2565 .bind(format!("{tier:?}").to_lowercase())
2566 .bind(importance_score)
2567 .bind(access_count)
2568 .bind(content.len() as i32)
2569 .bind(format!(
2570 "Auto-pruning: threshold={threshold}, cutoff={cutoff_date}"
2571 ))
2572 .execute(&mut **transaction)
2573 .await
2574 .map_err(|e| MemoryError::DatabaseError {
2575 message: format!("Failed to create pruning audit entry: {e}"),
2576 })?;
2577
2578 sqlx::query("UPDATE memories SET status = 'deleted', updated_at = NOW() WHERE id = $1")
2580 .bind(memory_id)
2581 .execute(&mut **transaction)
2582 .await
2583 .map_err(|e| MemoryError::DatabaseError {
2584 message: format!("Failed to mark memory as deleted: {e}"),
2585 })?;
2586 }
2587
2588 if safety_violations > 0 {
2589 warn!(
2590 "Encountered {} safety violations during pruning",
2591 safety_violations
2592 );
2593 }
2594
2595 Ok(PruningResult {
2596 memories_pruned: pruned_ids.len(),
2597 storage_freed,
2598 pruned_memory_ids: pruned_ids,
2599 })
2600 }
2601
2602 fn validate_pruning_safety(
2604 &self,
2605 memory_id: Uuid,
2606 importance_score: f64,
2607 access_count: i32,
2608 metadata: &serde_json::Value,
2609 tier: MemoryTier,
2610 ) -> Result<()> {
2611 if importance_score >= 0.3 {
2613 return Err(MemoryError::SafetyViolation {
2614 message: format!(
2615 "Memory {memory_id} has high importance score: {importance_score}"
2616 ),
2617 });
2618 }
2619
2620 if access_count >= 10 {
2622 return Err(MemoryError::SafetyViolation {
2623 message: format!("Memory {memory_id} has high access count: {access_count}"),
2624 });
2625 }
2626
2627 if matches!(tier, MemoryTier::Working | MemoryTier::Warm) {
2629 return Err(MemoryError::SafetyViolation {
2630 message: format!("Memory {memory_id} is in protected tier: {tier:?}"),
2631 });
2632 }
2633
2634 if let serde_json::Value::Object(obj) = metadata {
2636 for key in ["critical", "important", "permanent", "do_not_prune"] {
2637 if obj.contains_key(key) {
2638 return Err(MemoryError::SafetyViolation {
2639 message: format!("Memory {memory_id} has critical metadata flag: {key}"),
2640 });
2641 }
2642 }
2643 }
2644
2645 Ok(())
2646 }
2647}
2648
2649#[derive(Debug, Clone, Default, Serialize, Deserialize)]
2651pub struct DeduplicationMetrics {
2652 pub total_operations: u64,
2653 pub total_memories_processed: usize,
2654 pub total_memories_merged: usize,
2655 pub total_storage_saved: u64,
2656 pub total_execution_time_ms: u64,
2657 pub average_compression_ratio: f32,
2658 pub errors_encountered: u64,
2659 pub last_operation_timestamp: Option<DateTime<Utc>>,
2660}
2661
2662#[derive(Debug, Clone, Default)]
2664pub struct DeduplicationResult {
2665 pub total_processed: usize,
2666 pub groups_identified: usize,
2667 pub memories_merged: usize,
2668 pub storage_saved_bytes: u64,
2669 pub compression_ratio: f32,
2670 pub execution_time_ms: u64,
2671 pub errors_encountered: u64,
2672}
2673
2674#[derive(Debug, Clone)]
2675pub struct SimilarMemoryGroup {
2676 pub memories: Vec<Memory>,
2677 pub average_similarity: f32,
2678 pub merge_strategy: MergeStrategy,
2679}
2680
2681#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
2682pub enum MergeStrategy {
2683 LosslessPreservation,
2684 MetadataConsolidation,
2685 ContentSummarization,
2686}
2687
2688#[derive(Debug, Clone)]
2689pub struct MergeResult {
2690 pub merged_memory: Memory,
2691 pub storage_saved: u64,
2692 pub compression_ratio: f32,
2693 pub original_count: usize,
2694}
2695
2696#[derive(Debug, Clone)]
2697pub struct GroupMergeResult {
2698 pub merged_memory_id: Uuid,
2699 pub original_memory_ids: Vec<Uuid>,
2700 pub memories_merged: usize,
2701 pub storage_saved: u64,
2702 pub compression_ratio: f32,
2703 pub merge_strategy: MergeStrategy,
2704}
2705
2706#[derive(Debug, Clone)]
2707pub struct CompressionResult {
2708 pub original_size: usize,
2709 pub compressed_size: usize,
2710 pub compression_ratio: f32,
2711 pub is_lossless: bool,
2712 pub compressed_content: String,
2713}
2714
2715#[derive(Debug, Clone)]
2716pub struct AuditEntry {
2717 pub id: Uuid,
2718 pub operation_type: String,
2719 pub created_at: DateTime<Utc>,
2720 pub status: String,
2721}
2722
2723#[derive(Debug, Clone)]
2724pub struct ReversibleOperation {
2725 pub id: Uuid,
2726 pub operation_type: String,
2727 pub operation_data: serde_json::Value,
2728 pub completion_data: serde_json::Value,
2729 pub created_at: DateTime<Utc>,
2730}
2731
2732#[derive(Debug, Clone)]
2733pub struct PruningResult {
2734 pub memories_pruned: usize,
2735 pub storage_freed: u64,
2736 pub pruned_memory_ids: Vec<Uuid>,
2737}
2738
2739#[derive(Debug, Clone)]
2740pub struct HeadroomMaintenanceResult {
2741 pub initial_utilization: f32,
2742 pub final_utilization: f32,
2743 pub memories_processed: usize,
2744 pub memories_merged: usize,
2745 pub memories_pruned: usize,
2746 pub storage_freed: u64,
2747}
2748
2749#[derive(Debug, Clone)]
2750pub struct MemoryStatistics {
2751 pub total_memories: i64,
2752 pub total_content_bytes: i64,
2753 pub total_space_bytes: i64,
2754 pub free_space_bytes: i64,
2755 pub avg_importance: f64,
2756 pub working_count: i64,
2757 pub warm_count: i64,
2758 pub cold_count: i64,
2759 pub frozen_count: i64,
2760}
2761
2762#[derive(Debug, Clone)]
2763pub struct ReversalResult {
2764 pub operation_id: Uuid,
2765 pub operation_type: String,
2766 pub memories_restored: usize,
2767 pub success: bool,
2768 pub message: String,
2769}
2770
2771#[derive(Debug)]
2773struct PerformanceMetrics {
2774 operation_id: String,
2775 start_time: Instant,
2776 phase_timings: HashMap<String, std::time::Duration>,
2777 memory_count: usize,
2778 target_time_seconds: u64,
2779}
2780
2781impl PerformanceMetrics {
2782 fn record_phase(&mut self, phase: &str, duration: std::time::Duration) {
2783 self.phase_timings.insert(phase.to_string(), duration);
2784
2785 if duration.as_millis() > 1000 {
2787 warn!(
2788 "Slow phase '{}' in operation {}: {}ms",
2789 phase,
2790 self.operation_id,
2791 duration.as_millis()
2792 );
2793 }
2794 }
2795
2796 fn format_phase_summary(&self) -> String {
2797 let mut phases: Vec<_> = self.phase_timings.iter().collect();
2798 phases.sort_by_key(|(_, duration)| *duration);
2799
2800 phases
2801 .iter()
2802 .map(|(name, duration)| format!("{}:{}ms", name, duration.as_millis()))
2803 .collect::<Vec<_>>()
2804 .join(", ")
2805 }
2806
2807 fn get_performance_violations(&self) -> Vec<String> {
2808 let mut violations = Vec::new();
2809
2810 let total_duration = self.start_time.elapsed();
2811 if total_duration.as_secs() > self.target_time_seconds {
2812 violations.push(format!(
2813 "Total time {}s exceeds target {}s",
2814 total_duration.as_secs(),
2815 self.target_time_seconds
2816 ));
2817 }
2818
2819 if self.memory_count >= 10_000 && total_duration.as_secs() > 30 {
2820 violations.push(format!(
2821 "Large batch ({} memories) took {}s, exceeds 30s target",
2822 self.memory_count,
2823 total_duration.as_secs()
2824 ));
2825 }
2826
2827 for (phase, duration) in &self.phase_timings {
2829 if duration.as_millis() > 5000 {
2830 violations.push(format!(
2831 "Phase '{}' took {}ms, may need optimization",
2832 phase,
2833 duration.as_millis()
2834 ));
2835 }
2836 }
2837
2838 violations
2839 }
2840}