1use rusqlite::{params, Connection};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12use crate::error::{EngramError, Result};
13
14pub const CREATE_TEMPORAL_EDGES_TABLE: &str = r#"
22CREATE TABLE IF NOT EXISTS temporal_edges (
23 id INTEGER PRIMARY KEY AUTOINCREMENT,
24 from_id INTEGER NOT NULL,
25 to_id INTEGER NOT NULL,
26 relation TEXT NOT NULL,
27 properties TEXT NOT NULL DEFAULT '{}',
28 valid_from TEXT NOT NULL,
29 valid_to TEXT,
30 confidence REAL NOT NULL DEFAULT 1.0,
31 source TEXT NOT NULL DEFAULT '',
32 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
33);
34CREATE INDEX IF NOT EXISTS idx_temporal_edges_from ON temporal_edges(from_id);
35CREATE INDEX IF NOT EXISTS idx_temporal_edges_to ON temporal_edges(to_id);
36CREATE INDEX IF NOT EXISTS idx_temporal_edges_valid ON temporal_edges(valid_from, valid_to);
37"#;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TemporalEdge {
46 pub id: i64,
48 pub from_id: i64,
50 pub to_id: i64,
52 pub relation: String,
54 pub properties: Value,
56 pub valid_from: String,
58 pub valid_to: Option<String>,
60 pub confidence: f32,
62 pub source: String,
64 pub created_at: String,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct GraphDiff {
71 pub added: Vec<TemporalEdge>,
73 pub removed: Vec<TemporalEdge>,
75 pub changed: Vec<(TemporalEdge, TemporalEdge)>,
79}
80
81fn row_to_edge(row: &rusqlite::Row<'_>) -> rusqlite::Result<TemporalEdge> {
87 let props_str: String = row.get(3)?;
88 let properties: Value =
89 serde_json::from_str(&props_str).unwrap_or(Value::Object(Default::default()));
90
91 Ok(TemporalEdge {
92 id: row.get(0)?,
93 from_id: row.get(1)?,
94 to_id: row.get(2)?,
95 relation: row.get(8)?,
96 properties,
97 valid_from: row.get(4)?,
98 valid_to: row.get(5)?,
99 confidence: row.get(6)?,
100 source: row.get(7)?,
101 created_at: row.get(9)?,
102 })
103}
104
105pub fn add_edge(
117 conn: &Connection,
118 from_id: i64,
119 to_id: i64,
120 relation: &str,
121 properties: &Value,
122 valid_from: &str,
123 confidence: f32,
124 source: &str,
125) -> Result<TemporalEdge> {
126 let props_str = serde_json::to_string(properties)?;
127
128 conn.execute(
130 "UPDATE temporal_edges
131 SET valid_to = ?1
132 WHERE from_id = ?2
133 AND to_id = ?3
134 AND relation = ?4
135 AND valid_to IS NULL",
136 params![valid_from, from_id, to_id, relation],
137 )
138 .map_err(EngramError::Database)?;
139
140 conn.execute(
142 "INSERT INTO temporal_edges
143 (from_id, to_id, relation, properties, valid_from, confidence, source)
144 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
145 params![from_id, to_id, relation, props_str, valid_from, confidence, source],
146 )
147 .map_err(EngramError::Database)?;
148
149 let id = conn.last_insert_rowid();
150 get_edge_by_id(conn, id)?
151 .ok_or_else(|| EngramError::Internal(format!("Edge {} disappeared after insert", id)))
152}
153
154pub fn invalidate_edge(conn: &Connection, edge_id: i64, valid_to: &str) -> Result<()> {
156 let affected = conn
157 .execute(
158 "UPDATE temporal_edges SET valid_to = ?1 WHERE id = ?2",
159 params![valid_to, edge_id],
160 )
161 .map_err(EngramError::Database)?;
162
163 if affected == 0 {
164 return Err(EngramError::NotFound(edge_id));
165 }
166 Ok(())
167}
168
169pub fn snapshot_at(conn: &Connection, timestamp: &str) -> Result<Vec<TemporalEdge>> {
174 let mut stmt = conn
175 .prepare(
176 "SELECT id, from_id, to_id, properties, valid_from, valid_to,
177 confidence, source, relation, created_at
178 FROM temporal_edges
179 WHERE valid_from <= ?1
180 AND (valid_to IS NULL OR valid_to > ?1)
181 ORDER BY from_id, to_id, relation",
182 )
183 .map_err(EngramError::Database)?;
184
185 let edges = stmt
186 .query_map(params![timestamp], row_to_edge)
187 .map_err(EngramError::Database)?
188 .collect::<rusqlite::Result<Vec<_>>>()
189 .map_err(EngramError::Database)?;
190
191 Ok(edges)
192}
193
194pub fn relationship_timeline(
197 conn: &Connection,
198 from_id: i64,
199 to_id: i64,
200) -> Result<Vec<TemporalEdge>> {
201 let mut stmt = conn
202 .prepare(
203 "SELECT id, from_id, to_id, properties, valid_from, valid_to,
204 confidence, source, relation, created_at
205 FROM temporal_edges
206 WHERE from_id = ?1 AND to_id = ?2
207 ORDER BY valid_from ASC, created_at ASC",
208 )
209 .map_err(EngramError::Database)?;
210
211 let edges = stmt
212 .query_map(params![from_id, to_id], row_to_edge)
213 .map_err(EngramError::Database)?
214 .collect::<rusqlite::Result<Vec<_>>>()
215 .map_err(EngramError::Database)?;
216
217 Ok(edges)
218}
219
220pub fn detect_contradictions(conn: &Connection) -> Result<Vec<(TemporalEdge, TemporalEdge)>> {
226 let mut stmt = conn
229 .prepare(
230 "SELECT a.id, a.from_id, a.to_id, a.properties, a.valid_from, a.valid_to,
231 a.confidence, a.source, a.relation, a.created_at,
232 b.id, b.from_id, b.to_id, b.properties, b.valid_from, b.valid_to,
233 b.confidence, b.source, b.relation, b.created_at
234 FROM temporal_edges a
235 JOIN temporal_edges b
236 ON a.from_id = b.from_id
237 AND a.to_id = b.to_id
238 AND a.relation = b.relation
239 AND a.id < b.id
240 WHERE a.valid_from < COALESCE(b.valid_to, '9999-12-31T23:59:59Z')
241 AND b.valid_from < COALESCE(a.valid_to, '9999-12-31T23:59:59Z')",
242 )
243 .map_err(EngramError::Database)?;
244
245 let pairs = stmt
246 .query_map([], |row| {
247 let props_a: String = row.get(3)?;
249 let props_b: String = row.get(13)?;
250
251 let edge_a = TemporalEdge {
252 id: row.get(0)?,
253 from_id: row.get(1)?,
254 to_id: row.get(2)?,
255 properties: serde_json::from_str(&props_a)
256 .unwrap_or(Value::Object(Default::default())),
257 valid_from: row.get(4)?,
258 valid_to: row.get(5)?,
259 confidence: row.get(6)?,
260 source: row.get(7)?,
261 relation: row.get(8)?,
262 created_at: row.get(9)?,
263 };
264
265 let edge_b = TemporalEdge {
266 id: row.get(10)?,
267 from_id: row.get(11)?,
268 to_id: row.get(12)?,
269 properties: serde_json::from_str(&props_b)
270 .unwrap_or(Value::Object(Default::default())),
271 valid_from: row.get(14)?,
272 valid_to: row.get(15)?,
273 confidence: row.get(16)?,
274 source: row.get(17)?,
275 relation: row.get(18)?,
276 created_at: row.get(19)?,
277 };
278
279 Ok((edge_a, edge_b))
280 })
281 .map_err(EngramError::Database)?
282 .collect::<rusqlite::Result<Vec<_>>>()
283 .map_err(EngramError::Database)?;
284
285 Ok(pairs)
286}
287
288pub fn diff(conn: &Connection, t1: &str, t2: &str) -> Result<GraphDiff> {
297 let snap1 = snapshot_at(conn, t1)?;
298 let snap2 = snapshot_at(conn, t2)?;
299
300 type Key = (i64, i64, String);
302
303 let map1: std::collections::HashMap<Key, TemporalEdge> = snap1
304 .into_iter()
305 .map(|e| ((e.from_id, e.to_id, e.relation.clone()), e))
306 .collect();
307
308 let map2: std::collections::HashMap<Key, TemporalEdge> = snap2
309 .into_iter()
310 .map(|e| ((e.from_id, e.to_id, e.relation.clone()), e))
311 .collect();
312
313 let mut added = Vec::new();
314 let mut removed = Vec::new();
315 let mut changed = Vec::new();
316
317 for (key, edge2) in &map2 {
318 match map1.get(key) {
319 None => added.push(edge2.clone()),
320 Some(edge1) if edge1.id != edge2.id => {
321 changed.push((edge1.clone(), edge2.clone()));
322 }
323 _ => {} }
325 }
326
327 for (key, edge1) in &map1 {
328 if !map2.contains_key(key) {
329 removed.push(edge1.clone());
330 }
331 }
332
333 Ok(GraphDiff {
334 added,
335 removed,
336 changed,
337 })
338}
339
340fn get_edge_by_id(conn: &Connection, id: i64) -> Result<Option<TemporalEdge>> {
345 let mut stmt = conn
346 .prepare(
347 "SELECT id, from_id, to_id, properties, valid_from, valid_to,
348 confidence, source, relation, created_at
349 FROM temporal_edges
350 WHERE id = ?1",
351 )
352 .map_err(EngramError::Database)?;
353
354 let mut rows = stmt
355 .query_map(params![id], row_to_edge)
356 .map_err(EngramError::Database)?;
357
358 match rows.next() {
359 Some(row) => Ok(Some(row.map_err(EngramError::Database)?)),
360 None => Ok(None),
361 }
362}
363
364#[cfg(test)]
369mod tests {
370 use super::*;
371 use rusqlite::Connection;
372 use serde_json::json;
373
374 fn setup_db() -> Connection {
376 let conn = Connection::open_in_memory().expect("open in-memory DB");
377 conn.execute_batch(CREATE_TEMPORAL_EDGES_TABLE)
378 .expect("create table");
379 conn
380 }
381
382 #[test]
386 fn test_add_edge_and_retrieve() {
387 let conn = setup_db();
388
389 let edge = add_edge(
390 &conn,
391 1,
392 2,
393 "works_at",
394 &json!({}),
395 "2024-01-01T00:00:00Z",
396 0.9,
397 "test",
398 )
399 .expect("add_edge");
400
401 assert_eq!(edge.from_id, 1);
402 assert_eq!(edge.to_id, 2);
403 assert_eq!(edge.relation, "works_at");
404 assert!(edge.valid_to.is_none());
405 assert_eq!(edge.confidence, 0.9);
406 assert_eq!(edge.source, "test");
407 }
408
409 #[test]
413 fn test_auto_invalidation_on_new_edge() {
414 let conn = setup_db();
415
416 let first = add_edge(
417 &conn,
418 1,
419 2,
420 "works_at",
421 &json!({"role": "engineer"}),
422 "2023-01-01T00:00:00Z",
423 1.0,
424 "hr",
425 )
426 .expect("first edge");
427
428 assert!(first.valid_to.is_none(), "first edge should be open");
429
430 let _second = add_edge(
432 &conn,
433 1,
434 2,
435 "works_at",
436 &json!({"role": "manager"}),
437 "2024-06-01T00:00:00Z",
438 1.0,
439 "hr",
440 )
441 .expect("second edge");
442
443 let updated = get_edge_by_id(&conn, first.id)
445 .expect("query")
446 .expect("edge still exists");
447
448 assert_eq!(
449 updated.valid_to.as_deref(),
450 Some("2024-06-01T00:00:00Z"),
451 "first edge should have been closed at the second edge's valid_from"
452 );
453 }
454
455 #[test]
459 fn test_snapshot_at() {
460 let conn = setup_db();
461
462 add_edge(
464 &conn,
465 1,
466 2,
467 "rel",
468 &json!({}),
469 "2023-01-01T00:00:00Z",
470 1.0,
471 "",
472 )
473 .unwrap();
474 add_edge(
476 &conn,
477 1,
478 2,
479 "rel",
480 &json!({}),
481 "2024-01-01T00:00:00Z",
482 1.0,
483 "",
484 )
485 .unwrap();
486
487 let snap = snapshot_at(&conn, "2023-07-01T00:00:00Z").expect("snapshot");
489 assert_eq!(snap.len(), 1);
490 assert_eq!(snap[0].valid_from, "2023-01-01T00:00:00Z");
491
492 let snap2 = snapshot_at(&conn, "2024-07-01T00:00:00Z").expect("snapshot");
494 assert_eq!(snap2.len(), 1);
495 assert_eq!(snap2[0].valid_from, "2024-01-01T00:00:00Z");
496 }
497
498 #[test]
502 fn test_relationship_timeline_chronological() {
503 let conn = setup_db();
504
505 add_edge(
506 &conn,
507 10,
508 20,
509 "partner",
510 &json!({}),
511 "2020-01-01T00:00:00Z",
512 1.0,
513 "",
514 )
515 .unwrap();
516 add_edge(
517 &conn,
518 10,
519 20,
520 "partner",
521 &json!({}),
522 "2021-06-01T00:00:00Z",
523 1.0,
524 "",
525 )
526 .unwrap();
527 add_edge(
528 &conn,
529 10,
530 20,
531 "partner",
532 &json!({}),
533 "2022-09-01T00:00:00Z",
534 1.0,
535 "",
536 )
537 .unwrap();
538
539 let timeline = relationship_timeline(&conn, 10, 20).expect("timeline");
540 assert_eq!(timeline.len(), 3);
541
542 assert!(timeline[0].valid_from <= timeline[1].valid_from);
544 assert!(timeline[1].valid_from <= timeline[2].valid_from);
545 }
546
547 #[test]
551 fn test_detect_contradictions() {
552 let conn = setup_db();
553
554 conn.execute(
557 "INSERT INTO temporal_edges
558 (from_id, to_id, relation, properties, valid_from, valid_to, confidence, source)
559 VALUES (1, 2, 'rel', '{}', '2023-01-01T00:00:00Z', NULL, 1.0, '')",
560 [],
561 )
562 .unwrap();
563 conn.execute(
564 "INSERT INTO temporal_edges
565 (from_id, to_id, relation, properties, valid_from, valid_to, confidence, source)
566 VALUES (1, 2, 'rel', '{}', '2023-06-01T00:00:00Z', NULL, 1.0, '')",
567 [],
568 )
569 .unwrap();
570
571 let contradictions = detect_contradictions(&conn).expect("detect");
572 assert_eq!(contradictions.len(), 1);
573
574 let (a, b) = &contradictions[0];
575 assert!(a.id < b.id);
576 }
577
578 #[test]
582 fn test_diff_between_timestamps() {
583 let conn = setup_db();
584
585 add_edge(
587 &conn,
588 1,
589 2,
590 "knows",
591 &json!({}),
592 "2022-01-01T00:00:00Z",
593 1.0,
594 "",
595 )
596 .unwrap();
597
598 add_edge(
600 &conn,
601 3,
602 4,
603 "likes",
604 &json!({}),
605 "2024-01-01T00:00:00Z",
606 1.0,
607 "",
608 )
609 .unwrap();
610
611 let d = diff(&conn, "2023-01-01T00:00:00Z", "2025-01-01T00:00:00Z").expect("diff");
612
613 assert_eq!(d.added.len(), 1);
615 assert_eq!(d.added[0].relation, "likes");
616 assert_eq!(d.removed.len(), 0);
617 assert_eq!(d.changed.len(), 0);
619 }
620
621 #[test]
625 fn test_empty_graph_operations() {
626 let conn = setup_db();
627
628 let snap = snapshot_at(&conn, "2024-01-01T00:00:00Z").expect("snapshot");
629 assert!(snap.is_empty());
630
631 let timeline = relationship_timeline(&conn, 99, 100).expect("timeline");
632 assert!(timeline.is_empty());
633
634 let contradictions = detect_contradictions(&conn).expect("detect");
635 assert!(contradictions.is_empty());
636
637 let d = diff(&conn, "2024-01-01T00:00:00Z", "2025-01-01T00:00:00Z").expect("diff");
638 assert!(d.added.is_empty());
639 assert!(d.removed.is_empty());
640 assert!(d.changed.is_empty());
641 }
642
643 #[test]
647 fn test_edge_with_json_properties() {
648 let conn = setup_db();
649
650 let props = json!({
651 "title": "Senior Engineer",
652 "department": "R&D",
653 "salary": 120_000,
654 "remote": true,
655 "skills": ["Rust", "Python"]
656 });
657
658 let edge = add_edge(
659 &conn,
660 5,
661 6,
662 "employed_by",
663 &props,
664 "2024-03-01T00:00:00Z",
665 0.95,
666 "payroll",
667 )
668 .expect("add");
669
670 assert_eq!(edge.properties["title"], "Senior Engineer");
671 assert_eq!(edge.properties["salary"], 120_000);
672 assert_eq!(edge.properties["remote"], true);
673 assert_eq!(edge.properties["skills"][0], "Rust");
674 }
675
676 #[test]
680 fn test_invalidate_edge_manually() {
681 let conn = setup_db();
682
683 let edge = add_edge(
684 &conn,
685 7,
686 8,
687 "owns",
688 &json!({}),
689 "2024-01-01T00:00:00Z",
690 1.0,
691 "legal",
692 )
693 .expect("add");
694
695 assert!(edge.valid_to.is_none());
696
697 invalidate_edge(&conn, edge.id, "2024-12-31T23:59:59Z").expect("invalidate");
698
699 let updated = get_edge_by_id(&conn, edge.id)
700 .expect("query")
701 .expect("still exists");
702
703 assert_eq!(updated.valid_to.as_deref(), Some("2024-12-31T23:59:59Z"));
704 }
705
706 #[test]
710 fn test_invalidate_nonexistent_edge_returns_not_found() {
711 let conn = setup_db();
712
713 let result = invalidate_edge(&conn, 99999, "2025-01-01T00:00:00Z");
714 assert!(
715 matches!(result, Err(EngramError::NotFound(99999))),
716 "expected NotFound(99999), got {:?}",
717 result
718 );
719 }
720
721 #[test]
725 fn test_diff_detects_changed_edge() {
726 let conn = setup_db();
727
728 add_edge(
730 &conn,
731 1,
732 2,
733 "role",
734 &json!({"level": "junior"}),
735 "2022-01-01T00:00:00Z",
736 1.0,
737 "",
738 )
739 .unwrap();
740
741 add_edge(
743 &conn,
744 1,
745 2,
746 "role",
747 &json!({"level": "senior"}),
748 "2023-06-01T00:00:00Z",
749 1.0,
750 "",
751 )
752 .unwrap();
753
754 let d = diff(&conn, "2022-07-01T00:00:00Z", "2024-01-01T00:00:00Z").expect("diff");
755
756 assert_eq!(d.changed.len(), 1);
758 let (old, new) = &d.changed[0];
759 assert_eq!(old.properties["level"], "junior");
760 assert_eq!(new.properties["level"], "senior");
761 }
762}