Skip to main content

agentic_memory/engine/
maintenance.rs

1//! Memory consolidation — query 15.
2//!
3//! Provides deduplication, orphan pruning, contradiction linking,
4//! episode compression, and inference promotion operations on a
5//! [`MemoryGraph`].  This is the only query type that mutates the
6//! graph.
7
8use std::collections::HashSet;
9use std::path::PathBuf;
10
11use crate::graph::MemoryGraph;
12use crate::index::cosine_similarity;
13use crate::types::{AmemResult, Edge, EdgeType, EventType};
14
15use super::tokenizer::Tokenizer;
16
17// ---------------------------------------------------------------------------
18// Public types
19// ---------------------------------------------------------------------------
20
21/// A single consolidation operation to run.
22pub enum ConsolidationOp {
23    /// Merge near-duplicate Fact nodes.
24    /// `threshold` is the minimum cosine similarity to consider two facts
25    /// duplicates (typically 0.90 -- 0.98).
26    DeduplicateFacts { threshold: f32 },
27
28    /// Report orphaned nodes that could be pruned.
29    /// `max_decay` is the ceiling on `decay_score` for a node to be
30    /// considered orphaned (e.g. 0.1).
31    ///
32    /// **V1: dry-run only** -- the consolidation method will never remove
33    /// nodes, only report them.
34    PruneOrphans { max_decay: f32 },
35
36    /// Discover contradictory pairs and link them with `Contradicts` edges.
37    /// `threshold` is the minimum cosine similarity between two Fact/Inference
38    /// nodes for them to be candidates (the method additionally checks for
39    /// negation words).
40    LinkContradictions { threshold: f32 },
41
42    /// Report groups of Episode nodes that could be compressed.
43    /// `group_size` is the minimum number of contiguous episodes to consider
44    /// compressible.
45    ///
46    /// **V1: dry-run only** -- the consolidation method will never compress
47    /// episodes, only report them.
48    CompressEpisodes { group_size: u32 },
49
50    /// Promote well-established Inference nodes to Fact.
51    /// Requires `access_count >= min_access` **and** `confidence >=
52    /// min_confidence`.
53    PromoteInferences {
54        min_access: u32,
55        min_confidence: f32,
56    },
57}
58
59/// Parameters for a consolidation run.
60pub struct ConsolidationParams {
61    /// If set, only consider nodes whose `session_id` falls in
62    /// `[start, end]` (inclusive).
63    pub session_range: Option<(u32, u32)>,
64
65    /// The operations to execute, in order.
66    pub operations: Vec<ConsolidationOp>,
67
68    /// When `true`, no mutations are applied -- the report describes what
69    /// *would* happen.
70    pub dry_run: bool,
71
72    /// Optional path for the caller to store a pre-consolidation backup.
73    /// The consolidation method itself does **not** write files; it simply
74    /// copies this value into the report for the caller to act on.
75    pub backup_path: Option<PathBuf>,
76}
77
78/// A single action taken (or proposed) during consolidation.
79pub struct ConsolidationAction {
80    /// Human-readable operation name (e.g. "deduplicate_facts").
81    pub operation: String,
82    /// Human-readable description of the action.
83    pub description: String,
84    /// Node IDs affected by this action.
85    pub affected_nodes: Vec<u64>,
86}
87
88/// Summary report returned after consolidation.
89pub struct ConsolidationReport {
90    /// Detailed list of every action taken (or proposed).
91    pub actions: Vec<ConsolidationAction>,
92    /// Number of duplicate pairs resolved.
93    pub deduplicated: usize,
94    /// Number of orphaned nodes reported (never actually removed in V1).
95    pub pruned: usize,
96    /// Number of new `Contradicts` edges added (or proposed).
97    pub contradictions_linked: usize,
98    /// Number of episode groups reported (never actually compressed in V1).
99    pub episodes_compressed: usize,
100    /// Number of Inference nodes promoted to Fact.
101    pub inferences_promoted: usize,
102    /// Echoed back from [`ConsolidationParams::backup_path`].
103    pub backup_path: Option<PathBuf>,
104}
105
106// ---------------------------------------------------------------------------
107// Negation words used by the contradiction detector.
108// ---------------------------------------------------------------------------
109
110const NEGATION_WORDS: &[&str] = &[
111    "not",
112    "never",
113    "no",
114    "neither",
115    "nor",
116    "cannot",
117    "can't",
118    "won't",
119    "doesn't",
120    "don't",
121    "didn't",
122    "isn't",
123    "aren't",
124    "wasn't",
125    "weren't",
126    "shouldn't",
127    "wouldn't",
128    "couldn't",
129    "hardly",
130    "barely",
131    "false",
132    "incorrect",
133    "wrong",
134    "untrue",
135    "impossible",
136    "deny",
137    "denied",
138    "disagree",
139    "unlike",
140    "opposite",
141];
142
143// ---------------------------------------------------------------------------
144// Implementation on QueryEngine
145// ---------------------------------------------------------------------------
146
147impl super::query::QueryEngine {
148    /// Run a set of consolidation operations against `graph`.
149    ///
150    /// If `params.dry_run` is `true`, the graph is not mutated; the returned
151    /// report describes what *would* happen.
152    ///
153    /// When `dry_run` is `false`:
154    /// * `DeduplicateFacts` adds `Supersedes` edges from the surviving node to
155    ///   each duplicate.
156    /// * `LinkContradictions` adds `Contradicts` edges.
157    /// * `PromoteInferences` changes `event_type` from `Inference` to `Fact`.
158    /// * `PruneOrphans` and `CompressEpisodes` are always dry-run-only in V1.
159    pub fn consolidate(
160        &self,
161        graph: &mut MemoryGraph,
162        params: ConsolidationParams,
163    ) -> AmemResult<ConsolidationReport> {
164        let mut report = ConsolidationReport {
165            actions: Vec::new(),
166            deduplicated: 0,
167            pruned: 0,
168            contradictions_linked: 0,
169            episodes_compressed: 0,
170            inferences_promoted: 0,
171            backup_path: params.backup_path.clone(),
172        };
173
174        // Pre-compute the set of in-scope node IDs when a session range is
175        // specified.
176        let session_filter: Option<(u32, u32)> = params.session_range;
177
178        for op in &params.operations {
179            match op {
180                ConsolidationOp::DeduplicateFacts { threshold } => {
181                    self.op_deduplicate_facts(
182                        graph,
183                        *threshold,
184                        session_filter,
185                        params.dry_run,
186                        &mut report,
187                    );
188                }
189                ConsolidationOp::PruneOrphans { max_decay } => {
190                    // Always dry-run in V1.
191                    self.op_prune_orphans(graph, *max_decay, session_filter, &mut report);
192                }
193                ConsolidationOp::LinkContradictions { threshold } => {
194                    self.op_link_contradictions(
195                        graph,
196                        *threshold,
197                        session_filter,
198                        params.dry_run,
199                        &mut report,
200                    );
201                }
202                ConsolidationOp::CompressEpisodes { group_size } => {
203                    // Always dry-run in V1.
204                    self.op_compress_episodes(graph, *group_size, session_filter, &mut report);
205                }
206                ConsolidationOp::PromoteInferences {
207                    min_access,
208                    min_confidence,
209                } => {
210                    self.op_promote_inferences(
211                        graph,
212                        *min_access,
213                        *min_confidence,
214                        session_filter,
215                        params.dry_run,
216                        &mut report,
217                    );
218                }
219            }
220        }
221
222        Ok(report)
223    }
224
225    // -----------------------------------------------------------------------
226    // DeduplicateFacts
227    // -----------------------------------------------------------------------
228
229    fn op_deduplicate_facts(
230        &self,
231        graph: &mut MemoryGraph,
232        threshold: f32,
233        session_filter: Option<(u32, u32)>,
234        dry_run: bool,
235        report: &mut ConsolidationReport,
236    ) {
237        let tokenizer = Tokenizer::new();
238
239        // Collect Fact node IDs, respecting the session filter.
240        let fact_ids: Vec<u64> = graph
241            .nodes()
242            .iter()
243            .filter(|n| {
244                n.event_type == EventType::Fact && in_session_range(n.session_id, session_filter)
245            })
246            .map(|n| n.id)
247            .collect();
248
249        // Group facts by cluster so we only compare within-cluster pairs.
250        let cluster_count = graph.cluster_map().cluster_count();
251        let fact_set: HashSet<u64> = fact_ids.iter().copied().collect();
252
253        // Build cluster -> [fact ids in that cluster].
254        let mut cluster_groups: Vec<Vec<u64>> = Vec::new();
255        if cluster_count > 0 {
256            for ci in 0..cluster_count {
257                let members: Vec<u64> = graph
258                    .cluster_map()
259                    .get_cluster(ci)
260                    .iter()
261                    .copied()
262                    .filter(|id| fact_set.contains(id))
263                    .collect();
264                if members.len() >= 2 {
265                    cluster_groups.push(members);
266                }
267            }
268        }
269
270        // Fallback: if no clusters, treat all facts as one group.
271        if cluster_groups.is_empty() && fact_ids.len() >= 2 {
272            cluster_groups.push(fact_ids.clone());
273        }
274
275        // Track which nodes have already been marked as duplicates so we
276        // don't supersede the same node twice.
277        let mut superseded: HashSet<u64> = HashSet::new();
278
279        for group in &cluster_groups {
280            for i in 0..group.len() {
281                if superseded.contains(&group[i]) {
282                    continue;
283                }
284                for j in (i + 1)..group.len() {
285                    if superseded.contains(&group[j]) {
286                        continue;
287                    }
288
289                    // Borrow two separate snapshots so we don't alias &graph.
290                    let (vec_a, conf_a, content_a) = match graph.get_node(group[i]) {
291                        Some(n) => (n.feature_vec.clone(), n.confidence, n.content.clone()),
292                        None => continue,
293                    };
294                    let (vec_b, conf_b, content_b) = match graph.get_node(group[j]) {
295                        Some(n) => (n.feature_vec.clone(), n.confidence, n.content.clone()),
296                        None => continue,
297                    };
298
299                    let sim = cosine_similarity(&vec_a, &vec_b);
300                    if sim < threshold {
301                        continue;
302                    }
303
304                    // Also require high token-level overlap.
305                    let tokens_a: HashSet<String> =
306                        tokenizer.tokenize(&content_a).into_iter().collect();
307                    let tokens_b: HashSet<String> =
308                        tokenizer.tokenize(&content_b).into_iter().collect();
309
310                    if tokens_a.is_empty() && tokens_b.is_empty() {
311                        continue;
312                    }
313
314                    let intersection = tokens_a.intersection(&tokens_b).count();
315                    let union = tokens_a.union(&tokens_b).count();
316                    let jaccard = if union > 0 {
317                        intersection as f32 / union as f32
318                    } else {
319                        0.0
320                    };
321
322                    if jaccard < 0.5 {
323                        continue;
324                    }
325
326                    // Determine winner (higher confidence survives).
327                    let (winner, loser) = if conf_a >= conf_b {
328                        (group[i], group[j])
329                    } else {
330                        (group[j], group[i])
331                    };
332
333                    superseded.insert(loser);
334
335                    report.actions.push(ConsolidationAction {
336                        operation: "deduplicate_facts".to_string(),
337                        description: format!(
338                            "Node {} supersedes duplicate node {} (cosine={:.3}, jaccard={:.3})",
339                            winner, loser, sim, jaccard,
340                        ),
341                        affected_nodes: vec![winner, loser],
342                    });
343                    report.deduplicated += 1;
344
345                    if !dry_run {
346                        let edge = Edge {
347                            source_id: winner,
348                            target_id: loser,
349                            edge_type: EdgeType::Supersedes,
350                            weight: sim,
351                            created_at: crate::types::now_micros(),
352                        };
353                        // Ignore error if the edge cannot be added (e.g.
354                        // duplicate or limit reached).
355                        let _ = graph.add_edge(edge);
356                    }
357                }
358            }
359        }
360    }
361
362    // -----------------------------------------------------------------------
363    // PruneOrphans (dry-run only in V1)
364    // -----------------------------------------------------------------------
365
366    fn op_prune_orphans(
367        &self,
368        graph: &MemoryGraph,
369        max_decay: f32,
370        session_filter: Option<(u32, u32)>,
371        report: &mut ConsolidationReport,
372    ) {
373        let orphan_ids: Vec<u64> = graph
374            .nodes()
375            .iter()
376            .filter(|n| {
377                n.access_count == 0
378                    && n.decay_score < max_decay
379                    && in_session_range(n.session_id, session_filter)
380                    && graph.edges_to(n.id).is_empty()
381            })
382            .map(|n| n.id)
383            .collect();
384
385        if !orphan_ids.is_empty() {
386            report.actions.push(ConsolidationAction {
387                operation: "prune_orphans".to_string(),
388                description: format!(
389                    "Would prune {} orphaned node(s) with decay_score < {:.2} and no incoming edges",
390                    orphan_ids.len(),
391                    max_decay,
392                ),
393                affected_nodes: orphan_ids.clone(),
394            });
395            report.pruned += orphan_ids.len();
396        }
397    }
398
399    // -----------------------------------------------------------------------
400    // LinkContradictions
401    // -----------------------------------------------------------------------
402
403    fn op_link_contradictions(
404        &self,
405        graph: &mut MemoryGraph,
406        threshold: f32,
407        session_filter: Option<(u32, u32)>,
408        dry_run: bool,
409        report: &mut ConsolidationReport,
410    ) {
411        let tokenizer = Tokenizer::new();
412
413        // Collect candidate nodes: Facts and Inferences.
414        let candidates: Vec<u64> = graph
415            .nodes()
416            .iter()
417            .filter(|n| {
418                (n.event_type == EventType::Fact || n.event_type == EventType::Inference)
419                    && in_session_range(n.session_id, session_filter)
420            })
421            .map(|n| n.id)
422            .collect();
423
424        // Build a set of existing Contradicts pairs for dedup.
425        let mut existing_contradictions: HashSet<(u64, u64)> = HashSet::new();
426        for edge in graph.edges() {
427            if edge.edge_type == EdgeType::Contradicts {
428                let pair = ordered_pair(edge.source_id, edge.target_id);
429                existing_contradictions.insert(pair);
430            }
431        }
432
433        for i in 0..candidates.len() {
434            for j in (i + 1)..candidates.len() {
435                let id_a = candidates[i];
436                let id_b = candidates[j];
437
438                // Skip if already linked.
439                if existing_contradictions.contains(&ordered_pair(id_a, id_b)) {
440                    continue;
441                }
442
443                let (vec_a, content_a) = match graph.get_node(id_a) {
444                    Some(n) => (n.feature_vec.clone(), n.content.clone()),
445                    None => continue,
446                };
447                let (vec_b, content_b) = match graph.get_node(id_b) {
448                    Some(n) => (n.feature_vec.clone(), n.content.clone()),
449                    None => continue,
450                };
451
452                let sim = cosine_similarity(&vec_a, &vec_b);
453                if sim < threshold {
454                    continue;
455                }
456
457                // Check for negation: at least one of the two contents must
458                // contain a negation word that does NOT appear in the other.
459                let tokens_a: HashSet<String> =
460                    tokenizer.tokenize(&content_a).into_iter().collect();
461                let tokens_b: HashSet<String> =
462                    tokenizer.tokenize(&content_b).into_iter().collect();
463
464                let neg_set: HashSet<&str> = NEGATION_WORDS.iter().copied().collect();
465
466                let neg_in_a = tokens_a.iter().any(|t| neg_set.contains(t.as_str()));
467                let neg_in_b = tokens_b.iter().any(|t| neg_set.contains(t.as_str()));
468
469                // Contradiction signal: high similarity but exactly one side
470                // uses negation, OR both use negation words that differ.
471                if !(neg_in_a ^ neg_in_b) {
472                    continue;
473                }
474
475                existing_contradictions.insert(ordered_pair(id_a, id_b));
476
477                report.actions.push(ConsolidationAction {
478                    operation: "link_contradictions".to_string(),
479                    description: format!(
480                        "Nodes {} and {} appear contradictory (cosine={:.3})",
481                        id_a, id_b, sim,
482                    ),
483                    affected_nodes: vec![id_a, id_b],
484                });
485                report.contradictions_linked += 1;
486
487                if !dry_run {
488                    let edge = Edge {
489                        source_id: id_a,
490                        target_id: id_b,
491                        edge_type: EdgeType::Contradicts,
492                        weight: sim,
493                        created_at: crate::types::now_micros(),
494                    };
495                    let _ = graph.add_edge(edge);
496                }
497            }
498        }
499    }
500
501    // -----------------------------------------------------------------------
502    // CompressEpisodes (dry-run only in V1)
503    // -----------------------------------------------------------------------
504
505    fn op_compress_episodes(
506        &self,
507        graph: &MemoryGraph,
508        group_size: u32,
509        session_filter: Option<(u32, u32)>,
510        report: &mut ConsolidationReport,
511    ) {
512        // Collect Episode nodes sorted by creation time.
513        let mut episodes: Vec<(u64, u64, u32)> = graph
514            .nodes()
515            .iter()
516            .filter(|n| {
517                n.event_type == EventType::Episode && in_session_range(n.session_id, session_filter)
518            })
519            .map(|n| (n.id, n.created_at, n.session_id))
520            .collect();
521
522        episodes.sort_by_key(|&(_, ts, _)| ts);
523
524        if episodes.len() < group_size as usize {
525            return;
526        }
527
528        // Group contiguous episodes from the same session.
529        let mut groups: Vec<Vec<u64>> = Vec::new();
530        let mut current_group: Vec<u64> = vec![episodes[0].0];
531        let mut current_session = episodes[0].2;
532
533        for &(id, _, session) in &episodes[1..] {
534            if session == current_session {
535                current_group.push(id);
536            } else {
537                if current_group.len() >= group_size as usize {
538                    groups.push(std::mem::take(&mut current_group));
539                } else {
540                    current_group.clear();
541                }
542                current_group.push(id);
543                current_session = session;
544            }
545        }
546        if current_group.len() >= group_size as usize {
547            groups.push(current_group);
548        }
549
550        for group in &groups {
551            report.actions.push(ConsolidationAction {
552                operation: "compress_episodes".to_string(),
553                description: format!(
554                    "Would compress {} contiguous episode(s) into a summary",
555                    group.len(),
556                ),
557                affected_nodes: group.clone(),
558            });
559            report.episodes_compressed += group.len();
560        }
561    }
562
563    // -----------------------------------------------------------------------
564    // PromoteInferences
565    // -----------------------------------------------------------------------
566
567    fn op_promote_inferences(
568        &self,
569        graph: &mut MemoryGraph,
570        min_access: u32,
571        min_confidence: f32,
572        session_filter: Option<(u32, u32)>,
573        dry_run: bool,
574        report: &mut ConsolidationReport,
575    ) {
576        // First pass: collect IDs of eligible Inference nodes.
577        let eligible: Vec<u64> = graph
578            .nodes()
579            .iter()
580            .filter(|n| {
581                n.event_type == EventType::Inference
582                    && n.access_count >= min_access
583                    && n.confidence >= min_confidence
584                    && in_session_range(n.session_id, session_filter)
585            })
586            .map(|n| n.id)
587            .collect();
588
589        for &id in &eligible {
590            report.actions.push(ConsolidationAction {
591                operation: "promote_inferences".to_string(),
592                description: format!("Promote inference node {} to fact", id),
593                affected_nodes: vec![id],
594            });
595            report.inferences_promoted += 1;
596
597            if !dry_run {
598                if let Some(node) = graph.get_node_mut(id) {
599                    node.event_type = EventType::Fact;
600                }
601            }
602        }
603    }
604}
605
606// ---------------------------------------------------------------------------
607// Helpers
608// ---------------------------------------------------------------------------
609
610/// Check whether `session_id` falls within an optional inclusive range.
611fn in_session_range(session_id: u32, range: Option<(u32, u32)>) -> bool {
612    match range {
613        Some((lo, hi)) => session_id >= lo && session_id <= hi,
614        None => true,
615    }
616}
617
618/// Return the pair `(min, max)` so we can use it as a canonical key.
619fn ordered_pair(a: u64, b: u64) -> (u64, u64) {
620    if a <= b {
621        (a, b)
622    } else {
623        (b, a)
624    }
625}