Skip to main content

engram/graph/
temporal.rs

1//! Temporal knowledge graph — edges with validity periods.
2//!
3//! Provides bi-temporal edge tracking: each edge carries a `valid_from` /
4//! `valid_to` validity interval. Adding a new edge for the same
5//! `(from_id, to_id, relation)` triple automatically closes the previous open
6//! interval so the graph stays consistent.
7
8use rusqlite::{params, Connection};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12use crate::error::{EngramError, Result};
13
14// =============================================================================
15// DDL
16// =============================================================================
17
18/// SQL that creates the `temporal_edges` table and its supporting indexes.
19///
20/// Safe to run on an existing database — all statements use `IF NOT EXISTS`.
21///
22/// Note: the `scope_path` column was added in migration v33. This constant
23/// reflects the canonical schema; production databases gain the column via
24/// the migration runner.
25pub const CREATE_TEMPORAL_EDGES_TABLE: &str = r#"
26CREATE TABLE IF NOT EXISTS temporal_edges (
27    id          INTEGER PRIMARY KEY AUTOINCREMENT,
28    from_id     INTEGER NOT NULL,
29    to_id       INTEGER NOT NULL,
30    relation    TEXT    NOT NULL,
31    properties  TEXT    NOT NULL DEFAULT '{}',
32    valid_from  TEXT    NOT NULL,
33    valid_to    TEXT,
34    confidence  REAL    NOT NULL DEFAULT 1.0,
35    source      TEXT    NOT NULL DEFAULT '',
36    created_at  TEXT    NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
37    scope_path  TEXT    NOT NULL DEFAULT 'global'
38);
39CREATE INDEX IF NOT EXISTS idx_temporal_edges_from       ON temporal_edges(from_id);
40CREATE INDEX IF NOT EXISTS idx_temporal_edges_to         ON temporal_edges(to_id);
41CREATE INDEX IF NOT EXISTS idx_temporal_edges_valid      ON temporal_edges(valid_from, valid_to);
42CREATE INDEX IF NOT EXISTS idx_temporal_edges_scope_path ON temporal_edges(scope_path);
43"#;
44
45// =============================================================================
46// Types
47// =============================================================================
48
49/// A directed edge in the temporal knowledge graph.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct TemporalEdge {
52    /// Row identifier.
53    pub id: i64,
54    /// Source memory / node.
55    pub from_id: i64,
56    /// Target memory / node.
57    pub to_id: i64,
58    /// Semantic label for the relationship (e.g. `"works_at"`, `"reports_to"`).
59    pub relation: String,
60    /// Arbitrary key-value metadata stored as JSON.
61    pub properties: Value,
62    /// Start of validity period (RFC3339 UTC).
63    pub valid_from: String,
64    /// End of validity period (RFC3339 UTC), `None` means still valid.
65    pub valid_to: Option<String>,
66    /// Confidence in this edge (0.0–1.0).
67    pub confidence: f32,
68    /// Provenance string (e.g. document name, agent ID).
69    pub source: String,
70    /// Wall-clock creation time (RFC3339 UTC).
71    pub created_at: String,
72    /// Hierarchical scope path (e.g. `"global"`, `"global/org:acme/user:alice"`).
73    /// Added in schema v33. Defaults to `"global"` for backward compatibility.
74    pub scope_path: String,
75}
76
77/// Summary of how the graph changed between two timestamps.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct GraphDiff {
80    /// Edges present at `t2` but not at `t1`.
81    pub added: Vec<TemporalEdge>,
82    /// Edges present at `t1` but not at `t2`.
83    pub removed: Vec<TemporalEdge>,
84    /// Edges whose properties or confidence changed between `t1` and `t2`.
85    ///
86    /// Each tuple is `(old_edge_at_t1, new_edge_at_t2)`.
87    pub changed: Vec<(TemporalEdge, TemporalEdge)>,
88}
89
90// =============================================================================
91// Row mapper helpers
92// =============================================================================
93
94/// Build a `TemporalEdge` from a rusqlite row.
95///
96/// Expected column order:
97/// 0: id, 1: from_id, 2: to_id, 3: properties, 4: valid_from, 5: valid_to,
98/// 6: confidence, 7: source, 8: relation, 9: created_at, 10: scope_path
99fn row_to_edge(row: &rusqlite::Row<'_>) -> rusqlite::Result<TemporalEdge> {
100    let props_str: String = row.get(3)?;
101    let properties: Value =
102        serde_json::from_str(&props_str).unwrap_or(Value::Object(Default::default()));
103
104    Ok(TemporalEdge {
105        id: row.get(0)?,
106        from_id: row.get(1)?,
107        to_id: row.get(2)?,
108        relation: row.get(8)?,
109        properties,
110        valid_from: row.get(4)?,
111        valid_to: row.get(5)?,
112        confidence: row.get(6)?,
113        source: row.get(7)?,
114        created_at: row.get(9)?,
115        scope_path: row.get(10)?,
116    })
117}
118
119// =============================================================================
120// Public API
121// =============================================================================
122
123/// Add a new temporal edge.
124///
125/// If an open edge (`valid_to IS NULL`) already exists for the same
126/// `(from_id, to_id, relation)` triple **within the same scope**, it is
127/// automatically closed by setting its `valid_to` to the `valid_from` of the
128/// new edge before inserting.
129///
130/// `scope_path` defaults to `"global"` when `None`.
131///
132/// Returns the newly inserted edge with its generated `id` and `created_at`.
133pub fn add_edge(
134    conn: &Connection,
135    from_id: i64,
136    to_id: i64,
137    relation: &str,
138    properties: &Value,
139    valid_from: &str,
140    confidence: f32,
141    source: &str,
142    scope_path: Option<&str>,
143) -> Result<TemporalEdge> {
144    let scope = scope_path.unwrap_or("global");
145    let props_str = serde_json::to_string(properties)?;
146
147    // Auto-invalidate any currently-open edges for the same triple within
148    // the same scope.
149    conn.execute(
150        "UPDATE temporal_edges
151         SET valid_to = ?1
152         WHERE from_id    = ?2
153           AND to_id      = ?3
154           AND relation   = ?4
155           AND scope_path = ?5
156           AND valid_to IS NULL",
157        params![valid_from, from_id, to_id, relation, scope],
158    )
159    .map_err(EngramError::Database)?;
160
161    // Insert the new edge.
162    conn.execute(
163        "INSERT INTO temporal_edges
164             (from_id, to_id, relation, properties, valid_from, confidence, source, scope_path)
165         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
166        params![from_id, to_id, relation, props_str, valid_from, confidence, source, scope],
167    )
168    .map_err(EngramError::Database)?;
169
170    let id = conn.last_insert_rowid();
171    get_edge_by_id(conn, id)?
172        .ok_or_else(|| EngramError::Internal(format!("Edge {} disappeared after insert", id)))
173}
174
175/// Set the `valid_to` timestamp on an existing edge, effectively closing it.
176pub fn invalidate_edge(conn: &Connection, edge_id: i64, valid_to: &str) -> Result<()> {
177    let affected = conn
178        .execute(
179            "UPDATE temporal_edges SET valid_to = ?1 WHERE id = ?2",
180            params![valid_to, edge_id],
181        )
182        .map_err(EngramError::Database)?;
183
184    if affected == 0 {
185        return Err(EngramError::NotFound(edge_id));
186    }
187    Ok(())
188}
189
190/// Return all edges that were valid at `timestamp`.
191///
192/// An edge is valid at `t` when `valid_from <= t` AND (`valid_to IS NULL` OR
193/// `valid_to > t`).
194///
195/// When `scope_path` is `Some(prefix)`, only edges whose `scope_path` equals
196/// `prefix` or starts with `prefix/` are returned (hierarchical prefix
197/// matching). When `None`, edges from all scopes are returned (backward
198/// compatible).
199pub fn snapshot_at(
200    conn: &Connection,
201    timestamp: &str,
202    scope_path: Option<&str>,
203) -> Result<Vec<TemporalEdge>> {
204    match scope_path {
205        None => {
206            let mut stmt = conn
207                .prepare(
208                    "SELECT id, from_id, to_id, properties, valid_from, valid_to,
209                            confidence, source, relation, created_at, scope_path
210                     FROM   temporal_edges
211                     WHERE  valid_from <= ?1
212                       AND  (valid_to IS NULL OR valid_to > ?1)
213                     ORDER  BY from_id, to_id, relation",
214                )
215                .map_err(EngramError::Database)?;
216
217            let edges = stmt
218                .query_map(params![timestamp], row_to_edge)
219                .map_err(EngramError::Database)?
220                .collect::<rusqlite::Result<Vec<_>>>()
221                .map_err(EngramError::Database)?;
222
223            Ok(edges)
224        }
225        Some(scope) => {
226            let pattern = format!("{}/%", scope);
227            let mut stmt = conn
228                .prepare(
229                    "SELECT id, from_id, to_id, properties, valid_from, valid_to,
230                            confidence, source, relation, created_at, scope_path
231                     FROM   temporal_edges
232                     WHERE  valid_from <= ?1
233                       AND  (valid_to IS NULL OR valid_to > ?1)
234                       AND  (scope_path = ?2 OR scope_path LIKE ?3)
235                     ORDER  BY from_id, to_id, relation",
236                )
237                .map_err(EngramError::Database)?;
238
239            let edges = stmt
240                .query_map(params![timestamp, scope, pattern], row_to_edge)
241                .map_err(EngramError::Database)?
242                .collect::<rusqlite::Result<Vec<_>>>()
243                .map_err(EngramError::Database)?;
244
245            Ok(edges)
246        }
247    }
248}
249
250/// Return the complete edit history for a `(from_id, to_id)` pair, ordered
251/// chronologically (`valid_from ASC`, then `created_at ASC`).
252///
253/// When `scope_path` is `Some(prefix)`, only edges whose `scope_path` equals
254/// `prefix` or starts with `prefix/` are returned. When `None`, all scopes
255/// are included (backward compatible).
256pub fn relationship_timeline(
257    conn: &Connection,
258    from_id: i64,
259    to_id: i64,
260    scope_path: Option<&str>,
261) -> Result<Vec<TemporalEdge>> {
262    match scope_path {
263        None => {
264            let mut stmt = conn
265                .prepare(
266                    "SELECT id, from_id, to_id, properties, valid_from, valid_to,
267                            confidence, source, relation, created_at, scope_path
268                     FROM   temporal_edges
269                     WHERE  from_id = ?1 AND to_id = ?2
270                     ORDER  BY valid_from ASC, created_at ASC",
271                )
272                .map_err(EngramError::Database)?;
273
274            let edges = stmt
275                .query_map(params![from_id, to_id], row_to_edge)
276                .map_err(EngramError::Database)?
277                .collect::<rusqlite::Result<Vec<_>>>()
278                .map_err(EngramError::Database)?;
279
280            Ok(edges)
281        }
282        Some(scope) => {
283            let pattern = format!("{}/%", scope);
284            let mut stmt = conn
285                .prepare(
286                    "SELECT id, from_id, to_id, properties, valid_from, valid_to,
287                            confidence, source, relation, created_at, scope_path
288                     FROM   temporal_edges
289                     WHERE  from_id    = ?1
290                       AND  to_id      = ?2
291                       AND  (scope_path = ?3 OR scope_path LIKE ?4)
292                     ORDER  BY valid_from ASC, created_at ASC",
293                )
294                .map_err(EngramError::Database)?;
295
296            let edges = stmt
297                .query_map(params![from_id, to_id, scope, pattern], row_to_edge)
298                .map_err(EngramError::Database)?
299                .collect::<rusqlite::Result<Vec<_>>>()
300                .map_err(EngramError::Database)?;
301
302            Ok(edges)
303        }
304    }
305}
306
307/// Detect edges that share the same `(from_id, to_id, relation)` triple and
308/// have **overlapping** validity periods — which should not exist under normal
309/// operation.
310///
311/// Returns pairs `(edge_a, edge_b)` where `edge_a.id < edge_b.id`.
312pub fn detect_contradictions(conn: &Connection) -> Result<Vec<(TemporalEdge, TemporalEdge)>> {
313    // Self-join: find pairs that share the triple and overlap.
314    // Overlap condition: a.valid_from < b.valid_to_or_max AND b.valid_from < a.valid_to_or_max
315    let mut stmt = conn
316        .prepare(
317            "SELECT a.id, a.from_id, a.to_id, a.properties, a.valid_from, a.valid_to,
318                    a.confidence, a.source, a.relation, a.created_at, a.scope_path,
319                    b.id, b.from_id, b.to_id, b.properties, b.valid_from, b.valid_to,
320                    b.confidence, b.source, b.relation, b.created_at, b.scope_path
321             FROM   temporal_edges a
322             JOIN   temporal_edges b
323               ON   a.from_id  = b.from_id
324              AND   a.to_id    = b.to_id
325              AND   a.relation = b.relation
326              AND   a.id < b.id
327             WHERE  a.valid_from < COALESCE(b.valid_to, '9999-12-31T23:59:59Z')
328               AND  b.valid_from < COALESCE(a.valid_to, '9999-12-31T23:59:59Z')",
329        )
330        .map_err(EngramError::Database)?;
331
332    let pairs = stmt
333        .query_map([], |row| {
334            // First edge columns: 0..10
335            let props_a: String = row.get(3)?;
336            // Second edge columns: 11..21
337            let props_b: String = row.get(14)?;
338
339            let edge_a = TemporalEdge {
340                id: row.get(0)?,
341                from_id: row.get(1)?,
342                to_id: row.get(2)?,
343                properties: serde_json::from_str(&props_a)
344                    .unwrap_or(Value::Object(Default::default())),
345                valid_from: row.get(4)?,
346                valid_to: row.get(5)?,
347                confidence: row.get(6)?,
348                source: row.get(7)?,
349                relation: row.get(8)?,
350                created_at: row.get(9)?,
351                scope_path: row.get(10)?,
352            };
353
354            let edge_b = TemporalEdge {
355                id: row.get(11)?,
356                from_id: row.get(12)?,
357                to_id: row.get(13)?,
358                properties: serde_json::from_str(&props_b)
359                    .unwrap_or(Value::Object(Default::default())),
360                valid_from: row.get(15)?,
361                valid_to: row.get(16)?,
362                confidence: row.get(17)?,
363                source: row.get(18)?,
364                relation: row.get(19)?,
365                created_at: row.get(20)?,
366                scope_path: row.get(21)?,
367            };
368
369            Ok((edge_a, edge_b))
370        })
371        .map_err(EngramError::Database)?
372        .collect::<rusqlite::Result<Vec<_>>>()
373        .map_err(EngramError::Database)?;
374
375    Ok(pairs)
376}
377
378/// Compare the graph state at two different timestamps.
379///
380/// - `added`   — edges valid at `t2` whose `(from_id, to_id, relation)` triple
381///   was not present at `t1`.
382/// - `removed` — edges valid at `t1` whose triple was not present at `t2`.
383/// - `changed` — triples present at both `t1` and `t2` but with a different
384///   `id` (i.e. the edge was superseded), implying the properties
385///   or confidence changed.
386///
387/// When `scope_path` is `Some(prefix)`, the diff is limited to edges within
388/// that scope hierarchy. When `None`, all scopes are compared (backward
389/// compatible).
390pub fn diff(
391    conn: &Connection,
392    t1: &str,
393    t2: &str,
394    scope_path: Option<&str>,
395) -> Result<GraphDiff> {
396    let snap1 = snapshot_at(conn, t1, scope_path)?;
397    let snap2 = snapshot_at(conn, t2, scope_path)?;
398
399    // Key: (from_id, to_id, relation)
400    type Key = (i64, i64, String);
401
402    let map1: std::collections::HashMap<Key, TemporalEdge> = snap1
403        .into_iter()
404        .map(|e| ((e.from_id, e.to_id, e.relation.clone()), e))
405        .collect();
406
407    let map2: std::collections::HashMap<Key, TemporalEdge> = snap2
408        .into_iter()
409        .map(|e| ((e.from_id, e.to_id, e.relation.clone()), e))
410        .collect();
411
412    let mut added = Vec::new();
413    let mut removed = Vec::new();
414    let mut changed = Vec::new();
415
416    for (key, edge2) in &map2 {
417        match map1.get(key) {
418            None => added.push(edge2.clone()),
419            Some(edge1) if edge1.id != edge2.id => {
420                changed.push((edge1.clone(), edge2.clone()));
421            }
422            _ => {} // same edge, no change
423        }
424    }
425
426    for (key, edge1) in &map1 {
427        if !map2.contains_key(key) {
428            removed.push(edge1.clone());
429        }
430    }
431
432    Ok(GraphDiff {
433        added,
434        removed,
435        changed,
436    })
437}
438
439// =============================================================================
440// Private helpers
441// =============================================================================
442
443fn get_edge_by_id(conn: &Connection, id: i64) -> Result<Option<TemporalEdge>> {
444    let mut stmt = conn
445        .prepare(
446            "SELECT id, from_id, to_id, properties, valid_from, valid_to,
447                    confidence, source, relation, created_at, scope_path
448             FROM   temporal_edges
449             WHERE  id = ?1",
450        )
451        .map_err(EngramError::Database)?;
452
453    let mut rows = stmt
454        .query_map(params![id], row_to_edge)
455        .map_err(EngramError::Database)?;
456
457    match rows.next() {
458        Some(row) => Ok(Some(row.map_err(EngramError::Database)?)),
459        None => Ok(None),
460    }
461}
462
463// =============================================================================
464// Tests
465// =============================================================================
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use rusqlite::Connection;
471    use serde_json::json;
472
473    /// Open an in-memory SQLite database and create the temporal_edges table.
474    fn setup_db() -> Connection {
475        let conn = Connection::open_in_memory().expect("open in-memory DB");
476        conn.execute_batch(CREATE_TEMPORAL_EDGES_TABLE)
477            .expect("create table");
478        conn
479    }
480
481    // -------------------------------------------------------------------------
482    // Test 1: Add edge and retrieve it
483    // -------------------------------------------------------------------------
484    #[test]
485    fn test_add_edge_and_retrieve() {
486        let conn = setup_db();
487
488        let edge = add_edge(
489            &conn,
490            1,
491            2,
492            "works_at",
493            &json!({}),
494            "2024-01-01T00:00:00Z",
495            0.9,
496            "test",
497            None,
498        )
499        .expect("add_edge");
500
501        assert_eq!(edge.from_id, 1);
502        assert_eq!(edge.to_id, 2);
503        assert_eq!(edge.relation, "works_at");
504        assert!(edge.valid_to.is_none());
505        assert_eq!(edge.confidence, 0.9);
506        assert_eq!(edge.source, "test");
507        assert_eq!(edge.scope_path, "global");
508    }
509
510    // -------------------------------------------------------------------------
511    // Test 2: Auto-invalidation of conflicting edges
512    // -------------------------------------------------------------------------
513    #[test]
514    fn test_auto_invalidation_on_new_edge() {
515        let conn = setup_db();
516
517        let first = add_edge(
518            &conn,
519            1,
520            2,
521            "works_at",
522            &json!({"role": "engineer"}),
523            "2023-01-01T00:00:00Z",
524            1.0,
525            "hr",
526            None,
527        )
528        .expect("first edge");
529
530        assert!(first.valid_to.is_none(), "first edge should be open");
531
532        // Adding a new edge for the same triple must close the first one.
533        let _second = add_edge(
534            &conn,
535            1,
536            2,
537            "works_at",
538            &json!({"role": "manager"}),
539            "2024-06-01T00:00:00Z",
540            1.0,
541            "hr",
542            None,
543        )
544        .expect("second edge");
545
546        // Re-fetch first edge to confirm it was closed.
547        let updated = get_edge_by_id(&conn, first.id)
548            .expect("query")
549            .expect("edge still exists");
550
551        assert_eq!(
552            updated.valid_to.as_deref(),
553            Some("2024-06-01T00:00:00Z"),
554            "first edge should have been closed at the second edge's valid_from"
555        );
556    }
557
558    // -------------------------------------------------------------------------
559    // Test 3: Snapshot at a specific timestamp
560    // -------------------------------------------------------------------------
561    #[test]
562    fn test_snapshot_at() {
563        let conn = setup_db();
564
565        // Edge valid in 2023 only.
566        add_edge(
567            &conn,
568            1,
569            2,
570            "rel",
571            &json!({}),
572            "2023-01-01T00:00:00Z",
573            1.0,
574            "",
575            None,
576        )
577        .unwrap();
578        // Manually close it via a second edge (auto-invalidation).
579        add_edge(
580            &conn,
581            1,
582            2,
583            "rel",
584            &json!({}),
585            "2024-01-01T00:00:00Z",
586            1.0,
587            "",
588            None,
589        )
590        .unwrap();
591
592        // Snapshot mid-2023 should return exactly 1 edge.
593        let snap = snapshot_at(&conn, "2023-07-01T00:00:00Z", None).expect("snapshot");
594        assert_eq!(snap.len(), 1);
595        assert_eq!(snap[0].valid_from, "2023-01-01T00:00:00Z");
596
597        // Snapshot mid-2024 should return the second edge.
598        let snap2 = snapshot_at(&conn, "2024-07-01T00:00:00Z", None).expect("snapshot");
599        assert_eq!(snap2.len(), 1);
600        assert_eq!(snap2[0].valid_from, "2024-01-01T00:00:00Z");
601    }
602
603    // -------------------------------------------------------------------------
604    // Test 4: Timeline shows chronological history
605    // -------------------------------------------------------------------------
606    #[test]
607    fn test_relationship_timeline_chronological() {
608        let conn = setup_db();
609
610        add_edge(
611            &conn,
612            10,
613            20,
614            "partner",
615            &json!({}),
616            "2020-01-01T00:00:00Z",
617            1.0,
618            "",
619            None,
620        )
621        .unwrap();
622        add_edge(
623            &conn,
624            10,
625            20,
626            "partner",
627            &json!({}),
628            "2021-06-01T00:00:00Z",
629            1.0,
630            "",
631            None,
632        )
633        .unwrap();
634        add_edge(
635            &conn,
636            10,
637            20,
638            "partner",
639            &json!({}),
640            "2022-09-01T00:00:00Z",
641            1.0,
642            "",
643            None,
644        )
645        .unwrap();
646
647        let timeline = relationship_timeline(&conn, 10, 20, None).expect("timeline");
648        assert_eq!(timeline.len(), 3);
649
650        // Verify ascending order.
651        assert!(timeline[0].valid_from <= timeline[1].valid_from);
652        assert!(timeline[1].valid_from <= timeline[2].valid_from);
653    }
654
655    // -------------------------------------------------------------------------
656    // Test 5: Detect contradictions (manually injected overlap)
657    // -------------------------------------------------------------------------
658    #[test]
659    fn test_detect_contradictions() {
660        let conn = setup_db();
661
662        // Insert two edges with overlapping validity directly (bypassing
663        // the auto-invalidation logic that `add_edge` provides).
664        conn.execute(
665            "INSERT INTO temporal_edges
666                 (from_id, to_id, relation, properties, valid_from, valid_to, confidence, source)
667             VALUES (1, 2, 'rel', '{}', '2023-01-01T00:00:00Z', NULL, 1.0, '')",
668            [],
669        )
670        .unwrap();
671        conn.execute(
672            "INSERT INTO temporal_edges
673                 (from_id, to_id, relation, properties, valid_from, valid_to, confidence, source)
674             VALUES (1, 2, 'rel', '{}', '2023-06-01T00:00:00Z', NULL, 1.0, '')",
675            [],
676        )
677        .unwrap();
678
679        let contradictions = detect_contradictions(&conn).expect("detect");
680        assert_eq!(contradictions.len(), 1);
681
682        let (a, b) = &contradictions[0];
683        assert!(a.id < b.id);
684    }
685
686    // -------------------------------------------------------------------------
687    // Test 6: Diff between two timestamps
688    // -------------------------------------------------------------------------
689    #[test]
690    fn test_diff_between_timestamps() {
691        let conn = setup_db();
692
693        // Edge A: exists in 2023 and 2024.
694        add_edge(
695            &conn,
696            1,
697            2,
698            "knows",
699            &json!({}),
700            "2022-01-01T00:00:00Z",
701            1.0,
702            "",
703            None,
704        )
705        .unwrap();
706
707        // Edge B: appears in 2024 only.
708        add_edge(
709            &conn,
710            3,
711            4,
712            "likes",
713            &json!({}),
714            "2024-01-01T00:00:00Z",
715            1.0,
716            "",
717            None,
718        )
719        .unwrap();
720
721        let d = diff(&conn, "2023-01-01T00:00:00Z", "2025-01-01T00:00:00Z", None).expect("diff");
722
723        // "knows" was present at both; "likes" was added.
724        assert_eq!(d.added.len(), 1);
725        assert_eq!(d.added[0].relation, "likes");
726        assert_eq!(d.removed.len(), 0);
727        // "knows" same edge, not changed.
728        assert_eq!(d.changed.len(), 0);
729    }
730
731    // -------------------------------------------------------------------------
732    // Test 7: Empty graph operations
733    // -------------------------------------------------------------------------
734    #[test]
735    fn test_empty_graph_operations() {
736        let conn = setup_db();
737
738        let snap = snapshot_at(&conn, "2024-01-01T00:00:00Z", None).expect("snapshot");
739        assert!(snap.is_empty());
740
741        let timeline = relationship_timeline(&conn, 99, 100, None).expect("timeline");
742        assert!(timeline.is_empty());
743
744        let contradictions = detect_contradictions(&conn).expect("detect");
745        assert!(contradictions.is_empty());
746
747        let d = diff(&conn, "2024-01-01T00:00:00Z", "2025-01-01T00:00:00Z", None).expect("diff");
748        assert!(d.added.is_empty());
749        assert!(d.removed.is_empty());
750        assert!(d.changed.is_empty());
751    }
752
753    // -------------------------------------------------------------------------
754    // Test 8: Edge with rich JSON properties
755    // -------------------------------------------------------------------------
756    #[test]
757    fn test_edge_with_json_properties() {
758        let conn = setup_db();
759
760        let props = json!({
761            "title": "Senior Engineer",
762            "department": "R&D",
763            "salary": 120_000,
764            "remote": true,
765            "skills": ["Rust", "Python"]
766        });
767
768        let edge = add_edge(
769            &conn,
770            5,
771            6,
772            "employed_by",
773            &props,
774            "2024-03-01T00:00:00Z",
775            0.95,
776            "payroll",
777            None,
778        )
779        .expect("add");
780
781        assert_eq!(edge.properties["title"], "Senior Engineer");
782        assert_eq!(edge.properties["salary"], 120_000);
783        assert_eq!(edge.properties["remote"], true);
784        assert_eq!(edge.properties["skills"][0], "Rust");
785    }
786
787    // -------------------------------------------------------------------------
788    // Test 9: Invalidate edge manually
789    // -------------------------------------------------------------------------
790    #[test]
791    fn test_invalidate_edge_manually() {
792        let conn = setup_db();
793
794        let edge = add_edge(
795            &conn,
796            7,
797            8,
798            "owns",
799            &json!({}),
800            "2024-01-01T00:00:00Z",
801            1.0,
802            "legal",
803            None,
804        )
805        .expect("add");
806
807        assert!(edge.valid_to.is_none());
808
809        invalidate_edge(&conn, edge.id, "2024-12-31T23:59:59Z").expect("invalidate");
810
811        let updated = get_edge_by_id(&conn, edge.id)
812            .expect("query")
813            .expect("still exists");
814
815        assert_eq!(updated.valid_to.as_deref(), Some("2024-12-31T23:59:59Z"));
816    }
817
818    // -------------------------------------------------------------------------
819    // Test 10: Invalidating a non-existent edge returns NotFound
820    // -------------------------------------------------------------------------
821    #[test]
822    fn test_invalidate_nonexistent_edge_returns_not_found() {
823        let conn = setup_db();
824
825        let result = invalidate_edge(&conn, 99999, "2025-01-01T00:00:00Z");
826        assert!(
827            matches!(result, Err(EngramError::NotFound(99999))),
828            "expected NotFound(99999), got {:?}",
829            result
830        );
831    }
832
833    // -------------------------------------------------------------------------
834    // Test 11: Diff detects edge supersession as "changed"
835    // -------------------------------------------------------------------------
836    #[test]
837    fn test_diff_detects_changed_edge() {
838        let conn = setup_db();
839
840        // First version of the edge.
841        add_edge(
842            &conn,
843            1,
844            2,
845            "role",
846            &json!({"level": "junior"}),
847            "2022-01-01T00:00:00Z",
848            1.0,
849            "",
850            None,
851        )
852        .unwrap();
853
854        // Supersede it (auto-invalidation closes the first).
855        add_edge(
856            &conn,
857            1,
858            2,
859            "role",
860            &json!({"level": "senior"}),
861            "2023-06-01T00:00:00Z",
862            1.0,
863            "",
864            None,
865        )
866        .unwrap();
867
868        let d =
869            diff(&conn, "2022-07-01T00:00:00Z", "2024-01-01T00:00:00Z", None).expect("diff");
870
871        // The triple is present at both timestamps, but via a different edge id.
872        assert_eq!(d.changed.len(), 1);
873        let (old, new) = &d.changed[0];
874        assert_eq!(old.properties["level"], "junior");
875        assert_eq!(new.properties["level"], "senior");
876    }
877
878    // -------------------------------------------------------------------------
879    // Test 12: Add edge with explicit scope, verify scope is stored
880    // -------------------------------------------------------------------------
881    #[test]
882    fn test_add_edge_with_scope() {
883        let conn = setup_db();
884
885        // Edge in the default (global) scope.
886        let global_edge = add_edge(
887            &conn,
888            1,
889            2,
890            "knows",
891            &json!({}),
892            "2024-01-01T00:00:00Z",
893            1.0,
894            "",
895            None,
896        )
897        .expect("global edge");
898        assert_eq!(global_edge.scope_path, "global");
899
900        // Edge in a tenant-specific scope.
901        let tenant_edge = add_edge(
902            &conn,
903            3,
904            4,
905            "manages",
906            &json!({}),
907            "2024-01-01T00:00:00Z",
908            1.0,
909            "",
910            Some("global/org:acme"),
911        )
912        .expect("tenant edge");
913        assert_eq!(tenant_edge.scope_path, "global/org:acme");
914
915        // Edge in a deeper scope.
916        let user_edge = add_edge(
917            &conn,
918            5,
919            6,
920            "reports_to",
921            &json!({}),
922            "2024-01-01T00:00:00Z",
923            1.0,
924            "",
925            Some("global/org:acme/user:alice"),
926        )
927        .expect("user edge");
928        assert_eq!(user_edge.scope_path, "global/org:acme/user:alice");
929
930        // Auto-invalidation is scope-aware: adding another edge for the same
931        // triple in a DIFFERENT scope must NOT close the first-scope edge.
932        let acme_edge_1 = add_edge(
933            &conn,
934            10,
935            20,
936            "partner",
937            &json!({}),
938            "2024-01-01T00:00:00Z",
939            1.0,
940            "",
941            Some("global/org:acme"),
942        )
943        .expect("acme edge 1");
944
945        let _acme_edge_2 = add_edge(
946            &conn,
947            10,
948            20,
949            "partner",
950            &json!({}),
951            "2024-01-01T00:00:00Z",
952            1.0,
953            "",
954            Some("global/org:beta"), // different scope — must not close acme_edge_1
955        )
956        .expect("beta edge");
957
958        let refetched = get_edge_by_id(&conn, acme_edge_1.id)
959            .expect("query")
960            .expect("still exists");
961        assert!(
962            refetched.valid_to.is_none(),
963            "edge in org:acme must not be closed by edge in org:beta"
964        );
965    }
966
967    // -------------------------------------------------------------------------
968    // Test 13: snapshot_at with scope_path filter
969    // -------------------------------------------------------------------------
970    #[test]
971    fn test_snapshot_at_with_scope_filter() {
972        let conn = setup_db();
973
974        // Add one edge in "global" scope and one in "global/org:acme".
975        add_edge(
976            &conn,
977            1,
978            2,
979            "rel",
980            &json!({}),
981            "2024-01-01T00:00:00Z",
982            1.0,
983            "",
984            None, // defaults to "global"
985        )
986        .unwrap();
987
988        add_edge(
989            &conn,
990            3,
991            4,
992            "rel",
993            &json!({}),
994            "2024-01-01T00:00:00Z",
995            1.0,
996            "",
997            Some("global/org:acme"),
998        )
999        .unwrap();
1000
1001        // No scope filter → both edges visible.
1002        let all = snapshot_at(&conn, "2025-01-01T00:00:00Z", None).unwrap();
1003        assert_eq!(all.len(), 2);
1004
1005        // Filter to "global" includes all descendants (hierarchical prefix matching).
1006        // "global" matches exactly, and "global/org:acme" matches via LIKE 'global/%'.
1007        let global_tree = snapshot_at(&conn, "2025-01-01T00:00:00Z", Some("global")).unwrap();
1008        assert_eq!(global_tree.len(), 2, "global scope tree should include its child org:acme");
1009
1010        // Filter to "global/org:acme" → only the acme edge (no further children here).
1011        let acme_only =
1012            snapshot_at(&conn, "2025-01-01T00:00:00Z", Some("global/org:acme")).unwrap();
1013        assert_eq!(acme_only.len(), 1);
1014        assert_eq!(acme_only[0].from_id, 3);
1015
1016        // Demonstrate that "global" exact match can be queried by adding a non-child scope.
1017        add_edge(
1018            &conn,
1019            7,
1020            8,
1021            "rel",
1022            &json!({}),
1023            "2024-01-01T00:00:00Z",
1024            1.0,
1025            "",
1026            Some("global/org:beta"),
1027        )
1028        .unwrap();
1029
1030        // "global/org:acme" filter should still only return the one acme edge.
1031        let acme_only2 =
1032            snapshot_at(&conn, "2025-01-01T00:00:00Z", Some("global/org:acme")).unwrap();
1033        assert_eq!(acme_only2.len(), 1);
1034        assert_eq!(acme_only2[0].from_id, 3);
1035    }
1036
1037    // -------------------------------------------------------------------------
1038    // Test 14: scope prefix matching — hierarchy traversal
1039    // -------------------------------------------------------------------------
1040    #[test]
1041    fn test_scope_prefix_matching() {
1042        let conn = setup_db();
1043
1044        // Three edges at different scope depths.
1045        add_edge(
1046            &conn,
1047            1,
1048            2,
1049            "a",
1050            &json!({}),
1051            "2024-01-01T00:00:00Z",
1052            1.0,
1053            "",
1054            Some("global/mbras"),
1055        )
1056        .unwrap();
1057
1058        add_edge(
1059            &conn,
1060            3,
1061            4,
1062            "b",
1063            &json!({}),
1064            "2024-01-01T00:00:00Z",
1065            1.0,
1066            "",
1067            Some("global/mbras/broker_alice"),
1068        )
1069        .unwrap();
1070
1071        add_edge(
1072            &conn,
1073            5,
1074            6,
1075            "c",
1076            &json!({}),
1077            "2024-01-01T00:00:00Z",
1078            1.0,
1079            "",
1080            Some("global/other"),
1081        )
1082        .unwrap();
1083
1084        // Filtering on "global/mbras" should return:
1085        //   - the exact "global/mbras" edge
1086        //   - "global/mbras/broker_alice" (child)
1087        // but NOT "global/other".
1088        let mbras_snap =
1089            snapshot_at(&conn, "2025-01-01T00:00:00Z", Some("global/mbras")).unwrap();
1090        assert_eq!(
1091            mbras_snap.len(),
1092            2,
1093            "expected 2 edges under global/mbras, got: {:?}",
1094            mbras_snap
1095                .iter()
1096                .map(|e| &e.scope_path)
1097                .collect::<Vec<_>>()
1098        );
1099
1100        let scope_paths: Vec<&str> = mbras_snap.iter().map(|e| e.scope_path.as_str()).collect();
1101        assert!(scope_paths.contains(&"global/mbras"));
1102        assert!(scope_paths.contains(&"global/mbras/broker_alice"));
1103    }
1104}