1use std::path::Path;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use chrono::{DateTime, Utc};
14use duckdb::Connection;
15use tokio::sync::Mutex;
16use uuid::Uuid;
17
18use crate::model::TemporalEdge;
19use crate::store::{Error, GraphStore, Result};
20
21pub const CREATE_GRAPH_NODES_TABLE: &str = "
24CREATE TABLE IF NOT EXISTS graph_nodes (
25 id VARCHAR PRIMARY KEY,
26 label VARCHAR,
27 metadata JSON,
28 created_at VARCHAR NOT NULL
29);
30";
31
32pub const CREATE_GRAPH_EDGES_TABLE: &str = "
33CREATE TABLE IF NOT EXISTS graph_edges (
34 id VARCHAR PRIMARY KEY,
35 src VARCHAR NOT NULL,
36 dst VARCHAR NOT NULL,
37 relation VARCHAR NOT NULL,
38 valid_from VARCHAR NOT NULL,
39 valid_to VARCHAR,
40 confidence FLOAT NOT NULL DEFAULT 1.0,
41 recorded_at VARCHAR NOT NULL
42);
43CREATE INDEX IF NOT EXISTS idx_graph_edges_src_validfrom
44 ON graph_edges(src, valid_from);
45CREATE INDEX IF NOT EXISTS idx_graph_edges_dst
46 ON graph_edges(dst);
47";
48
49pub struct DuckGraphStore {
50 conn: Arc<Mutex<Connection>>,
51}
52
53impl DuckGraphStore {
54 pub fn open(path: &Path) -> Result<Self> {
55 let conn = Connection::open(path)?;
56 run_migrations(&conn)?;
57 Ok(Self {
58 conn: Arc::new(Mutex::new(conn)),
59 })
60 }
61
62 pub fn open_in_memory() -> Result<Self> {
63 let conn = Connection::open_in_memory()?;
64 run_migrations(&conn)?;
65 Ok(Self {
66 conn: Arc::new(Mutex::new(conn)),
67 })
68 }
69}
70
71pub fn run_migrations(conn: &Connection) -> Result<()> {
72 conn.execute_batch(CREATE_GRAPH_NODES_TABLE)?;
73 conn.execute_batch(CREATE_GRAPH_EDGES_TABLE)?;
74 Ok(())
75}
76
77#[async_trait]
78impl GraphStore for DuckGraphStore {
79 async fn insert_edge(&self, edge: &TemporalEdge) -> Result<()> {
80 let conn = self.conn.lock().await;
81 let valid_to_s: Option<String> = edge.valid_to.map(|v| v.to_rfc3339());
82 conn.execute(
84 "INSERT OR REPLACE INTO graph_edges
85 (id, src, dst, relation, valid_from, valid_to, confidence, recorded_at)
86 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
87 duckdb::params![
88 edge.id.to_string(),
89 edge.src.to_string(),
90 edge.dst.to_string(),
91 edge.relation,
92 edge.valid_from.to_rfc3339(),
93 valid_to_s,
94 edge.confidence,
95 edge.recorded_at.to_rfc3339(),
96 ],
97 )?;
98 Ok(())
99 }
100
101 async fn close_edge(&self, edge_id: Uuid, closed_at: DateTime<Utc>) -> Result<()> {
102 let conn = self.conn.lock().await;
103 conn.execute(
106 "UPDATE graph_edges SET valid_to = ?
107 WHERE id = ? AND valid_to IS NULL",
108 duckdb::params![closed_at.to_rfc3339(), edge_id.to_string()],
109 )?;
110 Ok(())
111 }
112
113 async fn outgoing_at(&self, node: Uuid, as_of: DateTime<Utc>) -> Result<Vec<TemporalEdge>> {
114 let conn = self.conn.lock().await;
115 let as_of_s = as_of.to_rfc3339();
116 let mut stmt = conn.prepare(
117 "SELECT id, src, dst, relation, valid_from, valid_to, confidence, recorded_at
118 FROM graph_edges
119 WHERE src = ?
120 AND valid_from <= ?
121 AND (valid_to IS NULL OR valid_to > ?)
122 ORDER BY confidence DESC, recorded_at DESC",
123 )?;
124 let rows = stmt.query_map(
125 duckdb::params![node.to_string(), as_of_s.clone(), as_of_s],
126 row_to_edge,
127 )?;
128 let mut out = Vec::new();
129 for row in rows {
130 out.push(row?);
131 }
132 Ok(out)
133 }
134
135 async fn all_edges(&self) -> Result<Vec<TemporalEdge>> {
136 let conn = self.conn.lock().await;
137 let mut stmt = conn.prepare(
138 "SELECT id, src, dst, relation, valid_from, valid_to, confidence, recorded_at
139 FROM graph_edges
140 ORDER BY recorded_at ASC",
141 )?;
142 let rows = stmt.query_map([], row_to_edge)?;
143 let mut out = Vec::new();
144 for row in rows {
145 out.push(row?);
146 }
147 Ok(out)
148 }
149}
150
151fn row_to_edge(row: &duckdb::Row<'_>) -> std::result::Result<TemporalEdge, ::duckdb::Error> {
152 let id: String = row.get(0)?;
153 let src: String = row.get(1)?;
154 let dst: String = row.get(2)?;
155 let relation: String = row.get(3)?;
156 let valid_from: String = row.get(4)?;
157 let valid_to: Option<String> = row.get(5)?;
158 let confidence: f32 = row.get(6)?;
159 let recorded_at: String = row.get(7)?;
160 let parse = |s: &str| -> std::result::Result<DateTime<Utc>, ::duckdb::Error> {
161 DateTime::parse_from_rfc3339(s)
162 .map(|dt| dt.with_timezone(&Utc))
163 .map_err(|e| {
164 ::duckdb::Error::FromSqlConversionFailure(
165 0,
166 ::duckdb::types::Type::Text,
167 Box::new(e),
168 )
169 })
170 };
171 Ok(TemporalEdge {
172 id: parse_uuid(&id)?,
173 src: parse_uuid(&src)?,
174 dst: parse_uuid(&dst)?,
175 relation,
176 valid_from: parse(&valid_from)?,
177 valid_to: valid_to.as_deref().map(parse).transpose()?,
178 confidence,
179 recorded_at: parse(&recorded_at)?,
180 })
181}
182
183fn parse_uuid(s: &str) -> std::result::Result<Uuid, ::duckdb::Error> {
184 Uuid::parse_str(s).map_err(|e| {
185 ::duckdb::Error::FromSqlConversionFailure(0, ::duckdb::types::Type::Text, Box::new(e))
186 })
187}
188
189#[allow(dead_code)]
191fn _ensure_error_used(e: ::duckdb::Error) -> Error {
192 Error::from(e)
193}