Skip to main content

engram/graph/
conflicts.rs

1//! Graph Conflict Detection & Resolution (RML-1217)
2//!
3//! Inspired by Mem0's approach to managing conflicting knowledge in graphs.
4//! Provides:
5//! - Detection of four conflict types: direct contradictions, temporal inconsistencies,
6//!   cyclic dependencies, and orphaned references.
7//! - Resolution strategies: keep newer, keep higher confidence, merge, or manual.
8//! - Persistence of detected conflicts in the `graph_conflicts` table.
9
10use rusqlite::{params, Connection};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13
14use crate::error::{EngramError, Result};
15
16// =============================================================================
17// DDL
18// =============================================================================
19
20/// SQL that creates the `graph_conflicts` table.
21///
22/// Safe to run on an existing database — uses `IF NOT EXISTS`.
23pub const CREATE_CONFLICTS_TABLE: &str = r#"
24CREATE TABLE IF NOT EXISTS graph_conflicts (
25    id                  INTEGER PRIMARY KEY AUTOINCREMENT,
26    conflict_type       TEXT    NOT NULL,
27    edge_ids            TEXT    NOT NULL DEFAULT '[]',
28    description         TEXT    NOT NULL,
29    severity            TEXT    NOT NULL,
30    resolved_at         TEXT,
31    resolution_strategy TEXT,
32    created_at          TEXT    NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
33);
34CREATE INDEX IF NOT EXISTS idx_graph_conflicts_type     ON graph_conflicts(conflict_type);
35CREATE INDEX IF NOT EXISTS idx_graph_conflicts_severity ON graph_conflicts(severity);
36CREATE INDEX IF NOT EXISTS idx_graph_conflicts_resolved ON graph_conflicts(resolved_at);
37"#;
38
39// =============================================================================
40// Types
41// =============================================================================
42
43/// The category of graph conflict.
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum ConflictType {
47    /// Two edges between the same pair of nodes carry contradicting relation
48    /// types (e.g. "supports" AND "contradicts" for the same A→B pair).
49    DirectContradiction,
50    /// Two or more edges for the same entity pair have overlapping validity
51    /// periods, indicating a temporal inconsistency.
52    TemporalInconsistency,
53    /// A cycle exists in the directed edge graph (A→B→C→A).
54    CyclicDependency,
55    /// An edge references a `from_id` or `to_id` that does not exist in the
56    /// `memories` table.
57    OrphanedReference,
58}
59
60impl ConflictType {
61    fn as_str(&self) -> &'static str {
62        match self {
63            ConflictType::DirectContradiction => "direct_contradiction",
64            ConflictType::TemporalInconsistency => "temporal_inconsistency",
65            ConflictType::CyclicDependency => "cyclic_dependency",
66            ConflictType::OrphanedReference => "orphaned_reference",
67        }
68    }
69
70    fn from_str(s: &str) -> Option<Self> {
71        match s {
72            "direct_contradiction" => Some(ConflictType::DirectContradiction),
73            "temporal_inconsistency" => Some(ConflictType::TemporalInconsistency),
74            "cyclic_dependency" => Some(ConflictType::CyclicDependency),
75            "orphaned_reference" => Some(ConflictType::OrphanedReference),
76            _ => None,
77        }
78    }
79}
80
81/// Severity level of a detected conflict.
82#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
83#[serde(rename_all = "snake_case")]
84pub enum Severity {
85    Low,
86    Medium,
87    High,
88    Critical,
89}
90
91impl Severity {
92    fn as_str(&self) -> &'static str {
93        match self {
94            Severity::Low => "low",
95            Severity::Medium => "medium",
96            Severity::High => "high",
97            Severity::Critical => "critical",
98        }
99    }
100
101    fn from_str(s: &str) -> Option<Self> {
102        match s {
103            "low" => Some(Severity::Low),
104            "medium" => Some(Severity::Medium),
105            "high" => Some(Severity::High),
106            "critical" => Some(Severity::Critical),
107            _ => None,
108        }
109    }
110}
111
112/// A detected conflict in the knowledge graph.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct Conflict {
115    /// Unique row identifier (`0` for unsaved conflicts).
116    pub id: i64,
117    /// Category of conflict.
118    pub conflict_type: ConflictType,
119    /// IDs of the edges involved in this conflict.
120    pub edge_ids: Vec<i64>,
121    /// Human-readable description of the conflict.
122    pub description: String,
123    /// How severe this conflict is.
124    pub severity: Severity,
125    /// When the conflict was resolved (`None` = unresolved).
126    pub resolved_at: Option<String>,
127    /// Which strategy was used to resolve this conflict.
128    pub resolution_strategy: Option<ResolutionStrategy>,
129}
130
131/// Strategy to apply when resolving a conflict.
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum ResolutionStrategy {
135    /// Remove all but the most recently created edge.
136    KeepNewer,
137    /// Remove all but the edge with the highest confidence / importance proxy.
138    KeepHigherConfidence,
139    /// Merge edge metadata into a single edge.
140    Merge,
141    /// Mark the conflict resolved without modifying any edges.
142    Manual,
143}
144
145impl ResolutionStrategy {
146    fn as_str(&self) -> &'static str {
147        match self {
148            ResolutionStrategy::KeepNewer => "keep_newer",
149            ResolutionStrategy::KeepHigherConfidence => "keep_higher_confidence",
150            ResolutionStrategy::Merge => "merge",
151            ResolutionStrategy::Manual => "manual",
152        }
153    }
154
155    fn from_str(s: &str) -> Option<Self> {
156        match s {
157            "keep_newer" => Some(ResolutionStrategy::KeepNewer),
158            "keep_higher_confidence" => Some(ResolutionStrategy::KeepHigherConfidence),
159            "merge" => Some(ResolutionStrategy::Merge),
160            "manual" => Some(ResolutionStrategy::Manual),
161            _ => None,
162        }
163    }
164}
165
166/// Outcome of resolving a conflict.
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct ResolutionResult {
169    /// The conflict that was resolved.
170    pub conflict_id: i64,
171    /// Strategy that was applied.
172    pub strategy: ResolutionStrategy,
173    /// Edge IDs that were deleted during resolution.
174    pub edges_removed: Vec<i64>,
175    /// Edge IDs that were kept during resolution.
176    pub edges_kept: Vec<i64>,
177}
178
179// =============================================================================
180// ConflictDetector
181// =============================================================================
182
183/// Detects conflicts in the `cross_references` graph.
184pub struct ConflictDetector;
185
186/// Pairs of relation types that are considered direct contradictions.
187const CONTRADICTING_PAIRS: &[(&str, &str)] = &[
188    ("supports", "contradicts"),
189    ("agrees_with", "disagrees_with"),
190    ("confirms", "refutes"),
191    ("approves", "rejects"),
192    ("enables", "prevents"),
193    ("causes", "prevents"),
194];
195
196impl ConflictDetector {
197    /// Run all detectors and return a combined, deduplicated list of conflicts.
198    pub fn detect_all(conn: &Connection) -> Result<Vec<Conflict>> {
199        let mut conflicts = Vec::new();
200        conflicts.extend(Self::detect_contradictions(conn)?);
201        conflicts.extend(Self::detect_temporal_inconsistencies(conn)?);
202        conflicts.extend(Self::detect_cycles(conn)?);
203        conflicts.extend(Self::detect_orphans(conn)?);
204        Ok(conflicts)
205    }
206
207    /// Find edges where A→B has contradicting relation types
208    /// (e.g. both "supports" and "contradicts" for the same pair).
209    pub fn detect_contradictions(conn: &Connection) -> Result<Vec<Conflict>> {
210        // Load all edges from cross_references.
211        let edges = load_all_edges(conn)?;
212
213        // Group by (from_id, to_id).
214        let mut by_pair: HashMap<(i64, i64), Vec<EdgeRow>> = HashMap::new();
215        for edge in edges {
216            by_pair
217                .entry((edge.from_id, edge.to_id))
218                .or_default()
219                .push(edge);
220        }
221
222        let mut conflicts = Vec::new();
223
224        for ((from_id, to_id), group) in &by_pair {
225            let relations: Vec<&str> = group.iter().map(|e| e.relation_type.as_str()).collect();
226
227            for &(a, b) in CONTRADICTING_PAIRS {
228                if relations.contains(&a) && relations.contains(&b) {
229                    let involved_ids: Vec<i64> = group.iter().map(|e| e.id).collect();
230                    conflicts.push(Conflict {
231                        id: 0,
232                        conflict_type: ConflictType::DirectContradiction,
233                        edge_ids: involved_ids,
234                        description: format!(
235                            "Contradicting relations '{}' and '{}' between nodes {} and {}",
236                            a, b, from_id, to_id
237                        ),
238                        severity: Severity::High,
239                        resolved_at: None,
240                        resolution_strategy: None,
241                    });
242                }
243            }
244        }
245
246        Ok(conflicts)
247    }
248
249    /// Find edges with overlapping validity periods for the same entity pair.
250    ///
251    /// Queries the `cross_references` table and treats the `created_at` column
252    /// as a proxy for validity start. If two edges share the same
253    /// `(from_id, to_id, relation_type)` triple, that is considered a temporal
254    /// inconsistency — one should have been closed when the next was created.
255    pub fn detect_temporal_inconsistencies(conn: &Connection) -> Result<Vec<Conflict>> {
256        // Self-join: same triple, both are unresolved / open, different IDs.
257        let sql = "
258            SELECT a.id, b.id, a.from_id, a.to_id, a.relation_type
259            FROM   cross_references a
260            JOIN   cross_references b
261              ON   a.from_id       = b.from_id
262             AND   a.to_id         = b.to_id
263             AND   a.relation_type = b.relation_type
264             AND   a.id < b.id
265        ";
266
267        let table_exists = table_exists(conn, "cross_references")?;
268        if !table_exists {
269            return Ok(Vec::new());
270        }
271
272        let mut stmt = conn.prepare(sql).map_err(EngramError::Database)?;
273
274        let pairs = stmt
275            .query_map([], |row| {
276                Ok((
277                    row.get::<_, i64>(0)?,
278                    row.get::<_, i64>(1)?,
279                    row.get::<_, i64>(2)?,
280                    row.get::<_, i64>(3)?,
281                    row.get::<_, String>(4)?,
282                ))
283            })
284            .map_err(EngramError::Database)?
285            .collect::<rusqlite::Result<Vec<_>>>()
286            .map_err(EngramError::Database)?;
287
288        let conflicts = pairs
289            .into_iter()
290            .map(|(id_a, id_b, from_id, to_id, rel)| Conflict {
291                id: 0,
292                conflict_type: ConflictType::TemporalInconsistency,
293                edge_ids: vec![id_a, id_b],
294                description: format!(
295                    "Duplicate '{}' edges between nodes {} and {} (ids {} and {}); possible temporal overlap",
296                    rel, from_id, to_id, id_a, id_b
297                ),
298                severity: Severity::Medium,
299                resolved_at: None,
300                resolution_strategy: None,
301            })
302            .collect();
303
304        Ok(conflicts)
305    }
306
307    /// Detect cycles in the directed edge graph using iterative DFS.
308    ///
309    /// Returns one conflict per cycle found, listing the edge IDs that form
310    /// that cycle.
311    pub fn detect_cycles(conn: &Connection) -> Result<Vec<Conflict>> {
312        let table_exists = table_exists(conn, "cross_references")?;
313        if !table_exists {
314            return Ok(Vec::new());
315        }
316
317        let edges = load_all_edges(conn)?;
318
319        // Build adjacency list: from_id -> Vec<(to_id, edge_id)>
320        let mut adj: HashMap<i64, Vec<(i64, i64)>> = HashMap::new();
321        for edge in &edges {
322            adj.entry(edge.from_id)
323                .or_default()
324                .push((edge.to_id, edge.id));
325        }
326
327        // Build edge lookup: (from_id, to_id) -> edge_id for path reconstruction.
328        let mut edge_map: HashMap<(i64, i64), i64> = HashMap::new();
329        for edge in &edges {
330            edge_map.insert((edge.from_id, edge.to_id), edge.id);
331        }
332
333        let all_nodes: HashSet<i64> = edges.iter().flat_map(|e| [e.from_id, e.to_id]).collect();
334
335        let mut visited: HashSet<i64> = HashSet::new();
336        let mut rec_stack: HashSet<i64> = HashSet::new();
337        let mut conflicts = Vec::new();
338
339        for &start in &all_nodes {
340            if !visited.contains(&start) {
341                dfs_detect_cycle(
342                    start,
343                    &adj,
344                    &edge_map,
345                    &mut visited,
346                    &mut rec_stack,
347                    &mut conflicts,
348                );
349            }
350        }
351
352        Ok(conflicts)
353    }
354
355    /// Find edges whose `from_id` or `to_id` do not exist in the `memories`
356    /// table.
357    pub fn detect_orphans(conn: &Connection) -> Result<Vec<Conflict>> {
358        let cr_exists = table_exists(conn, "cross_references")?;
359        let mem_exists = table_exists(conn, "memories")?;
360
361        if !cr_exists || !mem_exists {
362            return Ok(Vec::new());
363        }
364
365        let sql = "
366            SELECT cr.id, cr.from_id, cr.to_id
367            FROM   cross_references cr
368            WHERE  NOT EXISTS (SELECT 1 FROM memories m WHERE m.id = cr.from_id)
369               OR  NOT EXISTS (SELECT 1 FROM memories m WHERE m.id = cr.to_id)
370        ";
371
372        let mut stmt = conn.prepare(sql).map_err(EngramError::Database)?;
373
374        let rows = stmt
375            .query_map([], |row| {
376                Ok((
377                    row.get::<_, i64>(0)?,
378                    row.get::<_, i64>(1)?,
379                    row.get::<_, i64>(2)?,
380                ))
381            })
382            .map_err(EngramError::Database)?
383            .collect::<rusqlite::Result<Vec<_>>>()
384            .map_err(EngramError::Database)?;
385
386        let conflicts = rows
387            .into_iter()
388            .map(|(edge_id, from_id, to_id)| Conflict {
389                id: 0,
390                conflict_type: ConflictType::OrphanedReference,
391                edge_ids: vec![edge_id],
392                description: format!(
393                    "Edge {} references non-existent memory node(s): from_id={}, to_id={}",
394                    edge_id, from_id, to_id
395                ),
396                severity: Severity::Critical,
397                resolved_at: None,
398                resolution_strategy: None,
399            })
400            .collect();
401
402        Ok(conflicts)
403    }
404}
405
406// =============================================================================
407// ConflictResolver
408// =============================================================================
409
410/// Resolves conflicts and persists them to the `graph_conflicts` table.
411pub struct ConflictResolver;
412
413impl ConflictResolver {
414    /// Resolve a saved conflict by its ID using the given strategy.
415    pub fn resolve(
416        conn: &Connection,
417        conflict_id: i64,
418        strategy: ResolutionStrategy,
419    ) -> Result<ResolutionResult> {
420        let conflict = Self::get_conflict(conn, conflict_id)?
421            .ok_or_else(|| EngramError::NotFound(conflict_id))?;
422
423        if conflict.resolved_at.is_some() {
424            return Err(EngramError::InvalidInput(format!(
425                "Conflict {} is already resolved",
426                conflict_id
427            )));
428        }
429
430        let edge_ids = &conflict.edge_ids;
431
432        let (edges_removed, edges_kept) = match strategy {
433            ResolutionStrategy::KeepNewer => resolve_keep_newer(conn, edge_ids)?,
434            ResolutionStrategy::KeepHigherConfidence => {
435                resolve_keep_higher_confidence(conn, edge_ids)?
436            }
437            ResolutionStrategy::Merge => resolve_merge(conn, edge_ids)?,
438            ResolutionStrategy::Manual => {
439                // No edge modifications — just mark resolved.
440                (Vec::new(), edge_ids.clone())
441            }
442        };
443
444        // Mark the conflict as resolved.
445        let now = chrono_now();
446        conn.execute(
447            "UPDATE graph_conflicts
448             SET resolved_at = ?1, resolution_strategy = ?2
449             WHERE id = ?3",
450            params![now, strategy.as_str(), conflict_id],
451        )
452        .map_err(EngramError::Database)?;
453
454        Ok(ResolutionResult {
455            conflict_id,
456            strategy,
457            edges_removed,
458            edges_kept,
459        })
460    }
461
462    /// Persist a detected conflict to the `graph_conflicts` table.
463    ///
464    /// Returns the generated row ID.
465    pub fn save_conflict(conn: &Connection, conflict: &Conflict) -> Result<i64> {
466        let edge_ids_json = serde_json::to_string(&conflict.edge_ids)?;
467        let resolution_strategy = conflict.resolution_strategy.as_ref().map(|s| s.as_str());
468
469        conn.execute(
470            "INSERT INTO graph_conflicts
471                 (conflict_type, edge_ids, description, severity, resolved_at, resolution_strategy)
472             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
473            params![
474                conflict.conflict_type.as_str(),
475                edge_ids_json,
476                conflict.description,
477                conflict.severity.as_str(),
478                conflict.resolved_at,
479                resolution_strategy,
480            ],
481        )
482        .map_err(EngramError::Database)?;
483
484        Ok(conn.last_insert_rowid())
485    }
486
487    /// List conflicts from the `graph_conflicts` table.
488    ///
489    /// - `resolved = Some(true)`  — only resolved conflicts.
490    /// - `resolved = Some(false)` — only unresolved conflicts.
491    /// - `resolved = None`        — all conflicts.
492    pub fn list_conflicts(conn: &Connection, resolved: Option<bool>) -> Result<Vec<Conflict>> {
493        let sql = match resolved {
494            Some(true) => {
495                "SELECT id, conflict_type, edge_ids, description, severity,
496                        resolved_at, resolution_strategy
497                 FROM   graph_conflicts
498                 WHERE  resolved_at IS NOT NULL
499                 ORDER  BY id ASC"
500            }
501            Some(false) => {
502                "SELECT id, conflict_type, edge_ids, description, severity,
503                        resolved_at, resolution_strategy
504                 FROM   graph_conflicts
505                 WHERE  resolved_at IS NULL
506                 ORDER  BY id ASC"
507            }
508            None => {
509                "SELECT id, conflict_type, edge_ids, description, severity,
510                        resolved_at, resolution_strategy
511                 FROM   graph_conflicts
512                 ORDER  BY id ASC"
513            }
514        };
515
516        let mut stmt = conn.prepare(sql).map_err(EngramError::Database)?;
517
518        let rows = stmt
519            .query_map([], row_to_conflict)
520            .map_err(EngramError::Database)?
521            .collect::<rusqlite::Result<Vec<_>>>()
522            .map_err(EngramError::Database)?;
523
524        Ok(rows)
525    }
526
527    /// Retrieve a single conflict by ID.
528    pub fn get_conflict(conn: &Connection, id: i64) -> Result<Option<Conflict>> {
529        let mut stmt = conn
530            .prepare(
531                "SELECT id, conflict_type, edge_ids, description, severity,
532                        resolved_at, resolution_strategy
533                 FROM   graph_conflicts
534                 WHERE  id = ?1",
535            )
536            .map_err(EngramError::Database)?;
537
538        let mut rows = stmt
539            .query_map(params![id], row_to_conflict)
540            .map_err(EngramError::Database)?;
541
542        match rows.next() {
543            Some(row) => Ok(Some(row.map_err(EngramError::Database)?)),
544            None => Ok(None),
545        }
546    }
547}
548
549// =============================================================================
550// Resolution helpers (private)
551// =============================================================================
552
553/// Keep the edge with the highest ID (most recently inserted) and remove the
554/// rest.  Returns `(removed, kept)`.
555fn resolve_keep_newer(conn: &Connection, edge_ids: &[i64]) -> Result<(Vec<i64>, Vec<i64>)> {
556    if edge_ids.is_empty() {
557        return Ok((Vec::new(), Vec::new()));
558    }
559
560    // Load creation timestamps from cross_references.
561    let mut id_times: Vec<(i64, String)> = edge_ids
562        .iter()
563        .filter_map(|&id| {
564            let ts: rusqlite::Result<String> = conn.query_row(
565                "SELECT created_at FROM cross_references WHERE id = ?1",
566                params![id],
567                |r| r.get(0),
568            );
569            ts.ok().map(|t| (id, t))
570        })
571        .collect();
572
573    // Sort ascending; the last element is the newest.
574    id_times.sort_by(|a, b| a.1.cmp(&b.1));
575
576    if id_times.is_empty() {
577        return Ok((Vec::new(), edge_ids.to_vec()));
578    }
579
580    let newest_id = id_times.last().unwrap().0;
581    let to_remove: Vec<i64> = id_times
582        .iter()
583        .filter(|(id, _)| *id != newest_id)
584        .map(|(id, _)| *id)
585        .collect();
586
587    for &id in &to_remove {
588        conn.execute("DELETE FROM cross_references WHERE id = ?1", params![id])
589            .map_err(EngramError::Database)?;
590    }
591
592    Ok((to_remove, vec![newest_id]))
593}
594
595/// Keep the edge with the highest `strength` (confidence proxy) and remove the
596/// rest.  Returns `(removed, kept)`.
597fn resolve_keep_higher_confidence(
598    conn: &Connection,
599    edge_ids: &[i64],
600) -> Result<(Vec<i64>, Vec<i64>)> {
601    if edge_ids.is_empty() {
602        return Ok((Vec::new(), Vec::new()));
603    }
604
605    // Load strength from cross_references.
606    let mut id_strengths: Vec<(i64, f64)> = edge_ids
607        .iter()
608        .filter_map(|&id| {
609            let s: rusqlite::Result<f64> = conn.query_row(
610                "SELECT strength FROM cross_references WHERE id = ?1",
611                params![id],
612                |r| r.get(0),
613            );
614            s.ok().map(|strength| (id, strength))
615        })
616        .collect();
617
618    // Sort ascending; last element has highest strength.
619    id_strengths.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
620
621    if id_strengths.is_empty() {
622        return Ok((Vec::new(), edge_ids.to_vec()));
623    }
624
625    let best_id = id_strengths.last().unwrap().0;
626    let to_remove: Vec<i64> = id_strengths
627        .iter()
628        .filter(|(id, _)| *id != best_id)
629        .map(|(id, _)| *id)
630        .collect();
631
632    for &id in &to_remove {
633        conn.execute("DELETE FROM cross_references WHERE id = ?1", params![id])
634            .map_err(EngramError::Database)?;
635    }
636
637    Ok((to_remove, vec![best_id]))
638}
639
640/// Merge edges: keep the one with the highest strength, update its metadata to
641/// be the JSON merge of all involved edges, then delete the rest.
642/// Returns `(removed, kept)`.
643fn resolve_merge(conn: &Connection, edge_ids: &[i64]) -> Result<(Vec<i64>, Vec<i64>)> {
644    if edge_ids.is_empty() {
645        return Ok((Vec::new(), Vec::new()));
646    }
647
648    // Load all edge metadata.
649    let mut rows: Vec<(i64, f64, String)> = edge_ids
650        .iter()
651        .filter_map(|&id| {
652            conn.query_row(
653                "SELECT id, strength, metadata FROM cross_references WHERE id = ?1",
654                params![id],
655                |r| {
656                    Ok((
657                        r.get::<_, i64>(0)?,
658                        r.get::<_, f64>(1)?,
659                        r.get::<_, String>(2)?,
660                    ))
661                },
662            )
663            .ok()
664        })
665        .collect();
666
667    if rows.is_empty() {
668        return Ok((Vec::new(), edge_ids.to_vec()));
669    }
670
671    // Sort by strength desc; first element is the keeper.
672    rows.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
673    let (keep_id, keep_strength, keep_meta_str) = rows.remove(0);
674
675    // Merge metadata JSON objects.
676    let mut merged: serde_json::Map<String, serde_json::Value> =
677        serde_json::from_str(&keep_meta_str).unwrap_or_default();
678
679    for (_, _, meta_str) in &rows {
680        if let Ok(serde_json::Value::Object(extra)) = serde_json::from_str(meta_str) {
681            for (k, v) in extra {
682                merged.entry(k).or_insert(v);
683            }
684        }
685    }
686
687    let merged_str = serde_json::to_string(&serde_json::Value::Object(merged))?;
688
689    conn.execute(
690        "UPDATE cross_references SET metadata = ?1, strength = ?2 WHERE id = ?3",
691        params![merged_str, keep_strength, keep_id],
692    )
693    .map_err(EngramError::Database)?;
694
695    let to_remove: Vec<i64> = rows.iter().map(|(id, _, _)| *id).collect();
696
697    for &id in &to_remove {
698        conn.execute("DELETE FROM cross_references WHERE id = ?1", params![id])
699            .map_err(EngramError::Database)?;
700    }
701
702    Ok((to_remove, vec![keep_id]))
703}
704
705// =============================================================================
706// Private helpers
707// =============================================================================
708
709/// Minimal representation of a row in `cross_references`.
710#[derive(Debug)]
711struct EdgeRow {
712    id: i64,
713    from_id: i64,
714    to_id: i64,
715    relation_type: String,
716    /// Stored for temporal ordering; not read directly in Rust but used in SQL
717    /// ordering.
718    #[allow(dead_code)]
719    created_at: String,
720}
721
722/// Load all rows from `cross_references`. Returns empty vec if table does not
723/// exist.
724fn load_all_edges(conn: &Connection) -> Result<Vec<EdgeRow>> {
725    if !table_exists(conn, "cross_references")? {
726        return Ok(Vec::new());
727    }
728
729    let mut stmt = conn
730        .prepare(
731            "SELECT id, from_id, to_id, relation_type, created_at
732             FROM   cross_references
733             ORDER  BY id ASC",
734        )
735        .map_err(EngramError::Database)?;
736
737    let rows = stmt
738        .query_map([], |row| {
739            Ok(EdgeRow {
740                id: row.get(0)?,
741                from_id: row.get(1)?,
742                to_id: row.get(2)?,
743                relation_type: row.get(3)?,
744                created_at: row.get(4)?,
745            })
746        })
747        .map_err(EngramError::Database)?
748        .collect::<rusqlite::Result<Vec<_>>>()
749        .map_err(EngramError::Database)?;
750
751    Ok(rows)
752}
753
754/// Iterative DFS for cycle detection. Detected cycles are appended to
755/// `conflicts`.
756fn dfs_detect_cycle(
757    start: i64,
758    adj: &HashMap<i64, Vec<(i64, i64)>>,
759    edge_map: &HashMap<(i64, i64), i64>,
760    visited: &mut HashSet<i64>,
761    rec_stack: &mut HashSet<i64>,
762    conflicts: &mut Vec<Conflict>,
763) {
764    // Stack items: (node, index into adj[node], parent_edge_id)
765    let mut stack: Vec<(i64, usize, Option<i64>)> = vec![(start, 0, None)];
766    let mut path: Vec<i64> = vec![start];
767    let mut path_set: HashSet<i64> = {
768        let mut s = HashSet::new();
769        s.insert(start);
770        s
771    };
772
773    visited.insert(start);
774    rec_stack.insert(start);
775
776    while let Some((node, idx, _parent_edge)) = stack.last_mut() {
777        let node = *node;
778        let neighbors = adj.get(&node).map(|v| v.as_slice()).unwrap_or(&[]);
779
780        if *idx < neighbors.len() {
781            let (neighbor, edge_id) = neighbors[*idx];
782            *idx += 1;
783
784            if !visited.contains(&neighbor) {
785                visited.insert(neighbor);
786                rec_stack.insert(neighbor);
787                path.push(neighbor);
788                path_set.insert(neighbor);
789                stack.push((neighbor, 0, Some(edge_id)));
790            } else if rec_stack.contains(&neighbor) {
791                // Cycle detected — collect the edge IDs that form the cycle.
792                let cycle_start_pos = path.iter().position(|&n| n == neighbor).unwrap_or(0);
793                let cycle_nodes = &path[cycle_start_pos..];
794                let mut cycle_edge_ids: Vec<i64> = Vec::new();
795                for window in cycle_nodes.windows(2) {
796                    if let Some(&eid) = edge_map.get(&(window[0], window[1])) {
797                        cycle_edge_ids.push(eid);
798                    }
799                }
800                // Close the cycle: last node -> neighbor
801                if let Some(&eid) =
802                    edge_map.get(&(*cycle_nodes.last().unwrap_or(&neighbor), neighbor))
803                {
804                    cycle_edge_ids.push(eid);
805                }
806
807                if !cycle_edge_ids.is_empty() {
808                    conflicts.push(Conflict {
809                        id: 0,
810                        conflict_type: ConflictType::CyclicDependency,
811                        edge_ids: cycle_edge_ids.clone(),
812                        description: format!("Cycle detected involving nodes: {:?}", cycle_nodes),
813                        severity: Severity::Medium,
814                        resolved_at: None,
815                        resolution_strategy: None,
816                    });
817                }
818            }
819        } else {
820            // Done with this node — pop.
821            stack.pop();
822            path.pop();
823            path_set.remove(&node);
824            rec_stack.remove(&node);
825        }
826    }
827}
828
829/// Check whether a table exists in the current SQLite database.
830fn table_exists(conn: &Connection, name: &str) -> Result<bool> {
831    let count: i64 = conn
832        .query_row(
833            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
834            params![name],
835            |r| r.get(0),
836        )
837        .map_err(EngramError::Database)?;
838    Ok(count > 0)
839}
840
841/// Map a rusqlite row to a `Conflict`.
842fn row_to_conflict(row: &rusqlite::Row<'_>) -> rusqlite::Result<Conflict> {
843    let id: i64 = row.get(0)?;
844    let conflict_type_str: String = row.get(1)?;
845    let edge_ids_str: String = row.get(2)?;
846    let description: String = row.get(3)?;
847    let severity_str: String = row.get(4)?;
848    let resolved_at: Option<String> = row.get(5)?;
849    let resolution_strategy_str: Option<String> = row.get(6)?;
850
851    let conflict_type =
852        ConflictType::from_str(&conflict_type_str).unwrap_or(ConflictType::DirectContradiction);
853    let edge_ids: Vec<i64> = serde_json::from_str(&edge_ids_str).unwrap_or_default();
854    let severity = Severity::from_str(&severity_str).unwrap_or(Severity::Low);
855    let resolution_strategy = resolution_strategy_str
856        .as_deref()
857        .and_then(ResolutionStrategy::from_str);
858
859    Ok(Conflict {
860        id,
861        conflict_type,
862        edge_ids,
863        description,
864        severity,
865        resolved_at,
866        resolution_strategy,
867    })
868}
869
870/// Return the current UTC timestamp in RFC3339 format.
871fn chrono_now() -> String {
872    use std::time::{SystemTime, UNIX_EPOCH};
873    // Use chrono if available; otherwise fall back to a formatted timestamp.
874    let secs = SystemTime::now()
875        .duration_since(UNIX_EPOCH)
876        .unwrap_or_default()
877        .as_secs();
878    let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs as i64, 0)
879        .unwrap_or(chrono::Utc::now());
880    dt.format("%Y-%m-%dT%H:%M:%SZ").to_string()
881}
882
883// =============================================================================
884// Tests
885// =============================================================================
886
887#[cfg(test)]
888mod tests {
889    use super::*;
890    use rusqlite::Connection;
891
892    // -------------------------------------------------------------------------
893    // Helpers
894    // -------------------------------------------------------------------------
895
896    const CREATE_CROSS_REFS: &str = "
897        CREATE TABLE IF NOT EXISTS cross_references (
898            id              INTEGER PRIMARY KEY AUTOINCREMENT,
899            from_id         INTEGER NOT NULL,
900            to_id           INTEGER NOT NULL,
901            relation_type   TEXT    NOT NULL DEFAULT 'related',
902            strength        REAL    NOT NULL DEFAULT 0.5,
903            metadata        TEXT    DEFAULT '{}',
904            created_at      TEXT    NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
905        );
906    ";
907
908    const CREATE_MEMORIES: &str = "
909        CREATE TABLE IF NOT EXISTS memories (
910            id         INTEGER PRIMARY KEY AUTOINCREMENT,
911            content    TEXT    NOT NULL,
912            created_at TEXT    NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
913        );
914    ";
915
916    fn setup_db() -> Connection {
917        let conn = Connection::open_in_memory().expect("open in-memory DB");
918        conn.execute_batch(CREATE_CROSS_REFS)
919            .expect("create cross_references");
920        conn.execute_batch(CREATE_MEMORIES)
921            .expect("create memories");
922        conn.execute_batch(CREATE_CONFLICTS_TABLE)
923            .expect("create graph_conflicts");
924        conn
925    }
926
927    fn insert_edge(conn: &Connection, from_id: i64, to_id: i64, rel: &str, strength: f64) -> i64 {
928        conn.execute(
929            "INSERT INTO cross_references (from_id, to_id, relation_type, strength)
930             VALUES (?1, ?2, ?3, ?4)",
931            params![from_id, to_id, rel, strength],
932        )
933        .expect("insert edge");
934        conn.last_insert_rowid()
935    }
936
937    fn insert_memory(conn: &Connection, id: i64) {
938        conn.execute(
939            "INSERT INTO memories (id, content) VALUES (?1, 'test')",
940            params![id],
941        )
942        .expect("insert memory");
943    }
944
945    // -------------------------------------------------------------------------
946    // Test 1: detect_contradictions — finds contradicting relation pair
947    // -------------------------------------------------------------------------
948    #[test]
949    fn test_detect_contradiction() {
950        let conn = setup_db();
951
952        // A --supports--> B
953        insert_edge(&conn, 1, 2, "supports", 0.8);
954        // A --contradicts--> B  (same pair, contradicting semantics)
955        insert_edge(&conn, 1, 2, "contradicts", 0.8);
956
957        let conflicts =
958            ConflictDetector::detect_contradictions(&conn).expect("detect_contradictions");
959
960        assert_eq!(conflicts.len(), 1);
961        assert_eq!(
962            conflicts[0].conflict_type,
963            ConflictType::DirectContradiction
964        );
965        assert_eq!(conflicts[0].severity, Severity::High);
966        assert!(conflicts[0].edge_ids.len() >= 2);
967        assert!(conflicts[0].description.contains("Contradicting"));
968    }
969
970    // -------------------------------------------------------------------------
971    // Test 2: detect_temporal_inconsistencies — duplicate triple
972    // -------------------------------------------------------------------------
973    #[test]
974    fn test_detect_temporal_inconsistency() {
975        let conn = setup_db();
976
977        // Two edges for the exact same (from, to, relation) triple.
978        let id_a = insert_edge(&conn, 10, 20, "works_at", 0.9);
979        let id_b = insert_edge(&conn, 10, 20, "works_at", 0.7);
980
981        let conflicts = ConflictDetector::detect_temporal_inconsistencies(&conn)
982            .expect("detect_temporal_inconsistencies");
983
984        assert_eq!(conflicts.len(), 1);
985        assert_eq!(
986            conflicts[0].conflict_type,
987            ConflictType::TemporalInconsistency
988        );
989        assert_eq!(conflicts[0].severity, Severity::Medium);
990        assert!(conflicts[0].edge_ids.contains(&id_a));
991        assert!(conflicts[0].edge_ids.contains(&id_b));
992    }
993
994    // -------------------------------------------------------------------------
995    // Test 3: detect_cycles — simple A→B→C→A cycle
996    // -------------------------------------------------------------------------
997    #[test]
998    fn test_detect_cycle() {
999        let conn = setup_db();
1000
1001        // A→B, B→C, C→A forms a cycle.
1002        insert_edge(&conn, 1, 2, "depends_on", 0.9);
1003        insert_edge(&conn, 2, 3, "depends_on", 0.9);
1004        insert_edge(&conn, 3, 1, "depends_on", 0.9); // closes the cycle
1005
1006        let conflicts = ConflictDetector::detect_cycles(&conn).expect("detect_cycles");
1007
1008        assert!(
1009            !conflicts.is_empty(),
1010            "expected at least one cycle conflict"
1011        );
1012        assert_eq!(conflicts[0].conflict_type, ConflictType::CyclicDependency);
1013        assert!(conflicts[0].description.contains("Cycle"));
1014    }
1015
1016    // -------------------------------------------------------------------------
1017    // Test 4: detect_orphans — edge references missing memory
1018    // -------------------------------------------------------------------------
1019    #[test]
1020    fn test_detect_orphan() {
1021        let conn = setup_db();
1022
1023        // Only memory 1 exists; edge references memory 99 which doesn't exist.
1024        insert_memory(&conn, 1);
1025        let edge_id = insert_edge(&conn, 1, 99, "related", 0.5); // to_id=99 is orphan
1026
1027        let conflicts = ConflictDetector::detect_orphans(&conn).expect("detect_orphans");
1028
1029        assert_eq!(conflicts.len(), 1);
1030        assert_eq!(conflicts[0].conflict_type, ConflictType::OrphanedReference);
1031        assert_eq!(conflicts[0].severity, Severity::Critical);
1032        assert!(conflicts[0].edge_ids.contains(&edge_id));
1033    }
1034
1035    // -------------------------------------------------------------------------
1036    // Test 5: resolve with KeepNewer removes older edges
1037    // -------------------------------------------------------------------------
1038    #[test]
1039    fn test_resolve_keep_newer() {
1040        let conn = setup_db();
1041
1042        let id_old = insert_edge(&conn, 5, 6, "supports", 0.5);
1043        // Ensure the second edge has a later created_at by updating it.
1044        conn.execute(
1045            "UPDATE cross_references SET created_at = '2099-01-01T00:00:00.000Z' WHERE id = ?1",
1046            params![id_old + 1],
1047        )
1048        .ok();
1049        let id_new = insert_edge(&conn, 5, 6, "supports", 0.5);
1050        // Make the new edge newer.
1051        conn.execute(
1052            "UPDATE cross_references SET created_at = '2099-01-02T00:00:00.000Z' WHERE id = ?1",
1053            params![id_new],
1054        )
1055        .expect("update ts");
1056
1057        // Save a conflict manually.
1058        let conflict = Conflict {
1059            id: 0,
1060            conflict_type: ConflictType::TemporalInconsistency,
1061            edge_ids: vec![id_old, id_new],
1062            description: "duplicate triple".to_string(),
1063            severity: Severity::Medium,
1064            resolved_at: None,
1065            resolution_strategy: None,
1066        };
1067        let cid = ConflictResolver::save_conflict(&conn, &conflict).expect("save");
1068
1069        let result =
1070            ConflictResolver::resolve(&conn, cid, ResolutionStrategy::KeepNewer).expect("resolve");
1071
1072        assert_eq!(result.conflict_id, cid);
1073        assert_eq!(result.strategy, ResolutionStrategy::KeepNewer);
1074        assert_eq!(result.edges_removed.len(), 1);
1075        assert_eq!(result.edges_kept.len(), 1);
1076        assert!(result.edges_kept.contains(&id_new));
1077        assert!(result.edges_removed.contains(&id_old));
1078
1079        // Verify the conflict is marked resolved.
1080        let saved = ConflictResolver::get_conflict(&conn, cid)
1081            .expect("get")
1082            .expect("exists");
1083        assert!(saved.resolved_at.is_some());
1084    }
1085
1086    // -------------------------------------------------------------------------
1087    // Test 6: no conflicts when graph is clean
1088    // -------------------------------------------------------------------------
1089    #[test]
1090    fn test_no_conflicts_clean_graph() {
1091        let conn = setup_db();
1092
1093        // Insert valid memories and non-contradicting edges.
1094        insert_memory(&conn, 1);
1095        insert_memory(&conn, 2);
1096        insert_memory(&conn, 3);
1097        insert_edge(&conn, 1, 2, "supports", 0.9);
1098        insert_edge(&conn, 2, 3, "related", 0.7);
1099
1100        let all = ConflictDetector::detect_all(&conn).expect("detect_all");
1101
1102        // No cycles (1→2→3, no back-edge), no orphans, no contradictions, no temporal.
1103        assert!(all.is_empty(), "expected zero conflicts, got: {:?}", all);
1104    }
1105
1106    // -------------------------------------------------------------------------
1107    // Test 7: save and list conflicts
1108    // -------------------------------------------------------------------------
1109    #[test]
1110    fn test_save_and_list_conflicts() {
1111        let conn = setup_db();
1112
1113        let c1 = Conflict {
1114            id: 0,
1115            conflict_type: ConflictType::DirectContradiction,
1116            edge_ids: vec![1, 2],
1117            description: "supports vs contradicts".to_string(),
1118            severity: Severity::High,
1119            resolved_at: None,
1120            resolution_strategy: None,
1121        };
1122        let c2 = Conflict {
1123            id: 0,
1124            conflict_type: ConflictType::OrphanedReference,
1125            edge_ids: vec![3],
1126            description: "missing node 99".to_string(),
1127            severity: Severity::Critical,
1128            resolved_at: None,
1129            resolution_strategy: None,
1130        };
1131
1132        let id1 = ConflictResolver::save_conflict(&conn, &c1).expect("save c1");
1133        let id2 = ConflictResolver::save_conflict(&conn, &c2).expect("save c2");
1134
1135        let all = ConflictResolver::list_conflicts(&conn, None).expect("list all");
1136        assert_eq!(all.len(), 2);
1137
1138        let unresolved =
1139            ConflictResolver::list_conflicts(&conn, Some(false)).expect("list unresolved");
1140        assert_eq!(unresolved.len(), 2);
1141
1142        let resolved = ConflictResolver::list_conflicts(&conn, Some(true)).expect("list resolved");
1143        assert_eq!(resolved.len(), 0);
1144
1145        // Verify we can retrieve by ID.
1146        let fetched = ConflictResolver::get_conflict(&conn, id1)
1147            .expect("get c1")
1148            .expect("exists");
1149        assert_eq!(fetched.conflict_type, ConflictType::DirectContradiction);
1150        assert_eq!(fetched.severity, Severity::High);
1151
1152        let fetched2 = ConflictResolver::get_conflict(&conn, id2)
1153            .expect("get c2")
1154            .expect("exists");
1155        assert_eq!(fetched2.conflict_type, ConflictType::OrphanedReference);
1156    }
1157
1158    // -------------------------------------------------------------------------
1159    // Test 8: multiple conflict types in one scan
1160    // -------------------------------------------------------------------------
1161    #[test]
1162    fn test_detect_all_multiple_types() {
1163        let conn = setup_db();
1164
1165        // Memory 1 exists, 99 does not → orphan.
1166        insert_memory(&conn, 1);
1167        insert_memory(&conn, 2);
1168
1169        // Contradiction: supports + contradicts between same pair.
1170        insert_edge(&conn, 1, 2, "supports", 0.8);
1171        insert_edge(&conn, 1, 2, "contradicts", 0.6);
1172
1173        // Orphan: edge references non-existent memory 99.
1174        insert_edge(&conn, 1, 99, "related", 0.5);
1175
1176        let all = ConflictDetector::detect_all(&conn).expect("detect_all");
1177
1178        let types: Vec<&ConflictType> = all.iter().map(|c| &c.conflict_type).collect();
1179
1180        assert!(
1181            types.contains(&&ConflictType::DirectContradiction),
1182            "expected DirectContradiction in {:?}",
1183            types
1184        );
1185        assert!(
1186            types.contains(&&ConflictType::OrphanedReference),
1187            "expected OrphanedReference in {:?}",
1188            types
1189        );
1190    }
1191
1192    // -------------------------------------------------------------------------
1193    // Test 9: resolve already-resolved conflict returns error
1194    // -------------------------------------------------------------------------
1195    #[test]
1196    fn test_resolve_already_resolved_returns_error() {
1197        let conn = setup_db();
1198
1199        let id_a = insert_edge(&conn, 5, 6, "rel", 0.5);
1200        let id_b = insert_edge(&conn, 5, 6, "rel", 0.5);
1201
1202        let conflict = Conflict {
1203            id: 0,
1204            conflict_type: ConflictType::TemporalInconsistency,
1205            edge_ids: vec![id_a, id_b],
1206            description: "dup".to_string(),
1207            severity: Severity::Medium,
1208            resolved_at: None,
1209            resolution_strategy: None,
1210        };
1211        let cid = ConflictResolver::save_conflict(&conn, &conflict).expect("save");
1212
1213        // First resolution should succeed.
1214        ConflictResolver::resolve(&conn, cid, ResolutionStrategy::Manual).expect("first resolve");
1215
1216        // Second resolution on the same conflict should fail.
1217        let result = ConflictResolver::resolve(&conn, cid, ResolutionStrategy::Manual);
1218        assert!(result.is_err(), "expected error on double-resolve");
1219    }
1220}