Skip to main content

mnemo_graph/store/
duckdb.rs

1//! DuckDB-backed [`GraphStore`].
2//!
3//! Tables match the v0.4.0-rc1 migration shape — one row per edge,
4//! `valid_to` stored as nullable RFC3339 string. We inherit the
5//! `Arc<Mutex<duckdb::Connection>>` + `spawn_blocking` pattern from
6//! `mnemo-core::storage::duckdb` so the trait methods stay async-safe
7//! despite DuckDB's not-Send connection.
8
9use std::path::Path;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use chrono::{DateTime, Utc};
14use duckdb::Connection;
15use tokio::sync::Mutex;
16use uuid::Uuid;
17
18use crate::model::TemporalEdge;
19use crate::store::{Error, GraphStore, Result};
20
21/// SQL — kept as one string each so a future migrator can diff them
22/// against a stored schema version.
23pub const CREATE_GRAPH_NODES_TABLE: &str = "
24CREATE TABLE IF NOT EXISTS graph_nodes (
25    id VARCHAR PRIMARY KEY,
26    label VARCHAR,
27    metadata JSON,
28    created_at VARCHAR NOT NULL
29);
30";
31
32pub const CREATE_GRAPH_EDGES_TABLE: &str = "
33CREATE TABLE IF NOT EXISTS graph_edges (
34    id VARCHAR PRIMARY KEY,
35    src VARCHAR NOT NULL,
36    dst VARCHAR NOT NULL,
37    relation VARCHAR NOT NULL,
38    valid_from VARCHAR NOT NULL,
39    valid_to VARCHAR,
40    confidence FLOAT NOT NULL DEFAULT 1.0,
41    recorded_at VARCHAR NOT NULL
42);
43CREATE INDEX IF NOT EXISTS idx_graph_edges_src_validfrom
44    ON graph_edges(src, valid_from);
45CREATE INDEX IF NOT EXISTS idx_graph_edges_dst
46    ON graph_edges(dst);
47";
48
49pub struct DuckGraphStore {
50    conn: Arc<Mutex<Connection>>,
51}
52
53impl DuckGraphStore {
54    pub fn open(path: &Path) -> Result<Self> {
55        let conn = Connection::open(path)?;
56        run_migrations(&conn)?;
57        Ok(Self {
58            conn: Arc::new(Mutex::new(conn)),
59        })
60    }
61
62    pub fn open_in_memory() -> Result<Self> {
63        let conn = Connection::open_in_memory()?;
64        run_migrations(&conn)?;
65        Ok(Self {
66            conn: Arc::new(Mutex::new(conn)),
67        })
68    }
69}
70
71pub fn run_migrations(conn: &Connection) -> Result<()> {
72    conn.execute_batch(CREATE_GRAPH_NODES_TABLE)?;
73    conn.execute_batch(CREATE_GRAPH_EDGES_TABLE)?;
74    Ok(())
75}
76
77#[async_trait]
78impl GraphStore for DuckGraphStore {
79    async fn insert_edge(&self, edge: &TemporalEdge) -> Result<()> {
80        let conn = self.conn.lock().await;
81        let valid_to_s: Option<String> = edge.valid_to.map(|v| v.to_rfc3339());
82        // UPSERT — DuckDB supports ON CONFLICT for primary keys.
83        conn.execute(
84            "INSERT OR REPLACE INTO graph_edges
85             (id, src, dst, relation, valid_from, valid_to, confidence, recorded_at)
86             VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
87            duckdb::params![
88                edge.id.to_string(),
89                edge.src.to_string(),
90                edge.dst.to_string(),
91                edge.relation,
92                edge.valid_from.to_rfc3339(),
93                valid_to_s,
94                edge.confidence,
95                edge.recorded_at.to_rfc3339(),
96            ],
97        )?;
98        Ok(())
99    }
100
101    async fn close_edge(&self, edge_id: Uuid, closed_at: DateTime<Utc>) -> Result<()> {
102        let conn = self.conn.lock().await;
103        // Only update rows whose valid_to is currently NULL — closing
104        // an already-closed edge is a no-op.
105        conn.execute(
106            "UPDATE graph_edges SET valid_to = ?
107             WHERE id = ? AND valid_to IS NULL",
108            duckdb::params![closed_at.to_rfc3339(), edge_id.to_string()],
109        )?;
110        Ok(())
111    }
112
113    async fn outgoing_at(&self, node: Uuid, as_of: DateTime<Utc>) -> Result<Vec<TemporalEdge>> {
114        let conn = self.conn.lock().await;
115        let as_of_s = as_of.to_rfc3339();
116        let mut stmt = conn.prepare(
117            "SELECT id, src, dst, relation, valid_from, valid_to, confidence, recorded_at
118             FROM graph_edges
119             WHERE src = ?
120               AND valid_from <= ?
121               AND (valid_to IS NULL OR valid_to > ?)
122             ORDER BY confidence DESC, recorded_at DESC",
123        )?;
124        let rows = stmt.query_map(
125            duckdb::params![node.to_string(), as_of_s.clone(), as_of_s],
126            row_to_edge,
127        )?;
128        let mut out = Vec::new();
129        for row in rows {
130            out.push(row?);
131        }
132        Ok(out)
133    }
134
135    async fn all_edges(&self) -> Result<Vec<TemporalEdge>> {
136        let conn = self.conn.lock().await;
137        let mut stmt = conn.prepare(
138            "SELECT id, src, dst, relation, valid_from, valid_to, confidence, recorded_at
139             FROM graph_edges
140             ORDER BY recorded_at ASC",
141        )?;
142        let rows = stmt.query_map([], row_to_edge)?;
143        let mut out = Vec::new();
144        for row in rows {
145            out.push(row?);
146        }
147        Ok(out)
148    }
149}
150
151fn row_to_edge(row: &duckdb::Row<'_>) -> std::result::Result<TemporalEdge, ::duckdb::Error> {
152    let id: String = row.get(0)?;
153    let src: String = row.get(1)?;
154    let dst: String = row.get(2)?;
155    let relation: String = row.get(3)?;
156    let valid_from: String = row.get(4)?;
157    let valid_to: Option<String> = row.get(5)?;
158    let confidence: f32 = row.get(6)?;
159    let recorded_at: String = row.get(7)?;
160    let parse = |s: &str| -> std::result::Result<DateTime<Utc>, ::duckdb::Error> {
161        DateTime::parse_from_rfc3339(s)
162            .map(|dt| dt.with_timezone(&Utc))
163            .map_err(|e| {
164                ::duckdb::Error::FromSqlConversionFailure(
165                    0,
166                    ::duckdb::types::Type::Text,
167                    Box::new(e),
168                )
169            })
170    };
171    Ok(TemporalEdge {
172        id: parse_uuid(&id)?,
173        src: parse_uuid(&src)?,
174        dst: parse_uuid(&dst)?,
175        relation,
176        valid_from: parse(&valid_from)?,
177        valid_to: valid_to.as_deref().map(parse).transpose()?,
178        confidence,
179        recorded_at: parse(&recorded_at)?,
180    })
181}
182
183fn parse_uuid(s: &str) -> std::result::Result<Uuid, ::duckdb::Error> {
184    Uuid::parse_str(s).map_err(|e| {
185        ::duckdb::Error::FromSqlConversionFailure(0, ::duckdb::types::Type::Text, Box::new(e))
186    })
187}
188
189// Drop hint: keep `Error` import warm so `cargo check` doesn't elide it.
190#[allow(dead_code)]
191fn _ensure_error_used(e: ::duckdb::Error) -> Error {
192    Error::from(e)
193}