Skip to main content

ucp_graph/store/
sqlite.rs

1use std::{cell::RefCell, path::Path};
2
3use chrono::Utc;
4use rusqlite::{params, Connection};
5use ucm_core::{BlockId, PortableDocument};
6
7use super::{
8    GraphNodeRecord, GraphStore, GraphStoreError, GraphStoreObservability, GraphStoreStats,
9};
10use crate::types::GraphEdgeSummary;
11
12#[derive(Debug)]
13pub struct SqliteGraphStore {
14    graph_key: String,
15    connection: RefCell<Connection>,
16    stats: GraphStoreStats,
17}
18
19impl SqliteGraphStore {
20    pub fn import_document(
21        path: impl AsRef<Path>,
22        graph_key: impl Into<String>,
23        portable: &PortableDocument,
24    ) -> Result<Self, GraphStoreError> {
25        let graph_key = graph_key.into();
26        let connection = Connection::open(path)?;
27        init_schema(&connection)?;
28        let payload = serde_json::to_string(portable)?;
29        let document = portable.to_document()?;
30        let parent_by_child = document
31            .structure
32            .iter()
33            .flat_map(|(parent, children)| children.iter().map(move |child| (*child, *parent)))
34            .collect::<std::collections::HashMap<_, _>>();
35        let explicit_edge_count = document
36            .blocks
37            .values()
38            .map(|block| block.edges.len())
39            .sum::<usize>();
40        let structural_edge_count = parent_by_child.len();
41
42        connection.execute("BEGIN IMMEDIATE TRANSACTION", [])?;
43        connection.execute(
44            "DELETE FROM edges WHERE graph_key = ?1",
45            params![graph_key.as_str()],
46        )?;
47        connection.execute(
48            "DELETE FROM structure WHERE graph_key = ?1",
49            params![graph_key.as_str()],
50        )?;
51        connection.execute(
52            "DELETE FROM nodes WHERE graph_key = ?1",
53            params![graph_key.as_str()],
54        )?;
55        connection.execute(
56            "DELETE FROM graphs WHERE graph_key = ?1",
57            params![graph_key.as_str()],
58        )?;
59
60        connection.execute(
61            "INSERT INTO graphs (graph_key, document_json, document_id, root_block_id, node_count, explicit_edge_count, structural_edge_count, captured_at)
62             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
63            params![
64                graph_key.as_str(),
65                payload,
66                document.id.0.as_str(),
67                document.root.to_string(),
68                document.blocks.len() as i64,
69                explicit_edge_count as i64,
70                structural_edge_count as i64,
71                Utc::now().to_rfc3339(),
72            ],
73        )?;
74
75        for (parent, children) in &document.structure {
76            for (ordinal, child) in children.iter().enumerate() {
77                connection.execute(
78                    "INSERT INTO structure (graph_key, parent_block_id, child_block_id, ordinal) VALUES (?1, ?2, ?3, ?4)",
79                    params![graph_key.as_str(), parent.to_string(), child.to_string(), ordinal as i64],
80                )?;
81            }
82        }
83
84        for block in document.blocks.values() {
85            let parent = parent_by_child.get(&block.id).map(ToString::to_string);
86            connection.execute(
87                "INSERT INTO nodes (graph_key, block_id, label, content_type, semantic_role, tags_json, parent_block_id, child_count, outgoing_count, incoming_count)
88                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
89                params![
90                    graph_key.as_str(),
91                    block.id.to_string(),
92                    block.metadata.label.clone(),
93                    block.content_type(),
94                    block.metadata.semantic_role.as_ref().map(ToString::to_string),
95                    serde_json::to_string(&block.metadata.tags)?,
96                    parent,
97                    document.children(&block.id).len() as i64,
98                    document.edge_index.outgoing_from(&block.id).len() as i64,
99                    document.edge_index.incoming_to(&block.id).len() as i64,
100                ],
101            )?;
102
103            for edge in &block.edges {
104                connection.execute(
105                    "INSERT INTO edges (graph_key, source_block_id, target_block_id, relation) VALUES (?1, ?2, ?3, ?4)",
106                    params![
107                        graph_key.as_str(),
108                        block.id.to_string(),
109                        edge.target.to_string(),
110                        edge_relation(&edge.edge_type),
111                    ],
112                )?;
113            }
114        }
115
116        connection.execute("COMMIT", [])?;
117        Self::open_with_connection(graph_key, connection)
118    }
119
120    pub fn open(
121        path: impl AsRef<Path>,
122        graph_key: impl Into<String>,
123    ) -> Result<Self, GraphStoreError> {
124        let connection = Connection::open(path)?;
125        init_schema(&connection)?;
126        Self::open_with_connection(graph_key.into(), connection)
127    }
128
129    fn open_with_connection(
130        graph_key: String,
131        connection: Connection,
132    ) -> Result<Self, GraphStoreError> {
133        let stats = load_stats(&connection, &graph_key)?;
134        Ok(Self {
135            graph_key,
136            connection: RefCell::new(connection),
137            stats,
138        })
139    }
140}
141
142impl GraphStore for SqliteGraphStore {
143    fn stats(&self) -> GraphStoreStats {
144        self.stats.clone()
145    }
146
147    fn observability(&self) -> GraphStoreObservability {
148        GraphStoreObservability {
149            stats: self.stats(),
150            indexed_fields: vec![
151                "block_id".to_string(),
152                "label".to_string(),
153                "content_type".to_string(),
154                "semantic_role".to_string(),
155                "parent_block_id".to_string(),
156                "source_block_id".to_string(),
157                "target_block_id".to_string(),
158            ],
159        }
160    }
161
162    fn root_id(&self) -> BlockId {
163        self.stats.root_block_id
164    }
165
166    fn node_ids(&self) -> Vec<BlockId> {
167        let conn = self.connection.borrow();
168        let mut stmt = conn
169            .prepare("SELECT block_id FROM nodes WHERE graph_key = ?1 ORDER BY block_id")
170            .expect("prepare node id query");
171        stmt.query_map(params![self.graph_key.as_str()], |row| {
172            row.get::<_, String>(0)
173        })
174        .expect("query node ids")
175        .filter_map(|value| value.ok())
176        .filter_map(|value| value.parse().ok())
177        .collect()
178    }
179
180    fn node(&self, block_id: BlockId) -> Option<GraphNodeRecord> {
181        let conn = self.connection.borrow();
182        let mut stmt = conn
183            .prepare(
184                "SELECT n.label,
185                        n.content_type,
186                        n.semantic_role,
187                        n.tags_json,
188                        n.parent_block_id,
189                        (SELECT COUNT(*) FROM structure s WHERE s.graph_key = n.graph_key AND s.parent_block_id = n.block_id) AS child_count,
190                        (SELECT COUNT(*) FROM edges e WHERE e.graph_key = n.graph_key AND e.source_block_id = n.block_id) AS outgoing_count,
191                        (SELECT COUNT(*) FROM edges e WHERE e.graph_key = n.graph_key AND e.target_block_id = n.block_id) AS incoming_count
192                 FROM nodes n WHERE n.graph_key = ?1 AND n.block_id = ?2",
193            )
194            .ok()?;
195        stmt.query_row(
196            params![self.graph_key.as_str(), block_id.to_string()],
197            |row| {
198                let tags_json: String = row.get(3)?;
199                Ok(GraphNodeRecord {
200                    block_id,
201                    label: row.get(0)?,
202                    content_type: row.get(1)?,
203                    semantic_role: row.get(2)?,
204                    tags: serde_json::from_str(&tags_json).unwrap_or_default(),
205                    parent: row
206                        .get::<_, Option<String>>(4)?
207                        .and_then(|value| value.parse().ok()),
208                    children: row.get::<_, i64>(5)? as usize,
209                    outgoing_edges: row.get::<_, i64>(6)? as usize,
210                    incoming_edges: row.get::<_, i64>(7)? as usize,
211                })
212            },
213        )
214        .ok()
215    }
216
217    fn children(&self, block_id: BlockId) -> Vec<BlockId> {
218        let conn = self.connection.borrow();
219        let mut stmt = conn
220            .prepare(
221                "SELECT child_block_id FROM structure WHERE graph_key = ?1 AND parent_block_id = ?2 ORDER BY ordinal",
222            )
223            .expect("prepare child query");
224        stmt.query_map(
225            params![self.graph_key.as_str(), block_id.to_string()],
226            |row| row.get::<_, String>(0),
227        )
228        .expect("query children")
229        .filter_map(|value| value.ok())
230        .filter_map(|value| value.parse().ok())
231        .collect()
232    }
233
234    fn parent(&self, block_id: BlockId) -> Option<BlockId> {
235        let conn = self.connection.borrow();
236        let mut stmt = conn
237            .prepare("SELECT parent_block_id FROM nodes WHERE graph_key = ?1 AND block_id = ?2")
238            .ok()?;
239        stmt.query_row(
240            params![self.graph_key.as_str(), block_id.to_string()],
241            |row| row.get::<_, Option<String>>(0),
242        )
243        .ok()
244        .flatten()
245        .and_then(|value| value.parse().ok())
246    }
247
248    fn outgoing_edges(&self, block_id: BlockId) -> Vec<GraphEdgeSummary> {
249        let conn = self.connection.borrow();
250        let mut stmt = conn
251            .prepare(
252                "SELECT target_block_id, relation FROM edges WHERE graph_key = ?1 AND source_block_id = ?2 ORDER BY relation, target_block_id",
253            )
254            .expect("prepare outgoing edge query");
255        stmt.query_map(
256            params![self.graph_key.as_str(), block_id.to_string()],
257            |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
258        )
259        .expect("query outgoing edges")
260        .filter_map(|value| value.ok())
261        .filter_map(|(target, relation)| {
262            Some(GraphEdgeSummary {
263                source: block_id,
264                target: target.parse().ok()?,
265                relation,
266                direction: "outgoing".to_string(),
267            })
268        })
269        .collect()
270    }
271
272    fn incoming_edges(&self, block_id: BlockId) -> Vec<GraphEdgeSummary> {
273        let conn = self.connection.borrow();
274        let mut stmt = conn
275            .prepare(
276                "SELECT source_block_id, relation FROM edges WHERE graph_key = ?1 AND target_block_id = ?2 ORDER BY relation, source_block_id",
277            )
278            .expect("prepare incoming edge query");
279        stmt.query_map(
280            params![self.graph_key.as_str(), block_id.to_string()],
281            |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
282        )
283        .expect("query incoming edges")
284        .filter_map(|value| value.ok())
285        .filter_map(|(source, relation)| {
286            Some(GraphEdgeSummary {
287                source: source.parse().ok()?,
288                target: block_id,
289                relation,
290                direction: "incoming".to_string(),
291            })
292        })
293        .collect()
294    }
295
296    fn resolve_selector(&self, selector: &str) -> Option<BlockId> {
297        if selector == "root" {
298            return Some(self.root_id());
299        }
300        if let Ok(block_id) = selector.parse::<BlockId>() {
301            if self.node(block_id).is_some() {
302                return Some(block_id);
303            }
304        }
305
306        let conn = self.connection.borrow();
307        let mut stmt = conn
308            .prepare("SELECT block_id FROM nodes WHERE graph_key = ?1 AND label = ?2 LIMIT 1")
309            .ok()?;
310        stmt.query_row(params![self.graph_key.as_str(), selector], |row| {
311            row.get::<_, String>(0)
312        })
313        .ok()
314        .and_then(|value| value.parse().ok())
315    }
316
317    fn to_portable_document(&self) -> Result<PortableDocument, GraphStoreError> {
318        let conn = self.connection.borrow();
319        let payload = conn.query_row(
320            "SELECT document_json FROM graphs WHERE graph_key = ?1",
321            params![self.graph_key.as_str()],
322            |row| row.get::<_, String>(0),
323        )?;
324        Ok(serde_json::from_str(&payload)?)
325    }
326}
327
328fn init_schema(connection: &Connection) -> Result<(), GraphStoreError> {
329    connection.execute_batch(
330        "CREATE TABLE IF NOT EXISTS graphs (
331            graph_key TEXT PRIMARY KEY,
332            document_json TEXT NOT NULL,
333            document_id TEXT NOT NULL,
334            root_block_id TEXT NOT NULL,
335            node_count INTEGER NOT NULL,
336            explicit_edge_count INTEGER NOT NULL,
337            structural_edge_count INTEGER NOT NULL,
338            captured_at TEXT NOT NULL
339        );
340        CREATE TABLE IF NOT EXISTS nodes (
341            graph_key TEXT NOT NULL,
342            block_id TEXT NOT NULL,
343            label TEXT,
344            content_type TEXT NOT NULL,
345            semantic_role TEXT,
346            tags_json TEXT NOT NULL,
347            parent_block_id TEXT,
348            child_count INTEGER NOT NULL,
349            outgoing_count INTEGER NOT NULL,
350            incoming_count INTEGER NOT NULL,
351            PRIMARY KEY (graph_key, block_id)
352        );
353        CREATE TABLE IF NOT EXISTS structure (
354            graph_key TEXT NOT NULL,
355            parent_block_id TEXT NOT NULL,
356            child_block_id TEXT NOT NULL,
357            ordinal INTEGER NOT NULL,
358            PRIMARY KEY (graph_key, parent_block_id, child_block_id)
359        );
360        CREATE TABLE IF NOT EXISTS edges (
361            graph_key TEXT NOT NULL,
362            source_block_id TEXT NOT NULL,
363            target_block_id TEXT NOT NULL,
364            relation TEXT NOT NULL
365        );
366        CREATE INDEX IF NOT EXISTS idx_nodes_graph_label ON nodes(graph_key, label);
367        CREATE INDEX IF NOT EXISTS idx_nodes_graph_content_type ON nodes(graph_key, content_type);
368        CREATE INDEX IF NOT EXISTS idx_nodes_graph_parent ON nodes(graph_key, parent_block_id);
369        CREATE INDEX IF NOT EXISTS idx_structure_graph_parent ON structure(graph_key, parent_block_id, ordinal);
370        CREATE INDEX IF NOT EXISTS idx_edges_graph_source ON edges(graph_key, source_block_id, relation);
371        CREATE INDEX IF NOT EXISTS idx_edges_graph_target ON edges(graph_key, target_block_id, relation);",
372    )?;
373    Ok(())
374}
375
376fn load_stats(
377    connection: &Connection,
378    graph_key: &str,
379) -> Result<GraphStoreStats, GraphStoreError> {
380    connection
381        .query_row(
382            "SELECT document_id, root_block_id, node_count, explicit_edge_count, structural_edge_count, captured_at FROM graphs WHERE graph_key = ?1",
383            params![graph_key],
384            |row| {
385                Ok(GraphStoreStats {
386                    backend: "sqlite".to_string(),
387                    document_id: row.get(0)?,
388                    root_block_id: row
389                        .get::<_, String>(1)?
390                        .parse()
391                        .map_err(|_| rusqlite::Error::InvalidQuery)?,
392                    node_count: row.get::<_, i64>(2)? as usize,
393                    explicit_edge_count: row.get::<_, i64>(3)? as usize,
394                    structural_edge_count: row.get::<_, i64>(4)? as usize,
395                    captured_at: chrono::DateTime::parse_from_rfc3339(&row.get::<_, String>(5)?)
396                        .map(|value| value.with_timezone(&Utc))
397                        .unwrap_or_else(|_| Utc::now()),
398                    graph_key: Some(graph_key.to_string()),
399                })
400            },
401        )
402        .map_err(|error| match error {
403            rusqlite::Error::QueryReturnedNoRows => GraphStoreError::GraphNotFound(graph_key.to_string()),
404            other => GraphStoreError::Sqlite(other),
405        })
406}
407
408fn edge_relation(edge_type: &ucm_core::EdgeType) -> String {
409    match edge_type {
410        ucm_core::EdgeType::Custom(value) => value.clone(),
411        _ => serde_json::to_value(edge_type)
412            .ok()
413            .and_then(|value| value.as_str().map(ToOwned::to_owned))
414            .unwrap_or_else(|| format!("{:?}", edge_type).to_lowercase()),
415    }
416}