1use rusqlite::{params, Connection};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13
14use crate::error::{EngramError, Result};
15
16pub const CREATE_CONFLICTS_TABLE: &str = r#"
24CREATE TABLE IF NOT EXISTS graph_conflicts (
25 id INTEGER PRIMARY KEY AUTOINCREMENT,
26 conflict_type TEXT NOT NULL,
27 edge_ids TEXT NOT NULL DEFAULT '[]',
28 description TEXT NOT NULL,
29 severity TEXT NOT NULL,
30 resolved_at TEXT,
31 resolution_strategy TEXT,
32 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
33);
34CREATE INDEX IF NOT EXISTS idx_graph_conflicts_type ON graph_conflicts(conflict_type);
35CREATE INDEX IF NOT EXISTS idx_graph_conflicts_severity ON graph_conflicts(severity);
36CREATE INDEX IF NOT EXISTS idx_graph_conflicts_resolved ON graph_conflicts(resolved_at);
37"#;
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum ConflictType {
47 DirectContradiction,
50 TemporalInconsistency,
53 CyclicDependency,
55 OrphanedReference,
58}
59
60impl ConflictType {
61 fn as_str(&self) -> &'static str {
62 match self {
63 ConflictType::DirectContradiction => "direct_contradiction",
64 ConflictType::TemporalInconsistency => "temporal_inconsistency",
65 ConflictType::CyclicDependency => "cyclic_dependency",
66 ConflictType::OrphanedReference => "orphaned_reference",
67 }
68 }
69
70 fn from_str(s: &str) -> Option<Self> {
71 match s {
72 "direct_contradiction" => Some(ConflictType::DirectContradiction),
73 "temporal_inconsistency" => Some(ConflictType::TemporalInconsistency),
74 "cyclic_dependency" => Some(ConflictType::CyclicDependency),
75 "orphaned_reference" => Some(ConflictType::OrphanedReference),
76 _ => None,
77 }
78 }
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
83#[serde(rename_all = "snake_case")]
84pub enum Severity {
85 Low,
86 Medium,
87 High,
88 Critical,
89}
90
91impl Severity {
92 fn as_str(&self) -> &'static str {
93 match self {
94 Severity::Low => "low",
95 Severity::Medium => "medium",
96 Severity::High => "high",
97 Severity::Critical => "critical",
98 }
99 }
100
101 fn from_str(s: &str) -> Option<Self> {
102 match s {
103 "low" => Some(Severity::Low),
104 "medium" => Some(Severity::Medium),
105 "high" => Some(Severity::High),
106 "critical" => Some(Severity::Critical),
107 _ => None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct Conflict {
115 pub id: i64,
117 pub conflict_type: ConflictType,
119 pub edge_ids: Vec<i64>,
121 pub description: String,
123 pub severity: Severity,
125 pub resolved_at: Option<String>,
127 pub resolution_strategy: Option<ResolutionStrategy>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum ResolutionStrategy {
135 KeepNewer,
137 KeepHigherConfidence,
139 Merge,
141 Manual,
143}
144
145impl ResolutionStrategy {
146 fn as_str(&self) -> &'static str {
147 match self {
148 ResolutionStrategy::KeepNewer => "keep_newer",
149 ResolutionStrategy::KeepHigherConfidence => "keep_higher_confidence",
150 ResolutionStrategy::Merge => "merge",
151 ResolutionStrategy::Manual => "manual",
152 }
153 }
154
155 fn from_str(s: &str) -> Option<Self> {
156 match s {
157 "keep_newer" => Some(ResolutionStrategy::KeepNewer),
158 "keep_higher_confidence" => Some(ResolutionStrategy::KeepHigherConfidence),
159 "merge" => Some(ResolutionStrategy::Merge),
160 "manual" => Some(ResolutionStrategy::Manual),
161 _ => None,
162 }
163 }
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct ResolutionResult {
169 pub conflict_id: i64,
171 pub strategy: ResolutionStrategy,
173 pub edges_removed: Vec<i64>,
175 pub edges_kept: Vec<i64>,
177}
178
179pub struct ConflictDetector;
185
186const CONTRADICTING_PAIRS: &[(&str, &str)] = &[
188 ("supports", "contradicts"),
189 ("agrees_with", "disagrees_with"),
190 ("confirms", "refutes"),
191 ("approves", "rejects"),
192 ("enables", "prevents"),
193 ("causes", "prevents"),
194];
195
196impl ConflictDetector {
197 pub fn detect_all(conn: &Connection) -> Result<Vec<Conflict>> {
199 let mut conflicts = Vec::new();
200 conflicts.extend(Self::detect_contradictions(conn)?);
201 conflicts.extend(Self::detect_temporal_inconsistencies(conn)?);
202 conflicts.extend(Self::detect_cycles(conn)?);
203 conflicts.extend(Self::detect_orphans(conn)?);
204 Ok(conflicts)
205 }
206
207 pub fn detect_contradictions(conn: &Connection) -> Result<Vec<Conflict>> {
210 let edges = load_all_edges(conn)?;
212
213 let mut by_pair: HashMap<(i64, i64), Vec<EdgeRow>> = HashMap::new();
215 for edge in edges {
216 by_pair
217 .entry((edge.from_id, edge.to_id))
218 .or_default()
219 .push(edge);
220 }
221
222 let mut conflicts = Vec::new();
223
224 for ((from_id, to_id), group) in &by_pair {
225 let relations: Vec<&str> = group.iter().map(|e| e.relation_type.as_str()).collect();
226
227 for &(a, b) in CONTRADICTING_PAIRS {
228 if relations.contains(&a) && relations.contains(&b) {
229 let involved_ids: Vec<i64> = group.iter().map(|e| e.id).collect();
230 conflicts.push(Conflict {
231 id: 0,
232 conflict_type: ConflictType::DirectContradiction,
233 edge_ids: involved_ids,
234 description: format!(
235 "Contradicting relations '{}' and '{}' between nodes {} and {}",
236 a, b, from_id, to_id
237 ),
238 severity: Severity::High,
239 resolved_at: None,
240 resolution_strategy: None,
241 });
242 }
243 }
244 }
245
246 Ok(conflicts)
247 }
248
249 pub fn detect_temporal_inconsistencies(conn: &Connection) -> Result<Vec<Conflict>> {
256 let sql = "
258 SELECT a.id, b.id, a.from_id, a.to_id, a.relation_type
259 FROM cross_references a
260 JOIN cross_references b
261 ON a.from_id = b.from_id
262 AND a.to_id = b.to_id
263 AND a.relation_type = b.relation_type
264 AND a.id < b.id
265 ";
266
267 let table_exists = table_exists(conn, "cross_references")?;
268 if !table_exists {
269 return Ok(Vec::new());
270 }
271
272 let mut stmt = conn.prepare(sql).map_err(EngramError::Database)?;
273
274 let pairs = stmt
275 .query_map([], |row| {
276 Ok((
277 row.get::<_, i64>(0)?,
278 row.get::<_, i64>(1)?,
279 row.get::<_, i64>(2)?,
280 row.get::<_, i64>(3)?,
281 row.get::<_, String>(4)?,
282 ))
283 })
284 .map_err(EngramError::Database)?
285 .collect::<rusqlite::Result<Vec<_>>>()
286 .map_err(EngramError::Database)?;
287
288 let conflicts = pairs
289 .into_iter()
290 .map(|(id_a, id_b, from_id, to_id, rel)| Conflict {
291 id: 0,
292 conflict_type: ConflictType::TemporalInconsistency,
293 edge_ids: vec![id_a, id_b],
294 description: format!(
295 "Duplicate '{}' edges between nodes {} and {} (ids {} and {}); possible temporal overlap",
296 rel, from_id, to_id, id_a, id_b
297 ),
298 severity: Severity::Medium,
299 resolved_at: None,
300 resolution_strategy: None,
301 })
302 .collect();
303
304 Ok(conflicts)
305 }
306
307 pub fn detect_cycles(conn: &Connection) -> Result<Vec<Conflict>> {
312 let table_exists = table_exists(conn, "cross_references")?;
313 if !table_exists {
314 return Ok(Vec::new());
315 }
316
317 let edges = load_all_edges(conn)?;
318
319 let mut adj: HashMap<i64, Vec<(i64, i64)>> = HashMap::new();
321 for edge in &edges {
322 adj.entry(edge.from_id)
323 .or_default()
324 .push((edge.to_id, edge.id));
325 }
326
327 let mut edge_map: HashMap<(i64, i64), i64> = HashMap::new();
329 for edge in &edges {
330 edge_map.insert((edge.from_id, edge.to_id), edge.id);
331 }
332
333 let all_nodes: HashSet<i64> = edges.iter().flat_map(|e| [e.from_id, e.to_id]).collect();
334
335 let mut visited: HashSet<i64> = HashSet::new();
336 let mut rec_stack: HashSet<i64> = HashSet::new();
337 let mut conflicts = Vec::new();
338
339 for &start in &all_nodes {
340 if !visited.contains(&start) {
341 dfs_detect_cycle(
342 start,
343 &adj,
344 &edge_map,
345 &mut visited,
346 &mut rec_stack,
347 &mut conflicts,
348 );
349 }
350 }
351
352 Ok(conflicts)
353 }
354
355 pub fn detect_orphans(conn: &Connection) -> Result<Vec<Conflict>> {
358 let cr_exists = table_exists(conn, "cross_references")?;
359 let mem_exists = table_exists(conn, "memories")?;
360
361 if !cr_exists || !mem_exists {
362 return Ok(Vec::new());
363 }
364
365 let sql = "
366 SELECT cr.id, cr.from_id, cr.to_id
367 FROM cross_references cr
368 WHERE NOT EXISTS (SELECT 1 FROM memories m WHERE m.id = cr.from_id)
369 OR NOT EXISTS (SELECT 1 FROM memories m WHERE m.id = cr.to_id)
370 ";
371
372 let mut stmt = conn.prepare(sql).map_err(EngramError::Database)?;
373
374 let rows = stmt
375 .query_map([], |row| {
376 Ok((
377 row.get::<_, i64>(0)?,
378 row.get::<_, i64>(1)?,
379 row.get::<_, i64>(2)?,
380 ))
381 })
382 .map_err(EngramError::Database)?
383 .collect::<rusqlite::Result<Vec<_>>>()
384 .map_err(EngramError::Database)?;
385
386 let conflicts = rows
387 .into_iter()
388 .map(|(edge_id, from_id, to_id)| Conflict {
389 id: 0,
390 conflict_type: ConflictType::OrphanedReference,
391 edge_ids: vec![edge_id],
392 description: format!(
393 "Edge {} references non-existent memory node(s): from_id={}, to_id={}",
394 edge_id, from_id, to_id
395 ),
396 severity: Severity::Critical,
397 resolved_at: None,
398 resolution_strategy: None,
399 })
400 .collect();
401
402 Ok(conflicts)
403 }
404}
405
406pub struct ConflictResolver;
412
413impl ConflictResolver {
414 pub fn resolve(
416 conn: &Connection,
417 conflict_id: i64,
418 strategy: ResolutionStrategy,
419 ) -> Result<ResolutionResult> {
420 let conflict = Self::get_conflict(conn, conflict_id)?
421 .ok_or_else(|| EngramError::NotFound(conflict_id))?;
422
423 if conflict.resolved_at.is_some() {
424 return Err(EngramError::InvalidInput(format!(
425 "Conflict {} is already resolved",
426 conflict_id
427 )));
428 }
429
430 let edge_ids = &conflict.edge_ids;
431
432 let (edges_removed, edges_kept) = match strategy {
433 ResolutionStrategy::KeepNewer => resolve_keep_newer(conn, edge_ids)?,
434 ResolutionStrategy::KeepHigherConfidence => {
435 resolve_keep_higher_confidence(conn, edge_ids)?
436 }
437 ResolutionStrategy::Merge => resolve_merge(conn, edge_ids)?,
438 ResolutionStrategy::Manual => {
439 (Vec::new(), edge_ids.clone())
441 }
442 };
443
444 let now = chrono_now();
446 conn.execute(
447 "UPDATE graph_conflicts
448 SET resolved_at = ?1, resolution_strategy = ?2
449 WHERE id = ?3",
450 params![now, strategy.as_str(), conflict_id],
451 )
452 .map_err(EngramError::Database)?;
453
454 Ok(ResolutionResult {
455 conflict_id,
456 strategy,
457 edges_removed,
458 edges_kept,
459 })
460 }
461
462 pub fn save_conflict(conn: &Connection, conflict: &Conflict) -> Result<i64> {
466 let edge_ids_json = serde_json::to_string(&conflict.edge_ids)?;
467 let resolution_strategy = conflict.resolution_strategy.as_ref().map(|s| s.as_str());
468
469 conn.execute(
470 "INSERT INTO graph_conflicts
471 (conflict_type, edge_ids, description, severity, resolved_at, resolution_strategy)
472 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
473 params![
474 conflict.conflict_type.as_str(),
475 edge_ids_json,
476 conflict.description,
477 conflict.severity.as_str(),
478 conflict.resolved_at,
479 resolution_strategy,
480 ],
481 )
482 .map_err(EngramError::Database)?;
483
484 Ok(conn.last_insert_rowid())
485 }
486
487 pub fn list_conflicts(conn: &Connection, resolved: Option<bool>) -> Result<Vec<Conflict>> {
493 let sql = match resolved {
494 Some(true) => {
495 "SELECT id, conflict_type, edge_ids, description, severity,
496 resolved_at, resolution_strategy
497 FROM graph_conflicts
498 WHERE resolved_at IS NOT NULL
499 ORDER BY id ASC"
500 }
501 Some(false) => {
502 "SELECT id, conflict_type, edge_ids, description, severity,
503 resolved_at, resolution_strategy
504 FROM graph_conflicts
505 WHERE resolved_at IS NULL
506 ORDER BY id ASC"
507 }
508 None => {
509 "SELECT id, conflict_type, edge_ids, description, severity,
510 resolved_at, resolution_strategy
511 FROM graph_conflicts
512 ORDER BY id ASC"
513 }
514 };
515
516 let mut stmt = conn.prepare(sql).map_err(EngramError::Database)?;
517
518 let rows = stmt
519 .query_map([], row_to_conflict)
520 .map_err(EngramError::Database)?
521 .collect::<rusqlite::Result<Vec<_>>>()
522 .map_err(EngramError::Database)?;
523
524 Ok(rows)
525 }
526
527 pub fn get_conflict(conn: &Connection, id: i64) -> Result<Option<Conflict>> {
529 let mut stmt = conn
530 .prepare(
531 "SELECT id, conflict_type, edge_ids, description, severity,
532 resolved_at, resolution_strategy
533 FROM graph_conflicts
534 WHERE id = ?1",
535 )
536 .map_err(EngramError::Database)?;
537
538 let mut rows = stmt
539 .query_map(params![id], row_to_conflict)
540 .map_err(EngramError::Database)?;
541
542 match rows.next() {
543 Some(row) => Ok(Some(row.map_err(EngramError::Database)?)),
544 None => Ok(None),
545 }
546 }
547}
548
549fn resolve_keep_newer(conn: &Connection, edge_ids: &[i64]) -> Result<(Vec<i64>, Vec<i64>)> {
556 if edge_ids.is_empty() {
557 return Ok((Vec::new(), Vec::new()));
558 }
559
560 let mut id_times: Vec<(i64, String)> = edge_ids
562 .iter()
563 .filter_map(|&id| {
564 let ts: rusqlite::Result<String> = conn.query_row(
565 "SELECT created_at FROM cross_references WHERE id = ?1",
566 params![id],
567 |r| r.get(0),
568 );
569 ts.ok().map(|t| (id, t))
570 })
571 .collect();
572
573 id_times.sort_by(|a, b| a.1.cmp(&b.1));
575
576 if id_times.is_empty() {
577 return Ok((Vec::new(), edge_ids.to_vec()));
578 }
579
580 let newest_id = id_times.last().unwrap().0;
581 let to_remove: Vec<i64> = id_times
582 .iter()
583 .filter(|(id, _)| *id != newest_id)
584 .map(|(id, _)| *id)
585 .collect();
586
587 for &id in &to_remove {
588 conn.execute("DELETE FROM cross_references WHERE id = ?1", params![id])
589 .map_err(EngramError::Database)?;
590 }
591
592 Ok((to_remove, vec![newest_id]))
593}
594
595fn resolve_keep_higher_confidence(
598 conn: &Connection,
599 edge_ids: &[i64],
600) -> Result<(Vec<i64>, Vec<i64>)> {
601 if edge_ids.is_empty() {
602 return Ok((Vec::new(), Vec::new()));
603 }
604
605 let mut id_strengths: Vec<(i64, f64)> = edge_ids
607 .iter()
608 .filter_map(|&id| {
609 let s: rusqlite::Result<f64> = conn.query_row(
610 "SELECT strength FROM cross_references WHERE id = ?1",
611 params![id],
612 |r| r.get(0),
613 );
614 s.ok().map(|strength| (id, strength))
615 })
616 .collect();
617
618 id_strengths.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
620
621 if id_strengths.is_empty() {
622 return Ok((Vec::new(), edge_ids.to_vec()));
623 }
624
625 let best_id = id_strengths.last().unwrap().0;
626 let to_remove: Vec<i64> = id_strengths
627 .iter()
628 .filter(|(id, _)| *id != best_id)
629 .map(|(id, _)| *id)
630 .collect();
631
632 for &id in &to_remove {
633 conn.execute("DELETE FROM cross_references WHERE id = ?1", params![id])
634 .map_err(EngramError::Database)?;
635 }
636
637 Ok((to_remove, vec![best_id]))
638}
639
640fn resolve_merge(conn: &Connection, edge_ids: &[i64]) -> Result<(Vec<i64>, Vec<i64>)> {
644 if edge_ids.is_empty() {
645 return Ok((Vec::new(), Vec::new()));
646 }
647
648 let mut rows: Vec<(i64, f64, String)> = edge_ids
650 .iter()
651 .filter_map(|&id| {
652 conn.query_row(
653 "SELECT id, strength, metadata FROM cross_references WHERE id = ?1",
654 params![id],
655 |r| {
656 Ok((
657 r.get::<_, i64>(0)?,
658 r.get::<_, f64>(1)?,
659 r.get::<_, String>(2)?,
660 ))
661 },
662 )
663 .ok()
664 })
665 .collect();
666
667 if rows.is_empty() {
668 return Ok((Vec::new(), edge_ids.to_vec()));
669 }
670
671 rows.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
673 let (keep_id, keep_strength, keep_meta_str) = rows.remove(0);
674
675 let mut merged: serde_json::Map<String, serde_json::Value> =
677 serde_json::from_str(&keep_meta_str).unwrap_or_default();
678
679 for (_, _, meta_str) in &rows {
680 if let Ok(serde_json::Value::Object(extra)) = serde_json::from_str(meta_str) {
681 for (k, v) in extra {
682 merged.entry(k).or_insert(v);
683 }
684 }
685 }
686
687 let merged_str = serde_json::to_string(&serde_json::Value::Object(merged))?;
688
689 conn.execute(
690 "UPDATE cross_references SET metadata = ?1, strength = ?2 WHERE id = ?3",
691 params![merged_str, keep_strength, keep_id],
692 )
693 .map_err(EngramError::Database)?;
694
695 let to_remove: Vec<i64> = rows.iter().map(|(id, _, _)| *id).collect();
696
697 for &id in &to_remove {
698 conn.execute("DELETE FROM cross_references WHERE id = ?1", params![id])
699 .map_err(EngramError::Database)?;
700 }
701
702 Ok((to_remove, vec![keep_id]))
703}
704
705#[derive(Debug)]
711struct EdgeRow {
712 id: i64,
713 from_id: i64,
714 to_id: i64,
715 relation_type: String,
716 #[allow(dead_code)]
719 created_at: String,
720}
721
722fn load_all_edges(conn: &Connection) -> Result<Vec<EdgeRow>> {
725 if !table_exists(conn, "cross_references")? {
726 return Ok(Vec::new());
727 }
728
729 let mut stmt = conn
730 .prepare(
731 "SELECT id, from_id, to_id, relation_type, created_at
732 FROM cross_references
733 ORDER BY id ASC",
734 )
735 .map_err(EngramError::Database)?;
736
737 let rows = stmt
738 .query_map([], |row| {
739 Ok(EdgeRow {
740 id: row.get(0)?,
741 from_id: row.get(1)?,
742 to_id: row.get(2)?,
743 relation_type: row.get(3)?,
744 created_at: row.get(4)?,
745 })
746 })
747 .map_err(EngramError::Database)?
748 .collect::<rusqlite::Result<Vec<_>>>()
749 .map_err(EngramError::Database)?;
750
751 Ok(rows)
752}
753
754fn dfs_detect_cycle(
757 start: i64,
758 adj: &HashMap<i64, Vec<(i64, i64)>>,
759 edge_map: &HashMap<(i64, i64), i64>,
760 visited: &mut HashSet<i64>,
761 rec_stack: &mut HashSet<i64>,
762 conflicts: &mut Vec<Conflict>,
763) {
764 let mut stack: Vec<(i64, usize, Option<i64>)> = vec![(start, 0, None)];
766 let mut path: Vec<i64> = vec![start];
767 let mut path_set: HashSet<i64> = {
768 let mut s = HashSet::new();
769 s.insert(start);
770 s
771 };
772
773 visited.insert(start);
774 rec_stack.insert(start);
775
776 while let Some((node, idx, _parent_edge)) = stack.last_mut() {
777 let node = *node;
778 let neighbors = adj.get(&node).map(|v| v.as_slice()).unwrap_or(&[]);
779
780 if *idx < neighbors.len() {
781 let (neighbor, edge_id) = neighbors[*idx];
782 *idx += 1;
783
784 if !visited.contains(&neighbor) {
785 visited.insert(neighbor);
786 rec_stack.insert(neighbor);
787 path.push(neighbor);
788 path_set.insert(neighbor);
789 stack.push((neighbor, 0, Some(edge_id)));
790 } else if rec_stack.contains(&neighbor) {
791 let cycle_start_pos = path.iter().position(|&n| n == neighbor).unwrap_or(0);
793 let cycle_nodes = &path[cycle_start_pos..];
794 let mut cycle_edge_ids: Vec<i64> = Vec::new();
795 for window in cycle_nodes.windows(2) {
796 if let Some(&eid) = edge_map.get(&(window[0], window[1])) {
797 cycle_edge_ids.push(eid);
798 }
799 }
800 if let Some(&eid) =
802 edge_map.get(&(*cycle_nodes.last().unwrap_or(&neighbor), neighbor))
803 {
804 cycle_edge_ids.push(eid);
805 }
806
807 if !cycle_edge_ids.is_empty() {
808 conflicts.push(Conflict {
809 id: 0,
810 conflict_type: ConflictType::CyclicDependency,
811 edge_ids: cycle_edge_ids.clone(),
812 description: format!("Cycle detected involving nodes: {:?}", cycle_nodes),
813 severity: Severity::Medium,
814 resolved_at: None,
815 resolution_strategy: None,
816 });
817 }
818 }
819 } else {
820 stack.pop();
822 path.pop();
823 path_set.remove(&node);
824 rec_stack.remove(&node);
825 }
826 }
827}
828
829fn table_exists(conn: &Connection, name: &str) -> Result<bool> {
831 let count: i64 = conn
832 .query_row(
833 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
834 params![name],
835 |r| r.get(0),
836 )
837 .map_err(EngramError::Database)?;
838 Ok(count > 0)
839}
840
841fn row_to_conflict(row: &rusqlite::Row<'_>) -> rusqlite::Result<Conflict> {
843 let id: i64 = row.get(0)?;
844 let conflict_type_str: String = row.get(1)?;
845 let edge_ids_str: String = row.get(2)?;
846 let description: String = row.get(3)?;
847 let severity_str: String = row.get(4)?;
848 let resolved_at: Option<String> = row.get(5)?;
849 let resolution_strategy_str: Option<String> = row.get(6)?;
850
851 let conflict_type =
852 ConflictType::from_str(&conflict_type_str).unwrap_or(ConflictType::DirectContradiction);
853 let edge_ids: Vec<i64> = serde_json::from_str(&edge_ids_str).unwrap_or_default();
854 let severity = Severity::from_str(&severity_str).unwrap_or(Severity::Low);
855 let resolution_strategy = resolution_strategy_str
856 .as_deref()
857 .and_then(ResolutionStrategy::from_str);
858
859 Ok(Conflict {
860 id,
861 conflict_type,
862 edge_ids,
863 description,
864 severity,
865 resolved_at,
866 resolution_strategy,
867 })
868}
869
870fn chrono_now() -> String {
872 use std::time::{SystemTime, UNIX_EPOCH};
873 let secs = SystemTime::now()
875 .duration_since(UNIX_EPOCH)
876 .unwrap_or_default()
877 .as_secs();
878 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs as i64, 0)
879 .unwrap_or(chrono::Utc::now());
880 dt.format("%Y-%m-%dT%H:%M:%SZ").to_string()
881}
882
883#[cfg(test)]
888mod tests {
889 use super::*;
890 use rusqlite::Connection;
891
892 const CREATE_CROSS_REFS: &str = "
897 CREATE TABLE IF NOT EXISTS cross_references (
898 id INTEGER PRIMARY KEY AUTOINCREMENT,
899 from_id INTEGER NOT NULL,
900 to_id INTEGER NOT NULL,
901 relation_type TEXT NOT NULL DEFAULT 'related',
902 strength REAL NOT NULL DEFAULT 0.5,
903 metadata TEXT DEFAULT '{}',
904 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
905 );
906 ";
907
908 const CREATE_MEMORIES: &str = "
909 CREATE TABLE IF NOT EXISTS memories (
910 id INTEGER PRIMARY KEY AUTOINCREMENT,
911 content TEXT NOT NULL,
912 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
913 );
914 ";
915
916 fn setup_db() -> Connection {
917 let conn = Connection::open_in_memory().expect("open in-memory DB");
918 conn.execute_batch(CREATE_CROSS_REFS)
919 .expect("create cross_references");
920 conn.execute_batch(CREATE_MEMORIES)
921 .expect("create memories");
922 conn.execute_batch(CREATE_CONFLICTS_TABLE)
923 .expect("create graph_conflicts");
924 conn
925 }
926
927 fn insert_edge(conn: &Connection, from_id: i64, to_id: i64, rel: &str, strength: f64) -> i64 {
928 conn.execute(
929 "INSERT INTO cross_references (from_id, to_id, relation_type, strength)
930 VALUES (?1, ?2, ?3, ?4)",
931 params![from_id, to_id, rel, strength],
932 )
933 .expect("insert edge");
934 conn.last_insert_rowid()
935 }
936
937 fn insert_memory(conn: &Connection, id: i64) {
938 conn.execute(
939 "INSERT INTO memories (id, content) VALUES (?1, 'test')",
940 params![id],
941 )
942 .expect("insert memory");
943 }
944
945 #[test]
949 fn test_detect_contradiction() {
950 let conn = setup_db();
951
952 insert_edge(&conn, 1, 2, "supports", 0.8);
954 insert_edge(&conn, 1, 2, "contradicts", 0.8);
956
957 let conflicts =
958 ConflictDetector::detect_contradictions(&conn).expect("detect_contradictions");
959
960 assert_eq!(conflicts.len(), 1);
961 assert_eq!(
962 conflicts[0].conflict_type,
963 ConflictType::DirectContradiction
964 );
965 assert_eq!(conflicts[0].severity, Severity::High);
966 assert!(conflicts[0].edge_ids.len() >= 2);
967 assert!(conflicts[0].description.contains("Contradicting"));
968 }
969
970 #[test]
974 fn test_detect_temporal_inconsistency() {
975 let conn = setup_db();
976
977 let id_a = insert_edge(&conn, 10, 20, "works_at", 0.9);
979 let id_b = insert_edge(&conn, 10, 20, "works_at", 0.7);
980
981 let conflicts = ConflictDetector::detect_temporal_inconsistencies(&conn)
982 .expect("detect_temporal_inconsistencies");
983
984 assert_eq!(conflicts.len(), 1);
985 assert_eq!(
986 conflicts[0].conflict_type,
987 ConflictType::TemporalInconsistency
988 );
989 assert_eq!(conflicts[0].severity, Severity::Medium);
990 assert!(conflicts[0].edge_ids.contains(&id_a));
991 assert!(conflicts[0].edge_ids.contains(&id_b));
992 }
993
994 #[test]
998 fn test_detect_cycle() {
999 let conn = setup_db();
1000
1001 insert_edge(&conn, 1, 2, "depends_on", 0.9);
1003 insert_edge(&conn, 2, 3, "depends_on", 0.9);
1004 insert_edge(&conn, 3, 1, "depends_on", 0.9); let conflicts = ConflictDetector::detect_cycles(&conn).expect("detect_cycles");
1007
1008 assert!(
1009 !conflicts.is_empty(),
1010 "expected at least one cycle conflict"
1011 );
1012 assert_eq!(conflicts[0].conflict_type, ConflictType::CyclicDependency);
1013 assert!(conflicts[0].description.contains("Cycle"));
1014 }
1015
1016 #[test]
1020 fn test_detect_orphan() {
1021 let conn = setup_db();
1022
1023 insert_memory(&conn, 1);
1025 let edge_id = insert_edge(&conn, 1, 99, "related", 0.5); let conflicts = ConflictDetector::detect_orphans(&conn).expect("detect_orphans");
1028
1029 assert_eq!(conflicts.len(), 1);
1030 assert_eq!(conflicts[0].conflict_type, ConflictType::OrphanedReference);
1031 assert_eq!(conflicts[0].severity, Severity::Critical);
1032 assert!(conflicts[0].edge_ids.contains(&edge_id));
1033 }
1034
1035 #[test]
1039 fn test_resolve_keep_newer() {
1040 let conn = setup_db();
1041
1042 let id_old = insert_edge(&conn, 5, 6, "supports", 0.5);
1043 conn.execute(
1045 "UPDATE cross_references SET created_at = '2099-01-01T00:00:00.000Z' WHERE id = ?1",
1046 params![id_old + 1],
1047 )
1048 .ok();
1049 let id_new = insert_edge(&conn, 5, 6, "supports", 0.5);
1050 conn.execute(
1052 "UPDATE cross_references SET created_at = '2099-01-02T00:00:00.000Z' WHERE id = ?1",
1053 params![id_new],
1054 )
1055 .expect("update ts");
1056
1057 let conflict = Conflict {
1059 id: 0,
1060 conflict_type: ConflictType::TemporalInconsistency,
1061 edge_ids: vec![id_old, id_new],
1062 description: "duplicate triple".to_string(),
1063 severity: Severity::Medium,
1064 resolved_at: None,
1065 resolution_strategy: None,
1066 };
1067 let cid = ConflictResolver::save_conflict(&conn, &conflict).expect("save");
1068
1069 let result =
1070 ConflictResolver::resolve(&conn, cid, ResolutionStrategy::KeepNewer).expect("resolve");
1071
1072 assert_eq!(result.conflict_id, cid);
1073 assert_eq!(result.strategy, ResolutionStrategy::KeepNewer);
1074 assert_eq!(result.edges_removed.len(), 1);
1075 assert_eq!(result.edges_kept.len(), 1);
1076 assert!(result.edges_kept.contains(&id_new));
1077 assert!(result.edges_removed.contains(&id_old));
1078
1079 let saved = ConflictResolver::get_conflict(&conn, cid)
1081 .expect("get")
1082 .expect("exists");
1083 assert!(saved.resolved_at.is_some());
1084 }
1085
1086 #[test]
1090 fn test_no_conflicts_clean_graph() {
1091 let conn = setup_db();
1092
1093 insert_memory(&conn, 1);
1095 insert_memory(&conn, 2);
1096 insert_memory(&conn, 3);
1097 insert_edge(&conn, 1, 2, "supports", 0.9);
1098 insert_edge(&conn, 2, 3, "related", 0.7);
1099
1100 let all = ConflictDetector::detect_all(&conn).expect("detect_all");
1101
1102 assert!(all.is_empty(), "expected zero conflicts, got: {:?}", all);
1104 }
1105
1106 #[test]
1110 fn test_save_and_list_conflicts() {
1111 let conn = setup_db();
1112
1113 let c1 = Conflict {
1114 id: 0,
1115 conflict_type: ConflictType::DirectContradiction,
1116 edge_ids: vec![1, 2],
1117 description: "supports vs contradicts".to_string(),
1118 severity: Severity::High,
1119 resolved_at: None,
1120 resolution_strategy: None,
1121 };
1122 let c2 = Conflict {
1123 id: 0,
1124 conflict_type: ConflictType::OrphanedReference,
1125 edge_ids: vec![3],
1126 description: "missing node 99".to_string(),
1127 severity: Severity::Critical,
1128 resolved_at: None,
1129 resolution_strategy: None,
1130 };
1131
1132 let id1 = ConflictResolver::save_conflict(&conn, &c1).expect("save c1");
1133 let id2 = ConflictResolver::save_conflict(&conn, &c2).expect("save c2");
1134
1135 let all = ConflictResolver::list_conflicts(&conn, None).expect("list all");
1136 assert_eq!(all.len(), 2);
1137
1138 let unresolved =
1139 ConflictResolver::list_conflicts(&conn, Some(false)).expect("list unresolved");
1140 assert_eq!(unresolved.len(), 2);
1141
1142 let resolved = ConflictResolver::list_conflicts(&conn, Some(true)).expect("list resolved");
1143 assert_eq!(resolved.len(), 0);
1144
1145 let fetched = ConflictResolver::get_conflict(&conn, id1)
1147 .expect("get c1")
1148 .expect("exists");
1149 assert_eq!(fetched.conflict_type, ConflictType::DirectContradiction);
1150 assert_eq!(fetched.severity, Severity::High);
1151
1152 let fetched2 = ConflictResolver::get_conflict(&conn, id2)
1153 .expect("get c2")
1154 .expect("exists");
1155 assert_eq!(fetched2.conflict_type, ConflictType::OrphanedReference);
1156 }
1157
1158 #[test]
1162 fn test_detect_all_multiple_types() {
1163 let conn = setup_db();
1164
1165 insert_memory(&conn, 1);
1167 insert_memory(&conn, 2);
1168
1169 insert_edge(&conn, 1, 2, "supports", 0.8);
1171 insert_edge(&conn, 1, 2, "contradicts", 0.6);
1172
1173 insert_edge(&conn, 1, 99, "related", 0.5);
1175
1176 let all = ConflictDetector::detect_all(&conn).expect("detect_all");
1177
1178 let types: Vec<&ConflictType> = all.iter().map(|c| &c.conflict_type).collect();
1179
1180 assert!(
1181 types.contains(&&ConflictType::DirectContradiction),
1182 "expected DirectContradiction in {:?}",
1183 types
1184 );
1185 assert!(
1186 types.contains(&&ConflictType::OrphanedReference),
1187 "expected OrphanedReference in {:?}",
1188 types
1189 );
1190 }
1191
1192 #[test]
1196 fn test_resolve_already_resolved_returns_error() {
1197 let conn = setup_db();
1198
1199 let id_a = insert_edge(&conn, 5, 6, "rel", 0.5);
1200 let id_b = insert_edge(&conn, 5, 6, "rel", 0.5);
1201
1202 let conflict = Conflict {
1203 id: 0,
1204 conflict_type: ConflictType::TemporalInconsistency,
1205 edge_ids: vec![id_a, id_b],
1206 description: "dup".to_string(),
1207 severity: Severity::Medium,
1208 resolved_at: None,
1209 resolution_strategy: None,
1210 };
1211 let cid = ConflictResolver::save_conflict(&conn, &conflict).expect("save");
1212
1213 ConflictResolver::resolve(&conn, cid, ResolutionStrategy::Manual).expect("first resolve");
1215
1216 let result = ConflictResolver::resolve(&conn, cid, ResolutionStrategy::Manual);
1218 assert!(result.is_err(), "expected error on double-resolve");
1219 }
1220}