#![cfg(feature = "duckdb-graph")]
use duckdb::{params, Connection as DuckdbConnection};
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::error::{EngramError, Result};
impl From<duckdb::Error> for EngramError {
fn from(e: duckdb::Error) -> Self {
EngramError::Storage(format!("DuckDB error: {}", e))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathStep {
pub path: String,
pub depth: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DuckDbTemporalEdge {
pub id: i64,
pub from_id: i64,
pub to_id: i64,
pub relation: String,
pub valid_from: String,
pub valid_to: Option<String>,
pub confidence: f32,
pub scope_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DuckDbGraphDiff {
pub added: Vec<DuckDbTemporalEdge>,
pub removed: Vec<DuckDbTemporalEdge>,
pub changed: Vec<(DuckDbTemporalEdge, DuckDbTemporalEdge)>,
}
pub struct TemporalGraph {
conn: DuckdbConnection,
has_pgq: bool,
sqlite_path: String,
}
impl TemporalGraph {
pub fn new(sqlite_path: &str) -> Result<Self> {
let conn = DuckdbConnection::open_in_memory()?;
conn.execute_batch("INSTALL sqlite; LOAD sqlite;")?;
conn.execute_batch(&format!(
"ATTACH '{path}' AS engram (TYPE SQLITE, READ_ONLY);",
path = sqlite_path
))?;
let has_pgq = Self::try_load_pgq(&conn, sqlite_path);
Ok(Self {
conn,
has_pgq,
sqlite_path: sqlite_path.to_string(),
})
}
fn try_load_pgq(conn: &DuckdbConnection, _sqlite_path: &str) -> bool {
if let Err(e) = conn.execute_batch("INSTALL duckpgq FROM community;") {
warn!("duckpgq install failed (graph pattern matching unavailable): {}", e);
return false;
}
if let Err(e) = conn.execute_batch("LOAD duckpgq;") {
warn!("duckpgq load failed (graph pattern matching unavailable): {}", e);
return false;
}
let pgq_ddl = r#"
CREATE OR REPLACE PROPERTY GRAPH knowledge_graph
VERTEX TABLES (engram.graph_entities)
EDGE TABLES (
engram.temporal_edges
SOURCE KEY (from_id) REFERENCES graph_entities(id)
DESTINATION KEY (to_id) REFERENCES graph_entities(id)
LABEL relation
);
"#;
if let Err(e) = conn.execute_batch(pgq_ddl) {
warn!("duckpgq property graph creation failed: {}", e);
return false;
}
debug!("duckpgq property graph 'knowledge_graph' created successfully");
true
}
pub fn has_pgq(&self) -> bool {
self.has_pgq
}
pub fn refresh(&self) -> Result<()> {
self.conn
.execute_batch("DETACH engram;")?;
self.conn.execute_batch(&format!(
"ATTACH '{path}' AS engram (TYPE SQLITE, READ_ONLY);",
path = self.sqlite_path
))?;
debug!("TemporalGraph: re-attached SQLite at '{}'", self.sqlite_path);
Ok(())
}
pub fn snapshot_at(
&self,
scope: &str,
timestamp: &str,
) -> Result<Vec<DuckDbTemporalEdge>> {
let scope_pattern = format!("{}%", scope);
let sql = "
SELECT id, from_id, to_id, relation, valid_from, valid_to, confidence, scope_path
FROM engram.temporal_edges
WHERE scope_path LIKE ?
AND valid_from <= ?
AND (valid_to IS NULL OR valid_to >= ?)
ORDER BY id ASC
";
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query_map(
params![scope_pattern, timestamp, timestamp],
|row| {
Ok(DuckDbTemporalEdge {
id: row.get(0)?,
from_id: row.get(1)?,
to_id: row.get(2)?,
relation: row.get(3)?,
valid_from: row.get(4)?,
valid_to: row.get(5)?,
confidence: row.get::<_, f64>(6)? as f32,
scope_path: row.get(7)?,
})
},
)?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(EngramError::from)
}
pub fn graph_diff(
&self,
scope: &str,
t1: &str,
t2: &str,
) -> Result<DuckDbGraphDiff> {
let snap1 = self.snapshot_at(scope, t1)?;
let snap2 = self.snapshot_at(scope, t2)?;
use std::collections::HashMap;
let key = |e: &DuckDbTemporalEdge| (e.from_id, e.to_id, e.relation.clone());
let map1: HashMap<_, _> = snap1.iter().map(|e| (key(e), e.clone())).collect();
let map2: HashMap<_, _> = snap2.iter().map(|e| (key(e), e.clone())).collect();
let mut added = Vec::new();
let mut removed = Vec::new();
let mut changed = Vec::new();
for (k, e2) in &map2 {
match map1.get(k) {
None => added.push(e2.clone()),
Some(e1) => {
let conf_changed = (e1.confidence - e2.confidence).abs() > f32::EPSILON;
let valid_to_changed = e1.valid_to != e2.valid_to;
if conf_changed || valid_to_changed {
changed.push((e1.clone(), e2.clone()));
}
}
}
}
for (k, e1) in &map1 {
if !map2.contains_key(k) {
removed.push(e1.clone());
}
}
Ok(DuckDbGraphDiff { added, removed, changed })
}
pub fn relationship_timeline(
&self,
scope: &str,
from_id: i64,
to_id: i64,
) -> Result<Vec<DuckDbTemporalEdge>> {
let scope_pattern = format!("{}%", scope);
let sql = "
SELECT id, from_id, to_id, relation, valid_from, valid_to, confidence, scope_path
FROM engram.temporal_edges
WHERE scope_path LIKE ?
AND from_id = ?
AND to_id = ?
ORDER BY valid_from DESC
";
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query_map(
params![scope_pattern, from_id, to_id],
|row| {
Ok(DuckDbTemporalEdge {
id: row.get(0)?,
from_id: row.get(1)?,
to_id: row.get(2)?,
relation: row.get(3)?,
valid_from: row.get(4)?,
valid_to: row.get(5)?,
confidence: row.get::<_, f64>(6)? as f32,
scope_path: row.get(7)?,
})
},
)?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(EngramError::from)
}
pub fn find_connection(
&self,
scope: &str,
start_id: i64,
end_id: i64,
max_hops: u8,
) -> Result<Vec<PathStep>> {
let scope_pattern = format!("{}%", scope);
let sql = "
WITH RECURSIVE paths AS (
SELECT
from_id,
to_id,
relation,
1 AS depth,
CAST(from_id AS VARCHAR) || ' -[' || relation || ']-> '
|| CAST(to_id AS VARCHAR) AS path
FROM engram.temporal_edges
WHERE from_id = $1
AND scope_path LIKE $2
AND valid_to IS NULL
UNION ALL
SELECT
p.from_id,
e.to_id,
e.relation,
p.depth + 1,
p.path || ' -[' || e.relation || ']-> ' || CAST(e.to_id AS VARCHAR)
FROM paths p
JOIN engram.temporal_edges e ON p.to_id = e.from_id
WHERE p.depth < $3
AND e.scope_path LIKE $4
AND e.valid_to IS NULL
AND POSITION(CAST(e.to_id AS VARCHAR) IN p.path) = 0
)
SELECT path, depth
FROM paths
WHERE to_id = $5
ORDER BY depth
LIMIT 10
";
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query_map(
params![start_id, scope_pattern, max_hops as i32, scope_pattern, end_id],
|row| {
Ok(PathStep {
path: row.get(0)?,
depth: row.get(1)?,
})
},
)?;
let mut steps = Vec::new();
for row in rows {
steps.push(
row.map_err(|e| EngramError::Storage(format!("DuckDB row error: {}", e)))?,
);
}
debug!(
"find_connection({} -> {}, max_hops={}): {} paths found",
start_id,
end_id,
max_hops,
steps.len()
);
Ok(steps)
}
pub fn find_neighbors(
&self,
scope: &str,
node_id: i64,
max_depth: u8,
) -> Result<Vec<PathStep>> {
let scope_pattern = format!("{}%", scope);
let sql = "
WITH RECURSIVE paths AS (
SELECT
from_id,
to_id,
relation,
1 AS depth,
CAST(from_id AS VARCHAR) || ' -[' || relation || ']-> '
|| CAST(to_id AS VARCHAR) AS path
FROM engram.temporal_edges
WHERE from_id = $1
AND scope_path LIKE $2
AND valid_to IS NULL
UNION ALL
SELECT
p.from_id,
e.to_id,
e.relation,
p.depth + 1,
p.path || ' -[' || e.relation || ']-> ' || CAST(e.to_id AS VARCHAR)
FROM paths p
JOIN engram.temporal_edges e ON p.to_id = e.from_id
WHERE p.depth < $3
AND e.scope_path LIKE $4
AND e.valid_to IS NULL
AND POSITION(CAST(e.to_id AS VARCHAR) IN p.path) = 0
)
SELECT path, depth
FROM paths
ORDER BY depth
";
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query_map(
params![node_id, scope_pattern, max_depth as i32, scope_pattern],
|row| {
Ok(PathStep {
path: row.get(0)?,
depth: row.get(1)?,
})
},
)?;
let mut steps = Vec::new();
for row in rows {
steps.push(
row.map_err(|e| EngramError::Storage(format!("DuckDB row error: {}", e)))?,
);
}
debug!(
"find_neighbors(node={}, max_depth={}): {} reachable nodes",
node_id,
max_depth,
steps.len()
);
Ok(steps)
}
}
#[cfg(test)]
#[cfg(feature = "duckdb-graph")]
mod tests {
use super::*;
fn setup_sqlite(path: &str) -> rusqlite::Connection {
let conn = rusqlite::Connection::open(path).expect("open sqlite");
conn.execute_batch(r#"
CREATE TABLE IF NOT EXISTS graph_entities (
id TEXT PRIMARY KEY,
scope_path TEXT NOT NULL DEFAULT 'global',
name TEXT NOT NULL,
entity_type TEXT NOT NULL,
metadata TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS temporal_edges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_id INTEGER NOT NULL,
to_id INTEGER NOT NULL,
relation TEXT NOT NULL,
properties TEXT,
valid_from TEXT NOT NULL,
valid_to TEXT,
confidence REAL NOT NULL DEFAULT 1.0,
source TEXT,
scope_path TEXT NOT NULL DEFAULT 'global',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"#).expect("create tables");
conn
}
fn insert_edge(
conn: &rusqlite::Connection,
from_id: i64,
to_id: i64,
relation: &str,
valid_from: &str,
valid_to: Option<&str>,
confidence: f64,
scope_path: &str,
) {
conn.execute(
"INSERT INTO temporal_edges
(from_id, to_id, relation, valid_from, valid_to, confidence, scope_path)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
rusqlite::params![from_id, to_id, relation, valid_from, valid_to, confidence, scope_path],
)
.expect("insert edge");
}
#[test]
fn test_temporal_graph_new() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_new.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_sqlite(path_str);
let graph = TemporalGraph::new(path_str);
assert!(
graph.is_ok(),
"TemporalGraph::new should succeed: {:?}",
graph.err()
);
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_temporal_graph_refresh() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_refresh.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_sqlite(path_str);
let graph = TemporalGraph::new(path_str).expect("new");
let r1 = graph.refresh();
assert!(r1.is_ok(), "first refresh failed: {:?}", r1.err());
let r2 = graph.refresh();
assert!(r2.is_ok(), "second refresh failed: {:?}", r2.err());
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_has_pgq_false_without_extension() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_pgq.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_sqlite(path_str);
let graph = TemporalGraph::new(path_str).expect("new");
let _ = graph.has_pgq();
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_snapshot_at() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_snapshot_at.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
let sqlite = setup_sqlite(path_str);
insert_edge(&sqlite, 1, 2, "knows", "2024-01-01", Some("2024-06-30"), 0.9, "global");
insert_edge(&sqlite, 1, 3, "follows", "2024-01-01", None, 0.8, "global");
insert_edge(&sqlite, 2, 3, "linked", "2025-01-01", None, 0.7, "global");
drop(sqlite);
let graph = TemporalGraph::new(path_str).expect("new");
let snap = graph.snapshot_at("global", "2024-03-01").expect("snapshot_at");
assert_eq!(snap.len(), 2, "expected 2 edges active at 2024-03-01");
let relations: Vec<&str> = snap.iter().map(|e| e.relation.as_str()).collect();
assert!(relations.contains(&"knows"), "edge A should be included");
assert!(relations.contains(&"follows"), "edge B should be included");
let snap2 = graph.snapshot_at("global", "2024-08-01").expect("snapshot_at late");
assert_eq!(snap2.len(), 1, "expected 1 edge active at 2024-08-01");
assert_eq!(snap2[0].relation, "follows");
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_graph_diff() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_graph_diff.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
let sqlite = setup_sqlite(path_str);
insert_edge(&sqlite, 1, 2, "knows", "2024-01-01", None, 1.0, "global");
insert_edge(&sqlite, 1, 3, "follows", "2024-01-01", Some("2024-03-31"), 1.0, "global");
insert_edge(&sqlite, 2, 3, "linked", "2024-06-01", None, 0.5, "global");
drop(sqlite);
let graph = TemporalGraph::new(path_str).expect("new");
let diff = graph
.graph_diff("global", "2024-02-01", "2024-07-01")
.expect("graph_diff");
assert_eq!(diff.added.len(), 1, "one edge added between t1 and t2");
assert_eq!(diff.added[0].relation, "linked");
assert_eq!(diff.removed.len(), 1, "one edge removed between t1 and t2");
assert_eq!(diff.removed[0].relation, "follows");
assert_eq!(diff.changed.len(), 0, "no edges changed");
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_relationship_timeline() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_timeline.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
let sqlite = setup_sqlite(path_str);
insert_edge(&sqlite, 1, 2, "knows", "2022-01-01", Some("2022-12-31"), 0.5, "global");
insert_edge(&sqlite, 1, 2, "knows", "2023-01-01", Some("2023-12-31"), 0.75, "global");
insert_edge(&sqlite, 1, 2, "knows", "2024-01-01", None, 0.9, "global");
insert_edge(&sqlite, 3, 4, "linked", "2024-01-01", None, 1.0, "global");
drop(sqlite);
let graph = TemporalGraph::new(path_str).expect("new");
let timeline = graph
.relationship_timeline("global", 1, 2)
.expect("timeline");
assert_eq!(timeline.len(), 3, "three versions of the 1→2 relationship");
assert_eq!(timeline[0].valid_from, "2024-01-01", "most recent first");
assert_eq!(timeline[1].valid_from, "2023-01-01");
assert_eq!(timeline[2].valid_from, "2022-01-01", "oldest last");
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_scope_filtering_in_snapshot() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_scope_filter.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
let sqlite = setup_sqlite(path_str);
insert_edge(&sqlite, 1, 2, "depends", "2024-01-01", None, 1.0, "project/alpha");
insert_edge(&sqlite, 2, 3, "depends", "2024-01-01", None, 1.0, "project/alpha/sub");
insert_edge(&sqlite, 3, 4, "depends", "2024-01-01", None, 1.0, "project/beta");
insert_edge(&sqlite, 4, 5, "depends", "2024-01-01", None, 1.0, "project");
drop(sqlite);
let graph = TemporalGraph::new(path_str).expect("new");
let snap = graph
.snapshot_at("project/alpha", "2024-06-01")
.expect("snapshot_at scoped");
assert_eq!(snap.len(), 2, "only edges under project/alpha scope");
for edge in &snap {
assert!(
edge.scope_path.starts_with("project/alpha"),
"unexpected scope: {}",
edge.scope_path
);
}
let _ = std::fs::remove_file(path_str);
}
fn setup_pathfinding_db(path: &str) {
let conn = setup_sqlite(path);
let scope = "global";
let vf = "2024-01-01";
insert_edge(&conn, 1, 2, "works_at", vf, None, 1.0, scope);
insert_edge(&conn, 2, 3, "located_in", vf, None, 1.0, scope);
insert_edge(&conn, 1, 4, "knows", vf, None, 1.0, scope);
insert_edge(&conn, 4, 5, "works_at", vf, None, 1.0, scope);
}
#[test]
fn test_find_connection_direct() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_pathfind_direct.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_pathfinding_db(path_str);
let graph = TemporalGraph::new(path_str).expect("new");
let paths = graph
.find_connection("global", 1, 2, 3)
.expect("find_connection direct");
assert!(!paths.is_empty(), "should find a direct path 1->2");
assert_eq!(paths[0].depth, 1, "direct connection has depth 1");
assert!(
paths[0].path.contains("-[works_at]->"),
"path should traverse works_at edge"
);
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_find_connection_two_hops() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_pathfind_twohop.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_pathfinding_db(path_str);
let graph = TemporalGraph::new(path_str).expect("new");
let paths = graph
.find_connection("global", 1, 3, 5)
.expect("find_connection two hops");
assert!(!paths.is_empty(), "should find a 2-hop path 1->2->3");
let best = &paths[0];
assert_eq!(best.depth, 2, "two-hop path has depth 2");
assert!(
best.path.contains("-[works_at]->") && best.path.contains("-[located_in]->"),
"path should contain both edge labels"
);
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_find_connection_no_path() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_pathfind_nopath.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_pathfinding_db(path_str);
let graph = TemporalGraph::new(path_str).expect("new");
let paths = graph
.find_connection("global", 3, 1, 5)
.expect("find_connection no path");
assert!(paths.is_empty(), "no path from sink node 3 back to 1");
let _ = std::fs::remove_file(path_str);
}
#[test]
fn test_find_neighbors() {
let dir = std::env::temp_dir();
let path = dir.join("engram_test_neighbors.sqlite");
let path_str = path.to_str().unwrap();
let _ = std::fs::remove_file(path_str);
setup_pathfinding_db(path_str);
let graph = TemporalGraph::new(path_str).expect("new");
let neighbors = graph
.find_neighbors("global", 1, 2)
.expect("find_neighbors");
assert_eq!(neighbors.len(), 4, "4 nodes reachable within 2 hops from Alice");
let depth1: Vec<_> = neighbors.iter().filter(|n| n.depth == 1).collect();
let depth2: Vec<_> = neighbors.iter().filter(|n| n.depth == 2).collect();
assert_eq!(depth1.len(), 2, "2 direct neighbours");
assert_eq!(depth2.len(), 2, "2 two-hop neighbours");
let _ = std::fs::remove_file(path_str);
}
}