codex_memory/memory/
semantic_deduplication.rs

1use super::error::{MemoryError, Result};
2use super::models::CreateMemoryRequest;
3use super::models::*;
4use super::repository::MemoryRepository;
5use crate::embedding::EmbeddingService;
6use chrono::{DateTime, Duration, Utc};
7use pgvector::Vector;
8use serde::{Deserialize, Serialize};
9use sqlx::{Postgres, Row, Transaction};
10use std::collections::{HashMap, HashSet};
11use std::hash::Hash;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::sync::{Mutex, RwLock};
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17
18/// Configuration for the semantic deduplication system
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct SemanticDeduplicationConfig {
21    /// Cosine similarity threshold for considering memories as duplicates
22    pub similarity_threshold: f32,
23    /// Batch size for processing memories
24    pub batch_size: usize,
25    /// Maximum memories to process in a single operation
26    pub max_memories_per_operation: usize,
27    /// Minimum age before a memory can be considered for merging
28    pub min_memory_age_hours: i64,
29    /// Recall probability threshold for auto-pruning
30    pub prune_threshold: f64,
31    /// Days after which memories with low recall probability can be pruned
32    pub prune_age_days: i64,
33    /// Target memory headroom percentage (0.0 to 1.0)
34    pub target_memory_headroom: f32,
35    /// Compression ratio targets by tier
36    pub compression_targets: HashMap<MemoryTier, f32>,
37    /// Enable/disable lossless compression for critical memories
38    pub lossless_critical: bool,
39    /// Maximum time for deduplication operation in seconds
40    pub max_operation_time_seconds: u64,
41}
42
43impl Default for SemanticDeduplicationConfig {
44    fn default() -> Self {
45        let mut compression_targets = HashMap::new();
46        compression_targets.insert(MemoryTier::Working, 2.0);
47        compression_targets.insert(MemoryTier::Warm, 3.0);
48        compression_targets.insert(MemoryTier::Cold, 5.0);
49        compression_targets.insert(MemoryTier::Frozen, 10.0);
50
51        Self {
52            similarity_threshold: 0.85,
53            batch_size: 100,
54            max_memories_per_operation: 10_000,
55            min_memory_age_hours: 1,
56            prune_threshold: 0.2,
57            prune_age_days: 30,
58            target_memory_headroom: 0.2,
59            compression_targets,
60            lossless_critical: true,
61            max_operation_time_seconds: 30,
62        }
63    }
64}
65
66/// Main semantic deduplication engine with production-ready safety and concurrency controls
67#[allow(dead_code)]
68pub struct SemanticDeduplicationEngine {
69    config: SemanticDeduplicationConfig,
70    repository: Arc<MemoryRepository>,
71    #[allow(dead_code)]
72    embedding_service: Arc<dyn EmbeddingService>,
73    merger: Arc<MemoryMerger>,
74    #[allow(dead_code)]
75    compression_manager: Arc<CompressionManager>,
76    audit_trail: Arc<AuditTrail>,
77    auto_pruner: Arc<AutoPruner>,
78    metrics: Arc<RwLock<DeduplicationMetrics>>,
79    /// Distributed lock for preventing concurrent deduplication operations
80    operation_lock: Arc<Mutex<()>>,
81    /// Active operations tracking for concurrent control
82    active_operations: Arc<RwLock<HashSet<String>>>,
83}
84
85/// Simple lock guard that performs cleanup on drop
86struct OperationLockGuard {
87    operation_id: String,
88    lock_key: i64,
89    pool: sqlx::PgPool,
90    active_operations: Arc<RwLock<HashSet<String>>>,
91    released: bool,
92}
93
94impl OperationLockGuard {
95    async fn release(&mut self) -> Result<()> {
96        if self.released {
97            return Ok(());
98        }
99
100        self.released = true;
101
102        // Remove from active operations
103        {
104            let mut active_ops = self.active_operations.write().await;
105            active_ops.remove(&self.operation_id);
106        }
107
108        // Release PostgreSQL advisory lock
109        sqlx::query("SELECT pg_advisory_unlock($1)")
110            .bind(self.lock_key)
111            .execute(&self.pool)
112            .await
113            .map_err(|e| MemoryError::DatabaseError {
114                message: format!("Failed to release advisory lock: {e}"),
115            })?;
116
117        debug!(
118            "Released distributed lock for operation: {}",
119            self.operation_id
120        );
121        Ok(())
122    }
123}
124
125impl Drop for OperationLockGuard {
126    fn drop(&mut self) {
127        if !self.released {
128            let operation_id = self.operation_id.clone();
129            let lock_key = self.lock_key;
130            let pool = self.pool.clone();
131            let active_operations = self.active_operations.clone();
132
133            // Spawn cleanup task that doesn't block the drop
134            tokio::spawn(async move {
135                // Remove from active operations
136                {
137                    let mut active_ops = active_operations.write().await;
138                    active_ops.remove(&operation_id);
139                }
140
141                // Release PostgreSQL advisory lock
142                if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)")
143                    .bind(lock_key)
144                    .execute(&pool)
145                    .await
146                {
147                    error!(
148                        "Failed to release advisory lock for operation {}: {}",
149                        operation_id, e
150                    );
151                } else {
152                    debug!("Released distributed lock for operation: {}", operation_id);
153                }
154            });
155        }
156    }
157}
158
159impl SemanticDeduplicationEngine {
160    pub fn new(
161        config: SemanticDeduplicationConfig,
162        repository: Arc<MemoryRepository>,
163        embedding_service: Arc<dyn EmbeddingService>,
164    ) -> Self {
165        let merger = Arc::new(MemoryMerger::new(config.clone(), repository.clone()));
166        let compression_manager = Arc::new(CompressionManager::new(config.clone()));
167        let audit_trail = Arc::new(AuditTrail::new(repository.clone()));
168        let auto_pruner = Arc::new(AutoPruner::new(config.clone(), repository.clone()));
169        let metrics = Arc::new(RwLock::new(DeduplicationMetrics::default()));
170
171        Self {
172            config,
173            repository,
174            embedding_service,
175            merger,
176            compression_manager,
177            audit_trail,
178            auto_pruner,
179            metrics,
180            operation_lock: Arc::new(Mutex::new(())),
181            active_operations: Arc::new(RwLock::new(HashSet::new())),
182        }
183    }
184
185    /// Perform semantic deduplication on a batch of memories with distributed locking and transaction safety
186    pub async fn deduplicate_batch(&self, memory_ids: &[Uuid]) -> Result<DeduplicationResult> {
187        let operation_id = format!("dedup_{}", Uuid::new_v4());
188        let start_time = Instant::now();
189        let mut result;
190
191        // Acquire distributed lock to prevent concurrent deduplication operations
192        let (_mutex_guard, mut _lock_guard) = self.acquire_operation_lock(&operation_id).await?;
193
194        // Performance monitoring: start time tracking
195        let mut performance_metrics = PerformanceMetrics {
196            operation_id: operation_id.clone(),
197            start_time,
198            phase_timings: HashMap::new(),
199            memory_count: memory_ids.len(),
200            target_time_seconds: self.config.max_operation_time_seconds,
201        };
202
203        info!(
204            "Starting deduplication operation {} for {} memories with threshold {}",
205            operation_id,
206            memory_ids.len(),
207            self.config.similarity_threshold
208        );
209
210        // Check for operation timeout
211        let timeout_check = start_time.elapsed().as_secs();
212        if timeout_check > self.config.max_operation_time_seconds {
213            return Err(MemoryError::OperationTimeout {
214                message: format!("Operation timed out after {timeout_check} seconds"),
215            });
216        }
217
218        // Performance check: validate we can meet the 10K memories < 30 seconds requirement
219        if memory_ids.len() > 10_000 && self.config.max_operation_time_seconds > 30 {
220            warn!(
221                "Processing {} memories may exceed performance target of 30 seconds",
222                memory_ids.len()
223            );
224        }
225
226        performance_metrics.record_phase("lock_acquisition", start_time.elapsed());
227
228        // Begin a database transaction for atomic operations
229        let transaction_start = Instant::now();
230        let mut transaction =
231            self.repository
232                .pool()
233                .begin()
234                .await
235                .map_err(|e| MemoryError::DatabaseError {
236                    message: format!("Failed to begin transaction: {e}"),
237                })?;
238
239        performance_metrics.record_phase("transaction_begin", transaction_start.elapsed());
240
241        let processing_result = self
242            .execute_deduplication_with_transaction(&mut transaction, memory_ids, &operation_id)
243            .await;
244
245        match processing_result {
246            Ok(dedup_result) => {
247                // Commit transaction if all operations succeeded
248                let commit_start = Instant::now();
249                transaction
250                    .commit()
251                    .await
252                    .map_err(|e| MemoryError::DatabaseError {
253                        message: format!("Failed to commit transaction: {e}"),
254                    })?;
255                performance_metrics.record_phase("transaction_commit", commit_start.elapsed());
256
257                result = dedup_result;
258                info!(
259                    "Deduplication operation {} committed successfully",
260                    operation_id
261                );
262            }
263            Err(e) => {
264                // Rollback transaction on any error
265                let rollback_start = Instant::now();
266                if let Err(rollback_err) = transaction.rollback().await {
267                    error!(
268                        "Failed to rollback transaction for operation {}: {}",
269                        operation_id, rollback_err
270                    );
271                } else {
272                    performance_metrics
273                        .record_phase("transaction_rollback", rollback_start.elapsed());
274                }
275
276                // Ensure lock cleanup on error
277                if let Err(cleanup_err) = _lock_guard.release().await {
278                    error!(
279                        "Failed to cleanup lock for operation {}: {}",
280                        operation_id, cleanup_err
281                    );
282                }
283
284                error!("Deduplication operation {} failed: {}", operation_id, e);
285                return Err(e);
286            }
287        }
288
289        // Clean up lock explicitly
290        if let Err(cleanup_err) = _lock_guard.release().await {
291            error!(
292                "Failed to cleanup lock for operation {}: {}",
293                operation_id, cleanup_err
294            );
295        }
296
297        result.execution_time_ms = start_time.elapsed().as_millis() as u64;
298        performance_metrics.record_phase("total_operation", start_time.elapsed());
299
300        // Performance validation
301        if result.execution_time_ms > (self.config.max_operation_time_seconds * 1000) {
302            warn!(
303                "Operation {} exceeded time limit: {}ms > {}ms",
304                operation_id,
305                result.execution_time_ms,
306                self.config.max_operation_time_seconds * 1000
307            );
308        }
309
310        // Validate performance target for large batches
311        if memory_ids.len() >= 10_000 && result.execution_time_ms > 30_000 {
312            error!(
313                "PERFORMANCE VIOLATION: {} memories processed in {}ms (> 30s target)",
314                memory_ids.len(),
315                result.execution_time_ms
316            );
317        }
318
319        // Update metrics with performance data
320        self.update_metrics_with_performance(&result, &performance_metrics)
321            .await;
322
323        info!(
324            "Deduplication operation {} completed: {} memories processed, {} merged, {:.2}% storage saved in {}ms (phases: {})",
325            operation_id,
326            result.total_processed,
327            result.memories_merged,
328            (result.storage_saved_bytes as f64 / (result.total_processed as f64 * 1024.0)) * 100.0,
329            result.execution_time_ms,
330            performance_metrics.format_phase_summary()
331        );
332
333        Ok(result)
334    }
335
336    /// Find similar groups using optimized pgvector nearest neighbor search instead of O(n²) comparison
337    async fn find_similar_groups_optimized(
338        &self,
339        memories: &[Memory],
340    ) -> Result<Vec<SimilarMemoryGroup>> {
341        let mut groups = Vec::new();
342        let mut processed_ids = HashSet::new();
343
344        info!(
345            "Finding similar groups for {} memories using pgvector optimization",
346            memories.len()
347        );
348
349        for memory in memories {
350            if processed_ids.contains(&memory.id) {
351                continue;
352            }
353
354            let embedding = match &memory.embedding {
355                Some(emb) => emb,
356                None => continue,
357            };
358
359            // Use pgvector to find similar memories efficiently
360            let similar_memories = self
361                .find_similar_memories_pgvector(
362                    memory,
363                    embedding,
364                    &memories.iter().map(|m| m.id).collect::<Vec<_>>(),
365                    self.config.similarity_threshold,
366                )
367                .await?;
368
369            if similar_memories.len() > 1 {
370                // Mark all memories in this group as processed
371                for sim_memory in &similar_memories {
372                    processed_ids.insert(sim_memory.id);
373                }
374
375                let average_similarity =
376                    self.calculate_average_similarity(&similar_memories).await?;
377                let merge_strategy = self.determine_merge_strategy(&similar_memories);
378
379                groups.push(SimilarMemoryGroup {
380                    memories: similar_memories,
381                    average_similarity,
382                    merge_strategy,
383                });
384            } else {
385                processed_ids.insert(memory.id);
386            }
387        }
388
389        info!(
390            "Found {} similar groups using optimized search",
391            groups.len()
392        );
393        Ok(groups)
394    }
395
396    /// Use pgvector's efficient similarity search to find similar memories
397    async fn find_similar_memories_pgvector(
398        &self,
399        _query_memory: &Memory,
400        query_embedding: &Vector,
401        candidate_ids: &[Uuid],
402        threshold: f32,
403    ) -> Result<Vec<Memory>> {
404        let query = r#"
405            SELECT m.*, (m.embedding <=> $1) as similarity_distance
406            FROM memories m
407            WHERE m.id = ANY($2)
408            AND m.status = 'active'
409            AND m.embedding IS NOT NULL
410            AND (m.embedding <=> $1) <= $3
411            ORDER BY m.embedding <=> $1
412            LIMIT 100
413        "#;
414
415        // Convert cosine similarity threshold to distance threshold
416        // Distance = 1 - cosine_similarity, so distance <= 1 - threshold
417        let distance_threshold = 1.0 - threshold;
418
419        let rows = sqlx::query(query)
420            .bind(query_embedding)
421            .bind(candidate_ids)
422            .bind(distance_threshold)
423            .fetch_all(self.repository.pool())
424            .await
425            .map_err(|e| MemoryError::DatabaseError {
426                message: format!("Failed to execute pgvector similarity search: {e}"),
427            })?;
428
429        let mut similar_memories = Vec::new();
430        for row in rows {
431            let memory = Memory {
432                id: row.get("id"),
433                content: row.get("content"),
434                content_hash: row.get("content_hash"),
435                embedding: row.get("embedding"),
436                tier: row.get("tier"),
437                status: row.get("status"),
438                importance_score: row.get("importance_score"),
439                access_count: row.get("access_count"),
440                last_accessed_at: row.get("last_accessed_at"),
441                metadata: row.get("metadata"),
442                parent_id: row.get("parent_id"),
443                created_at: row.get("created_at"),
444                updated_at: row.get("updated_at"),
445                expires_at: row.get("expires_at"),
446                consolidation_strength: row.get("consolidation_strength"),
447                decay_rate: row.get("decay_rate"),
448                recall_probability: row.get("recall_probability"),
449                last_recall_interval: row.get("last_recall_interval"),
450                recency_score: row.get("recency_score"),
451                relevance_score: row.get("relevance_score"),
452            };
453            similar_memories.push(memory);
454        }
455
456        Ok(similar_memories)
457    }
458
459    /// Find all memories that exceed the similarity threshold (legacy method for backward compatibility)
460    async fn find_similar_groups(&self, memories: &[Memory]) -> Result<Vec<SimilarMemoryGroup>> {
461        // Delegate to optimized version
462        self.find_similar_groups_optimized(memories).await
463    }
464
465    /// Calculate cosine similarity between two embeddings
466    pub fn calculate_cosine_similarity(&self, a: &Vector, b: &Vector) -> Result<f32> {
467        let a_slice = a.as_slice();
468        let b_slice = b.as_slice();
469
470        if a_slice.len() != b_slice.len() {
471            return Err(MemoryError::InvalidData {
472                message: "Embedding dimensions don't match".to_string(),
473            });
474        }
475
476        let dot_product: f32 = a_slice.iter().zip(b_slice.iter()).map(|(x, y)| x * y).sum();
477        let norm_a: f32 = a_slice.iter().map(|x| x * x).sum::<f32>().sqrt();
478        let norm_b: f32 = b_slice.iter().map(|x| x * x).sum::<f32>().sqrt();
479
480        if norm_a == 0.0 || norm_b == 0.0 {
481            return Ok(0.0);
482        }
483
484        Ok(dot_product / (norm_a * norm_b))
485    }
486
487    /// Calculate average similarity within a group
488    async fn calculate_average_similarity(&self, memories: &[Memory]) -> Result<f32> {
489        if memories.len() < 2 {
490            return Ok(1.0);
491        }
492
493        let mut total_similarity = 0.0;
494        let mut comparisons = 0;
495
496        for i in 0..memories.len() {
497            for j in (i + 1)..memories.len() {
498                if let (Some(emb_i), Some(emb_j)) = (&memories[i].embedding, &memories[j].embedding)
499                {
500                    total_similarity += self.calculate_cosine_similarity(emb_i, emb_j)?;
501                    comparisons += 1;
502                }
503            }
504        }
505
506        Ok(if comparisons > 0 {
507            total_similarity / comparisons as f32
508        } else {
509            0.0
510        })
511    }
512
513    /// Determine the best merge strategy for a group of similar memories
514    fn determine_merge_strategy(&self, memories: &[Memory]) -> MergeStrategy {
515        // Check if any memory is critical (high importance or recent access)
516        let has_critical = memories.iter().any(|m| {
517            m.importance_score > 0.8
518                || m.last_accessed_at.is_some_and(|last| {
519                    Utc::now().signed_duration_since(last) < Duration::hours(24)
520                })
521        });
522
523        // Check tier distribution
524        let has_working_tier = memories
525            .iter()
526            .any(|m| matches!(m.tier, MemoryTier::Working));
527        let has_different_tiers = memories
528            .iter()
529            .map(|m| m.tier)
530            .collect::<HashSet<_>>()
531            .len()
532            > 1;
533
534        if has_critical && self.config.lossless_critical {
535            MergeStrategy::LosslessPreservation
536        } else if has_working_tier || has_different_tiers {
537            MergeStrategy::MetadataConsolidation
538        } else {
539            MergeStrategy::ContentSummarization
540        }
541    }
542
543    /// Process a group of similar memories for merging with transaction safety
544    async fn process_similar_group_tx(
545        &self,
546        transaction: &mut Transaction<'_, Postgres>,
547        group: SimilarMemoryGroup,
548        operation_id: &str,
549    ) -> Result<GroupMergeResult> {
550        debug!(
551            "Processing similar group with {} memories, strategy: {:?}, operation: {}",
552            group.memories.len(),
553            group.merge_strategy,
554            operation_id
555        );
556
557        // Record audit entry before merging
558        let audit_entry = self
559            .audit_trail
560            .create_merge_entry_tx(transaction, &group)
561            .await?;
562
563        // Perform the merge with transaction safety
564        let merge_result = self.merger.merge_group_tx(transaction, &group).await?;
565
566        // Complete audit entry
567        self.audit_trail
568            .complete_merge_entry_tx(transaction, audit_entry.id, &merge_result)
569            .await?;
570
571        Ok(GroupMergeResult {
572            merged_memory_id: merge_result.merged_memory.id,
573            original_memory_ids: group.memories.iter().map(|m| m.id).collect(),
574            memories_merged: group.memories.len(),
575            storage_saved: merge_result.storage_saved,
576            compression_ratio: merge_result.compression_ratio,
577            merge_strategy: group.merge_strategy,
578        })
579    }
580
581    /// Process a group of similar memories for merging (legacy method)
582    async fn process_similar_group(&self, group: SimilarMemoryGroup) -> Result<GroupMergeResult> {
583        debug!(
584            "Processing similar group with {} memories, strategy: {:?}",
585            group.memories.len(),
586            group.merge_strategy
587        );
588
589        // Record audit entry before merging
590        let audit_entry = self.audit_trail.create_merge_entry(&group).await?;
591
592        // Perform the merge
593        let merge_result = self.merger.merge_group(&group).await?;
594
595        // Complete audit entry
596        self.audit_trail
597            .complete_merge_entry(audit_entry.id, &merge_result)
598            .await?;
599
600        Ok(GroupMergeResult {
601            merged_memory_id: merge_result.merged_memory.id,
602            original_memory_ids: group.memories.iter().map(|m| m.id).collect(),
603            memories_merged: group.memories.len(),
604            storage_saved: merge_result.storage_saved,
605            compression_ratio: merge_result.compression_ratio,
606            merge_strategy: group.merge_strategy,
607        })
608    }
609
610    /// Load memories with their embeddings
611    async fn load_memories_with_embeddings(&self, memory_ids: &[Uuid]) -> Result<Vec<Memory>> {
612        let query = r#"
613            SELECT * FROM memories 
614            WHERE id = ANY($1) 
615            AND status = 'active' 
616            AND embedding IS NOT NULL
617            ORDER BY importance_score DESC, last_accessed_at DESC NULLS LAST
618        "#;
619
620        let memories = sqlx::query_as::<_, Memory>(query)
621            .bind(memory_ids)
622            .fetch_all(self.repository.pool())
623            .await?;
624
625        Ok(memories)
626    }
627
628    /// Load memories with their embeddings using a transaction
629    async fn load_memories_with_embeddings_tx(
630        &self,
631        transaction: &mut Transaction<'_, Postgres>,
632        memory_ids: &[Uuid],
633    ) -> Result<Vec<Memory>> {
634        let query = r#"
635            SELECT * FROM memories 
636            WHERE id = ANY($1) 
637            AND status = 'active' 
638            AND embedding IS NOT NULL
639            ORDER BY importance_score DESC, last_accessed_at DESC NULLS LAST
640        "#;
641
642        let memories = sqlx::query_as::<_, Memory>(query)
643            .bind(memory_ids)
644            .fetch_all(&mut **transaction)
645            .await
646            .map_err(|e| MemoryError::DatabaseError {
647                message: format!("Failed to load memories in transaction: {e}"),
648            })?;
649
650        Ok(memories)
651    }
652
653    /// Run auto-pruning based on recall probability thresholds
654    pub async fn run_auto_pruning(&self) -> Result<PruningResult> {
655        info!(
656            "Starting auto-pruning with threshold {}",
657            self.config.prune_threshold
658        );
659
660        let cutoff_date = Utc::now() - Duration::days(self.config.prune_age_days);
661
662        let prune_result = self
663            .auto_pruner
664            .prune_memories(self.config.prune_threshold, cutoff_date)
665            .await?;
666
667        info!(
668            "Auto-pruning completed: {} memories pruned, {} bytes freed",
669            prune_result.memories_pruned, prune_result.storage_freed
670        );
671
672        Ok(prune_result)
673    }
674
675    /// Check and maintain memory headroom
676    pub async fn maintain_memory_headroom(&self) -> Result<HeadroomMaintenanceResult> {
677        let stats = self.get_memory_statistics().await?;
678        let current_utilization =
679            1.0 - (stats.free_space_bytes as f32 / stats.total_space_bytes as f32);
680
681        if current_utilization > (1.0 - self.config.target_memory_headroom) {
682            info!(
683                "Memory utilization {:.2}% exceeds target, starting aggressive cleanup",
684                current_utilization * 100.0
685            );
686
687            // Run more aggressive deduplication and pruning
688            let all_memory_ids = self.get_all_active_memory_ids().await?;
689            let dedup_result = self.deduplicate_batch(&all_memory_ids).await?;
690            let prune_result = self.run_auto_pruning().await?;
691
692            Ok(HeadroomMaintenanceResult {
693                initial_utilization: current_utilization,
694                final_utilization: self.get_memory_utilization().await?,
695                memories_processed: dedup_result.total_processed,
696                memories_merged: dedup_result.memories_merged,
697                memories_pruned: prune_result.memories_pruned,
698                storage_freed: dedup_result.storage_saved_bytes + prune_result.storage_freed,
699            })
700        } else {
701            Ok(HeadroomMaintenanceResult {
702                initial_utilization: current_utilization,
703                final_utilization: current_utilization,
704                memories_processed: 0,
705                memories_merged: 0,
706                memories_pruned: 0,
707                storage_freed: 0,
708            })
709        }
710    }
711
712    /// Get current memory utilization percentage
713    async fn get_memory_utilization(&self) -> Result<f32> {
714        let stats = self.get_memory_statistics().await?;
715        Ok(1.0 - (stats.free_space_bytes as f32 / stats.total_space_bytes as f32))
716    }
717
718    /// Get comprehensive memory statistics
719    pub async fn get_memory_statistics(&self) -> Result<MemoryStatistics> {
720        let query = r#"
721            SELECT 
722                COUNT(*) as total_memories,
723                SUM(LENGTH(content)) as total_content_bytes,
724                AVG(importance_score) as avg_importance,
725                COUNT(CASE WHEN tier = 'working' THEN 1 END) as working_count,
726                COUNT(CASE WHEN tier = 'warm' THEN 1 END) as warm_count,
727                COUNT(CASE WHEN tier = 'cold' THEN 1 END) as cold_count,
728                COUNT(CASE WHEN tier = 'frozen' THEN 1 END) as frozen_count
729            FROM memories 
730            WHERE status = 'active'
731        "#;
732
733        let row = sqlx::query(query).fetch_one(self.repository.pool()).await?;
734
735        // Simulate total space calculation (would need actual disk space monitoring)
736        let total_content_bytes: i64 = row.get("total_content_bytes");
737        let estimated_total_space = total_content_bytes * 5; // Rough estimate including indexes, metadata
738        let estimated_free_space = estimated_total_space / 5; // Simulate 20% free
739
740        Ok(MemoryStatistics {
741            total_memories: row.get("total_memories"),
742            total_content_bytes,
743            total_space_bytes: estimated_total_space,
744            free_space_bytes: estimated_free_space,
745            avg_importance: row.get::<Option<f64>, _>("avg_importance").unwrap_or(0.0),
746            working_count: row.get("working_count"),
747            warm_count: row.get("warm_count"),
748            cold_count: row.get("cold_count"),
749            frozen_count: row.get("frozen_count"),
750        })
751    }
752
753    /// Get all active memory IDs for batch processing
754    async fn get_all_active_memory_ids(&self) -> Result<Vec<Uuid>> {
755        let query = "SELECT id FROM memories WHERE status = 'active' ORDER BY created_at ASC";
756
757        let rows = sqlx::query(query).fetch_all(self.repository.pool()).await?;
758
759        Ok(rows.into_iter().map(|row| row.get("id")).collect())
760    }
761
762    /// Update internal metrics with performance data
763    async fn update_metrics_with_performance(
764        &self,
765        result: &DeduplicationResult,
766        performance: &PerformanceMetrics,
767    ) {
768        let mut metrics = self.metrics.write().await;
769        metrics.total_operations += 1;
770        metrics.total_memories_processed += result.total_processed;
771        metrics.total_memories_merged += result.memories_merged;
772        metrics.total_storage_saved += result.storage_saved_bytes;
773        metrics.total_execution_time_ms += result.execution_time_ms;
774        metrics.errors_encountered += result.errors_encountered;
775        metrics.last_operation_timestamp = Some(Utc::now());
776
777        if result.compression_ratio > 0.0 {
778            metrics.average_compression_ratio =
779                (metrics.average_compression_ratio + result.compression_ratio) / 2.0;
780        }
781
782        // Log performance violations
783        let violations = performance.get_performance_violations();
784        if !violations.is_empty() {
785            error!(
786                "Performance violations in operation {}: {}",
787                performance.operation_id,
788                violations.join("; ")
789            );
790        }
791
792        // Record performance metrics in database for monitoring
793        if let Err(e) = self.record_performance_metrics(performance, result).await {
794            warn!("Failed to record performance metrics: {}", e);
795        }
796    }
797
798    /// Update internal metrics (legacy method)
799    async fn update_metrics(&self, result: &DeduplicationResult) {
800        let mut metrics = self.metrics.write().await;
801        metrics.total_operations += 1;
802        metrics.total_memories_processed += result.total_processed;
803        metrics.total_memories_merged += result.memories_merged;
804        metrics.total_storage_saved += result.storage_saved_bytes;
805        metrics.total_execution_time_ms += result.execution_time_ms;
806        metrics.errors_encountered += result.errors_encountered;
807        metrics.last_operation_timestamp = Some(Utc::now());
808
809        if result.compression_ratio > 0.0 {
810            metrics.average_compression_ratio =
811                (metrics.average_compression_ratio + result.compression_ratio) / 2.0;
812        }
813    }
814
815    /// Record performance metrics in database for monitoring and alerting
816    async fn record_performance_metrics(
817        &self,
818        performance: &PerformanceMetrics,
819        result: &DeduplicationResult,
820    ) -> Result<()> {
821        let metrics_data = serde_json::json!({
822            "operation_id": performance.operation_id,
823            "memory_count": performance.memory_count,
824            "execution_time_ms": result.execution_time_ms,
825            "target_time_seconds": performance.target_time_seconds,
826            "phase_timings": performance.phase_timings.iter().map(|(k, v)| (k, v.as_millis())).collect::<HashMap<_, _>>(),
827            "memories_processed": result.total_processed,
828            "memories_merged": result.memories_merged,
829            "storage_saved_bytes": result.storage_saved_bytes,
830            "compression_ratio": result.compression_ratio,
831            "errors_encountered": result.errors_encountered,
832            "performance_violations": performance.get_performance_violations()
833        });
834
835        sqlx::query(
836            r#"
837            INSERT INTO deduplication_metrics (
838                measurement_type, metrics_data, recorded_at
839            ) VALUES ($1, $2, $3)
840        "#,
841        )
842        .bind("operation_performance")
843        .bind(metrics_data)
844        .bind(Utc::now())
845        .execute(self.repository.pool())
846        .await
847        .map_err(|e| MemoryError::DatabaseError {
848            message: format!("Failed to record performance metrics: {e}"),
849        })?;
850
851        Ok(())
852    }
853
854    /// Get current deduplication metrics
855    pub async fn get_metrics(&self) -> DeduplicationMetrics {
856        self.metrics.read().await.clone()
857    }
858
859    /// Process large memory batches with intelligent batching and concurrency control
860    pub async fn deduplicate_large_batch(
861        &self,
862        memory_ids: &[Uuid],
863    ) -> Result<DeduplicationResult> {
864        let total_memories = memory_ids.len();
865        info!(
866            "Starting large batch deduplication for {} memories",
867            total_memories
868        );
869
870        // If batch is small enough, use regular processing
871        if total_memories <= self.config.batch_size {
872            return self.deduplicate_batch(memory_ids).await;
873        }
874
875        // Split into optimal batch sizes for performance
876        let optimal_batch_size = self.calculate_optimal_batch_size(total_memories);
877        let batches: Vec<_> = memory_ids
878            .chunks(optimal_batch_size)
879            .map(|chunk| chunk.to_vec())
880            .collect();
881
882        info!(
883            "Split {} memories into {} batches of ~{} memories each",
884            total_memories,
885            batches.len(),
886            optimal_batch_size
887        );
888
889        let mut combined_result = DeduplicationResult::default();
890        let start_time = Instant::now();
891
892        // Process batches sequentially for safety and simplicity
893        let mut successful_batches = 0;
894        let mut failed_batches = 0;
895
896        for (batch_index, batch_memory_ids) in batches.into_iter().enumerate() {
897            info!(
898                "Processing batch {} with {} memories",
899                batch_index,
900                batch_memory_ids.len()
901            );
902
903            let batch_start = Instant::now();
904
905            // Check if we're approaching time limit
906            if start_time.elapsed().as_secs() > (self.config.max_operation_time_seconds * 3 / 4) {
907                warn!(
908                    "Approaching time limit, stopping batch processing after {} batches",
909                    batch_index
910                );
911                break;
912            }
913
914            match self.deduplicate_batch(&batch_memory_ids).await {
915                Ok(batch_result) => {
916                    successful_batches += 1;
917                    combined_result.total_processed += batch_result.total_processed;
918                    combined_result.groups_identified += batch_result.groups_identified;
919                    combined_result.memories_merged += batch_result.memories_merged;
920                    combined_result.storage_saved_bytes += batch_result.storage_saved_bytes;
921                    combined_result.errors_encountered += batch_result.errors_encountered;
922
923                    // Calculate weighted average compression ratio
924                    if batch_result.compression_ratio > 0.0 {
925                        combined_result.compression_ratio = (combined_result.compression_ratio
926                            * (successful_batches - 1) as f32
927                            + batch_result.compression_ratio)
928                            / successful_batches as f32;
929                    }
930
931                    info!(
932                        "Batch {} completed in {}ms: {} processed, {} merged",
933                        batch_index,
934                        batch_start.elapsed().as_millis(),
935                        batch_result.total_processed,
936                        batch_result.memories_merged
937                    );
938                }
939                Err(e) => {
940                    failed_batches += 1;
941                    combined_result.errors_encountered += 1;
942                    error!("Batch {} failed: {}", batch_index, e);
943
944                    // Continue processing other batches even if one fails
945                    // This provides better resilience in large batch operations
946                }
947            }
948        }
949
950        combined_result.execution_time_ms = start_time.elapsed().as_millis() as u64;
951
952        info!(
953            "Large batch deduplication completed: {}/{} batches successful, {} total memories processed, {} merged, {}ms total time",
954            successful_batches, successful_batches + failed_batches,
955            combined_result.total_processed, combined_result.memories_merged,
956            combined_result.execution_time_ms
957        );
958
959        // Update metrics
960        self.update_metrics(&combined_result).await;
961
962        Ok(combined_result)
963    }
964
965    /// Calculate optimal batch size based on system capacity and memory count
966    fn calculate_optimal_batch_size(&self, total_memories: usize) -> usize {
967        // Base batch size from config
968        let mut batch_size = self.config.batch_size;
969
970        // Scale batch size based on total memory count for better performance
971        if total_memories > 50_000 {
972            batch_size = std::cmp::max(batch_size * 2, 1000); // Larger batches for very large datasets
973        } else if total_memories > 20_000 {
974            batch_size = std::cmp::max(batch_size * 3 / 2, 500); // Medium batches
975        }
976
977        // Ensure we don't exceed max memories per operation
978        batch_size = std::cmp::min(batch_size, self.config.max_memories_per_operation);
979
980        // Ensure minimum viable batch size
981        std::cmp::max(batch_size, 50)
982    }
983
984    /// Acquire distributed lock for deduplication operations with automatic cleanup
985    async fn acquire_operation_lock(
986        &self,
987        operation_id: &str,
988    ) -> Result<(tokio::sync::MutexGuard<'_, ()>, OperationLockGuard)> {
989        // Check if operation is already running
990        {
991            let active_ops = self.active_operations.read().await;
992            if active_ops.contains(operation_id) {
993                return Err(MemoryError::ConcurrencyError {
994                    message: format!("Operation {operation_id} is already in progress"),
995                });
996            }
997        }
998
999        // Acquire PostgreSQL advisory lock for distributed locking
1000        let lock_key = self.calculate_lock_key(operation_id);
1001        let acquired = sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)")
1002            .bind(lock_key)
1003            .fetch_one(self.repository.pool())
1004            .await
1005            .map_err(|e| MemoryError::DatabaseError {
1006                message: format!("Failed to acquire advisory lock: {e}"),
1007            })?;
1008
1009        if !acquired {
1010            return Err(MemoryError::ConcurrencyError {
1011                message: "Another deduplication operation is in progress".to_string(),
1012            });
1013        }
1014
1015        // Add to active operations
1016        {
1017            let mut active_ops = self.active_operations.write().await;
1018            active_ops.insert(operation_id.to_string());
1019        }
1020
1021        // Acquire local mutex guard
1022        let mutex_guard = self.operation_lock.lock().await;
1023
1024        info!("Acquired distributed lock for operation: {}", operation_id);
1025
1026        let lock_guard = OperationLockGuard {
1027            operation_id: operation_id.to_string(),
1028            lock_key,
1029            pool: self.repository.pool().clone(),
1030            active_operations: self.active_operations.clone(),
1031            released: false,
1032        };
1033
1034        Ok((mutex_guard, lock_guard))
1035    }
1036
1037    /// Calculate lock key from operation ID for PostgreSQL advisory locks
1038    fn calculate_lock_key(&self, operation_id: &str) -> i64 {
1039        use std::collections::hash_map::DefaultHasher;
1040        use std::hash::{Hash, Hasher};
1041
1042        let mut hasher = DefaultHasher::new();
1043        operation_id.hash(&mut hasher);
1044        hasher.finish() as i64
1045    }
1046
1047    /// Execute deduplication within a database transaction
1048    async fn execute_deduplication_with_transaction(
1049        &self,
1050        transaction: &mut Transaction<'_, Postgres>,
1051        memory_ids: &[Uuid],
1052        operation_id: &str,
1053    ) -> Result<DeduplicationResult> {
1054        let mut result = DeduplicationResult::default();
1055
1056        // Load memories with embeddings using transaction
1057        let memories = self
1058            .load_memories_with_embeddings_tx(transaction, memory_ids)
1059            .await?;
1060        if memories.is_empty() {
1061            return Ok(result);
1062        }
1063
1064        result.total_processed = memories.len();
1065
1066        // Find similar memory groups using efficient pgvector search
1067        let similar_groups = self.find_similar_groups_optimized(&memories).await?;
1068        result.groups_identified = similar_groups.len();
1069
1070        // Process each group for merging within the transaction
1071        for group in similar_groups {
1072            match self
1073                .process_similar_group_tx(transaction, group, operation_id)
1074                .await
1075            {
1076                Ok(merge_result) => {
1077                    result.memories_merged += merge_result.memories_merged;
1078                    result.storage_saved_bytes += merge_result.storage_saved;
1079                    result.compression_ratio =
1080                        (result.compression_ratio + merge_result.compression_ratio) / 2.0;
1081                }
1082                Err(e) => {
1083                    warn!(
1084                        "Failed to process similar group in operation {}: {}",
1085                        operation_id, e
1086                    );
1087                    result.errors_encountered += 1;
1088                    // Return error to trigger transaction rollback
1089                    return Err(e);
1090                }
1091            }
1092        }
1093
1094        Ok(result)
1095    }
1096}
1097
1098/// Memory merger component for intelligent merging algorithms
1099pub struct MemoryMerger {
1100    #[allow(dead_code)]
1101    config: SemanticDeduplicationConfig,
1102    repository: Arc<MemoryRepository>,
1103}
1104
1105impl MemoryMerger {
1106    pub fn new(config: SemanticDeduplicationConfig, repository: Arc<MemoryRepository>) -> Self {
1107        Self { config, repository }
1108    }
1109
1110    /// Merge a group of similar memories
1111    pub async fn merge_group(&self, group: &SimilarMemoryGroup) -> Result<MergeResult> {
1112        match group.merge_strategy {
1113            MergeStrategy::LosslessPreservation => self.merge_lossless(group).await,
1114            MergeStrategy::MetadataConsolidation => {
1115                self.merge_with_metadata_consolidation(group).await
1116            }
1117            MergeStrategy::ContentSummarization => self.merge_with_summarization(group).await,
1118        }
1119    }
1120
1121    /// Merge a group of similar memories within a transaction
1122    pub async fn merge_group_tx(
1123        &self,
1124        transaction: &mut Transaction<'_, Postgres>,
1125        group: &SimilarMemoryGroup,
1126    ) -> Result<MergeResult> {
1127        match group.merge_strategy {
1128            MergeStrategy::LosslessPreservation => self.merge_lossless_tx(transaction, group).await,
1129            MergeStrategy::MetadataConsolidation => {
1130                self.merge_with_metadata_consolidation_tx(transaction, group)
1131                    .await
1132            }
1133            MergeStrategy::ContentSummarization => {
1134                self.merge_with_summarization_tx(transaction, group).await
1135            }
1136        }
1137    }
1138
1139    /// Lossless merging preserving all content and metadata
1140    async fn merge_lossless(&self, group: &SimilarMemoryGroup) -> Result<MergeResult> {
1141        let memories = &group.memories;
1142        let primary_memory = &memories[0]; // Use highest importance as primary
1143
1144        // Combine all content with clear delineation
1145        let combined_content = memories
1146            .iter()
1147            .enumerate()
1148            .map(|(i, m)| format!("--- Memory {} (ID: {}) ---\n{}", i + 1, m.id, m.content))
1149            .collect::<Vec<_>>()
1150            .join("\n\n");
1151
1152        // Merge metadata preserving all information
1153        let combined_metadata = self.merge_metadata_lossless(memories)?;
1154
1155        // Calculate combined embedding (average of all embeddings)
1156        let combined_embedding = self.calculate_combined_embedding(memories)?;
1157
1158        // Calculate merged importance score (weighted average)
1159        let combined_importance = self.calculate_weighted_importance(memories);
1160
1161        // Store the combined content length before moving
1162        let combined_content_len = combined_content.len();
1163
1164        // Create the merged memory
1165        let merged_memory = self
1166            .create_merged_memory(
1167                combined_content,
1168                combined_embedding,
1169                combined_importance,
1170                combined_metadata,
1171                primary_memory,
1172            )
1173            .await?;
1174
1175        // Archive original memories
1176        let storage_saved = self.archive_original_memories(memories).await?;
1177
1178        Ok(MergeResult {
1179            merged_memory,
1180            storage_saved,
1181            compression_ratio: storage_saved as f32 / combined_content_len as f32,
1182            original_count: memories.len(),
1183        })
1184    }
1185
1186    /// Merge with metadata consolidation but content preservation
1187    async fn merge_with_metadata_consolidation(
1188        &self,
1189        group: &SimilarMemoryGroup,
1190    ) -> Result<MergeResult> {
1191        let memories = &group.memories;
1192        let primary_memory = &memories[0];
1193
1194        // Keep primary content but add reference to similar memories
1195        let mut combined_content = primary_memory.content.clone();
1196        combined_content.push_str("\n\n--- Related Content References ---\n");
1197
1198        for (i, memory) in memories.iter().skip(1).enumerate() {
1199            combined_content.push_str(&format!(
1200                "Related Memory {}: {} (similarity: {:.3})\n",
1201                i + 1,
1202                memory.content.chars().take(100).collect::<String>(),
1203                group.average_similarity
1204            ));
1205        }
1206
1207        let combined_metadata = self.merge_metadata_consolidated(memories)?;
1208        let combined_embedding = self.calculate_combined_embedding(memories)?;
1209        let combined_importance = self.calculate_weighted_importance(memories);
1210
1211        let merged_memory = self
1212            .create_merged_memory(
1213                combined_content,
1214                combined_embedding,
1215                combined_importance,
1216                combined_metadata,
1217                primary_memory,
1218            )
1219            .await?;
1220
1221        let storage_saved = self.archive_original_memories(memories).await?;
1222
1223        Ok(MergeResult {
1224            merged_memory,
1225            storage_saved,
1226            compression_ratio: 2.0, // Moderate compression
1227            original_count: memories.len(),
1228        })
1229    }
1230
1231    /// Merge with content summarization for maximum compression
1232    async fn merge_with_summarization(&self, group: &SimilarMemoryGroup) -> Result<MergeResult> {
1233        let memories = &group.memories;
1234        let primary_memory = &memories[0];
1235
1236        // Create a summary of all content
1237        let summary_content = self.create_content_summary(memories).await?;
1238
1239        let combined_metadata = self.merge_metadata_summarized(memories)?;
1240        let combined_embedding = self.calculate_combined_embedding(memories)?;
1241        let combined_importance = self.calculate_weighted_importance(memories);
1242
1243        let merged_memory = self
1244            .create_merged_memory(
1245                summary_content,
1246                combined_embedding,
1247                combined_importance,
1248                combined_metadata,
1249                primary_memory,
1250            )
1251            .await?;
1252
1253        let storage_saved = self.archive_original_memories(memories).await?;
1254
1255        Ok(MergeResult {
1256            merged_memory,
1257            storage_saved,
1258            compression_ratio: 5.0, // High compression
1259            original_count: memories.len(),
1260        })
1261    }
1262
1263    /// Create a content summary from multiple memories
1264    async fn create_content_summary(&self, memories: &[Memory]) -> Result<String> {
1265        // Simple extractive summarization
1266        let mut key_sentences = Vec::new();
1267
1268        for memory in memories {
1269            let sentences: Vec<&str> = memory.content.split('.').collect();
1270            if !sentences.is_empty() {
1271                key_sentences.push(sentences[0]); // Take first sentence as key
1272            }
1273        }
1274
1275        let summary = format!(
1276            "Summary of {} related memories:\n{}",
1277            memories.len(),
1278            key_sentences.join(". ")
1279        );
1280
1281        Ok(summary)
1282    }
1283
1284    /// Merge metadata in lossless mode
1285    fn merge_metadata_lossless(&self, memories: &[Memory]) -> Result<serde_json::Value> {
1286        let mut combined = serde_json::Map::new();
1287
1288        // Add merge information
1289        combined.insert(
1290            "merge_info".to_string(),
1291            serde_json::json!({
1292                "merge_type": "lossless",
1293                "original_count": memories.len(),
1294                "merged_at": Utc::now(),
1295                "original_ids": memories.iter().map(|m| m.id).collect::<Vec<_>>()
1296            }),
1297        );
1298
1299        // Preserve all original metadata
1300        for (i, memory) in memories.iter().enumerate() {
1301            if let serde_json::Value::Object(metadata_map) = &memory.metadata {
1302                for (key, value) in metadata_map {
1303                    let prefixed_key = format!("memory_{i}_{key}");
1304                    combined.insert(prefixed_key, value.clone());
1305                }
1306            }
1307        }
1308
1309        Ok(serde_json::Value::Object(combined))
1310    }
1311
1312    /// Merge metadata in consolidated mode
1313    fn merge_metadata_consolidated(&self, memories: &[Memory]) -> Result<serde_json::Value> {
1314        let mut combined = serde_json::Map::new();
1315
1316        combined.insert(
1317            "merge_info".to_string(),
1318            serde_json::json!({
1319                "merge_type": "consolidated",
1320                "original_count": memories.len(),
1321                "merged_at": Utc::now(),
1322                "original_ids": memories.iter().map(|m| m.id).collect::<Vec<_>>()
1323            }),
1324        );
1325
1326        // Merge common keys, keep unique ones with prefixes
1327        let mut common_keys = HashMap::new();
1328        for memory in memories {
1329            if let serde_json::Value::Object(metadata_map) = &memory.metadata {
1330                for (key, value) in metadata_map {
1331                    common_keys
1332                        .entry(key.clone())
1333                        .or_insert_with(Vec::new)
1334                        .push(value.clone());
1335                }
1336            }
1337        }
1338
1339        for (key, values) in common_keys {
1340            if values.len() == memories.len() && values.iter().all(|v| v == &values[0]) {
1341                // Common value across all memories
1342                combined.insert(key, values[0].clone());
1343            } else {
1344                // Different values, store as array
1345                combined.insert(key, serde_json::Value::Array(values));
1346            }
1347        }
1348
1349        Ok(serde_json::Value::Object(combined))
1350    }
1351
1352    /// Merge metadata in summarized mode
1353    fn merge_metadata_summarized(&self, memories: &[Memory]) -> Result<serde_json::Value> {
1354        let mut combined = serde_json::Map::new();
1355
1356        combined.insert(
1357            "merge_info".to_string(),
1358            serde_json::json!({
1359                "merge_type": "summarized",
1360                "original_count": memories.len(),
1361                "merged_at": Utc::now(),
1362                "original_ids": memories.iter().map(|m| m.id).collect::<Vec<_>>(),
1363                "compression_applied": true
1364            }),
1365        );
1366
1367        // Only keep essential metadata
1368        if let serde_json::Value::Object(primary_metadata) = &memories[0].metadata {
1369            for (key, value) in primary_metadata {
1370                if ["tags", "category", "type", "priority"].contains(&key.as_str()) {
1371                    combined.insert(key.clone(), value.clone());
1372                }
1373            }
1374        }
1375
1376        Ok(serde_json::Value::Object(combined))
1377    }
1378
1379    /// Calculate combined embedding from multiple memories
1380    fn calculate_combined_embedding(&self, memories: &[Memory]) -> Result<Option<Vector>> {
1381        let embeddings: Vec<&Vector> = memories
1382            .iter()
1383            .filter_map(|m| m.embedding.as_ref())
1384            .collect();
1385
1386        if embeddings.is_empty() {
1387            return Ok(None);
1388        }
1389
1390        let dimension = embeddings[0].as_slice().len();
1391        let mut combined = vec![0.0f32; dimension];
1392
1393        for embedding in &embeddings {
1394            let slice = embedding.as_slice();
1395            for (i, &value) in slice.iter().enumerate() {
1396                combined[i] += value;
1397            }
1398        }
1399
1400        // Average the embeddings
1401        for value in &mut combined {
1402            *value /= embeddings.len() as f32;
1403        }
1404
1405        Ok(Some(Vector::from(combined)))
1406    }
1407
1408    /// Calculate weighted importance score
1409    fn calculate_weighted_importance(&self, memories: &[Memory]) -> f64 {
1410        let total_weight: f64 = memories.iter().map(|m| m.access_count as f64 + 1.0).sum();
1411        let weighted_sum: f64 = memories
1412            .iter()
1413            .map(|m| m.importance_score * (m.access_count as f64 + 1.0))
1414            .sum();
1415
1416        weighted_sum / total_weight
1417    }
1418
1419    /// Create the merged memory record within a transaction
1420    async fn create_merged_memory_tx(
1421        &self,
1422        transaction: &mut Transaction<'_, Postgres>,
1423        content: String,
1424        embedding: Option<Vector>,
1425        importance: f64,
1426        metadata: serde_json::Value,
1427        primary_memory: &Memory,
1428    ) -> Result<Memory> {
1429        let memory_id = Uuid::new_v4();
1430        let content_hash = format!("{:x}", md5::compute(&content));
1431        let now = Utc::now();
1432
1433        let query = r#"
1434            INSERT INTO memories (
1435                id, content, content_hash, embedding, tier, status,
1436                importance_score, access_count, last_accessed_at, metadata,
1437                parent_id, created_at, updated_at, expires_at,
1438                consolidation_strength, decay_rate, recall_probability,
1439                recency_score, relevance_score, is_merged_result,
1440                original_memory_count, merge_generation
1441            ) VALUES (
1442                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1443                $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
1444            ) RETURNING *
1445        "#;
1446
1447        let row = sqlx::query(query)
1448            .bind(memory_id)
1449            .bind(&content)
1450            .bind(&content_hash)
1451            .bind(embedding)
1452            .bind(primary_memory.tier)
1453            .bind(MemoryStatus::Active)
1454            .bind(importance)
1455            .bind(0i32) // access_count
1456            .bind(now) // last_accessed_at
1457            .bind(&metadata)
1458            .bind(primary_memory.parent_id)
1459            .bind(now) // created_at
1460            .bind(now) // updated_at
1461            .bind(primary_memory.expires_at)
1462            .bind(primary_memory.consolidation_strength)
1463            .bind(primary_memory.decay_rate)
1464            .bind(primary_memory.recall_probability)
1465            .bind(primary_memory.recency_score)
1466            .bind(primary_memory.relevance_score)
1467            .bind(true) // is_merged_result
1468            .bind(1i32) // original_memory_count (will be updated)
1469            .bind(
1470                primary_memory
1471                    .metadata
1472                    .get("merge_generation")
1473                    .and_then(|v| v.as_i64())
1474                    .unwrap_or(0)
1475                    + 1,
1476            ) // merge_generation
1477            .fetch_one(&mut **transaction)
1478            .await
1479            .map_err(|e| MemoryError::DatabaseError {
1480                message: format!("Failed to create merged memory: {e}"),
1481            })?;
1482
1483        Ok(Memory {
1484            id: row.get("id"),
1485            content: row.get("content"),
1486            content_hash: row.get("content_hash"),
1487            embedding: row.get("embedding"),
1488            tier: row.get("tier"),
1489            status: row.get("status"),
1490            importance_score: row.get("importance_score"),
1491            access_count: row.get("access_count"),
1492            last_accessed_at: row.get("last_accessed_at"),
1493            metadata: row.get("metadata"),
1494            parent_id: row.get("parent_id"),
1495            created_at: row.get("created_at"),
1496            updated_at: row.get("updated_at"),
1497            expires_at: row.get("expires_at"),
1498            consolidation_strength: row.get("consolidation_strength"),
1499            decay_rate: row.get("decay_rate"),
1500            recall_probability: row.get("recall_probability"),
1501            last_recall_interval: row.get("last_recall_interval"),
1502            recency_score: row.get("recency_score"),
1503            relevance_score: row.get("relevance_score"),
1504        })
1505    }
1506
1507    /// Create the merged memory record (legacy method)
1508    async fn create_merged_memory(
1509        &self,
1510        content: String,
1511        embedding: Option<Vector>,
1512        importance: f64,
1513        metadata: serde_json::Value,
1514        primary_memory: &Memory,
1515    ) -> Result<Memory> {
1516        let create_request = CreateMemoryRequest {
1517            content,
1518            embedding: embedding.map(|v| v.as_slice().to_vec()),
1519            tier: Some(primary_memory.tier),
1520            importance_score: Some(importance),
1521            metadata: Some(metadata),
1522            parent_id: None,
1523            expires_at: primary_memory.expires_at,
1524        };
1525
1526        self.repository.create_memory(create_request).await
1527    }
1528
1529    /// Transaction-based lossless merge
1530    async fn merge_lossless_tx(
1531        &self,
1532        transaction: &mut Transaction<'_, Postgres>,
1533        group: &SimilarMemoryGroup,
1534    ) -> Result<MergeResult> {
1535        let memories = &group.memories;
1536        let primary_memory = &memories[0];
1537
1538        // Combine all content with clear delineation
1539        let combined_content = memories
1540            .iter()
1541            .enumerate()
1542            .map(|(i, m)| format!("--- Memory {} (ID: {}) ---\n{}", i + 1, m.id, m.content))
1543            .collect::<Vec<_>>()
1544            .join("\n\n");
1545
1546        // Merge metadata preserving all information
1547        let combined_metadata = self.merge_metadata_lossless(memories)?;
1548        let combined_embedding = self.calculate_combined_embedding(memories)?;
1549        let combined_importance = self.calculate_weighted_importance(memories);
1550
1551        // Create the merged memory within transaction
1552        let merged_memory = self
1553            .create_merged_memory_tx(
1554                transaction,
1555                combined_content,
1556                combined_embedding,
1557                combined_importance,
1558                combined_metadata,
1559                primary_memory,
1560            )
1561            .await?;
1562
1563        // Archive original memories with verification
1564        let storage_saved = self
1565            .archive_original_memories_tx(transaction, memories, merged_memory.id)
1566            .await?;
1567
1568        Ok(MergeResult {
1569            merged_memory: merged_memory.clone(),
1570            storage_saved,
1571            compression_ratio: storage_saved as f32 / merged_memory.content.len() as f32,
1572            original_count: memories.len(),
1573        })
1574    }
1575
1576    /// Transaction-based metadata consolidation merge
1577    async fn merge_with_metadata_consolidation_tx(
1578        &self,
1579        transaction: &mut Transaction<'_, Postgres>,
1580        group: &SimilarMemoryGroup,
1581    ) -> Result<MergeResult> {
1582        let memories = &group.memories;
1583        let primary_memory = &memories[0];
1584
1585        // Keep primary content but add reference to similar memories
1586        let mut combined_content = primary_memory.content.clone();
1587        combined_content.push_str("\n\n--- Related Content References ---\n");
1588
1589        for (i, memory) in memories.iter().skip(1).enumerate() {
1590            combined_content.push_str(&format!(
1591                "Related Memory {}: {} (similarity: {:.3})\n",
1592                i + 1,
1593                memory.content.chars().take(100).collect::<String>(),
1594                group.average_similarity
1595            ));
1596        }
1597
1598        let combined_metadata = self.merge_metadata_consolidated(memories)?;
1599        let combined_embedding = self.calculate_combined_embedding(memories)?;
1600        let combined_importance = self.calculate_weighted_importance(memories);
1601
1602        let merged_memory = self
1603            .create_merged_memory_tx(
1604                transaction,
1605                combined_content,
1606                combined_embedding,
1607                combined_importance,
1608                combined_metadata,
1609                primary_memory,
1610            )
1611            .await?;
1612
1613        let storage_saved = self
1614            .archive_original_memories_tx(transaction, memories, merged_memory.id)
1615            .await?;
1616
1617        Ok(MergeResult {
1618            merged_memory,
1619            storage_saved,
1620            compression_ratio: 2.0, // Moderate compression
1621            original_count: memories.len(),
1622        })
1623    }
1624
1625    /// Transaction-based summarization merge
1626    async fn merge_with_summarization_tx(
1627        &self,
1628        transaction: &mut Transaction<'_, Postgres>,
1629        group: &SimilarMemoryGroup,
1630    ) -> Result<MergeResult> {
1631        let memories = &group.memories;
1632        let primary_memory = &memories[0];
1633
1634        // Create a summary of all content
1635        let summary_content = self.create_content_summary(memories).await?;
1636
1637        let combined_metadata = self.merge_metadata_summarized(memories)?;
1638        let combined_embedding = self.calculate_combined_embedding(memories)?;
1639        let combined_importance = self.calculate_weighted_importance(memories);
1640
1641        let merged_memory = self
1642            .create_merged_memory_tx(
1643                transaction,
1644                summary_content,
1645                combined_embedding,
1646                combined_importance,
1647                combined_metadata,
1648                primary_memory,
1649            )
1650            .await?;
1651
1652        let storage_saved = self
1653            .archive_original_memories_tx(transaction, memories, merged_memory.id)
1654            .await?;
1655
1656        Ok(MergeResult {
1657            merged_memory,
1658            storage_saved,
1659            compression_ratio: 5.0, // High compression
1660            original_count: memories.len(),
1661        })
1662    }
1663
1664    /// Archive the original memories with proper verification and transaction safety
1665    async fn archive_original_memories_tx(
1666        &self,
1667        transaction: &mut Transaction<'_, Postgres>,
1668        memories: &[Memory],
1669        merged_memory_id: Uuid,
1670    ) -> Result<u64> {
1671        let mut total_size = 0u64;
1672
1673        // Verify merged memory was created successfully before archiving originals
1674        let merged_exists = sqlx::query_scalar::<_, bool>(
1675            "SELECT EXISTS(SELECT 1 FROM memories WHERE id = $1 AND status = 'active')",
1676        )
1677        .bind(merged_memory_id)
1678        .fetch_one(&mut **transaction)
1679        .await
1680        .map_err(|e| MemoryError::DatabaseError {
1681            message: format!("Failed to verify merged memory exists: {e}"),
1682        })?;
1683
1684        if !merged_exists {
1685            return Err(MemoryError::SafetyViolation {
1686                message: format!(
1687                    "Cannot archive original memories: merged memory {merged_memory_id} does not exist"
1688                ),
1689            });
1690        }
1691
1692        for memory in memories {
1693            // Verify memory is still active before archiving
1694            let current_status =
1695                sqlx::query_scalar::<_, String>("SELECT status FROM memories WHERE id = $1")
1696                    .bind(memory.id)
1697                    .fetch_optional(&mut **transaction)
1698                    .await
1699                    .map_err(|e| MemoryError::DatabaseError {
1700                        message: format!("Failed to check memory status: {e}"),
1701                    })?;
1702
1703            match current_status {
1704                Some(status) if status == "active" => {
1705                    total_size += memory.content.len() as u64;
1706
1707                    // Create backup in compression log for reversibility
1708                    sqlx::query(
1709                        r#"
1710                        INSERT INTO memory_compression_log (
1711                            memory_id, original_content, original_metadata, 
1712                            compression_type, compression_ratio, 
1713                            reversible_until
1714                        ) VALUES ($1, $2, $3, $4, $5, $6)
1715                    "#,
1716                    )
1717                    .bind(memory.id)
1718                    .bind(&memory.content)
1719                    .bind(&memory.metadata)
1720                    .bind("archive")
1721                    .bind(1.0)
1722                    .bind(Utc::now() + Duration::days(7))
1723                    .execute(&mut **transaction)
1724                    .await
1725                    .map_err(|e| MemoryError::DatabaseError {
1726                        message: format!("Failed to create archive backup: {e}"),
1727                    })?;
1728
1729                    // Update status to archived
1730                    let archive_result = sqlx::query(
1731                        "UPDATE memories SET status = 'archived', updated_at = NOW() WHERE id = $1 AND status = 'active'",
1732                    )
1733                    .bind(memory.id)
1734                    .execute(&mut **transaction)
1735                    .await
1736                    .map_err(|e| MemoryError::DatabaseError {
1737                        message: format!("Failed to archive memory: {e}"),
1738                    })?;
1739
1740                    if archive_result.rows_affected() == 0 {
1741                        warn!(
1742                            "Memory {} was not archived (may have been modified concurrently)",
1743                            memory.id
1744                        );
1745                    } else {
1746                        debug!("Successfully archived memory: {}", memory.id);
1747                    }
1748                }
1749                Some(status) => {
1750                    warn!(
1751                        "Skipping archival of memory {} - status is {}, not active",
1752                        memory.id, status
1753                    );
1754                }
1755                None => {
1756                    warn!("Memory {} no longer exists, cannot archive", memory.id);
1757                }
1758            }
1759        }
1760
1761        Ok(total_size)
1762    }
1763
1764    /// Archive the original memories (legacy method for backward compatibility)
1765    async fn archive_original_memories(&self, memories: &[Memory]) -> Result<u64> {
1766        let mut total_size = 0u64;
1767
1768        for memory in memories {
1769            total_size += memory.content.len() as u64;
1770
1771            // Update status to archived instead of deleting
1772            sqlx::query(
1773                "UPDATE memories SET status = 'archived', updated_at = NOW() WHERE id = $1",
1774            )
1775            .bind(memory.id)
1776            .execute(self.repository.pool())
1777            .await?;
1778        }
1779
1780        Ok(total_size)
1781    }
1782}
1783
1784/// Compression manager for hierarchical compression strategies
1785pub struct CompressionManager {
1786    config: SemanticDeduplicationConfig,
1787}
1788
1789impl CompressionManager {
1790    pub fn new(config: SemanticDeduplicationConfig) -> Self {
1791        Self { config }
1792    }
1793
1794    /// Apply compression based on memory tier and criticality
1795    pub async fn compress_memory(&self, memory: &Memory) -> Result<CompressionResult> {
1796        let is_critical =
1797            memory.importance_score > 0.8 || matches!(memory.tier, MemoryTier::Working);
1798
1799        if is_critical && self.config.lossless_critical {
1800            self.apply_lossless_compression(memory).await
1801        } else {
1802            self.apply_lossy_compression(memory).await
1803        }
1804    }
1805
1806    /// Apply lossless compression (mainly structural optimization)
1807    async fn apply_lossless_compression(&self, memory: &Memory) -> Result<CompressionResult> {
1808        // Lossless compression: remove redundant whitespace, normalize structure
1809        let compressed_content = self.normalize_content(&memory.content);
1810        let compression_ratio = memory.content.len() as f32 / compressed_content.len() as f32;
1811
1812        Ok(CompressionResult {
1813            original_size: memory.content.len(),
1814            compressed_size: compressed_content.len(),
1815            compression_ratio,
1816            is_lossless: true,
1817            compressed_content,
1818        })
1819    }
1820
1821    /// Apply lossy compression (content summarization, metadata reduction)
1822    async fn apply_lossy_compression(&self, memory: &Memory) -> Result<CompressionResult> {
1823        // Lossy compression: extract key information, summarize
1824        let compressed_content = self.extract_key_information(&memory.content);
1825        let compression_ratio = memory.content.len() as f32 / compressed_content.len() as f32;
1826
1827        Ok(CompressionResult {
1828            original_size: memory.content.len(),
1829            compressed_size: compressed_content.len(),
1830            compression_ratio,
1831            is_lossless: false,
1832            compressed_content,
1833        })
1834    }
1835
1836    /// Normalize content for lossless compression
1837    fn normalize_content(&self, content: &str) -> String {
1838        // Remove excessive whitespace while preserving structure
1839        content
1840            .lines()
1841            .map(|line| line.trim())
1842            .filter(|line| !line.is_empty())
1843            .collect::<Vec<_>>()
1844            .join("\n")
1845    }
1846
1847    /// Extract key information for lossy compression
1848    fn extract_key_information(&self, content: &str) -> String {
1849        // Simple extractive approach: keep first and last sentences, key phrases
1850        let sentences: Vec<&str> = content
1851            .split('.')
1852            .filter(|s| !s.trim().is_empty())
1853            .collect();
1854
1855        if sentences.len() <= 2 {
1856            return content.to_string();
1857        }
1858
1859        format!(
1860            "{}. ... {} (content compressed from {} to 2 key sentences)",
1861            sentences[0].trim(),
1862            sentences.last().unwrap().trim(),
1863            sentences.len()
1864        )
1865    }
1866}
1867
1868/// Audit trail for operation tracking and reversibility
1869pub struct AuditTrail {
1870    repository: Arc<MemoryRepository>,
1871}
1872
1873impl AuditTrail {
1874    pub fn new(repository: Arc<MemoryRepository>) -> Self {
1875        Self { repository }
1876    }
1877
1878    /// Create a merge audit entry within a transaction
1879    pub async fn create_merge_entry_tx(
1880        &self,
1881        transaction: &mut Transaction<'_, Postgres>,
1882        group: &SimilarMemoryGroup,
1883    ) -> Result<AuditEntry> {
1884        let entry_id = Uuid::new_v4();
1885        let operation_data = serde_json::json!({
1886            "operation_type": "merge",
1887            "memory_ids": group.memories.iter().map(|m| m.id).collect::<Vec<_>>(),
1888            "strategy": group.merge_strategy,
1889            "similarity": group.average_similarity,
1890            "memory_count": group.memories.len()
1891        });
1892
1893        sqlx::query(
1894            r#"
1895            INSERT INTO deduplication_audit_log 
1896            (id, operation_type, operation_data, created_at, status)
1897            VALUES ($1, $2, $3, $4, $5)
1898            "#,
1899        )
1900        .bind(entry_id)
1901        .bind("merge")
1902        .bind(operation_data)
1903        .bind(Utc::now())
1904        .bind("in_progress")
1905        .execute(&mut **transaction)
1906        .await
1907        .map_err(|e| MemoryError::DatabaseError {
1908            message: format!("Failed to create merge audit entry: {e}"),
1909        })?;
1910
1911        Ok(AuditEntry {
1912            id: entry_id,
1913            operation_type: "merge".to_string(),
1914            created_at: Utc::now(),
1915            status: "in_progress".to_string(),
1916        })
1917    }
1918
1919    /// Create a merge audit entry (legacy method)
1920    pub async fn create_merge_entry(&self, group: &SimilarMemoryGroup) -> Result<AuditEntry> {
1921        let entry_id = Uuid::new_v4();
1922        let operation_data = serde_json::json!({
1923            "operation_type": "merge",
1924            "memory_ids": group.memories.iter().map(|m| m.id).collect::<Vec<_>>(),
1925            "strategy": group.merge_strategy,
1926            "similarity": group.average_similarity,
1927            "memory_count": group.memories.len()
1928        });
1929
1930        sqlx::query(
1931            r#"
1932            INSERT INTO deduplication_audit_log 
1933            (id, operation_type, operation_data, created_at, status)
1934            VALUES ($1, $2, $3, $4, $5)
1935            "#,
1936        )
1937        .bind(entry_id)
1938        .bind("merge")
1939        .bind(operation_data)
1940        .bind(Utc::now())
1941        .bind("in_progress")
1942        .execute(self.repository.pool())
1943        .await?;
1944
1945        Ok(AuditEntry {
1946            id: entry_id,
1947            operation_type: "merge".to_string(),
1948            created_at: Utc::now(),
1949            status: "in_progress".to_string(),
1950        })
1951    }
1952
1953    /// Complete a merge audit entry within a transaction
1954    pub async fn complete_merge_entry_tx(
1955        &self,
1956        transaction: &mut Transaction<'_, Postgres>,
1957        entry_id: Uuid,
1958        result: &MergeResult,
1959    ) -> Result<()> {
1960        let completion_data = serde_json::json!({
1961            "merged_memory_id": result.merged_memory.id,
1962            "storage_saved": result.storage_saved,
1963            "compression_ratio": result.compression_ratio,
1964            "original_count": result.original_count
1965        });
1966
1967        sqlx::query(
1968            r#"
1969            UPDATE deduplication_audit_log 
1970            SET status = $1, completion_data = $2, completed_at = $3
1971            WHERE id = $4
1972            "#,
1973        )
1974        .bind("completed")
1975        .bind(completion_data)
1976        .bind(Utc::now())
1977        .bind(entry_id)
1978        .execute(&mut **transaction)
1979        .await
1980        .map_err(|e| MemoryError::DatabaseError {
1981            message: format!("Failed to complete merge audit entry: {e}"),
1982        })?;
1983
1984        Ok(())
1985    }
1986
1987    /// Complete a merge audit entry (legacy method)
1988    pub async fn complete_merge_entry(&self, entry_id: Uuid, result: &MergeResult) -> Result<()> {
1989        let completion_data = serde_json::json!({
1990            "merged_memory_id": result.merged_memory.id,
1991            "storage_saved": result.storage_saved,
1992            "compression_ratio": result.compression_ratio,
1993            "original_count": result.original_count
1994        });
1995
1996        sqlx::query(
1997            r#"
1998            UPDATE deduplication_audit_log 
1999            SET status = $1, completion_data = $2, completed_at = $3
2000            WHERE id = $4
2001            "#,
2002        )
2003        .bind("completed")
2004        .bind(completion_data)
2005        .bind(Utc::now())
2006        .bind(entry_id)
2007        .execute(self.repository.pool())
2008        .await?;
2009
2010        Ok(())
2011    }
2012
2013    /// Record a pruning operation
2014    pub async fn record_pruning(&self, pruned_ids: &[Uuid], reason: &str) -> Result<AuditEntry> {
2015        let entry_id = Uuid::new_v4();
2016        let operation_data = serde_json::json!({
2017            "operation_type": "prune",
2018            "pruned_memory_ids": pruned_ids,
2019            "reason": reason,
2020            "count": pruned_ids.len()
2021        });
2022
2023        sqlx::query(
2024            r#"
2025            INSERT INTO deduplication_audit_log 
2026            (id, operation_type, operation_data, created_at, status, completed_at)
2027            VALUES ($1, $2, $3, $4, $5, $6)
2028            "#,
2029        )
2030        .bind(entry_id)
2031        .bind("prune")
2032        .bind(operation_data)
2033        .bind(Utc::now())
2034        .bind("completed")
2035        .bind(Utc::now())
2036        .execute(self.repository.pool())
2037        .await?;
2038
2039        Ok(AuditEntry {
2040            id: entry_id,
2041            operation_type: "prune".to_string(),
2042            created_at: Utc::now(),
2043            status: "completed".to_string(),
2044        })
2045    }
2046
2047    /// Get reversible operations (within 7 days)
2048    pub async fn get_reversible_operations(&self) -> Result<Vec<ReversibleOperation>> {
2049        let cutoff_date = Utc::now() - Duration::days(7);
2050
2051        let query = r#"
2052            SELECT id, operation_type, operation_data, completion_data, created_at
2053            FROM deduplication_audit_log 
2054            WHERE created_at > $1 
2055            AND operation_type IN ('merge', 'prune')
2056            AND status = 'completed'
2057            AND reversible_until IS NOT NULL
2058            AND reversible_until > NOW()
2059            ORDER BY created_at DESC
2060        "#;
2061
2062        let rows = sqlx::query(query)
2063            .bind(cutoff_date)
2064            .fetch_all(self.repository.pool())
2065            .await?;
2066
2067        let mut operations = Vec::new();
2068        for row in rows {
2069            operations.push(ReversibleOperation {
2070                id: row.get("id"),
2071                operation_type: row.get("operation_type"),
2072                operation_data: row.get("operation_data"),
2073                completion_data: row.get("completion_data"),
2074                created_at: row.get("created_at"),
2075            });
2076        }
2077
2078        Ok(operations)
2079    }
2080
2081    /// Reverse a deduplication or pruning operation
2082    pub async fn reverse_operation(&self, operation_id: Uuid) -> Result<ReversalResult> {
2083        info!("Starting reversal of operation: {}", operation_id);
2084
2085        // Begin transaction for atomic reversal
2086        let mut transaction =
2087            self.repository
2088                .pool()
2089                .begin()
2090                .await
2091                .map_err(|e| MemoryError::DatabaseError {
2092                    message: format!("Failed to begin reversal transaction: {e}"),
2093                })?;
2094
2095        let result = self
2096            .execute_operation_reversal(&mut transaction, operation_id)
2097            .await;
2098
2099        match result {
2100            Ok(reversal_result) => {
2101                transaction
2102                    .commit()
2103                    .await
2104                    .map_err(|e| MemoryError::DatabaseError {
2105                        message: format!("Failed to commit reversal transaction: {e}"),
2106                    })?;
2107                info!("Successfully reversed operation: {}", operation_id);
2108                Ok(reversal_result)
2109            }
2110            Err(e) => {
2111                if let Err(rollback_err) = transaction.rollback().await {
2112                    error!("Failed to rollback reversal transaction: {}", rollback_err);
2113                }
2114                error!("Reversal failed for operation {}: {}", operation_id, e);
2115                Err(e)
2116            }
2117        }
2118    }
2119
2120    /// Execute operation reversal within a transaction
2121    async fn execute_operation_reversal(
2122        &self,
2123        transaction: &mut Transaction<'_, Postgres>,
2124        operation_id: Uuid,
2125    ) -> Result<ReversalResult> {
2126        // Get operation details
2127        let operation_query = r#"
2128            SELECT operation_type, operation_data, completion_data, status, reversible_until
2129            FROM deduplication_audit_log 
2130            WHERE id = $1
2131        "#;
2132
2133        let operation_row = sqlx::query(operation_query)
2134            .bind(operation_id)
2135            .fetch_optional(&mut **transaction)
2136            .await
2137            .map_err(|e| MemoryError::DatabaseError {
2138                message: format!("Failed to fetch operation details: {e}"),
2139            })?
2140            .ok_or_else(|| MemoryError::NotFound {
2141                id: operation_id.to_string(),
2142            })?;
2143
2144        let operation_type: String = operation_row.get("operation_type");
2145        let operation_data: serde_json::Value = operation_row.get("operation_data");
2146        let status: String = operation_row.get("status");
2147        let reversible_until: Option<DateTime<Utc>> = operation_row.get("reversible_until");
2148
2149        // Validate operation is reversible
2150        if status != "completed" {
2151            return Err(MemoryError::InvalidRequest {
2152                message: format!("Operation {operation_id} is not in completed state: {status}"),
2153            });
2154        }
2155
2156        if let Some(cutoff) = reversible_until {
2157            if Utc::now() > cutoff {
2158                return Err(MemoryError::InvalidRequest {
2159                    message: format!("Operation {operation_id} is past its reversal deadline"),
2160                });
2161            }
2162        } else {
2163            return Err(MemoryError::InvalidRequest {
2164                message: format!("Operation {operation_id} is not reversible"),
2165            });
2166        }
2167
2168        // Perform reversal based on operation type
2169        let reversal_result = match operation_type.as_str() {
2170            "merge" => {
2171                self.reverse_merge_operation(transaction, operation_id, &operation_data)
2172                    .await?
2173            }
2174            "prune" => {
2175                self.reverse_prune_operation(transaction, operation_id, &operation_data)
2176                    .await?
2177            }
2178            _ => {
2179                return Err(MemoryError::InvalidRequest {
2180                    message: format!("Unsupported operation type for reversal: {operation_type}"),
2181                });
2182            }
2183        };
2184
2185        // Mark operation as reversed
2186        sqlx::query(
2187            r#"
2188            UPDATE deduplication_audit_log 
2189            SET status = 'reversed', reversible_until = NULL
2190            WHERE id = $1
2191        "#,
2192        )
2193        .bind(operation_id)
2194        .execute(&mut **transaction)
2195        .await
2196        .map_err(|e| MemoryError::DatabaseError {
2197            message: format!("Failed to mark operation as reversed: {e}"),
2198        })?;
2199
2200        Ok(reversal_result)
2201    }
2202
2203    /// Reverse a merge operation by restoring original memories
2204    async fn reverse_merge_operation(
2205        &self,
2206        transaction: &mut Transaction<'_, Postgres>,
2207        operation_id: Uuid,
2208        operation_data: &serde_json::Value,
2209    ) -> Result<ReversalResult> {
2210        info!("Reversing merge operation: {}", operation_id);
2211
2212        // Extract memory IDs from operation data
2213        let memory_ids: Vec<Uuid> = operation_data["memory_ids"]
2214            .as_array()
2215            .ok_or_else(|| MemoryError::InvalidData {
2216                message: "Missing memory_ids in merge operation data".to_string(),
2217            })?
2218            .iter()
2219            .map(|id| {
2220                Uuid::parse_str(id.as_str().unwrap_or("")).map_err(|e| MemoryError::InvalidData {
2221                    message: format!("Invalid UUID in memory_ids: {e}"),
2222                })
2223            })
2224            .collect::<Result<Vec<_>>>()?;
2225
2226        // Get merge history to find the merged result and original memories
2227        let merge_history_query = r#"
2228            SELECT merged_memory_id, original_memory_id 
2229            FROM memory_merge_history 
2230            WHERE merge_operation_id = $1
2231        "#;
2232
2233        let merge_rows = sqlx::query(merge_history_query)
2234            .bind(operation_id)
2235            .fetch_all(&mut **transaction)
2236            .await
2237            .map_err(|e| MemoryError::DatabaseError {
2238                message: format!("Failed to fetch merge history: {e}"),
2239            })?;
2240
2241        if merge_rows.is_empty() {
2242            return Err(MemoryError::InvalidData {
2243                message: "No merge history found for operation".to_string(),
2244            });
2245        }
2246
2247        let merged_memory_id: Uuid = merge_rows[0].get("merged_memory_id");
2248        let mut restored_count = 0;
2249
2250        // Restore each original memory from archived status
2251        for memory_id in &memory_ids {
2252            let restore_result = sqlx::query(
2253                r#"
2254                UPDATE memories 
2255                SET status = 'active', updated_at = NOW() 
2256                WHERE id = $1 AND status = 'archived'
2257            "#,
2258            )
2259            .bind(memory_id)
2260            .execute(&mut **transaction)
2261            .await
2262            .map_err(|e| MemoryError::DatabaseError {
2263                message: format!("Failed to restore memory {memory_id}: {e}"),
2264            })?;
2265
2266            if restore_result.rows_affected() > 0 {
2267                restored_count += 1;
2268                info!("Restored original memory: {}", memory_id);
2269            } else {
2270                warn!(
2271                    "Memory {} was not in archived status, cannot restore",
2272                    memory_id
2273                );
2274            }
2275        }
2276
2277        // Archive the merged memory result
2278        sqlx::query(
2279            r#"
2280            UPDATE memories 
2281            SET status = 'archived', updated_at = NOW() 
2282            WHERE id = $1
2283        "#,
2284        )
2285        .bind(merged_memory_id)
2286        .execute(&mut **transaction)
2287        .await
2288        .map_err(|e| MemoryError::DatabaseError {
2289            message: format!("Failed to archive merged memory: {e}"),
2290        })?;
2291
2292        info!("Archived merged memory result: {}", merged_memory_id);
2293
2294        Ok(ReversalResult {
2295            operation_id,
2296            operation_type: "merge".to_string(),
2297            memories_restored: restored_count,
2298            success: restored_count > 0,
2299            message: format!(
2300                "Restored {} of {} original memories from merge operation",
2301                restored_count,
2302                memory_ids.len()
2303            ),
2304        })
2305    }
2306
2307    /// Reverse a prune operation by restoring deleted memories
2308    async fn reverse_prune_operation(
2309        &self,
2310        transaction: &mut Transaction<'_, Postgres>,
2311        operation_id: Uuid,
2312        operation_data: &serde_json::Value,
2313    ) -> Result<ReversalResult> {
2314        info!("Reversing prune operation: {}", operation_id);
2315
2316        // Extract pruned memory IDs from operation data
2317        let pruned_memory_ids: Vec<Uuid> = operation_data["pruned_memory_ids"]
2318            .as_array()
2319            .ok_or_else(|| MemoryError::InvalidData {
2320                message: "Missing pruned_memory_ids in prune operation data".to_string(),
2321            })?
2322            .iter()
2323            .map(|id| {
2324                Uuid::parse_str(id.as_str().unwrap_or("")).map_err(|e| MemoryError::InvalidData {
2325                    message: format!("Invalid UUID in pruned_memory_ids: {e}"),
2326                })
2327            })
2328            .collect::<Result<Vec<_>>>()?;
2329
2330        let mut restored_count = 0;
2331
2332        // Restore each pruned memory from deleted status
2333        for memory_id in &pruned_memory_ids {
2334            let restore_result = sqlx::query(
2335                r#"
2336                UPDATE memories 
2337                SET status = 'active', updated_at = NOW() 
2338                WHERE id = $1 AND status = 'deleted'
2339            "#,
2340            )
2341            .bind(memory_id)
2342            .execute(&mut **transaction)
2343            .await
2344            .map_err(|e| MemoryError::DatabaseError {
2345                message: format!("Failed to restore pruned memory {memory_id}: {e}"),
2346            })?;
2347
2348            if restore_result.rows_affected() > 0 {
2349                restored_count += 1;
2350                info!("Restored pruned memory: {}", memory_id);
2351            } else {
2352                warn!(
2353                    "Memory {} was not in deleted status, cannot restore",
2354                    memory_id
2355                );
2356            }
2357        }
2358
2359        Ok(ReversalResult {
2360            operation_id,
2361            operation_type: "prune".to_string(),
2362            memories_restored: restored_count,
2363            success: restored_count > 0,
2364            message: format!(
2365                "Restored {} of {} pruned memories",
2366                restored_count,
2367                pruned_memory_ids.len()
2368            ),
2369        })
2370    }
2371}
2372
2373/// Auto-pruner for automated memory cleanup
2374pub struct AutoPruner {
2375    #[allow(dead_code)]
2376    config: SemanticDeduplicationConfig,
2377    repository: Arc<MemoryRepository>,
2378}
2379
2380impl AutoPruner {
2381    pub fn new(config: SemanticDeduplicationConfig, repository: Arc<MemoryRepository>) -> Self {
2382        Self { config, repository }
2383    }
2384
2385    /// Prune memories based on recall probability and age with comprehensive safety checks
2386    pub async fn prune_memories(
2387        &self,
2388        threshold: f64,
2389        cutoff_date: DateTime<Utc>,
2390    ) -> Result<PruningResult> {
2391        info!(
2392            "Starting safe pruning with threshold {} and cutoff date {}",
2393            threshold, cutoff_date
2394        );
2395
2396        // Begin transaction for atomic pruning operation
2397        let mut transaction =
2398            self.repository
2399                .pool()
2400                .begin()
2401                .await
2402                .map_err(|e| MemoryError::DatabaseError {
2403                    message: format!("Failed to begin pruning transaction: {e}"),
2404                })?;
2405
2406        let result = self
2407            .execute_safe_pruning(&mut transaction, threshold, cutoff_date)
2408            .await;
2409
2410        match result {
2411            Ok(pruning_result) => {
2412                transaction
2413                    .commit()
2414                    .await
2415                    .map_err(|e| MemoryError::DatabaseError {
2416                        message: format!("Failed to commit pruning transaction: {e}"),
2417                    })?;
2418                info!(
2419                    "Safe pruning completed: {} memories pruned, {} bytes freed",
2420                    pruning_result.memories_pruned, pruning_result.storage_freed
2421                );
2422                Ok(pruning_result)
2423            }
2424            Err(e) => {
2425                if let Err(rollback_err) = transaction.rollback().await {
2426                    error!("Failed to rollback pruning transaction: {}", rollback_err);
2427                }
2428                error!("Pruning failed: {}", e);
2429                Err(e)
2430            }
2431        }
2432    }
2433
2434    /// Execute pruning with comprehensive safety checks within a transaction
2435    async fn execute_safe_pruning(
2436        &self,
2437        transaction: &mut Transaction<'_, Postgres>,
2438        threshold: f64,
2439        cutoff_date: DateTime<Utc>,
2440    ) -> Result<PruningResult> {
2441        // Query for pruning candidates with comprehensive safety filtering
2442        let query = r#"
2443            SELECT 
2444                id, 
2445                content, 
2446                recall_probability,
2447                importance_score,
2448                access_count,
2449                last_accessed_at,
2450                tier,
2451                metadata,
2452                consolidation_strength
2453            FROM memories 
2454            WHERE recall_probability IS NOT NULL 
2455            AND recall_probability < $1 
2456            AND created_at < $2 
2457            AND status = 'active'
2458            AND tier IN ('cold', 'frozen')
2459            -- Additional safety checks
2460            AND importance_score < 0.3  -- Don't prune important memories
2461            AND access_count < 10       -- Don't prune frequently accessed memories
2462            AND (
2463                last_accessed_at IS NULL OR 
2464                last_accessed_at < (NOW() - INTERVAL '30 days')
2465            )  -- Don't prune recently accessed memories
2466            AND consolidation_strength < 0.5  -- Don't prune well-consolidated memories
2467            -- Exclude memories with critical metadata markers
2468            AND NOT (
2469                metadata ? 'critical' OR 
2470                metadata ? 'important' OR 
2471                metadata ? 'permanent' OR
2472                metadata ? 'do_not_prune'
2473            )
2474            ORDER BY 
2475                recall_probability ASC, 
2476                importance_score ASC,
2477                last_accessed_at ASC NULLS FIRST
2478            LIMIT 500  -- Conservative batch size for safety
2479        "#;
2480
2481        let candidates = sqlx::query(query)
2482            .bind(threshold)
2483            .bind(cutoff_date)
2484            .fetch_all(&mut **transaction)
2485            .await
2486            .map_err(|e| MemoryError::DatabaseError {
2487                message: format!("Failed to fetch pruning candidates: {e}"),
2488            })?;
2489
2490        info!(
2491            "Found {} pruning candidates after safety filtering",
2492            candidates.len()
2493        );
2494
2495        let mut pruned_ids = Vec::new();
2496        let mut storage_freed = 0u64;
2497        let mut safety_violations = 0;
2498
2499        for row in candidates {
2500            let memory_id: Uuid = row.get("id");
2501            let content: String = row.get("content");
2502            let importance_score: f64 = row.get("importance_score");
2503            let access_count: i32 = row.get("access_count");
2504            let metadata: serde_json::Value = row.get("metadata");
2505            let tier: MemoryTier = row.get("tier");
2506
2507            // Final safety validation before pruning
2508            if let Err(violation) = self.validate_pruning_safety(
2509                memory_id,
2510                importance_score,
2511                access_count,
2512                &metadata,
2513                tier,
2514            ) {
2515                warn!("Skipping pruning due to safety violation: {}", violation);
2516                safety_violations += 1;
2517                continue;
2518            }
2519
2520            // Double-check memory is still eligible (race condition protection)
2521            let recheck_query = r#"
2522                SELECT status, tier, importance_score 
2523                FROM memories 
2524                WHERE id = $1 
2525                AND status = 'active' 
2526                AND tier IN ('cold', 'frozen')
2527                AND importance_score < 0.3
2528            "#;
2529
2530            let recheck_result = sqlx::query(recheck_query)
2531                .bind(memory_id)
2532                .fetch_optional(&mut **transaction)
2533                .await
2534                .map_err(|e| MemoryError::DatabaseError {
2535                    message: format!("Failed to recheck memory eligibility: {e}"),
2536                })?;
2537
2538            if recheck_result.is_none() {
2539                warn!(
2540                    "Memory {} no longer eligible for pruning, skipping",
2541                    memory_id
2542                );
2543                continue;
2544            }
2545
2546            storage_freed += content.len() as u64;
2547            pruned_ids.push(memory_id);
2548
2549            // Create audit entry for reversibility
2550            sqlx::query(
2551                r#"
2552                INSERT INTO memory_pruning_log (
2553                    memory_id, recall_probability, age_days, tier, 
2554                    importance_score, access_count, content_size_bytes, 
2555                    pruning_reason
2556                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
2557            "#,
2558            )
2559            .bind(memory_id)
2560            .bind(
2561                row.get::<Option<f64>, _>("recall_probability")
2562                    .unwrap_or(0.0),
2563            )
2564            .bind((Utc::now() - cutoff_date).num_days())
2565            .bind(format!("{tier:?}").to_lowercase())
2566            .bind(importance_score)
2567            .bind(access_count)
2568            .bind(content.len() as i32)
2569            .bind(format!(
2570                "Auto-pruning: threshold={threshold}, cutoff={cutoff_date}"
2571            ))
2572            .execute(&mut **transaction)
2573            .await
2574            .map_err(|e| MemoryError::DatabaseError {
2575                message: format!("Failed to create pruning audit entry: {e}"),
2576            })?;
2577
2578            // Mark as deleted rather than hard delete for reversibility
2579            sqlx::query("UPDATE memories SET status = 'deleted', updated_at = NOW() WHERE id = $1")
2580                .bind(memory_id)
2581                .execute(&mut **transaction)
2582                .await
2583                .map_err(|e| MemoryError::DatabaseError {
2584                    message: format!("Failed to mark memory as deleted: {e}"),
2585                })?;
2586        }
2587
2588        if safety_violations > 0 {
2589            warn!(
2590                "Encountered {} safety violations during pruning",
2591                safety_violations
2592            );
2593        }
2594
2595        Ok(PruningResult {
2596            memories_pruned: pruned_ids.len(),
2597            storage_freed,
2598            pruned_memory_ids: pruned_ids,
2599        })
2600    }
2601
2602    /// Validate that pruning a memory is safe
2603    fn validate_pruning_safety(
2604        &self,
2605        memory_id: Uuid,
2606        importance_score: f64,
2607        access_count: i32,
2608        metadata: &serde_json::Value,
2609        tier: MemoryTier,
2610    ) -> Result<()> {
2611        // Check importance score threshold
2612        if importance_score >= 0.3 {
2613            return Err(MemoryError::SafetyViolation {
2614                message: format!(
2615                    "Memory {memory_id} has high importance score: {importance_score}"
2616                ),
2617            });
2618        }
2619
2620        // Check access count threshold
2621        if access_count >= 10 {
2622            return Err(MemoryError::SafetyViolation {
2623                message: format!("Memory {memory_id} has high access count: {access_count}"),
2624            });
2625        }
2626
2627        // Check tier restrictions
2628        if matches!(tier, MemoryTier::Working | MemoryTier::Warm) {
2629            return Err(MemoryError::SafetyViolation {
2630                message: format!("Memory {memory_id} is in protected tier: {tier:?}"),
2631            });
2632        }
2633
2634        // Check for critical metadata flags
2635        if let serde_json::Value::Object(obj) = metadata {
2636            for key in ["critical", "important", "permanent", "do_not_prune"] {
2637                if obj.contains_key(key) {
2638                    return Err(MemoryError::SafetyViolation {
2639                        message: format!("Memory {memory_id} has critical metadata flag: {key}"),
2640                    });
2641                }
2642            }
2643        }
2644
2645        Ok(())
2646    }
2647}
2648
2649/// Comprehensive metrics for deduplication operations
2650#[derive(Debug, Clone, Default, Serialize, Deserialize)]
2651pub struct DeduplicationMetrics {
2652    pub total_operations: u64,
2653    pub total_memories_processed: usize,
2654    pub total_memories_merged: usize,
2655    pub total_storage_saved: u64,
2656    pub total_execution_time_ms: u64,
2657    pub average_compression_ratio: f32,
2658    pub errors_encountered: u64,
2659    pub last_operation_timestamp: Option<DateTime<Utc>>,
2660}
2661
2662/// Result structures for various operations
2663#[derive(Debug, Clone, Default)]
2664pub struct DeduplicationResult {
2665    pub total_processed: usize,
2666    pub groups_identified: usize,
2667    pub memories_merged: usize,
2668    pub storage_saved_bytes: u64,
2669    pub compression_ratio: f32,
2670    pub execution_time_ms: u64,
2671    pub errors_encountered: u64,
2672}
2673
2674#[derive(Debug, Clone)]
2675pub struct SimilarMemoryGroup {
2676    pub memories: Vec<Memory>,
2677    pub average_similarity: f32,
2678    pub merge_strategy: MergeStrategy,
2679}
2680
2681#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
2682pub enum MergeStrategy {
2683    LosslessPreservation,
2684    MetadataConsolidation,
2685    ContentSummarization,
2686}
2687
2688#[derive(Debug, Clone)]
2689pub struct MergeResult {
2690    pub merged_memory: Memory,
2691    pub storage_saved: u64,
2692    pub compression_ratio: f32,
2693    pub original_count: usize,
2694}
2695
2696#[derive(Debug, Clone)]
2697pub struct GroupMergeResult {
2698    pub merged_memory_id: Uuid,
2699    pub original_memory_ids: Vec<Uuid>,
2700    pub memories_merged: usize,
2701    pub storage_saved: u64,
2702    pub compression_ratio: f32,
2703    pub merge_strategy: MergeStrategy,
2704}
2705
2706#[derive(Debug, Clone)]
2707pub struct CompressionResult {
2708    pub original_size: usize,
2709    pub compressed_size: usize,
2710    pub compression_ratio: f32,
2711    pub is_lossless: bool,
2712    pub compressed_content: String,
2713}
2714
2715#[derive(Debug, Clone)]
2716pub struct AuditEntry {
2717    pub id: Uuid,
2718    pub operation_type: String,
2719    pub created_at: DateTime<Utc>,
2720    pub status: String,
2721}
2722
2723#[derive(Debug, Clone)]
2724pub struct ReversibleOperation {
2725    pub id: Uuid,
2726    pub operation_type: String,
2727    pub operation_data: serde_json::Value,
2728    pub completion_data: serde_json::Value,
2729    pub created_at: DateTime<Utc>,
2730}
2731
2732#[derive(Debug, Clone)]
2733pub struct PruningResult {
2734    pub memories_pruned: usize,
2735    pub storage_freed: u64,
2736    pub pruned_memory_ids: Vec<Uuid>,
2737}
2738
2739#[derive(Debug, Clone)]
2740pub struct HeadroomMaintenanceResult {
2741    pub initial_utilization: f32,
2742    pub final_utilization: f32,
2743    pub memories_processed: usize,
2744    pub memories_merged: usize,
2745    pub memories_pruned: usize,
2746    pub storage_freed: u64,
2747}
2748
2749#[derive(Debug, Clone)]
2750pub struct MemoryStatistics {
2751    pub total_memories: i64,
2752    pub total_content_bytes: i64,
2753    pub total_space_bytes: i64,
2754    pub free_space_bytes: i64,
2755    pub avg_importance: f64,
2756    pub working_count: i64,
2757    pub warm_count: i64,
2758    pub cold_count: i64,
2759    pub frozen_count: i64,
2760}
2761
2762#[derive(Debug, Clone)]
2763pub struct ReversalResult {
2764    pub operation_id: Uuid,
2765    pub operation_type: String,
2766    pub memories_restored: usize,
2767    pub success: bool,
2768    pub message: String,
2769}
2770
2771/// Performance monitoring for deduplication operations
2772#[derive(Debug)]
2773struct PerformanceMetrics {
2774    operation_id: String,
2775    start_time: Instant,
2776    phase_timings: HashMap<String, std::time::Duration>,
2777    memory_count: usize,
2778    target_time_seconds: u64,
2779}
2780
2781impl PerformanceMetrics {
2782    fn record_phase(&mut self, phase: &str, duration: std::time::Duration) {
2783        self.phase_timings.insert(phase.to_string(), duration);
2784
2785        // Log slow phases
2786        if duration.as_millis() > 1000 {
2787            warn!(
2788                "Slow phase '{}' in operation {}: {}ms",
2789                phase,
2790                self.operation_id,
2791                duration.as_millis()
2792            );
2793        }
2794    }
2795
2796    fn format_phase_summary(&self) -> String {
2797        let mut phases: Vec<_> = self.phase_timings.iter().collect();
2798        phases.sort_by_key(|(_, duration)| *duration);
2799
2800        phases
2801            .iter()
2802            .map(|(name, duration)| format!("{}:{}ms", name, duration.as_millis()))
2803            .collect::<Vec<_>>()
2804            .join(", ")
2805    }
2806
2807    fn get_performance_violations(&self) -> Vec<String> {
2808        let mut violations = Vec::new();
2809
2810        let total_duration = self.start_time.elapsed();
2811        if total_duration.as_secs() > self.target_time_seconds {
2812            violations.push(format!(
2813                "Total time {}s exceeds target {}s",
2814                total_duration.as_secs(),
2815                self.target_time_seconds
2816            ));
2817        }
2818
2819        if self.memory_count >= 10_000 && total_duration.as_secs() > 30 {
2820            violations.push(format!(
2821                "Large batch ({} memories) took {}s, exceeds 30s target",
2822                self.memory_count,
2823                total_duration.as_secs()
2824            ));
2825        }
2826
2827        // Check individual phase performance
2828        for (phase, duration) in &self.phase_timings {
2829            if duration.as_millis() > 5000 {
2830                violations.push(format!(
2831                    "Phase '{}' took {}ms, may need optimization",
2832                    phase,
2833                    duration.as_millis()
2834                ));
2835            }
2836        }
2837
2838        violations
2839    }
2840}