Skip to main content

zeph_memory/graph/
implicit_conflict.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Implicit conflict detection for SYNAPSE recall (spec 004-17, STALE/CUPMem).
5//!
6//! [`ImplicitConflictDetector`] runs at write time to detect predicate pairs
7//! that are semantically similar but not identical, staging them in
8//! `implicit_conflict_candidates` for later resolution or annotation.
9//!
10//! SYNAPSE recall uses [`annotate_conflicts`] to mark retrieved [`ActivatedFact`]s
11//! that have pending conflict candidates.
12
13// SQLite limits bind parameters to 999. Each ID is bound twice (two IN clauses),
14// so process in chunks of at most 499.
15const MAX_IDS_PER_QUERY: usize = 499;
16
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use zeph_config::ImplicitConflictConfig;
20use zeph_db::DbTransaction;
21
22use crate::error::MemoryError;
23use crate::graph::activation::ActivatedFact;
24
25/// A candidate conflict pair detected at write time.
26#[derive(Debug, Clone)]
27pub struct ConflictCandidate {
28    /// ID of the newly inserted edge.
29    pub edge_a_id: i64,
30    /// ID of the existing edge with a similar predicate.
31    pub edge_b_id: i64,
32    /// Similarity score in `[0.0, 1.0]`.
33    pub similarity: f64,
34    /// The similarity method that produced this candidate.
35    pub method: String,
36}
37
38/// Write-time implicit conflict detector.
39///
40/// Compares a new edge's predicate against existing active edges on the same
41/// source entity using the configured similarity method and threshold.
42///
43/// # Examples
44///
45/// ```rust,no_run
46/// use zeph_config::ImplicitConflictConfig;
47/// use zeph_memory::graph::implicit_conflict::ImplicitConflictDetector;
48///
49/// let config = ImplicitConflictConfig { enabled: true, ..Default::default() };
50/// let detector = ImplicitConflictDetector::new(config);
51/// let candidates = detector.detect_candidates(42, "employ", &[(1, "employs")], false);
52/// assert!(!candidates.is_empty());
53/// ```
54pub struct ImplicitConflictDetector {
55    config: ImplicitConflictConfig,
56}
57
58impl ImplicitConflictDetector {
59    /// Create a new detector with the given configuration.
60    #[must_use]
61    pub fn new(config: ImplicitConflictConfig) -> Self {
62        Self { config }
63    }
64
65    /// Detect implicit conflict candidates for a new predicate against existing ones.
66    ///
67    /// Returns pairs where normalized Levenshtein similarity is
68    /// `>= conflict_similarity_threshold` **and** the predicates differ (identical
69    /// predicates are already handled by APEX-MEM explicit supersession).
70    ///
71    /// Returns an empty vec when `enabled = false` or when the cardinality flag
72    /// `is_cardinality_n` is set (FR-011).
73    ///
74    /// # Arguments
75    ///
76    /// * `new_edge_id` — database ID of the newly inserted edge
77    /// * `new_predicate` — canonical relation of the new edge
78    /// * `existing` — slice of `(edge_id, canonical_relation)` for all other active
79    ///   edges on the same source entity
80    /// * `is_cardinality_n` — set to `true` for multi-valued predicates; skips detection
81    #[must_use]
82    pub fn detect_candidates(
83        &self,
84        new_edge_id: i64,
85        new_predicate: &str,
86        existing: &[(i64, &str)],
87        is_cardinality_n: bool,
88    ) -> Vec<ConflictCandidate> {
89        let _span = tracing::info_span!(
90            "memory.graph.implicit_conflict.detect",
91            predicate = new_predicate,
92        )
93        .entered();
94
95        if !self.config.enabled || is_cardinality_n || existing.is_empty() {
96            return Vec::new();
97        }
98
99        let threshold = self.config.conflict_similarity_threshold;
100        let mut candidates = Vec::new();
101
102        for &(edge_id, predicate) in existing {
103            if predicate == new_predicate {
104                // Identical predicate: handled by APEX-MEM explicit supersession.
105                continue;
106            }
107            let sim = Self::normalized_levenshtein(new_predicate, predicate);
108            if sim >= threshold {
109                candidates.push(ConflictCandidate {
110                    edge_a_id: new_edge_id,
111                    edge_b_id: edge_id,
112                    similarity: sim,
113                    method: "levenshtein".to_owned(),
114                });
115            }
116        }
117
118        candidates
119    }
120
121    /// Persist conflict candidates into `implicit_conflict_candidates`.
122    ///
123    /// Each candidate is inserted with `status = 'pending'` and an expiry of
124    /// `now + ttl_days * 86400` seconds.
125    ///
126    /// # Errors
127    ///
128    /// Returns a [`MemoryError`] on database write failure.
129    #[tracing::instrument(skip_all, name = "memory.graph.implicit_conflict.stage", fields(count = candidates.len()))]
130    pub async fn stage_candidates(
131        &self,
132        candidates: &[ConflictCandidate],
133        tx: &mut DbTransaction<'_>,
134        ttl_days: u32,
135    ) -> Result<(), MemoryError> {
136        if candidates.is_empty() {
137            return Ok(());
138        }
139
140        #[allow(clippy::cast_possible_wrap)]
141        let now = SystemTime::now()
142            .duration_since(UNIX_EPOCH)
143            .unwrap_or_default()
144            .as_secs() as i64;
145        let ttl_secs = i64::from(ttl_days) * 86_400;
146        let expires_at = now + ttl_secs;
147
148        for c in candidates {
149            sqlx::query(
150                "INSERT INTO implicit_conflict_candidates
151                 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
152                 VALUES (?, ?, ?, ?, 'pending', ?, ?)",
153            )
154            .bind(c.edge_a_id)
155            .bind(c.edge_b_id)
156            .bind(c.similarity)
157            .bind(&c.method)
158            .bind(now)
159            .bind(expires_at)
160            .execute(&mut **tx)
161            .await
162            .map_err(MemoryError::from)?;
163        }
164
165        Ok(())
166    }
167
168    /// Compute normalized Levenshtein similarity between two strings.
169    ///
170    /// Returns a value in `[0.0, 1.0]` where `1.0` means identical.
171    /// Returns `1.0` if both strings are empty, `0.0` if only one is empty.
172    #[must_use]
173    pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
174        if a == b {
175            return 1.0;
176        }
177        let len_a = a.chars().count();
178        let len_b = b.chars().count();
179        if len_a == 0 && len_b == 0 {
180            return 1.0;
181        }
182        if len_a == 0 || len_b == 0 {
183            return 0.0;
184        }
185        let dist = levenshtein_distance(a, b);
186        let max_len = len_a.max(len_b);
187        #[allow(clippy::cast_precision_loss)]
188        let result = 1.0 - (dist as f64 / max_len as f64);
189        result
190    }
191
192    /// Returns `true` when detection is enabled.
193    #[must_use]
194    pub fn is_enabled(&self) -> bool {
195        self.config.enabled
196    }
197
198    /// Returns the configured TTL for conflict candidates, in days.
199    #[must_use]
200    pub fn candidate_ttl_days(&self) -> u32 {
201        self.config.candidate_ttl_days
202    }
203}
204
205/// Annotate retrieved [`ActivatedFact`]s with pending implicit conflict metadata.
206///
207/// Queries `implicit_conflict_candidates` for all edge IDs in `facts` and sets
208/// `is_implicit_conflict = true` and `conflict_candidate_id` on matches.
209///
210/// # Errors
211///
212/// Returns a [`MemoryError`] on database query failure.
213#[tracing::instrument(skip_all, name = "memory.graph.implicit_conflict.annotate", fields(facts = facts.len()))]
214pub async fn annotate_conflicts(
215    facts: &mut [ActivatedFact],
216    tx: &mut DbTransaction<'_>,
217) -> Result<(), MemoryError> {
218    if facts.is_empty() {
219        return Ok(());
220    }
221
222    let edge_ids: Vec<i64> = facts.iter().map(|f| f.edge.id).collect();
223
224    let mut edge_to_candidate: std::collections::HashMap<i64, i64> =
225        std::collections::HashMap::new();
226
227    for chunk in edge_ids.chunks(MAX_IDS_PER_QUERY) {
228        let placeholders: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
229        let query_str = format!(
230            "SELECT id, edge_a_id, edge_b_id
231             FROM implicit_conflict_candidates
232             WHERE status = 'pending'
233               AND (edge_a_id IN ({placeholders}) OR edge_b_id IN ({placeholders}))",
234        );
235
236        let mut q = sqlx::query(&query_str);
237        for id in chunk {
238            q = q.bind(id);
239        }
240        // Bind a second time for the second IN clause.
241        for id in chunk {
242            q = q.bind(id);
243        }
244
245        let rows = q.fetch_all(&mut **tx).await.map_err(MemoryError::from)?;
246
247        for row in rows {
248            use sqlx::Row as _;
249            let candidate_id: i64 = row.try_get("id").map_err(MemoryError::from)?;
250            let ea: i64 = row.try_get("edge_a_id").map_err(MemoryError::from)?;
251            let eb: i64 = row.try_get("edge_b_id").map_err(MemoryError::from)?;
252            edge_to_candidate.entry(ea).or_insert(candidate_id);
253            edge_to_candidate.entry(eb).or_insert(candidate_id);
254        }
255    }
256
257    for fact in facts.iter_mut() {
258        if let Some(&cid) = edge_to_candidate.get(&fact.edge.id) {
259            fact.is_implicit_conflict = true;
260            fact.conflict_candidate_id = Some(cid);
261        }
262    }
263
264    Ok(())
265}
266
267/// Hand-rolled Levenshtein edit distance (char-level).
268fn levenshtein_distance(a: &str, b: &str) -> usize {
269    let a_chars: Vec<char> = a.chars().collect();
270    let b_chars: Vec<char> = b.chars().collect();
271    let m = a_chars.len();
272    let n = b_chars.len();
273
274    let mut prev: Vec<usize> = (0..=n).collect();
275    let mut curr = vec![0usize; n + 1];
276
277    for i in 1..=m {
278        curr[0] = i;
279        for j in 1..=n {
280            let cost = usize::from(a_chars[i - 1] != b_chars[j - 1]);
281            curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost);
282        }
283        std::mem::swap(&mut prev, &mut curr);
284    }
285
286    prev[n]
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use zeph_config::ImplicitConflictConfig;
293
294    fn detector(enabled: bool) -> ImplicitConflictDetector {
295        ImplicitConflictDetector::new(ImplicitConflictConfig {
296            enabled,
297            conflict_similarity_threshold: 0.80,
298            ..Default::default()
299        })
300    }
301
302    #[test]
303    fn normalized_levenshtein_identical() {
304        assert!(
305            (ImplicitConflictDetector::normalized_levenshtein("uses", "uses") - 1.0).abs()
306                < f64::EPSILON
307        );
308    }
309
310    #[test]
311    fn normalized_levenshtein_empty_both() {
312        assert!(
313            (ImplicitConflictDetector::normalized_levenshtein("", "") - 1.0).abs() < f64::EPSILON
314        );
315    }
316
317    #[test]
318    fn normalized_levenshtein_empty_one() {
319        assert!(
320            (ImplicitConflictDetector::normalized_levenshtein("", "abc") - 0.0).abs()
321                < f64::EPSILON
322        );
323        assert!(
324            (ImplicitConflictDetector::normalized_levenshtein("abc", "") - 0.0).abs()
325                < f64::EPSILON
326        );
327    }
328
329    #[test]
330    fn normalized_levenshtein_completely_different() {
331        let sim = ImplicitConflictDetector::normalized_levenshtein("uses", "xyz_unrelated_value");
332        assert!(sim < 0.5, "expected low similarity, got {sim}");
333    }
334
335    #[test]
336    fn detect_candidates_above_threshold_returns_candidate() {
337        let d = detector(true);
338        // "employ" vs "employs": distance = 1, max = 7, sim ≈ 0.857 — above 0.80
339        let candidates = d.detect_candidates(42, "employ", &[(7, "employs")], false);
340        assert_eq!(candidates.len(), 1, "expected one candidate");
341        assert_eq!(candidates[0].edge_a_id, 42);
342        assert_eq!(candidates[0].edge_b_id, 7);
343        assert!(candidates[0].similarity >= 0.80);
344    }
345
346    #[test]
347    fn detect_candidates_below_threshold_returns_empty() {
348        let d = detector(true);
349        let candidates = d.detect_candidates(1, "uses", &[(2, "xyz_unrelated")], false);
350        assert!(
351            candidates.is_empty(),
352            "expected no candidates below threshold"
353        );
354    }
355
356    #[test]
357    fn detect_candidates_identical_predicate_skipped() {
358        let d = detector(true);
359        // Identical predicates are handled by APEX-MEM; detector must skip them.
360        let candidates = d.detect_candidates(1, "uses", &[(2, "uses")], false);
361        assert!(
362            candidates.is_empty(),
363            "identical predicates must not create candidates"
364        );
365    }
366
367    #[test]
368    fn detect_candidates_disabled_returns_empty() {
369        let d = detector(false);
370        // Even with high-similarity predicates, disabled detector returns nothing.
371        let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], false);
372        assert!(candidates.is_empty(), "disabled detector must return empty");
373    }
374
375    #[test]
376    fn detect_candidates_cardinality_n_skipped() {
377        let d = detector(true);
378        let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], true);
379        assert!(
380            candidates.is_empty(),
381            "cardinality-n predicate must be skipped"
382        );
383    }
384
385    // ── annotate_conflicts DB tests ───────────────────────────────────────────
386
387    async fn setup_test_db() -> crate::store::SqliteStore {
388        crate::store::SqliteStore::new(":memory:").await.unwrap()
389    }
390
391    fn stub_fact(edge_id: i64) -> ActivatedFact {
392        use crate::graph::types::{Edge, EdgeType};
393        ActivatedFact {
394            edge: Edge {
395                id: edge_id,
396                source_entity_id: 1,
397                target_entity_id: 2,
398                relation: "test".to_owned(),
399                canonical_relation: "test".to_owned(),
400                fact: "test fact".to_owned(),
401                confidence: 1.0,
402                valid_from: "2026-01-01".to_owned(),
403                valid_to: None,
404                created_at: "2026-01-01".to_owned(),
405                expired_at: None,
406                source_message_id: None,
407                qdrant_point_id: None,
408                edge_type: EdgeType::Semantic,
409                retrieval_count: 0,
410                last_retrieved_at: None,
411                superseded_by: None,
412                supersedes: None,
413                weight: 1.0,
414                confidence_fast: 1.0,
415                confidence_slow: 1.0,
416                turn_index: None,
417            },
418            activation_score: 1.0,
419            is_implicit_conflict: false,
420            conflict_candidate_id: None,
421        }
422    }
423
424    #[tokio::test]
425    async fn annotate_conflicts_marks_flagged_edges() {
426        let db = setup_test_db().await;
427        let pool = db.pool();
428
429        // Insert a pending candidate pair (edge_a_id=1, edge_b_id=2).
430        // Use raw SQL since these are not real graph_edges (no FK enforcement with PRAGMA).
431        sqlx::query(
432            "PRAGMA foreign_keys = OFF;
433             INSERT INTO implicit_conflict_candidates
434             (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
435             VALUES (1, 2, 0.90, 'levenshtein', 'pending', 1000000, 9999999)",
436        )
437        .execute(pool)
438        .await
439        .unwrap();
440
441        let mut facts = vec![stub_fact(1), stub_fact(3)];
442
443        let mut tx = zeph_db::begin(pool).await.unwrap();
444        annotate_conflicts(&mut facts, &mut tx).await.unwrap();
445        tx.commit().await.unwrap();
446
447        assert!(facts[0].is_implicit_conflict, "edge 1 must be flagged");
448        assert!(facts[0].conflict_candidate_id.is_some());
449        assert!(!facts[1].is_implicit_conflict, "edge 3 must not be flagged");
450        assert!(facts[1].conflict_candidate_id.is_none());
451    }
452
453    #[tokio::test]
454    async fn annotate_conflicts_empty_candidates_no_annotation() {
455        let db = setup_test_db().await;
456        let pool = db.pool();
457
458        let mut facts = vec![stub_fact(10), stub_fact(20)];
459
460        let mut tx = zeph_db::begin(pool).await.unwrap();
461        annotate_conflicts(&mut facts, &mut tx).await.unwrap();
462        tx.commit().await.unwrap();
463
464        assert!(
465            !facts[0].is_implicit_conflict,
466            "no candidates → no annotation"
467        );
468        assert!(facts[1].conflict_candidate_id.is_none());
469    }
470
471    #[tokio::test]
472    async fn annotate_conflicts_edge_b_side_also_flagged() {
473        let db = setup_test_db().await;
474        let pool = db.pool();
475
476        // Insert candidate with edge_a=5, edge_b=7. Pass edge 7 in facts.
477        sqlx::query(
478            "PRAGMA foreign_keys = OFF;
479             INSERT INTO implicit_conflict_candidates
480             (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
481             VALUES (5, 7, 0.85, 'levenshtein', 'pending', 1000000, 9999999)",
482        )
483        .execute(pool)
484        .await
485        .unwrap();
486
487        let mut facts = vec![stub_fact(7)];
488
489        let mut tx = zeph_db::begin(pool).await.unwrap();
490        annotate_conflicts(&mut facts, &mut tx).await.unwrap();
491        tx.commit().await.unwrap();
492
493        assert!(
494            facts[0].is_implicit_conflict,
495            "edge on edge_b side must also be flagged"
496        );
497    }
498}