agentic_memory/engine/maintenance.rs
1//! Memory consolidation — query 15.
2//!
3//! Provides deduplication, orphan pruning, contradiction linking,
4//! episode compression, and inference promotion operations on a
5//! [`MemoryGraph`]. This is the only query type that mutates the
6//! graph.
7
8use std::collections::HashSet;
9use std::path::PathBuf;
10
11use crate::graph::MemoryGraph;
12use crate::index::cosine_similarity;
13use crate::types::{AmemResult, Edge, EdgeType, EventType};
14
15use super::tokenizer::Tokenizer;
16
17// ---------------------------------------------------------------------------
18// Public types
19// ---------------------------------------------------------------------------
20
21/// A single consolidation operation to run.
22pub enum ConsolidationOp {
23 /// Merge near-duplicate Fact nodes.
24 /// `threshold` is the minimum cosine similarity to consider two facts
25 /// duplicates (typically 0.90 -- 0.98).
26 DeduplicateFacts { threshold: f32 },
27
28 /// Report orphaned nodes that could be pruned.
29 /// `max_decay` is the ceiling on `decay_score` for a node to be
30 /// considered orphaned (e.g. 0.1).
31 ///
32 /// **V1: dry-run only** -- the consolidation method will never remove
33 /// nodes, only report them.
34 PruneOrphans { max_decay: f32 },
35
36 /// Discover contradictory pairs and link them with `Contradicts` edges.
37 /// `threshold` is the minimum cosine similarity between two Fact/Inference
38 /// nodes for them to be candidates (the method additionally checks for
39 /// negation words).
40 LinkContradictions { threshold: f32 },
41
42 /// Report groups of Episode nodes that could be compressed.
43 /// `group_size` is the minimum number of contiguous episodes to consider
44 /// compressible.
45 ///
46 /// **V1: dry-run only** -- the consolidation method will never compress
47 /// episodes, only report them.
48 CompressEpisodes { group_size: u32 },
49
50 /// Promote well-established Inference nodes to Fact.
51 /// Requires `access_count >= min_access` **and** `confidence >=
52 /// min_confidence`.
53 PromoteInferences {
54 min_access: u32,
55 min_confidence: f32,
56 },
57}
58
59/// Parameters for a consolidation run.
60pub struct ConsolidationParams {
61 /// If set, only consider nodes whose `session_id` falls in
62 /// `[start, end]` (inclusive).
63 pub session_range: Option<(u32, u32)>,
64
65 /// The operations to execute, in order.
66 pub operations: Vec<ConsolidationOp>,
67
68 /// When `true`, no mutations are applied -- the report describes what
69 /// *would* happen.
70 pub dry_run: bool,
71
72 /// Optional path for the caller to store a pre-consolidation backup.
73 /// The consolidation method itself does **not** write files; it simply
74 /// copies this value into the report for the caller to act on.
75 pub backup_path: Option<PathBuf>,
76}
77
78/// A single action taken (or proposed) during consolidation.
79pub struct ConsolidationAction {
80 /// Human-readable operation name (e.g. "deduplicate_facts").
81 pub operation: String,
82 /// Human-readable description of the action.
83 pub description: String,
84 /// Node IDs affected by this action.
85 pub affected_nodes: Vec<u64>,
86}
87
88/// Summary report returned after consolidation.
89pub struct ConsolidationReport {
90 /// Detailed list of every action taken (or proposed).
91 pub actions: Vec<ConsolidationAction>,
92 /// Number of duplicate pairs resolved.
93 pub deduplicated: usize,
94 /// Number of orphaned nodes reported (never actually removed in V1).
95 pub pruned: usize,
96 /// Number of new `Contradicts` edges added (or proposed).
97 pub contradictions_linked: usize,
98 /// Number of episode groups reported (never actually compressed in V1).
99 pub episodes_compressed: usize,
100 /// Number of Inference nodes promoted to Fact.
101 pub inferences_promoted: usize,
102 /// Echoed back from [`ConsolidationParams::backup_path`].
103 pub backup_path: Option<PathBuf>,
104}
105
106// ---------------------------------------------------------------------------
107// Negation words used by the contradiction detector.
108// ---------------------------------------------------------------------------
109
110const NEGATION_WORDS: &[&str] = &[
111 "not",
112 "never",
113 "no",
114 "neither",
115 "nor",
116 "cannot",
117 "can't",
118 "won't",
119 "doesn't",
120 "don't",
121 "didn't",
122 "isn't",
123 "aren't",
124 "wasn't",
125 "weren't",
126 "shouldn't",
127 "wouldn't",
128 "couldn't",
129 "hardly",
130 "barely",
131 "false",
132 "incorrect",
133 "wrong",
134 "untrue",
135 "impossible",
136 "deny",
137 "denied",
138 "disagree",
139 "unlike",
140 "opposite",
141];
142
143// ---------------------------------------------------------------------------
144// Implementation on QueryEngine
145// ---------------------------------------------------------------------------
146
147impl super::query::QueryEngine {
148 /// Run a set of consolidation operations against `graph`.
149 ///
150 /// If `params.dry_run` is `true`, the graph is not mutated; the returned
151 /// report describes what *would* happen.
152 ///
153 /// When `dry_run` is `false`:
154 /// * `DeduplicateFacts` adds `Supersedes` edges from the surviving node to
155 /// each duplicate.
156 /// * `LinkContradictions` adds `Contradicts` edges.
157 /// * `PromoteInferences` changes `event_type` from `Inference` to `Fact`.
158 /// * `PruneOrphans` and `CompressEpisodes` are always dry-run-only in V1.
159 pub fn consolidate(
160 &self,
161 graph: &mut MemoryGraph,
162 params: ConsolidationParams,
163 ) -> AmemResult<ConsolidationReport> {
164 let mut report = ConsolidationReport {
165 actions: Vec::new(),
166 deduplicated: 0,
167 pruned: 0,
168 contradictions_linked: 0,
169 episodes_compressed: 0,
170 inferences_promoted: 0,
171 backup_path: params.backup_path.clone(),
172 };
173
174 // Pre-compute the set of in-scope node IDs when a session range is
175 // specified.
176 let session_filter: Option<(u32, u32)> = params.session_range;
177
178 for op in ¶ms.operations {
179 match op {
180 ConsolidationOp::DeduplicateFacts { threshold } => {
181 self.op_deduplicate_facts(
182 graph,
183 *threshold,
184 session_filter,
185 params.dry_run,
186 &mut report,
187 );
188 }
189 ConsolidationOp::PruneOrphans { max_decay } => {
190 // Always dry-run in V1.
191 self.op_prune_orphans(graph, *max_decay, session_filter, &mut report);
192 }
193 ConsolidationOp::LinkContradictions { threshold } => {
194 self.op_link_contradictions(
195 graph,
196 *threshold,
197 session_filter,
198 params.dry_run,
199 &mut report,
200 );
201 }
202 ConsolidationOp::CompressEpisodes { group_size } => {
203 // Always dry-run in V1.
204 self.op_compress_episodes(graph, *group_size, session_filter, &mut report);
205 }
206 ConsolidationOp::PromoteInferences {
207 min_access,
208 min_confidence,
209 } => {
210 self.op_promote_inferences(
211 graph,
212 *min_access,
213 *min_confidence,
214 session_filter,
215 params.dry_run,
216 &mut report,
217 );
218 }
219 }
220 }
221
222 Ok(report)
223 }
224
225 // -----------------------------------------------------------------------
226 // DeduplicateFacts
227 // -----------------------------------------------------------------------
228
229 fn op_deduplicate_facts(
230 &self,
231 graph: &mut MemoryGraph,
232 threshold: f32,
233 session_filter: Option<(u32, u32)>,
234 dry_run: bool,
235 report: &mut ConsolidationReport,
236 ) {
237 let tokenizer = Tokenizer::new();
238
239 // Collect Fact node IDs, respecting the session filter.
240 let fact_ids: Vec<u64> = graph
241 .nodes()
242 .iter()
243 .filter(|n| {
244 n.event_type == EventType::Fact && in_session_range(n.session_id, session_filter)
245 })
246 .map(|n| n.id)
247 .collect();
248
249 // Group facts by cluster so we only compare within-cluster pairs.
250 let cluster_count = graph.cluster_map().cluster_count();
251 let fact_set: HashSet<u64> = fact_ids.iter().copied().collect();
252
253 // Build cluster -> [fact ids in that cluster].
254 let mut cluster_groups: Vec<Vec<u64>> = Vec::new();
255 if cluster_count > 0 {
256 for ci in 0..cluster_count {
257 let members: Vec<u64> = graph
258 .cluster_map()
259 .get_cluster(ci)
260 .iter()
261 .copied()
262 .filter(|id| fact_set.contains(id))
263 .collect();
264 if members.len() >= 2 {
265 cluster_groups.push(members);
266 }
267 }
268 }
269
270 // Fallback: if no clusters, treat all facts as one group.
271 if cluster_groups.is_empty() && fact_ids.len() >= 2 {
272 cluster_groups.push(fact_ids.clone());
273 }
274
275 // Track which nodes have already been marked as duplicates so we
276 // don't supersede the same node twice.
277 let mut superseded: HashSet<u64> = HashSet::new();
278
279 for group in &cluster_groups {
280 for i in 0..group.len() {
281 if superseded.contains(&group[i]) {
282 continue;
283 }
284 for j in (i + 1)..group.len() {
285 if superseded.contains(&group[j]) {
286 continue;
287 }
288
289 // Borrow two separate snapshots so we don't alias &graph.
290 let (vec_a, conf_a, content_a) = match graph.get_node(group[i]) {
291 Some(n) => (n.feature_vec.clone(), n.confidence, n.content.clone()),
292 None => continue,
293 };
294 let (vec_b, conf_b, content_b) = match graph.get_node(group[j]) {
295 Some(n) => (n.feature_vec.clone(), n.confidence, n.content.clone()),
296 None => continue,
297 };
298
299 let sim = cosine_similarity(&vec_a, &vec_b);
300 if sim < threshold {
301 continue;
302 }
303
304 // Also require high token-level overlap.
305 let tokens_a: HashSet<String> =
306 tokenizer.tokenize(&content_a).into_iter().collect();
307 let tokens_b: HashSet<String> =
308 tokenizer.tokenize(&content_b).into_iter().collect();
309
310 if tokens_a.is_empty() && tokens_b.is_empty() {
311 continue;
312 }
313
314 let intersection = tokens_a.intersection(&tokens_b).count();
315 let union = tokens_a.union(&tokens_b).count();
316 let jaccard = if union > 0 {
317 intersection as f32 / union as f32
318 } else {
319 0.0
320 };
321
322 if jaccard < 0.5 {
323 continue;
324 }
325
326 // Determine winner (higher confidence survives).
327 let (winner, loser) = if conf_a >= conf_b {
328 (group[i], group[j])
329 } else {
330 (group[j], group[i])
331 };
332
333 superseded.insert(loser);
334
335 report.actions.push(ConsolidationAction {
336 operation: "deduplicate_facts".to_string(),
337 description: format!(
338 "Node {} supersedes duplicate node {} (cosine={:.3}, jaccard={:.3})",
339 winner, loser, sim, jaccard,
340 ),
341 affected_nodes: vec![winner, loser],
342 });
343 report.deduplicated += 1;
344
345 if !dry_run {
346 let edge = Edge {
347 source_id: winner,
348 target_id: loser,
349 edge_type: EdgeType::Supersedes,
350 weight: sim,
351 created_at: crate::types::now_micros(),
352 };
353 // Ignore error if the edge cannot be added (e.g.
354 // duplicate or limit reached).
355 let _ = graph.add_edge(edge);
356 }
357 }
358 }
359 }
360 }
361
362 // -----------------------------------------------------------------------
363 // PruneOrphans (dry-run only in V1)
364 // -----------------------------------------------------------------------
365
366 fn op_prune_orphans(
367 &self,
368 graph: &MemoryGraph,
369 max_decay: f32,
370 session_filter: Option<(u32, u32)>,
371 report: &mut ConsolidationReport,
372 ) {
373 let orphan_ids: Vec<u64> = graph
374 .nodes()
375 .iter()
376 .filter(|n| {
377 n.access_count == 0
378 && n.decay_score < max_decay
379 && in_session_range(n.session_id, session_filter)
380 && graph.edges_to(n.id).is_empty()
381 })
382 .map(|n| n.id)
383 .collect();
384
385 if !orphan_ids.is_empty() {
386 report.actions.push(ConsolidationAction {
387 operation: "prune_orphans".to_string(),
388 description: format!(
389 "Would prune {} orphaned node(s) with decay_score < {:.2} and no incoming edges",
390 orphan_ids.len(),
391 max_decay,
392 ),
393 affected_nodes: orphan_ids.clone(),
394 });
395 report.pruned += orphan_ids.len();
396 }
397 }
398
399 // -----------------------------------------------------------------------
400 // LinkContradictions
401 // -----------------------------------------------------------------------
402
403 fn op_link_contradictions(
404 &self,
405 graph: &mut MemoryGraph,
406 threshold: f32,
407 session_filter: Option<(u32, u32)>,
408 dry_run: bool,
409 report: &mut ConsolidationReport,
410 ) {
411 let tokenizer = Tokenizer::new();
412
413 // Collect candidate nodes: Facts and Inferences.
414 let candidates: Vec<u64> = graph
415 .nodes()
416 .iter()
417 .filter(|n| {
418 (n.event_type == EventType::Fact || n.event_type == EventType::Inference)
419 && in_session_range(n.session_id, session_filter)
420 })
421 .map(|n| n.id)
422 .collect();
423
424 // Build a set of existing Contradicts pairs for dedup.
425 let mut existing_contradictions: HashSet<(u64, u64)> = HashSet::new();
426 for edge in graph.edges() {
427 if edge.edge_type == EdgeType::Contradicts {
428 let pair = ordered_pair(edge.source_id, edge.target_id);
429 existing_contradictions.insert(pair);
430 }
431 }
432
433 for i in 0..candidates.len() {
434 for j in (i + 1)..candidates.len() {
435 let id_a = candidates[i];
436 let id_b = candidates[j];
437
438 // Skip if already linked.
439 if existing_contradictions.contains(&ordered_pair(id_a, id_b)) {
440 continue;
441 }
442
443 let (vec_a, content_a) = match graph.get_node(id_a) {
444 Some(n) => (n.feature_vec.clone(), n.content.clone()),
445 None => continue,
446 };
447 let (vec_b, content_b) = match graph.get_node(id_b) {
448 Some(n) => (n.feature_vec.clone(), n.content.clone()),
449 None => continue,
450 };
451
452 let sim = cosine_similarity(&vec_a, &vec_b);
453 if sim < threshold {
454 continue;
455 }
456
457 // Check for negation: at least one of the two contents must
458 // contain a negation word that does NOT appear in the other.
459 let tokens_a: HashSet<String> =
460 tokenizer.tokenize(&content_a).into_iter().collect();
461 let tokens_b: HashSet<String> =
462 tokenizer.tokenize(&content_b).into_iter().collect();
463
464 let neg_set: HashSet<&str> = NEGATION_WORDS.iter().copied().collect();
465
466 let neg_in_a = tokens_a.iter().any(|t| neg_set.contains(t.as_str()));
467 let neg_in_b = tokens_b.iter().any(|t| neg_set.contains(t.as_str()));
468
469 // Contradiction signal: high similarity but exactly one side
470 // uses negation, OR both use negation words that differ.
471 if !(neg_in_a ^ neg_in_b) {
472 continue;
473 }
474
475 existing_contradictions.insert(ordered_pair(id_a, id_b));
476
477 report.actions.push(ConsolidationAction {
478 operation: "link_contradictions".to_string(),
479 description: format!(
480 "Nodes {} and {} appear contradictory (cosine={:.3})",
481 id_a, id_b, sim,
482 ),
483 affected_nodes: vec![id_a, id_b],
484 });
485 report.contradictions_linked += 1;
486
487 if !dry_run {
488 let edge = Edge {
489 source_id: id_a,
490 target_id: id_b,
491 edge_type: EdgeType::Contradicts,
492 weight: sim,
493 created_at: crate::types::now_micros(),
494 };
495 let _ = graph.add_edge(edge);
496 }
497 }
498 }
499 }
500
501 // -----------------------------------------------------------------------
502 // CompressEpisodes (dry-run only in V1)
503 // -----------------------------------------------------------------------
504
505 fn op_compress_episodes(
506 &self,
507 graph: &MemoryGraph,
508 group_size: u32,
509 session_filter: Option<(u32, u32)>,
510 report: &mut ConsolidationReport,
511 ) {
512 // Collect Episode nodes sorted by creation time.
513 let mut episodes: Vec<(u64, u64, u32)> = graph
514 .nodes()
515 .iter()
516 .filter(|n| {
517 n.event_type == EventType::Episode && in_session_range(n.session_id, session_filter)
518 })
519 .map(|n| (n.id, n.created_at, n.session_id))
520 .collect();
521
522 episodes.sort_by_key(|&(_, ts, _)| ts);
523
524 if episodes.len() < group_size as usize {
525 return;
526 }
527
528 // Group contiguous episodes from the same session.
529 let mut groups: Vec<Vec<u64>> = Vec::new();
530 let mut current_group: Vec<u64> = vec![episodes[0].0];
531 let mut current_session = episodes[0].2;
532
533 for &(id, _, session) in &episodes[1..] {
534 if session == current_session {
535 current_group.push(id);
536 } else {
537 if current_group.len() >= group_size as usize {
538 groups.push(std::mem::take(&mut current_group));
539 } else {
540 current_group.clear();
541 }
542 current_group.push(id);
543 current_session = session;
544 }
545 }
546 if current_group.len() >= group_size as usize {
547 groups.push(current_group);
548 }
549
550 for group in &groups {
551 report.actions.push(ConsolidationAction {
552 operation: "compress_episodes".to_string(),
553 description: format!(
554 "Would compress {} contiguous episode(s) into a summary",
555 group.len(),
556 ),
557 affected_nodes: group.clone(),
558 });
559 report.episodes_compressed += group.len();
560 }
561 }
562
563 // -----------------------------------------------------------------------
564 // PromoteInferences
565 // -----------------------------------------------------------------------
566
567 fn op_promote_inferences(
568 &self,
569 graph: &mut MemoryGraph,
570 min_access: u32,
571 min_confidence: f32,
572 session_filter: Option<(u32, u32)>,
573 dry_run: bool,
574 report: &mut ConsolidationReport,
575 ) {
576 // First pass: collect IDs of eligible Inference nodes.
577 let eligible: Vec<u64> = graph
578 .nodes()
579 .iter()
580 .filter(|n| {
581 n.event_type == EventType::Inference
582 && n.access_count >= min_access
583 && n.confidence >= min_confidence
584 && in_session_range(n.session_id, session_filter)
585 })
586 .map(|n| n.id)
587 .collect();
588
589 for &id in &eligible {
590 report.actions.push(ConsolidationAction {
591 operation: "promote_inferences".to_string(),
592 description: format!("Promote inference node {} to fact", id),
593 affected_nodes: vec![id],
594 });
595 report.inferences_promoted += 1;
596
597 if !dry_run {
598 if let Some(node) = graph.get_node_mut(id) {
599 node.event_type = EventType::Fact;
600 }
601 }
602 }
603 }
604}
605
606// ---------------------------------------------------------------------------
607// Helpers
608// ---------------------------------------------------------------------------
609
610/// Check whether `session_id` falls within an optional inclusive range.
611fn in_session_range(session_id: u32, range: Option<(u32, u32)>) -> bool {
612 match range {
613 Some((lo, hi)) => session_id >= lo && session_id <= hi,
614 None => true,
615 }
616}
617
618/// Return the pair `(min, max)` so we can use it as a canonical key.
619fn ordered_pair(a: u64, b: u64) -> (u64, u64) {
620 if a <= b {
621 (a, b)
622 } else {
623 (b, a)
624 }
625}