Skip to main content

zeph_memory/graph/
implicit_conflict.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Implicit conflict detection for SYNAPSE recall (spec 004-17, STALE/CUPMem).
5//!
6//! [`ImplicitConflictDetector`] runs at write time to detect predicate pairs
7//! that are semantically similar but not identical, staging them in
8//! `implicit_conflict_candidates` for later resolution or annotation.
9//!
10//! SYNAPSE recall uses [`annotate_conflicts`] to mark retrieved [`ActivatedFact`]s
11//! that have pending conflict candidates.
12
13// SQLite limits bind parameters to 999. Each ID is bound twice (two IN clauses),
14// so process in chunks of at most 499.
15const MAX_IDS_PER_QUERY: usize = 499;
16
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use zeph_config::ImplicitConflictConfig;
20use zeph_db::DbTransaction;
21
22use crate::error::MemoryError;
23use crate::graph::activation::ActivatedFact;
24
25/// A candidate conflict pair detected at write time.
26#[derive(Debug, Clone)]
27pub struct ConflictCandidate {
28    /// ID of the newly inserted edge.
29    pub edge_a_id: i64,
30    /// ID of the existing edge with a similar predicate.
31    pub edge_b_id: i64,
32    /// Similarity score in `[0.0, 1.0]`.
33    pub similarity: f64,
34    /// The similarity method that produced this candidate.
35    pub method: String,
36}
37
38/// Write-time implicit conflict detector.
39///
40/// Compares a new edge's predicate against existing active edges on the same
41/// source entity using the configured similarity method and threshold.
42///
43/// # Examples
44///
45/// ```rust,no_run
46/// use zeph_config::ImplicitConflictConfig;
47/// use zeph_memory::graph::implicit_conflict::ImplicitConflictDetector;
48///
49/// let config = ImplicitConflictConfig { enabled: true, ..Default::default() };
50/// let detector = ImplicitConflictDetector::new(config);
51/// let candidates = detector.detect_candidates(42, "employ", &[(1, "employs")], false);
52/// assert!(!candidates.is_empty());
53/// ```
54pub struct ImplicitConflictDetector {
55    config: ImplicitConflictConfig,
56}
57
58impl ImplicitConflictDetector {
59    /// Create a new detector with the given configuration.
60    #[must_use]
61    pub fn new(config: ImplicitConflictConfig) -> Self {
62        Self { config }
63    }
64
65    /// Detect implicit conflict candidates for a new predicate against existing ones.
66    ///
67    /// Returns pairs where normalized Levenshtein similarity is
68    /// `>= conflict_similarity_threshold` **and** the predicates differ (identical
69    /// predicates are already handled by APEX-MEM explicit supersession).
70    ///
71    /// Returns an empty vec when `enabled = false` or when the cardinality flag
72    /// `is_cardinality_n` is set (FR-011).
73    ///
74    /// # Arguments
75    ///
76    /// * `new_edge_id` — database ID of the newly inserted edge
77    /// * `new_predicate` — canonical relation of the new edge
78    /// * `existing` — slice of `(edge_id, canonical_relation)` for all other active
79    ///   edges on the same source entity
80    /// * `is_cardinality_n` — set to `true` for multi-valued predicates; skips detection
81    #[must_use]
82    pub fn detect_candidates(
83        &self,
84        new_edge_id: i64,
85        new_predicate: &str,
86        existing: &[(i64, &str)],
87        is_cardinality_n: bool,
88    ) -> Vec<ConflictCandidate> {
89        let _span = tracing::info_span!(
90            "memory.graph.implicit_conflict.detect",
91            predicate = new_predicate,
92        )
93        .entered();
94
95        if !self.config.enabled || is_cardinality_n || existing.is_empty() {
96            return Vec::new();
97        }
98
99        let threshold = self.config.conflict_similarity_threshold;
100        let mut candidates = Vec::new();
101
102        for &(edge_id, predicate) in existing {
103            if predicate == new_predicate {
104                // Identical predicate: handled by APEX-MEM explicit supersession.
105                continue;
106            }
107            let sim = Self::normalized_levenshtein(new_predicate, predicate);
108            if sim >= threshold {
109                candidates.push(ConflictCandidate {
110                    edge_a_id: new_edge_id,
111                    edge_b_id: edge_id,
112                    similarity: sim,
113                    method: "levenshtein".to_owned(),
114                });
115            }
116        }
117
118        candidates
119    }
120
121    /// Persist conflict candidates into `implicit_conflict_candidates`.
122    ///
123    /// Each candidate is inserted with `status = 'pending'` and an expiry of
124    /// `now + ttl_days * 86400` seconds.
125    ///
126    /// # Errors
127    ///
128    /// Returns a [`MemoryError`] on database write failure.
129    pub async fn stage_candidates(
130        &self,
131        candidates: &[ConflictCandidate],
132        tx: &mut DbTransaction<'_>,
133        ttl_days: u32,
134    ) -> Result<(), MemoryError> {
135        if candidates.is_empty() {
136            return Ok(());
137        }
138
139        let _span = tracing::info_span!(
140            "memory.graph.implicit_conflict.stage",
141            count = candidates.len(),
142        )
143        .entered();
144
145        #[allow(clippy::cast_possible_wrap)]
146        let now = SystemTime::now()
147            .duration_since(UNIX_EPOCH)
148            .unwrap_or_default()
149            .as_secs() as i64;
150        let ttl_secs = i64::from(ttl_days) * 86_400;
151        let expires_at = now + ttl_secs;
152
153        for c in candidates {
154            sqlx::query(
155                "INSERT INTO implicit_conflict_candidates
156                 (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
157                 VALUES (?, ?, ?, ?, 'pending', ?, ?)",
158            )
159            .bind(c.edge_a_id)
160            .bind(c.edge_b_id)
161            .bind(c.similarity)
162            .bind(&c.method)
163            .bind(now)
164            .bind(expires_at)
165            .execute(&mut **tx)
166            .await
167            .map_err(MemoryError::from)?;
168        }
169
170        Ok(())
171    }
172
173    /// Compute normalized Levenshtein similarity between two strings.
174    ///
175    /// Returns a value in `[0.0, 1.0]` where `1.0` means identical.
176    /// Returns `1.0` if both strings are empty, `0.0` if only one is empty.
177    #[must_use]
178    pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
179        if a == b {
180            return 1.0;
181        }
182        let len_a = a.chars().count();
183        let len_b = b.chars().count();
184        if len_a == 0 && len_b == 0 {
185            return 1.0;
186        }
187        if len_a == 0 || len_b == 0 {
188            return 0.0;
189        }
190        let dist = levenshtein_distance(a, b);
191        let max_len = len_a.max(len_b);
192        #[allow(clippy::cast_precision_loss)]
193        let result = 1.0 - (dist as f64 / max_len as f64);
194        result
195    }
196
197    /// Returns `true` when detection is enabled.
198    #[must_use]
199    pub fn is_enabled(&self) -> bool {
200        self.config.enabled
201    }
202
203    /// Returns the configured TTL for conflict candidates, in days.
204    #[must_use]
205    pub fn candidate_ttl_days(&self) -> u32 {
206        self.config.candidate_ttl_days
207    }
208}
209
210/// Annotate retrieved [`ActivatedFact`]s with pending implicit conflict metadata.
211///
212/// Queries `implicit_conflict_candidates` for all edge IDs in `facts` and sets
213/// `is_implicit_conflict = true` and `conflict_candidate_id` on matches.
214///
215/// # Errors
216///
217/// Returns a [`MemoryError`] on database query failure.
218pub async fn annotate_conflicts(
219    facts: &mut [ActivatedFact],
220    tx: &mut DbTransaction<'_>,
221) -> Result<(), MemoryError> {
222    if facts.is_empty() {
223        return Ok(());
224    }
225
226    let _span = tracing::info_span!(
227        "memory.graph.implicit_conflict.annotate",
228        facts = facts.len(),
229    )
230    .entered();
231
232    let edge_ids: Vec<i64> = facts.iter().map(|f| f.edge.id).collect();
233
234    let mut edge_to_candidate: std::collections::HashMap<i64, i64> =
235        std::collections::HashMap::new();
236
237    for chunk in edge_ids.chunks(MAX_IDS_PER_QUERY) {
238        let placeholders: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
239        let query_str = format!(
240            "SELECT id, edge_a_id, edge_b_id
241             FROM implicit_conflict_candidates
242             WHERE status = 'pending'
243               AND (edge_a_id IN ({placeholders}) OR edge_b_id IN ({placeholders}))",
244        );
245
246        let mut q = sqlx::query(&query_str);
247        for id in chunk {
248            q = q.bind(id);
249        }
250        // Bind a second time for the second IN clause.
251        for id in chunk {
252            q = q.bind(id);
253        }
254
255        let rows = q.fetch_all(&mut **tx).await.map_err(MemoryError::from)?;
256
257        for row in rows {
258            use sqlx::Row as _;
259            let candidate_id: i64 = row.try_get("id").map_err(MemoryError::from)?;
260            let ea: i64 = row.try_get("edge_a_id").map_err(MemoryError::from)?;
261            let eb: i64 = row.try_get("edge_b_id").map_err(MemoryError::from)?;
262            edge_to_candidate.entry(ea).or_insert(candidate_id);
263            edge_to_candidate.entry(eb).or_insert(candidate_id);
264        }
265    }
266
267    for fact in facts.iter_mut() {
268        if let Some(&cid) = edge_to_candidate.get(&fact.edge.id) {
269            fact.is_implicit_conflict = true;
270            fact.conflict_candidate_id = Some(cid);
271        }
272    }
273
274    Ok(())
275}
276
277/// Hand-rolled Levenshtein edit distance (char-level).
278fn levenshtein_distance(a: &str, b: &str) -> usize {
279    let a_chars: Vec<char> = a.chars().collect();
280    let b_chars: Vec<char> = b.chars().collect();
281    let m = a_chars.len();
282    let n = b_chars.len();
283
284    let mut prev: Vec<usize> = (0..=n).collect();
285    let mut curr = vec![0usize; n + 1];
286
287    for i in 1..=m {
288        curr[0] = i;
289        for j in 1..=n {
290            let cost = usize::from(a_chars[i - 1] != b_chars[j - 1]);
291            curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost);
292        }
293        std::mem::swap(&mut prev, &mut curr);
294    }
295
296    prev[n]
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use zeph_config::ImplicitConflictConfig;
303
304    fn detector(enabled: bool) -> ImplicitConflictDetector {
305        ImplicitConflictDetector::new(ImplicitConflictConfig {
306            enabled,
307            conflict_similarity_threshold: 0.80,
308            ..Default::default()
309        })
310    }
311
312    #[test]
313    fn normalized_levenshtein_identical() {
314        assert!(
315            (ImplicitConflictDetector::normalized_levenshtein("uses", "uses") - 1.0).abs()
316                < f64::EPSILON
317        );
318    }
319
320    #[test]
321    fn normalized_levenshtein_empty_both() {
322        assert!(
323            (ImplicitConflictDetector::normalized_levenshtein("", "") - 1.0).abs() < f64::EPSILON
324        );
325    }
326
327    #[test]
328    fn normalized_levenshtein_empty_one() {
329        assert!(
330            (ImplicitConflictDetector::normalized_levenshtein("", "abc") - 0.0).abs()
331                < f64::EPSILON
332        );
333        assert!(
334            (ImplicitConflictDetector::normalized_levenshtein("abc", "") - 0.0).abs()
335                < f64::EPSILON
336        );
337    }
338
339    #[test]
340    fn normalized_levenshtein_completely_different() {
341        let sim = ImplicitConflictDetector::normalized_levenshtein("uses", "xyz_unrelated_value");
342        assert!(sim < 0.5, "expected low similarity, got {sim}");
343    }
344
345    #[test]
346    fn detect_candidates_above_threshold_returns_candidate() {
347        let d = detector(true);
348        // "employ" vs "employs": distance = 1, max = 7, sim ≈ 0.857 — above 0.80
349        let candidates = d.detect_candidates(42, "employ", &[(7, "employs")], false);
350        assert_eq!(candidates.len(), 1, "expected one candidate");
351        assert_eq!(candidates[0].edge_a_id, 42);
352        assert_eq!(candidates[0].edge_b_id, 7);
353        assert!(candidates[0].similarity >= 0.80);
354    }
355
356    #[test]
357    fn detect_candidates_below_threshold_returns_empty() {
358        let d = detector(true);
359        let candidates = d.detect_candidates(1, "uses", &[(2, "xyz_unrelated")], false);
360        assert!(
361            candidates.is_empty(),
362            "expected no candidates below threshold"
363        );
364    }
365
366    #[test]
367    fn detect_candidates_identical_predicate_skipped() {
368        let d = detector(true);
369        // Identical predicates are handled by APEX-MEM; detector must skip them.
370        let candidates = d.detect_candidates(1, "uses", &[(2, "uses")], false);
371        assert!(
372            candidates.is_empty(),
373            "identical predicates must not create candidates"
374        );
375    }
376
377    #[test]
378    fn detect_candidates_disabled_returns_empty() {
379        let d = detector(false);
380        // Even with high-similarity predicates, disabled detector returns nothing.
381        let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], false);
382        assert!(candidates.is_empty(), "disabled detector must return empty");
383    }
384
385    #[test]
386    fn detect_candidates_cardinality_n_skipped() {
387        let d = detector(true);
388        let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], true);
389        assert!(
390            candidates.is_empty(),
391            "cardinality-n predicate must be skipped"
392        );
393    }
394
395    // ── annotate_conflicts DB tests ───────────────────────────────────────────
396
397    async fn setup_test_db() -> crate::store::SqliteStore {
398        crate::store::SqliteStore::new(":memory:").await.unwrap()
399    }
400
401    fn stub_fact(edge_id: i64) -> ActivatedFact {
402        use crate::graph::types::{Edge, EdgeType};
403        ActivatedFact {
404            edge: Edge {
405                id: edge_id,
406                source_entity_id: 1,
407                target_entity_id: 2,
408                relation: "test".to_owned(),
409                canonical_relation: "test".to_owned(),
410                fact: "test fact".to_owned(),
411                confidence: 1.0,
412                valid_from: "2026-01-01".to_owned(),
413                valid_to: None,
414                created_at: "2026-01-01".to_owned(),
415                expired_at: None,
416                source_message_id: None,
417                qdrant_point_id: None,
418                edge_type: EdgeType::Semantic,
419                retrieval_count: 0,
420                last_retrieved_at: None,
421                superseded_by: None,
422                supersedes: None,
423                weight: 1.0,
424            },
425            activation_score: 1.0,
426            is_implicit_conflict: false,
427            conflict_candidate_id: None,
428        }
429    }
430
431    #[tokio::test]
432    async fn annotate_conflicts_marks_flagged_edges() {
433        let db = setup_test_db().await;
434        let pool = db.pool();
435
436        // Insert a pending candidate pair (edge_a_id=1, edge_b_id=2).
437        // Use raw SQL since these are not real graph_edges (no FK enforcement with PRAGMA).
438        sqlx::query(
439            "PRAGMA foreign_keys = OFF;
440             INSERT INTO implicit_conflict_candidates
441             (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
442             VALUES (1, 2, 0.90, 'levenshtein', 'pending', 1000000, 9999999)",
443        )
444        .execute(pool)
445        .await
446        .unwrap();
447
448        let mut facts = vec![stub_fact(1), stub_fact(3)];
449
450        let mut tx = zeph_db::begin(pool).await.unwrap();
451        annotate_conflicts(&mut facts, &mut tx).await.unwrap();
452        tx.commit().await.unwrap();
453
454        assert!(facts[0].is_implicit_conflict, "edge 1 must be flagged");
455        assert!(facts[0].conflict_candidate_id.is_some());
456        assert!(!facts[1].is_implicit_conflict, "edge 3 must not be flagged");
457        assert!(facts[1].conflict_candidate_id.is_none());
458    }
459
460    #[tokio::test]
461    async fn annotate_conflicts_empty_candidates_no_annotation() {
462        let db = setup_test_db().await;
463        let pool = db.pool();
464
465        let mut facts = vec![stub_fact(10), stub_fact(20)];
466
467        let mut tx = zeph_db::begin(pool).await.unwrap();
468        annotate_conflicts(&mut facts, &mut tx).await.unwrap();
469        tx.commit().await.unwrap();
470
471        assert!(
472            !facts[0].is_implicit_conflict,
473            "no candidates → no annotation"
474        );
475        assert!(facts[1].conflict_candidate_id.is_none());
476    }
477
478    #[tokio::test]
479    async fn annotate_conflicts_edge_b_side_also_flagged() {
480        let db = setup_test_db().await;
481        let pool = db.pool();
482
483        // Insert candidate with edge_a=5, edge_b=7. Pass edge 7 in facts.
484        sqlx::query(
485            "PRAGMA foreign_keys = OFF;
486             INSERT INTO implicit_conflict_candidates
487             (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at)
488             VALUES (5, 7, 0.85, 'levenshtein', 'pending', 1000000, 9999999)",
489        )
490        .execute(pool)
491        .await
492        .unwrap();
493
494        let mut facts = vec![stub_fact(7)];
495
496        let mut tx = zeph_db::begin(pool).await.unwrap();
497        annotate_conflicts(&mut facts, &mut tx).await.unwrap();
498        tx.commit().await.unwrap();
499
500        assert!(
501            facts[0].is_implicit_conflict,
502            "edge on edge_b side must also be flagged"
503        );
504    }
505}