1use std::{cell::RefCell, path::Path};
2
3use chrono::Utc;
4use rusqlite::{params, Connection};
5use ucm_core::{BlockId, PortableDocument};
6
7use super::{
8 GraphNodeRecord, GraphStore, GraphStoreError, GraphStoreObservability, GraphStoreStats,
9};
10use crate::types::GraphEdgeSummary;
11
12#[derive(Debug)]
13pub struct SqliteGraphStore {
14 graph_key: String,
15 connection: RefCell<Connection>,
16 stats: GraphStoreStats,
17}
18
19impl SqliteGraphStore {
20 pub fn import_document(
21 path: impl AsRef<Path>,
22 graph_key: impl Into<String>,
23 portable: &PortableDocument,
24 ) -> Result<Self, GraphStoreError> {
25 let graph_key = graph_key.into();
26 let connection = Connection::open(path)?;
27 init_schema(&connection)?;
28 let payload = serde_json::to_string(portable)?;
29 let document = portable.to_document()?;
30 let parent_by_child = document
31 .structure
32 .iter()
33 .flat_map(|(parent, children)| children.iter().map(move |child| (*child, *parent)))
34 .collect::<std::collections::HashMap<_, _>>();
35 let explicit_edge_count = document
36 .blocks
37 .values()
38 .map(|block| block.edges.len())
39 .sum::<usize>();
40 let structural_edge_count = parent_by_child.len();
41
42 connection.execute("BEGIN IMMEDIATE TRANSACTION", [])?;
43 connection.execute(
44 "DELETE FROM edges WHERE graph_key = ?1",
45 params![graph_key.as_str()],
46 )?;
47 connection.execute(
48 "DELETE FROM structure WHERE graph_key = ?1",
49 params![graph_key.as_str()],
50 )?;
51 connection.execute(
52 "DELETE FROM nodes WHERE graph_key = ?1",
53 params![graph_key.as_str()],
54 )?;
55 connection.execute(
56 "DELETE FROM graphs WHERE graph_key = ?1",
57 params![graph_key.as_str()],
58 )?;
59
60 connection.execute(
61 "INSERT INTO graphs (graph_key, document_json, document_id, root_block_id, node_count, explicit_edge_count, structural_edge_count, captured_at)
62 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
63 params![
64 graph_key.as_str(),
65 payload,
66 document.id.0.as_str(),
67 document.root.to_string(),
68 document.blocks.len() as i64,
69 explicit_edge_count as i64,
70 structural_edge_count as i64,
71 Utc::now().to_rfc3339(),
72 ],
73 )?;
74
75 for (parent, children) in &document.structure {
76 for (ordinal, child) in children.iter().enumerate() {
77 connection.execute(
78 "INSERT INTO structure (graph_key, parent_block_id, child_block_id, ordinal) VALUES (?1, ?2, ?3, ?4)",
79 params![graph_key.as_str(), parent.to_string(), child.to_string(), ordinal as i64],
80 )?;
81 }
82 }
83
84 for block in document.blocks.values() {
85 let parent = parent_by_child.get(&block.id).map(ToString::to_string);
86 connection.execute(
87 "INSERT INTO nodes (graph_key, block_id, label, content_type, semantic_role, tags_json, parent_block_id, child_count, outgoing_count, incoming_count)
88 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
89 params![
90 graph_key.as_str(),
91 block.id.to_string(),
92 block.metadata.label.clone(),
93 block.content_type(),
94 block.metadata.semantic_role.as_ref().map(ToString::to_string),
95 serde_json::to_string(&block.metadata.tags)?,
96 parent,
97 document.children(&block.id).len() as i64,
98 document.edge_index.outgoing_from(&block.id).len() as i64,
99 document.edge_index.incoming_to(&block.id).len() as i64,
100 ],
101 )?;
102
103 for edge in &block.edges {
104 connection.execute(
105 "INSERT INTO edges (graph_key, source_block_id, target_block_id, relation) VALUES (?1, ?2, ?3, ?4)",
106 params![
107 graph_key.as_str(),
108 block.id.to_string(),
109 edge.target.to_string(),
110 edge_relation(&edge.edge_type),
111 ],
112 )?;
113 }
114 }
115
116 connection.execute("COMMIT", [])?;
117 Self::open_with_connection(graph_key, connection)
118 }
119
120 pub fn open(
121 path: impl AsRef<Path>,
122 graph_key: impl Into<String>,
123 ) -> Result<Self, GraphStoreError> {
124 let connection = Connection::open(path)?;
125 init_schema(&connection)?;
126 Self::open_with_connection(graph_key.into(), connection)
127 }
128
129 fn open_with_connection(
130 graph_key: String,
131 connection: Connection,
132 ) -> Result<Self, GraphStoreError> {
133 let stats = load_stats(&connection, &graph_key)?;
134 Ok(Self {
135 graph_key,
136 connection: RefCell::new(connection),
137 stats,
138 })
139 }
140}
141
142impl GraphStore for SqliteGraphStore {
143 fn stats(&self) -> GraphStoreStats {
144 self.stats.clone()
145 }
146
147 fn observability(&self) -> GraphStoreObservability {
148 GraphStoreObservability {
149 stats: self.stats(),
150 indexed_fields: vec![
151 "block_id".to_string(),
152 "label".to_string(),
153 "content_type".to_string(),
154 "semantic_role".to_string(),
155 "parent_block_id".to_string(),
156 "source_block_id".to_string(),
157 "target_block_id".to_string(),
158 ],
159 }
160 }
161
162 fn root_id(&self) -> BlockId {
163 self.stats.root_block_id
164 }
165
166 fn node_ids(&self) -> Vec<BlockId> {
167 let conn = self.connection.borrow();
168 let mut stmt = conn
169 .prepare("SELECT block_id FROM nodes WHERE graph_key = ?1 ORDER BY block_id")
170 .expect("prepare node id query");
171 stmt.query_map(params![self.graph_key.as_str()], |row| {
172 row.get::<_, String>(0)
173 })
174 .expect("query node ids")
175 .filter_map(|value| value.ok())
176 .filter_map(|value| value.parse().ok())
177 .collect()
178 }
179
180 fn node(&self, block_id: BlockId) -> Option<GraphNodeRecord> {
181 let conn = self.connection.borrow();
182 let mut stmt = conn
183 .prepare(
184 "SELECT n.label,
185 n.content_type,
186 n.semantic_role,
187 n.tags_json,
188 n.parent_block_id,
189 (SELECT COUNT(*) FROM structure s WHERE s.graph_key = n.graph_key AND s.parent_block_id = n.block_id) AS child_count,
190 (SELECT COUNT(*) FROM edges e WHERE e.graph_key = n.graph_key AND e.source_block_id = n.block_id) AS outgoing_count,
191 (SELECT COUNT(*) FROM edges e WHERE e.graph_key = n.graph_key AND e.target_block_id = n.block_id) AS incoming_count
192 FROM nodes n WHERE n.graph_key = ?1 AND n.block_id = ?2",
193 )
194 .ok()?;
195 stmt.query_row(
196 params![self.graph_key.as_str(), block_id.to_string()],
197 |row| {
198 let tags_json: String = row.get(3)?;
199 Ok(GraphNodeRecord {
200 block_id,
201 label: row.get(0)?,
202 content_type: row.get(1)?,
203 semantic_role: row.get(2)?,
204 tags: serde_json::from_str(&tags_json).unwrap_or_default(),
205 parent: row
206 .get::<_, Option<String>>(4)?
207 .and_then(|value| value.parse().ok()),
208 children: row.get::<_, i64>(5)? as usize,
209 outgoing_edges: row.get::<_, i64>(6)? as usize,
210 incoming_edges: row.get::<_, i64>(7)? as usize,
211 })
212 },
213 )
214 .ok()
215 }
216
217 fn children(&self, block_id: BlockId) -> Vec<BlockId> {
218 let conn = self.connection.borrow();
219 let mut stmt = conn
220 .prepare(
221 "SELECT child_block_id FROM structure WHERE graph_key = ?1 AND parent_block_id = ?2 ORDER BY ordinal",
222 )
223 .expect("prepare child query");
224 stmt.query_map(
225 params![self.graph_key.as_str(), block_id.to_string()],
226 |row| row.get::<_, String>(0),
227 )
228 .expect("query children")
229 .filter_map(|value| value.ok())
230 .filter_map(|value| value.parse().ok())
231 .collect()
232 }
233
234 fn parent(&self, block_id: BlockId) -> Option<BlockId> {
235 let conn = self.connection.borrow();
236 let mut stmt = conn
237 .prepare("SELECT parent_block_id FROM nodes WHERE graph_key = ?1 AND block_id = ?2")
238 .ok()?;
239 stmt.query_row(
240 params![self.graph_key.as_str(), block_id.to_string()],
241 |row| row.get::<_, Option<String>>(0),
242 )
243 .ok()
244 .flatten()
245 .and_then(|value| value.parse().ok())
246 }
247
248 fn outgoing_edges(&self, block_id: BlockId) -> Vec<GraphEdgeSummary> {
249 let conn = self.connection.borrow();
250 let mut stmt = conn
251 .prepare(
252 "SELECT target_block_id, relation FROM edges WHERE graph_key = ?1 AND source_block_id = ?2 ORDER BY relation, target_block_id",
253 )
254 .expect("prepare outgoing edge query");
255 stmt.query_map(
256 params![self.graph_key.as_str(), block_id.to_string()],
257 |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
258 )
259 .expect("query outgoing edges")
260 .filter_map(|value| value.ok())
261 .filter_map(|(target, relation)| {
262 Some(GraphEdgeSummary {
263 source: block_id,
264 target: target.parse().ok()?,
265 relation,
266 direction: "outgoing".to_string(),
267 })
268 })
269 .collect()
270 }
271
272 fn incoming_edges(&self, block_id: BlockId) -> Vec<GraphEdgeSummary> {
273 let conn = self.connection.borrow();
274 let mut stmt = conn
275 .prepare(
276 "SELECT source_block_id, relation FROM edges WHERE graph_key = ?1 AND target_block_id = ?2 ORDER BY relation, source_block_id",
277 )
278 .expect("prepare incoming edge query");
279 stmt.query_map(
280 params![self.graph_key.as_str(), block_id.to_string()],
281 |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
282 )
283 .expect("query incoming edges")
284 .filter_map(|value| value.ok())
285 .filter_map(|(source, relation)| {
286 Some(GraphEdgeSummary {
287 source: source.parse().ok()?,
288 target: block_id,
289 relation,
290 direction: "incoming".to_string(),
291 })
292 })
293 .collect()
294 }
295
296 fn resolve_selector(&self, selector: &str) -> Option<BlockId> {
297 if selector == "root" {
298 return Some(self.root_id());
299 }
300 if let Ok(block_id) = selector.parse::<BlockId>() {
301 if self.node(block_id).is_some() {
302 return Some(block_id);
303 }
304 }
305
306 let conn = self.connection.borrow();
307 let mut stmt = conn
308 .prepare("SELECT block_id FROM nodes WHERE graph_key = ?1 AND label = ?2 LIMIT 1")
309 .ok()?;
310 stmt.query_row(params![self.graph_key.as_str(), selector], |row| {
311 row.get::<_, String>(0)
312 })
313 .ok()
314 .and_then(|value| value.parse().ok())
315 }
316
317 fn to_portable_document(&self) -> Result<PortableDocument, GraphStoreError> {
318 let conn = self.connection.borrow();
319 let payload = conn.query_row(
320 "SELECT document_json FROM graphs WHERE graph_key = ?1",
321 params![self.graph_key.as_str()],
322 |row| row.get::<_, String>(0),
323 )?;
324 Ok(serde_json::from_str(&payload)?)
325 }
326}
327
328fn init_schema(connection: &Connection) -> Result<(), GraphStoreError> {
329 connection.execute_batch(
330 "CREATE TABLE IF NOT EXISTS graphs (
331 graph_key TEXT PRIMARY KEY,
332 document_json TEXT NOT NULL,
333 document_id TEXT NOT NULL,
334 root_block_id TEXT NOT NULL,
335 node_count INTEGER NOT NULL,
336 explicit_edge_count INTEGER NOT NULL,
337 structural_edge_count INTEGER NOT NULL,
338 captured_at TEXT NOT NULL
339 );
340 CREATE TABLE IF NOT EXISTS nodes (
341 graph_key TEXT NOT NULL,
342 block_id TEXT NOT NULL,
343 label TEXT,
344 content_type TEXT NOT NULL,
345 semantic_role TEXT,
346 tags_json TEXT NOT NULL,
347 parent_block_id TEXT,
348 child_count INTEGER NOT NULL,
349 outgoing_count INTEGER NOT NULL,
350 incoming_count INTEGER NOT NULL,
351 PRIMARY KEY (graph_key, block_id)
352 );
353 CREATE TABLE IF NOT EXISTS structure (
354 graph_key TEXT NOT NULL,
355 parent_block_id TEXT NOT NULL,
356 child_block_id TEXT NOT NULL,
357 ordinal INTEGER NOT NULL,
358 PRIMARY KEY (graph_key, parent_block_id, child_block_id)
359 );
360 CREATE TABLE IF NOT EXISTS edges (
361 graph_key TEXT NOT NULL,
362 source_block_id TEXT NOT NULL,
363 target_block_id TEXT NOT NULL,
364 relation TEXT NOT NULL
365 );
366 CREATE INDEX IF NOT EXISTS idx_nodes_graph_label ON nodes(graph_key, label);
367 CREATE INDEX IF NOT EXISTS idx_nodes_graph_content_type ON nodes(graph_key, content_type);
368 CREATE INDEX IF NOT EXISTS idx_nodes_graph_parent ON nodes(graph_key, parent_block_id);
369 CREATE INDEX IF NOT EXISTS idx_structure_graph_parent ON structure(graph_key, parent_block_id, ordinal);
370 CREATE INDEX IF NOT EXISTS idx_edges_graph_source ON edges(graph_key, source_block_id, relation);
371 CREATE INDEX IF NOT EXISTS idx_edges_graph_target ON edges(graph_key, target_block_id, relation);",
372 )?;
373 Ok(())
374}
375
376fn load_stats(
377 connection: &Connection,
378 graph_key: &str,
379) -> Result<GraphStoreStats, GraphStoreError> {
380 connection
381 .query_row(
382 "SELECT document_id, root_block_id, node_count, explicit_edge_count, structural_edge_count, captured_at FROM graphs WHERE graph_key = ?1",
383 params![graph_key],
384 |row| {
385 Ok(GraphStoreStats {
386 backend: "sqlite".to_string(),
387 document_id: row.get(0)?,
388 root_block_id: row
389 .get::<_, String>(1)?
390 .parse()
391 .map_err(|_| rusqlite::Error::InvalidQuery)?,
392 node_count: row.get::<_, i64>(2)? as usize,
393 explicit_edge_count: row.get::<_, i64>(3)? as usize,
394 structural_edge_count: row.get::<_, i64>(4)? as usize,
395 captured_at: chrono::DateTime::parse_from_rfc3339(&row.get::<_, String>(5)?)
396 .map(|value| value.with_timezone(&Utc))
397 .unwrap_or_else(|_| Utc::now()),
398 graph_key: Some(graph_key.to_string()),
399 })
400 },
401 )
402 .map_err(|error| match error {
403 rusqlite::Error::QueryReturnedNoRows => GraphStoreError::GraphNotFound(graph_key.to_string()),
404 other => GraphStoreError::Sqlite(other),
405 })
406}
407
408fn edge_relation(edge_type: &ucm_core::EdgeType) -> String {
409 match edge_type {
410 ucm_core::EdgeType::Custom(value) => value.clone(),
411 _ => serde_json::to_value(edge_type)
412 .ok()
413 .and_then(|value| value.as_str().map(ToOwned::to_owned))
414 .unwrap_or_else(|| format!("{:?}", edge_type).to_lowercase()),
415 }
416}