codemem_engine/consolidation/cluster.rs
1use super::union_find::UnionFind;
2use super::ConsolidationResult;
3use crate::CodememEngine;
4use codemem_core::CodememError;
5use codemem_storage::vector::cosine_similarity;
6use serde_json::json;
7use std::collections::{HashMap, HashSet};
8
9impl CodememEngine {
10 /// Consolidate cluster: semantic deduplication using vector KNN + cosine similarity.
11 ///
12 /// Groups memories by namespace and memory_type for candidate grouping. Within each
13 /// group, uses vector KNN search to find candidate duplicates (avoiding O(n^2) pairwise
14 /// comparison), then verifies with cosine similarity + union-find to cluster
15 /// transitively-similar memories. Keeps the highest-importance memory per cluster.
16 ///
17 /// For small groups (<=50 members), falls back to pairwise comparison since the
18 /// overhead of KNN setup is not worth it.
19 pub fn consolidate_cluster(
20 &self,
21 similarity_threshold: Option<f64>,
22 ) -> Result<ConsolidationResult, CodememError> {
23 let similarity_threshold = similarity_threshold.unwrap_or(0.92);
24
25 let ids = self.storage.list_memory_ids()?;
26 let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
27 let memories = self.storage.get_memories_batch(&id_refs)?;
28
29 // M11: Group by namespace+memory_type instead of hash prefix (SHA-256 prefix is
30 // uniformly distributed, making it a no-op as a pre-filter). Grouping by semantic
31 // attributes ensures we only compare memories that could plausibly be duplicates.
32 let mut groups: HashMap<String, Vec<usize>> = HashMap::new();
33 for (idx, m) in memories.iter().enumerate() {
34 let key = format!(
35 "{}:{}",
36 m.namespace.as_deref().unwrap_or("default"),
37 m.memory_type
38 );
39 groups.entry(key).or_default().push(idx);
40 }
41
42 // Union-find for transitive clustering
43 let n = memories.len();
44 let mut uf = UnionFind::new(n);
45
46 // X3: For large groups, use vector KNN to find candidate duplicates
47 // instead of O(n^2) pairwise comparison. For small groups (<=50),
48 // pairwise is fine and avoids KNN overhead.
49 let vector = self.lock_vector()?;
50
51 // Build index from memory idx to id for quick lookup
52 let id_to_idx: HashMap<&str, usize> = memories
53 .iter()
54 .enumerate()
55 .map(|(i, m)| (m.id.as_str(), i))
56 .collect();
57
58 for member_indices in groups.values() {
59 if member_indices.len() <= 1 {
60 continue;
61 }
62
63 if member_indices.len() <= 50 {
64 // O(n^2) pairwise comparison is acceptable here — groups are capped at <=50 members,
65 // so worst case is ~1250 comparisons which completes in microseconds.
66 for i in 0..member_indices.len() {
67 for j in (i + 1)..member_indices.len() {
68 let idx_a = member_indices[i];
69 let idx_b = member_indices[j];
70
71 let id_a = &memories[idx_a].id;
72 let id_b = &memories[idx_b].id;
73
74 let sim = match (
75 self.storage.get_embedding(id_a).ok().flatten(),
76 self.storage.get_embedding(id_b).ok().flatten(),
77 ) {
78 (Some(emb_a), Some(emb_b)) => cosine_similarity(&emb_a, &emb_b),
79 _ => {
80 if memories[idx_a].content_hash == memories[idx_b].content_hash {
81 1.0
82 } else {
83 0.0
84 }
85 }
86 };
87
88 if sim >= similarity_threshold {
89 uf.union(idx_a, idx_b);
90 }
91 }
92 }
93 } else {
94 // Large group: use vector KNN per member to find candidates
95 // Search for K nearest neighbors where K is small (e.g. 10)
96 let k_neighbors = 10.min(member_indices.len());
97
98 // Build a set of IDs in this group for filtering
99 let group_ids: HashSet<&str> = member_indices
100 .iter()
101 .map(|&idx| memories[idx].id.as_str())
102 .collect();
103
104 for &idx_a in member_indices {
105 let id_a = &memories[idx_a].id;
106 let embedding = match self.storage.get_embedding(id_a).ok().flatten() {
107 Some(e) => e,
108 None => continue,
109 };
110
111 // Use vector KNN to find nearest neighbors
112 let neighbors = vector
113 .search(&embedding, k_neighbors + 1)
114 .unwrap_or_default();
115
116 for (neighbor_id, _) in &neighbors {
117 if neighbor_id == id_a {
118 continue;
119 }
120 // Only consider neighbors within the same group
121 if !group_ids.contains(neighbor_id.as_str()) {
122 continue;
123 }
124
125 let idx_b = match id_to_idx.get(neighbor_id.as_str()) {
126 Some(&idx) => idx,
127 None => continue,
128 };
129
130 // Verify with cosine similarity
131 let sim = match self.storage.get_embedding(neighbor_id).ok().flatten() {
132 Some(emb_b) => cosine_similarity(&embedding, &emb_b),
133 None => {
134 if memories[idx_a].content_hash == memories[idx_b].content_hash {
135 1.0
136 } else {
137 0.0
138 }
139 }
140 };
141
142 if sim >= similarity_threshold {
143 uf.union(idx_a, idx_b);
144 }
145 }
146 }
147 }
148 }
149 drop(vector);
150
151 let clusters = uf.groups(n);
152
153 let mut merged_count = 0usize;
154 let mut kept_count = 0usize;
155 let mut ids_to_delete: Vec<String> = Vec::new();
156
157 for cluster in &clusters {
158 if cluster.len() <= 1 {
159 kept_count += 1;
160 continue;
161 }
162
163 // Tiered winner selection: agent-curated/verified memories win over
164 // raw static-analysis, which wins over archived. Within each tier,
165 // highest importance wins. This prevents enrichment outputs from
166 // displacing agent-refined analysis during dedup.
167 let mut members: Vec<(usize, u8, f64)> = cluster
168 .iter()
169 .map(|&idx| {
170 let tags = &memories[idx].tags;
171 let tier = if tags.contains(&"agent-curated".to_string())
172 || tags.contains(&"agent-verified".to_string())
173 || tags.contains(&"human-verified".to_string())
174 {
175 0 // highest priority: agent/human reviewed
176 } else if tags.contains(&"archived".to_string()) {
177 2 // lowest: archived noise
178 } else {
179 1 // middle: unreviewed (including raw static-analysis)
180 };
181 (idx, tier, memories[idx].importance)
182 })
183 .collect();
184 members.sort_by(|a, b| {
185 a.1.cmp(&b.1)
186 .then(b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal))
187 });
188 kept_count += 1;
189
190 for &(idx, _, _) in members.iter().skip(1) {
191 ids_to_delete.push(memories[idx].id.clone());
192 merged_count += 1;
193 }
194 }
195
196 // H2: Batch deletes in groups of 100, releasing all locks between batches.
197 // SQLite cascade (memory + graph nodes/edges + embeddings) is batched into
198 // a single transaction per chunk; in-memory indices are updated afterwards.
199 for batch in ids_to_delete.chunks(100) {
200 let batch_refs: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
201 if let Err(e) = self.storage.delete_memories_batch_cascade(&batch_refs) {
202 tracing::warn!(
203 "Failed to batch-delete {} memories during cluster consolidation: {e}",
204 batch.len()
205 );
206 }
207
208 // C1: Lock ordering: graph first, then vector, then bm25
209 let mut graph = self.lock_graph()?;
210 let mut vector = self.lock_vector()?;
211 let mut bm25 = self.lock_bm25()?;
212 for id in batch {
213 if let Err(e) = vector.remove(id) {
214 tracing::warn!(
215 "Failed to remove {id} from vector index during cluster consolidation: {e}"
216 );
217 }
218 if let Err(e) = graph.remove_node(id) {
219 tracing::warn!(
220 "Failed to remove {id} from graph during cluster consolidation: {e}"
221 );
222 }
223 bm25.remove_document(id);
224 }
225 drop(bm25);
226 drop(vector);
227 drop(graph);
228 }
229
230 // Rebuild vector index if we deleted anything
231 if merged_count > 0 {
232 let mut vector = self.lock_vector()?;
233 self.rebuild_vector_index_internal(&mut **vector);
234 drop(vector);
235 }
236
237 self.save_index();
238
239 if let Err(e) = self
240 .storage
241 .insert_consolidation_log("cluster", merged_count)
242 {
243 tracing::warn!("Failed to log cluster consolidation: {e}");
244 }
245
246 Ok(ConsolidationResult {
247 cycle: "cluster".to_string(),
248 affected: merged_count,
249 details: json!({
250 "merged": merged_count,
251 "kept": kept_count,
252 "similarity_threshold": similarity_threshold,
253 "algorithm": "semantic_cosine",
254 }),
255 })
256 }
257}