1use anyhow::{Context, Result, bail};
2use rusqlite::{
3 Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4 types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::collections::{BTreeMap, BTreeSet};
8use std::ffi::OsString;
9use std::path::{Path, PathBuf};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12pub use tsift_core::{
13 ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
14 ConvexRowsGraphClient, GraphEdge, GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath,
15 GraphProjection, GraphPropertyFilter, GraphProvenance, GraphQueryOptions, GraphQueryPage,
16 GraphStore, GraphSubgraph, SQLITE_GRAPH_SCHEMA_VERSION, apply_graph_edge_query_page,
17 apply_graph_query_page, graph_edge_id, shortest_path_using_outgoing, stable_graph_edge_id,
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum ReadOnlyRecovery {
23 SnapshotFallback,
24 SnapshotFallbackWal,
25}
26
27pub fn read_only_snapshot_recovery(
28 db_path: &Path,
29 err: &anyhow::Error,
30) -> Option<ReadOnlyRecovery> {
31 if !error_mentions_locked_db(err) {
32 return None;
33 }
34 if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
35 Some(ReadOnlyRecovery::SnapshotFallbackWal)
36 } else if rollback_journal_path(db_path).exists() {
37 Some(ReadOnlyRecovery::SnapshotFallback)
38 } else {
39 None
40 }
41}
42
43pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
44 let mut journal = db_path.as_os_str().to_os_string();
45 journal.push("-journal");
46 PathBuf::from(journal)
47}
48
49pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
50 let mut wal = db_path.as_os_str().to_os_string();
51 wal.push("-wal");
52 PathBuf::from(wal)
53}
54
55pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
56 let mut shm = db_path.as_os_str().to_os_string();
57 shm.push("-shm");
58 PathBuf::from(shm)
59}
60
61pub fn copy_read_only_snapshot(
62 db_path: &Path,
63 default_stem: &str,
64) -> Result<(PathBuf, Vec<PathBuf>)> {
65 let snapshot_path = snapshot_copy_path(db_path, default_stem);
66 std::fs::copy(db_path, &snapshot_path).with_context(|| {
67 format!(
68 "copying locked db {} to snapshot {}",
69 db_path.display(),
70 snapshot_path.display()
71 )
72 })?;
73 let mut cleanup_paths = vec![snapshot_path.clone()];
74 copy_optional_snapshot_sidecar(
75 &wal_sidecar_path(db_path),
76 &wal_sidecar_path(&snapshot_path),
77 &mut cleanup_paths,
78 )?;
79 copy_optional_snapshot_sidecar(
80 &shared_memory_sidecar_path(db_path),
81 &shared_memory_sidecar_path(&snapshot_path),
82 &mut cleanup_paths,
83 )?;
84 Ok((snapshot_path, cleanup_paths))
85}
86
87pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
88 err.chain()
89 .any(|cause| cause.to_string().contains("database is locked"))
90}
91
92fn copy_optional_snapshot_sidecar(
93 source_path: &Path,
94 snapshot_path: &Path,
95 cleanup_paths: &mut Vec<PathBuf>,
96) -> Result<()> {
97 match std::fs::copy(source_path, snapshot_path) {
98 Ok(_) => {
99 cleanup_paths.push(snapshot_path.to_path_buf());
100 Ok(())
101 }
102 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
103 Err(err) => Err(err).with_context(|| {
104 format!(
105 "copying SQLite sidecar {} to snapshot {}",
106 source_path.display(),
107 snapshot_path.display()
108 )
109 }),
110 }
111}
112
113fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
114 let nanos = SystemTime::now()
115 .duration_since(UNIX_EPOCH)
116 .unwrap_or(Duration::ZERO)
117 .as_nanos();
118 let stem = db_path
119 .file_stem()
120 .and_then(|stem| stem.to_str())
121 .unwrap_or(default_stem);
122 let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
123 file_name.push(".db");
124 std::env::temp_dir().join(file_name)
125}
126const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 256;
127const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 50;
128
129pub struct SqliteGraphStore {
130 conn: Connection,
131 _snapshot_copy: Option<SnapshotCopyGuard>,
132 read_only_recovery: Option<ReadOnlyRecovery>,
133}
134
135pub struct SqliteReadOnlyConnection {
136 conn: Connection,
137 _snapshot_copy: Option<SnapshotCopyGuard>,
138 recovery: Option<ReadOnlyRecovery>,
139}
140
141impl SqliteReadOnlyConnection {
142 pub fn conn(&self) -> &Connection {
143 &self.conn
144 }
145
146 pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
147 self.recovery
148 }
149}
150
151struct SnapshotCopyGuard {
152 paths: Vec<PathBuf>,
153}
154
155impl Drop for SnapshotCopyGuard {
156 fn drop(&mut self) {
157 for path in &self.paths {
158 let _ = std::fs::remove_file(path);
159 }
160 }
161}
162
163fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
164 conn.busy_timeout(Duration::from_secs(5))?;
165 conn.pragma_update(None, "journal_mode", "WAL")?;
166 let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
167 if mode.to_lowercase() != "wal" {
168 bail!(
169 "graph substrate db {} requires WAL mode for concurrent reads, got {}",
170 db_path.display(),
171 mode
172 );
173 }
174 conn.pragma_update(
175 None,
176 "wal_autocheckpoint",
177 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
178 )?;
179 let checkpoint_pages: i64 =
180 conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
181 if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
182 bail!(
183 "graph substrate db {} requires wal_autocheckpoint={}, got {}",
184 db_path.display(),
185 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
186 checkpoint_pages
187 );
188 }
189 Ok(())
190}
191
192fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
193 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
194 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
195 for row in rows {
196 if row? == column {
197 return Ok(true);
198 }
199 }
200 Ok(false)
201}
202
203fn add_column_if_missing(
204 conn: &Connection,
205 table: &str,
206 column: &str,
207 definition: &str,
208) -> Result<()> {
209 if !sqlite_column_exists(conn, table, column)? {
210 conn.execute(
211 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
212 [],
213 )?;
214 }
215 Ok(())
216}
217
218fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
219 if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
220 return Ok(());
221 }
222 let rows = {
223 let mut stmt = conn.prepare(
224 r#"
225 SELECT from_id, to_id, kind
226 FROM graph_edges
227 WHERE edge_key IS NULL OR edge_key = ''
228 ORDER BY from_id, kind, to_id
229 "#,
230 )?;
231 collect_rows(stmt.query_map([], |row| {
232 Ok((
233 row.get::<_, String>(0)?,
234 row.get::<_, String>(1)?,
235 row.get::<_, String>(2)?,
236 ))
237 })?)?
238 };
239 let mut update = conn.prepare(
240 r#"
241 UPDATE graph_edges
242 SET edge_key = ?4
243 WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
244 "#,
245 )?;
246 for (from_id, to_id, kind) in rows {
247 update.execute((
248 &from_id,
249 &to_id,
250 &kind,
251 stable_graph_edge_id(&from_id, &to_id, &kind),
252 ))?;
253 }
254 Ok(())
255}
256
257fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
258 if old_version < 2 {
259 add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
260 add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
261 add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
262 add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
263 }
264 if old_version < 3 {
265 rebuild_graph_node_properties(conn)?;
266 }
267 if old_version < 4 {
268 ensure_sqlite_graph_operator_stats_schema(conn)?;
269 }
270 if old_version < 5 {
271 add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
272 backfill_graph_edge_keys(conn)?;
273 ensure_sqlite_graph_edge_properties_schema(conn)?;
274 rebuild_graph_edge_properties(conn)?;
275 }
276 Ok(())
277}
278
279fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
280 conn.execute_batch(
281 r#"
282 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
283 ON graph_nodes(kind, label, id);
284 CREATE TABLE IF NOT EXISTS graph_operator_stats (
285 scope TEXT PRIMARY KEY,
286 nodes INTEGER NOT NULL,
287 edges INTEGER NOT NULL,
288 tombstone_nodes INTEGER NOT NULL,
289 tombstone_edges INTEGER NOT NULL,
290 file_size_bytes INTEGER,
291 freelist_bytes INTEGER,
292 observed_at_unix INTEGER NOT NULL
293 );
294 "#,
295 )?;
296 Ok(())
297}
298
299fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
300 conn.execute_batch(
301 r#"
302 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
303 ON graph_edges(edge_key);
304 CREATE TABLE IF NOT EXISTS graph_edge_properties (
305 edge_key TEXT NOT NULL,
306 key TEXT NOT NULL,
307 value TEXT NOT NULL,
308 PRIMARY KEY (edge_key, key),
309 FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
310 );
311 CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
312 ON graph_edge_properties(key, value, edge_key);
313 "#,
314 )?;
315 Ok(())
316}
317
318fn replace_node_properties(
319 conn: &Connection,
320 node_id: &str,
321 properties: &BTreeMap<String, String>,
322) -> Result<()> {
323 conn.execute(
324 "DELETE FROM graph_node_properties WHERE node_id = ?1",
325 [node_id],
326 )?;
327 let mut insert = conn.prepare(
328 r#"
329 INSERT INTO graph_node_properties (node_id, key, value)
330 VALUES (?1, ?2, ?3)
331 ON CONFLICT(node_id, key) DO UPDATE SET
332 value = excluded.value
333 "#,
334 )?;
335 for (key, value) in properties {
336 insert.execute((node_id, key, value))?;
337 }
338 Ok(())
339}
340
341fn replace_edge_properties(
342 conn: &Connection,
343 edge_key: &str,
344 properties: &BTreeMap<String, String>,
345) -> Result<()> {
346 conn.execute(
347 "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
348 [edge_key],
349 )?;
350 let mut insert = conn.prepare(
351 r#"
352 INSERT INTO graph_edge_properties (edge_key, key, value)
353 VALUES (?1, ?2, ?3)
354 ON CONFLICT(edge_key, key) DO UPDATE SET
355 value = excluded.value
356 "#,
357 )?;
358 for (key, value) in properties {
359 insert.execute((edge_key, key, value))?;
360 }
361 Ok(())
362}
363
364fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
365 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
366 return Ok(());
367 }
368 conn.execute_batch(
369 r#"
370 DELETE FROM graph_node_properties;
371 INSERT INTO graph_node_properties (node_id, key, value)
372 SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
373 FROM graph_nodes, json_each(graph_nodes.properties_json)
374 WHERE json_each.key IS NOT NULL
375 AND json_each.value IS NOT NULL
376 "#,
377 )?;
378 Ok(())
379}
380
381fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
382 if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
383 || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
384 {
385 return Ok(());
386 }
387 conn.execute_batch(
388 r#"
389 DELETE FROM graph_edge_properties;
390 INSERT INTO graph_edge_properties (edge_key, key, value)
391 SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
392 FROM graph_edges, json_each(graph_edges.properties_json)
393 WHERE graph_edges.edge_key IS NOT NULL
394 AND graph_edges.edge_key <> ''
395 AND json_each.key IS NOT NULL
396 AND json_each.value IS NOT NULL
397 "#,
398 )?;
399 Ok(())
400}
401
402pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
403 let conn = Connection::open_with_flags(
404 db_path,
405 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
406 )
407 .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
408 conn.busy_timeout(Duration::from_secs(5))?;
409 Ok(SqliteReadOnlyConnection {
410 conn,
411 _snapshot_copy: None,
412 recovery: None,
413 })
414}
415
416pub fn open_graph_read_only_connection_resilient(
417 db_path: &Path,
418) -> Result<SqliteReadOnlyConnection> {
419 match open_graph_read_only_connection(db_path).and_then(|connection| {
420 connection
421 .conn
422 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
423 Ok(connection)
424 }) {
425 Ok(connection) => Ok(connection),
426 Err(err) => match read_only_snapshot_recovery(db_path, &err) {
427 Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
428 None => Err(err),
429 },
430 }
431}
432
433fn open_graph_read_only_snapshot(
434 db_path: &Path,
435 recovery: ReadOnlyRecovery,
436) -> Result<SqliteReadOnlyConnection> {
437 let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
438 let conn = Connection::open_with_flags(
439 &snapshot_path,
440 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
441 )
442 .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
443 conn.busy_timeout(Duration::from_secs(5))?;
444 Ok(SqliteReadOnlyConnection {
445 conn,
446 _snapshot_copy: Some(SnapshotCopyGuard {
447 paths: cleanup_paths,
448 }),
449 recovery: Some(recovery),
450 })
451}
452
453#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
454pub struct SqliteProjectionRefreshPhase {
455 pub name: String,
456 pub duration_micros: u128,
457 pub detail: String,
458}
459
460#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
461pub struct SqliteProjectionRefresh {
462 pub scope: String,
463 pub projection_version: String,
464 pub source_watermark: Option<String>,
465 pub tombstoned_nodes: Vec<String>,
466 pub tombstoned_edges: Vec<String>,
467 pub upserted_nodes: usize,
468 pub upserted_edges: usize,
469 pub unchanged_nodes: usize,
470 pub unchanged_edges: usize,
471 pub upserted_properties: usize,
472 pub unchanged_properties: usize,
473 pub deleted_properties: usize,
474 pub deleted_nodes: usize,
475 pub deleted_edges: usize,
476 pub pruned_tombstones: usize,
477 pub file_size_bytes_before: Option<u64>,
478 pub file_size_bytes_after: Option<u64>,
479 pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
483pub struct SqliteProjectionVersion {
484 pub projection_version: String,
485 pub content_hash: Option<String>,
486 pub source_watermark: Option<String>,
487}
488
489fn sqlite_refresh_phase_timing(
490 name: &str,
491 started: Instant,
492 detail: &str,
493) -> SqliteProjectionRefreshPhase {
494 SqliteProjectionRefreshPhase {
495 name: name.to_string(),
496 duration_micros: started.elapsed().as_micros(),
497 detail: detail.to_string(),
498 }
499}
500
501fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
502 let row = format!(
503 "({})",
504 (0..column_count)
505 .map(|_| "?")
506 .collect::<Vec<_>>()
507 .join(", ")
508 );
509 (0..row_count)
510 .map(|_| row.as_str())
511 .collect::<Vec<_>>()
512 .join(", ")
513}
514
515fn sqlite_stage_projection_nodes(
516 tx: &rusqlite::Transaction<'_>,
517 nodes: &[GraphNode],
518 source_watermark: Option<&str>,
519) -> Result<()> {
520 for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
521 let sql = format!(
522 r#"
523 INSERT INTO next_graph_nodes
524 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
525 VALUES {}
526 "#,
527 sqlite_graph_staging_placeholders(8, chunk.len())
528 );
529 let mut values = Vec::with_capacity(chunk.len() * 8);
530 for node in chunk {
531 values.push(Value::Text(node.id.clone()));
532 values.push(Value::Text(node.kind.clone()));
533 values.push(Value::Text(node.label.clone()));
534 values.push(Value::Text(to_json(&node.properties)?));
535 values.push(Value::Text(to_json(&node.provenance)?));
536 values.push(
537 optional_to_json(&node.freshness)?
538 .map(Value::Text)
539 .unwrap_or(Value::Null),
540 );
541 values.push(Value::Text(row_hash(node)?));
542 values.push(
543 source_watermark
544 .map(|watermark| Value::Text(watermark.to_string()))
545 .unwrap_or(Value::Null),
546 );
547 }
548 tx.execute(&sql, params_from_iter(values))?;
549 }
550 Ok(())
551}
552
553fn sqlite_stage_projection_edges(
554 tx: &rusqlite::Transaction<'_>,
555 edges: &[GraphEdge],
556 source_watermark: Option<&str>,
557) -> Result<()> {
558 for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
559 let sql = format!(
560 r#"
561 INSERT INTO next_graph_edges
562 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
563 VALUES {}
564 "#,
565 sqlite_graph_staging_placeholders(9, chunk.len())
566 );
567 let mut values = Vec::with_capacity(chunk.len() * 9);
568 for edge in chunk {
569 values.push(Value::Text(graph_edge_id(edge)));
570 values.push(Value::Text(edge.from_id.clone()));
571 values.push(Value::Text(edge.to_id.clone()));
572 values.push(Value::Text(edge.kind.clone()));
573 values.push(Value::Text(to_json(&edge.properties)?));
574 values.push(Value::Text(to_json(&edge.provenance)?));
575 values.push(
576 optional_to_json(&edge.freshness)?
577 .map(Value::Text)
578 .unwrap_or(Value::Null),
579 );
580 values.push(Value::Text(row_hash(edge)?));
581 values.push(
582 source_watermark
583 .map(|watermark| Value::Text(watermark.to_string()))
584 .unwrap_or(Value::Null),
585 );
586 }
587 tx.execute(&sql, params_from_iter(values))?;
588 }
589 Ok(())
590}
591
592impl SqliteGraphStore {
593 pub fn open(db_path: &Path) -> Result<Self> {
594 if let Some(parent) = db_path.parent() {
595 std::fs::create_dir_all(parent)
596 .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
597 }
598 let conn = Connection::open(db_path)
599 .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
600 configure_writable_graph_connection(&conn, db_path)?;
601 Self::from_connection(conn)
602 }
603
604 pub fn in_memory() -> Result<Self> {
605 let conn = Connection::open_in_memory()?;
606 conn.busy_timeout(Duration::from_secs(5))?;
607 Self::from_connection(conn)
608 }
609
610 pub fn open_read_only(db_path: &Path) -> Result<Self> {
611 let connection = open_graph_read_only_connection(db_path)?;
612 Self::from_read_only_connection(connection)
613 }
614
615 pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
616 let connection = open_graph_read_only_connection_resilient(db_path)?;
617 Self::from_read_only_connection(connection)
618 }
619
620 pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
621 self.read_only_recovery
622 }
623
624 pub fn has_user_triggers(&self) -> Result<bool> {
625 self.conn
626 .query_row(
627 r#"
628 SELECT EXISTS(
629 SELECT 1
630 FROM sqlite_master
631 WHERE type = 'trigger'
632 AND name NOT LIKE 'sqlite_%'
633 )
634 "#,
635 [],
636 |row| row.get::<_, bool>(0),
637 )
638 .map_err(Into::into)
639 }
640
641 fn from_connection(conn: Connection) -> Result<Self> {
642 conn.pragma_update(None, "foreign_keys", "ON")?;
643 let user_version: i64 =
644 conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
645 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
646 bail!(
647 "graph.db schema version {} is newer than supported version {}",
648 user_version,
649 SQLITE_GRAPH_SCHEMA_VERSION
650 );
651 }
652 conn.execute_batch(
653 r#"
654 CREATE TABLE IF NOT EXISTS graph_nodes (
655 id TEXT PRIMARY KEY,
656 kind TEXT NOT NULL,
657 label TEXT NOT NULL,
658 properties_json TEXT NOT NULL DEFAULT '{}',
659 provenance_json TEXT NOT NULL DEFAULT '[]',
660 freshness_json TEXT,
661 row_hash TEXT,
662 source_watermark TEXT
663 );
664 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
665 ON graph_nodes(kind);
666 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
667 ON graph_nodes(kind, label, id);
668
669 CREATE TABLE IF NOT EXISTS graph_edges (
670 edge_key TEXT NOT NULL UNIQUE,
671 from_id TEXT NOT NULL,
672 to_id TEXT NOT NULL,
673 kind TEXT NOT NULL,
674 properties_json TEXT NOT NULL DEFAULT '{}',
675 provenance_json TEXT NOT NULL DEFAULT '[]',
676 freshness_json TEXT,
677 row_hash TEXT,
678 source_watermark TEXT,
679 PRIMARY KEY (from_id, to_id, kind),
680 FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
681 FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
682 );
683 CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
684 ON graph_edges(from_id, kind);
685 CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
686 ON graph_edges(to_id, kind);
687
688 CREATE TABLE IF NOT EXISTS graph_projection_versions (
689 scope TEXT PRIMARY KEY,
690 projection_version TEXT NOT NULL,
691 content_hash TEXT,
692 source_watermark TEXT,
693 observed_at_unix INTEGER NOT NULL
694 );
695
696 CREATE TABLE IF NOT EXISTS graph_tombstones (
697 row_key TEXT PRIMARY KEY,
698 row_kind TEXT NOT NULL,
699 deleted_at_unix INTEGER NOT NULL
700 );
701
702 CREATE TABLE IF NOT EXISTS graph_node_properties (
703 node_id TEXT NOT NULL,
704 key TEXT NOT NULL,
705 value TEXT NOT NULL,
706 PRIMARY KEY (node_id, key),
707 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
708 );
709 CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
710 ON graph_node_properties(key, value, node_id);
711
712 CREATE TABLE IF NOT EXISTS graph_operator_stats (
713 scope TEXT PRIMARY KEY,
714 nodes INTEGER NOT NULL,
715 edges INTEGER NOT NULL,
716 tombstone_nodes INTEGER NOT NULL,
717 tombstone_edges INTEGER NOT NULL,
718 file_size_bytes INTEGER,
719 freelist_bytes INTEGER,
720 observed_at_unix INTEGER NOT NULL
721 );
722 "#,
723 )?;
724 if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
725 migrate_sqlite_graph_schema(&conn, user_version)?;
726 conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
727 }
728 Ok(Self {
729 conn,
730 _snapshot_copy: None,
731 read_only_recovery: None,
732 })
733 }
734
735 fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
736 connection.conn.pragma_update(None, "foreign_keys", "ON")?;
737 let user_version: i64 =
738 connection
739 .conn
740 .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
741 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
742 bail!(
743 "graph.db schema version {} is newer than supported version {}",
744 user_version,
745 SQLITE_GRAPH_SCHEMA_VERSION
746 );
747 }
748 connection
749 .conn
750 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
751 Ok(Self {
752 conn: connection.conn,
753 _snapshot_copy: connection._snapshot_copy,
754 read_only_recovery: connection.recovery,
755 })
756 }
757
758 pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
759 self.replace_projection_with_version("root", projection, None, None)
760 .map(|_| ())
761 }
762
763 pub fn replace_projection_with_version(
764 &mut self,
765 scope: impl Into<String>,
766 projection: &GraphProjection,
767 projection_version: Option<&str>,
768 source_watermark: Option<String>,
769 ) -> Result<SqliteProjectionRefresh> {
770 let scope = scope.into();
771 let projection_version = projection_version
772 .map(str::to_string)
773 .or_else(|| projection_version_from_nodes(&projection.nodes))
774 .unwrap_or_else(|| "unversioned".to_string());
775 let projection_hash = projection_hash_from_nodes(&projection.nodes);
776 let observed_at_unix = unix_now();
777 let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
778 let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
779 let mut phase_timings = Vec::new();
780
781 let tx = self.conn.transaction()?;
782 let started = Instant::now();
783 tx.execute_batch(
784 r#"
785 CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
786 id TEXT PRIMARY KEY,
787 kind TEXT NOT NULL,
788 label TEXT NOT NULL,
789 properties_json TEXT NOT NULL,
790 provenance_json TEXT NOT NULL,
791 freshness_json TEXT,
792 row_hash TEXT NOT NULL,
793 source_watermark TEXT
794 );
795 CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
796 edge_key TEXT PRIMARY KEY,
797 from_id TEXT NOT NULL,
798 to_id TEXT NOT NULL,
799 kind TEXT NOT NULL,
800 properties_json TEXT NOT NULL,
801 provenance_json TEXT NOT NULL,
802 freshness_json TEXT,
803 row_hash TEXT NOT NULL,
804 source_watermark TEXT
805 );
806 CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
807 ON next_graph_edges(from_id, to_id, kind);
808 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
809 node_id TEXT NOT NULL,
810 key TEXT NOT NULL,
811 value TEXT NOT NULL,
812 PRIMARY KEY (node_id, key)
813 );
814 CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
815 edge_key TEXT NOT NULL,
816 key TEXT NOT NULL,
817 value TEXT NOT NULL,
818 PRIMARY KEY (edge_key, key)
819 );
820 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
821 id TEXT PRIMARY KEY
822 );
823 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
824 edge_key TEXT PRIMARY KEY
825 );
826 DELETE FROM next_graph_nodes;
827 DELETE FROM next_graph_edges;
828 DELETE FROM next_graph_node_properties;
829 DELETE FROM next_graph_edge_properties;
830 DELETE FROM next_graph_changed_nodes;
831 DELETE FROM next_graph_changed_edges;
832 "#,
833 )?;
834 phase_timings.push(sqlite_refresh_phase_timing(
835 "sqlite_temp_table_prepare",
836 started,
837 "create and clear refresh staging tables before row loading",
838 ));
839 {
840 let started = Instant::now();
841 sqlite_stage_projection_nodes(&tx, &projection.nodes, source_watermark.as_deref())?;
842 phase_timings.push(sqlite_refresh_phase_timing(
843 "sqlite_node_staging",
844 started,
845 &format!(
846 "bulk stage {} graph_nodes rows into temp table using multi-row chunks up to {} rows before delta comparison",
847 projection.nodes.len(),
848 SQLITE_GRAPH_STAGING_CHUNK_ROWS
849 ),
850 ));
851 }
852 {
853 let started = Instant::now();
854 sqlite_stage_projection_edges(&tx, &projection.edges, source_watermark.as_deref())?;
855 phase_timings.push(sqlite_refresh_phase_timing(
856 "sqlite_edge_staging",
857 started,
858 &format!(
859 "bulk stage {} graph_edges rows into temp table using multi-row chunks up to {} rows before delta comparison",
860 projection.edges.len(),
861 SQLITE_GRAPH_STAGING_CHUNK_ROWS
862 ),
863 ));
864 }
865 {
866 let started = Instant::now();
867 let changed_nodes_sql = if force_refresh_writes {
868 r#"
869 INSERT INTO next_graph_changed_nodes (id)
870 SELECT id
871 FROM next_graph_nodes
872 "#
873 } else {
874 r#"
875 INSERT INTO next_graph_changed_nodes (id)
876 SELECT n.id
877 FROM next_graph_nodes n
878 LEFT JOIN graph_nodes g ON g.id = n.id
879 WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
880 "#
881 };
882 tx.execute(changed_nodes_sql, [])?;
883 tx.execute_batch(
884 r#"
885 INSERT INTO next_graph_node_properties (node_id, key, value)
886 SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
887 FROM next_graph_nodes n
888 JOIN next_graph_changed_nodes c ON c.id = n.id,
889 json_each(n.properties_json)
890 WHERE json_each.key IS NOT NULL
891 AND json_each.value IS NOT NULL
892 "#,
893 )?;
894 phase_timings.push(sqlite_refresh_phase_timing(
895 "sqlite_property_row_staging",
896 started,
897 "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
898 ));
899 }
900 {
901 let started = Instant::now();
902 let changed_edges_sql = if force_refresh_writes {
903 r#"
904 INSERT INTO next_graph_changed_edges (edge_key)
905 SELECT edge_key
906 FROM next_graph_edges
907 "#
908 } else {
909 r#"
910 INSERT INTO next_graph_changed_edges (edge_key)
911 SELECT n.edge_key
912 FROM next_graph_edges n
913 LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
914 WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
915 "#
916 };
917 tx.execute(changed_edges_sql, [])?;
918 tx.execute_batch(
919 r#"
920 INSERT INTO next_graph_edge_properties (edge_key, key, value)
921 SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
922 FROM next_graph_edges e
923 JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
924 json_each(e.properties_json)
925 WHERE json_each.key IS NOT NULL
926 AND json_each.value IS NOT NULL
927 "#,
928 )?;
929 phase_timings.push(sqlite_refresh_phase_timing(
930 "sqlite_edge_property_row_staging",
931 started,
932 "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
933 ));
934 }
935
936 let delta_started = Instant::now();
937 let tombstoned_nodes = {
938 let mut stmt = tx.prepare(
939 r#"
940 SELECT g.id
941 FROM graph_nodes g
942 LEFT JOIN next_graph_nodes n ON n.id = g.id
943 WHERE n.id IS NULL
944 ORDER BY g.id
945 "#,
946 )?;
947 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
948 };
949 let tombstoned_edges = {
950 let mut stmt = tx.prepare(
951 r#"
952 SELECT g.edge_key
953 FROM graph_edges g
954 LEFT JOIN next_graph_edges n
955 ON n.edge_key = g.edge_key
956 WHERE n.edge_key IS NULL
957 ORDER BY g.edge_key
958 "#,
959 )?;
960 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
961 };
962 let unchanged_nodes: usize = tx.query_row(
963 r#"
964 SELECT COUNT(*)
965 FROM next_graph_nodes n
966 JOIN graph_nodes g ON g.id = n.id
967 WHERE g.row_hash = n.row_hash
968 "#,
969 [],
970 |row| row.get(0),
971 )?;
972 let unchanged_edges: usize = tx.query_row(
973 r#"
974 SELECT COUNT(*)
975 FROM next_graph_edges n
976 JOIN graph_edges g
977 ON g.edge_key = n.edge_key
978 WHERE g.row_hash = n.row_hash
979 "#,
980 [],
981 |row| row.get(0),
982 )?;
983 let reused_owner_node_properties: usize = tx.query_row(
984 r#"
985 SELECT COUNT(*)
986 FROM graph_node_properties g
987 JOIN next_graph_nodes n ON n.id = g.node_id
988 LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
989 WHERE c.id IS NULL
990 "#,
991 [],
992 |row| row.get(0),
993 )?;
994 let reused_owner_edge_properties: usize = tx.query_row(
995 r#"
996 SELECT COUNT(*)
997 FROM graph_edge_properties g
998 JOIN next_graph_edges n ON n.edge_key = g.edge_key
999 LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1000 WHERE c.edge_key IS NULL
1001 "#,
1002 [],
1003 |row| row.get(0),
1004 )?;
1005 let unchanged_changed_node_properties: usize = tx.query_row(
1006 r#"
1007 SELECT COUNT(*)
1008 FROM next_graph_node_properties n
1009 JOIN graph_node_properties g
1010 ON g.node_id = n.node_id AND g.key = n.key
1011 WHERE g.value = n.value
1012 "#,
1013 [],
1014 |row| row.get(0),
1015 )?;
1016 let unchanged_changed_edge_properties: usize = tx.query_row(
1017 r#"
1018 SELECT COUNT(*)
1019 FROM next_graph_edge_properties n
1020 JOIN graph_edge_properties g
1021 ON g.edge_key = n.edge_key AND g.key = n.key
1022 WHERE g.value = n.value
1023 "#,
1024 [],
1025 |row| row.get(0),
1026 )?;
1027 let unchanged_properties = reused_owner_node_properties
1028 + reused_owner_edge_properties
1029 + unchanged_changed_node_properties
1030 + unchanged_changed_edge_properties;
1031
1032 let deleted_edges = tx.execute(
1033 r#"
1034 DELETE FROM graph_edges
1035 WHERE NOT EXISTS (
1036 SELECT 1
1037 FROM next_graph_edges n
1038 WHERE n.edge_key = graph_edges.edge_key
1039 )
1040 "#,
1041 [],
1042 )?;
1043 let deleted_nodes = tx.execute(
1044 r#"
1045 DELETE FROM graph_nodes
1046 WHERE NOT EXISTS (
1047 SELECT 1
1048 FROM next_graph_nodes n
1049 WHERE n.id = graph_nodes.id
1050 )
1051 "#,
1052 [],
1053 )?;
1054
1055 let upsert_nodes_sql = r#"
1056 INSERT INTO graph_nodes
1057 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1058 SELECT
1059 n.id,
1060 n.kind,
1061 n.label,
1062 n.properties_json,
1063 n.provenance_json,
1064 n.freshness_json,
1065 n.row_hash,
1066 n.source_watermark
1067 FROM next_graph_nodes n
1068 JOIN next_graph_changed_nodes c ON c.id = n.id
1069 WHERE true
1070 ON CONFLICT(id) DO UPDATE SET
1071 kind = excluded.kind,
1072 label = excluded.label,
1073 properties_json = excluded.properties_json,
1074 provenance_json = excluded.provenance_json,
1075 freshness_json = excluded.freshness_json,
1076 row_hash = excluded.row_hash,
1077 source_watermark = excluded.source_watermark
1078 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1079 "#;
1080 tx.execute(upsert_nodes_sql, [])?;
1081 let upsert_edges_sql = r#"
1082 INSERT INTO graph_edges
1083 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1084 SELECT
1085 n.edge_key,
1086 n.from_id,
1087 n.to_id,
1088 n.kind,
1089 n.properties_json,
1090 n.provenance_json,
1091 n.freshness_json,
1092 n.row_hash,
1093 n.source_watermark
1094 FROM next_graph_edges n
1095 JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1096 WHERE true
1097 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1098 edge_key = excluded.edge_key,
1099 properties_json = excluded.properties_json,
1100 provenance_json = excluded.provenance_json,
1101 freshness_json = excluded.freshness_json,
1102 row_hash = excluded.row_hash,
1103 source_watermark = excluded.source_watermark
1104 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1105 "#;
1106 tx.execute(upsert_edges_sql, [])?;
1107 let deleted_node_properties = tx.execute(
1108 r#"
1109 DELETE FROM graph_node_properties
1110 WHERE EXISTS (
1111 SELECT 1
1112 FROM next_graph_changed_nodes c
1113 WHERE c.id = graph_node_properties.node_id
1114 )
1115 AND NOT EXISTS (
1116 SELECT 1
1117 FROM next_graph_node_properties n
1118 WHERE n.node_id = graph_node_properties.node_id
1119 AND n.key = graph_node_properties.key
1120 )
1121 "#,
1122 [],
1123 )?;
1124 let deleted_edge_properties = tx.execute(
1125 r#"
1126 DELETE FROM graph_edge_properties
1127 WHERE EXISTS (
1128 SELECT 1
1129 FROM next_graph_changed_edges c
1130 WHERE c.edge_key = graph_edge_properties.edge_key
1131 )
1132 AND NOT EXISTS (
1133 SELECT 1
1134 FROM next_graph_edge_properties n
1135 WHERE n.edge_key = graph_edge_properties.edge_key
1136 AND n.key = graph_edge_properties.key
1137 )
1138 "#,
1139 [],
1140 )?;
1141 let deleted_properties = deleted_node_properties + deleted_edge_properties;
1142 let upsert_properties_sql = r#"
1143 INSERT INTO graph_node_properties (node_id, key, value)
1144 SELECT n.node_id, n.key, n.value
1145 FROM next_graph_node_properties n
1146 LEFT JOIN graph_node_properties g
1147 ON g.node_id = n.node_id AND g.key = n.key
1148 WHERE g.node_id IS NULL OR g.value IS NOT n.value
1149 ON CONFLICT(node_id, key) DO UPDATE SET
1150 value = excluded.value
1151 WHERE graph_node_properties.value IS NOT excluded.value
1152 "#;
1153 let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1154 let upsert_edge_properties_sql = r#"
1155 INSERT INTO graph_edge_properties (edge_key, key, value)
1156 SELECT n.edge_key, n.key, n.value
1157 FROM next_graph_edge_properties n
1158 LEFT JOIN graph_edge_properties g
1159 ON g.edge_key = n.edge_key AND g.key = n.key
1160 WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1161 ON CONFLICT(edge_key, key) DO UPDATE SET
1162 value = excluded.value
1163 WHERE graph_edge_properties.value IS NOT excluded.value
1164 "#;
1165 let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1166 let upserted_properties = upserted_node_properties + upserted_edge_properties;
1167 tx.execute(
1168 r#"
1169 INSERT INTO graph_projection_versions
1170 (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1171 VALUES (?1, ?2, ?3, ?4, ?5)
1172 ON CONFLICT(scope) DO UPDATE SET
1173 projection_version = excluded.projection_version,
1174 content_hash = excluded.content_hash,
1175 source_watermark = excluded.source_watermark,
1176 observed_at_unix = excluded.observed_at_unix
1177 "#,
1178 (
1179 &scope,
1180 &projection_version,
1181 &projection_hash,
1182 &source_watermark,
1183 observed_at_unix,
1184 ),
1185 )?;
1186 let pruned_node_tombstones = tx.execute(
1187 r#"
1188 DELETE FROM graph_tombstones
1189 WHERE row_kind = 'node'
1190 AND EXISTS (
1191 SELECT 1
1192 FROM next_graph_nodes n
1193 WHERE n.id = substr(graph_tombstones.row_key, 6)
1194 )
1195 "#,
1196 [],
1197 )?;
1198 let pruned_edge_tombstones = tx.execute(
1199 r#"
1200 DELETE FROM graph_tombstones
1201 WHERE row_kind = 'edge'
1202 AND EXISTS (
1203 SELECT 1
1204 FROM next_graph_edges n
1205 WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1206 )
1207 "#,
1208 [],
1209 )?;
1210 {
1211 let mut insert_node_tombstone = tx.prepare(
1212 r#"
1213 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1214 VALUES (?1, 'node', ?2)
1215 ON CONFLICT(row_key) DO UPDATE SET
1216 row_kind = excluded.row_kind,
1217 deleted_at_unix = excluded.deleted_at_unix
1218 "#,
1219 )?;
1220 for id in &tombstoned_nodes {
1221 insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1222 }
1223 }
1224 {
1225 let mut insert_edge_tombstone = tx.prepare(
1226 r#"
1227 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1228 VALUES (?1, 'edge', ?2)
1229 ON CONFLICT(row_key) DO UPDATE SET
1230 row_kind = excluded.row_kind,
1231 deleted_at_unix = excluded.deleted_at_unix
1232 "#,
1233 )?;
1234 for key in &tombstoned_edges {
1235 insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1236 }
1237 }
1238 let tombstone_node_count: usize = tx.query_row(
1239 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1240 [],
1241 |row| row.get(0),
1242 )?;
1243 let tombstone_edge_count: usize = tx.query_row(
1244 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1245 [],
1246 |row| row.get(0),
1247 )?;
1248 tx.execute(
1249 r#"
1250 INSERT INTO graph_operator_stats
1251 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1252 VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1253 ON CONFLICT(scope) DO UPDATE SET
1254 nodes = excluded.nodes,
1255 edges = excluded.edges,
1256 tombstone_nodes = excluded.tombstone_nodes,
1257 tombstone_edges = excluded.tombstone_edges,
1258 file_size_bytes = excluded.file_size_bytes,
1259 freelist_bytes = excluded.freelist_bytes,
1260 observed_at_unix = excluded.observed_at_unix
1261 "#,
1262 (
1263 &scope,
1264 projection.nodes.len() as i64,
1265 projection.edges.len() as i64,
1266 tombstone_node_count as i64,
1267 tombstone_edge_count as i64,
1268 observed_at_unix,
1269 ),
1270 )?;
1271 phase_timings.push(sqlite_refresh_phase_timing(
1272 "sqlite_delta_write",
1273 delta_started,
1274 "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1275 ));
1276 let commit_started = Instant::now();
1277 tx.commit()?;
1278 phase_timings.push(sqlite_refresh_phase_timing(
1279 "sqlite_commit",
1280 commit_started,
1281 "commit refresh transaction and publish old-or-new graph visibility",
1282 ));
1283 let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1284 let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1285 let stats_started = Instant::now();
1286 self.conn.execute(
1287 r#"
1288 UPDATE graph_operator_stats
1289 SET file_size_bytes = ?2,
1290 freelist_bytes = ?3,
1291 observed_at_unix = ?4
1292 WHERE scope = ?1
1293 "#,
1294 (
1295 &scope,
1296 file_size_bytes_after.map(|value| value as i64),
1297 freelist_bytes_after.map(|value| value as i64),
1298 unix_now(),
1299 ),
1300 )?;
1301 phase_timings.push(sqlite_refresh_phase_timing(
1302 "sqlite_stats_cache_update",
1303 stats_started,
1304 "persist post-commit file and freelist proof for status/doctor",
1305 ));
1306 Ok(SqliteProjectionRefresh {
1307 scope,
1308 projection_version,
1309 source_watermark,
1310 upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1311 upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1312 unchanged_nodes,
1313 unchanged_edges,
1314 upserted_properties,
1315 unchanged_properties,
1316 deleted_properties,
1317 deleted_nodes,
1318 deleted_edges,
1319 pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1320 file_size_bytes_before,
1321 file_size_bytes_after,
1322 tombstoned_nodes,
1323 tombstoned_edges,
1324 phase_timings,
1325 })
1326 }
1327
1328 pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1329 let tx = self.conn.transaction()?;
1330 {
1331 let mut insert_node = tx.prepare(
1332 r#"
1333 INSERT INTO graph_nodes
1334 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1335 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1336 ON CONFLICT(id) DO UPDATE SET
1337 kind = excluded.kind,
1338 label = excluded.label,
1339 properties_json = excluded.properties_json,
1340 provenance_json = excluded.provenance_json,
1341 freshness_json = excluded.freshness_json,
1342 row_hash = excluded.row_hash,
1343 source_watermark = excluded.source_watermark
1344 "#,
1345 )?;
1346 let mut delete_properties =
1347 tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1348 let mut insert_property = tx.prepare(
1349 r#"
1350 INSERT INTO graph_node_properties (node_id, key, value)
1351 VALUES (?1, ?2, ?3)
1352 "#,
1353 )?;
1354 for node in &projection.nodes {
1355 insert_node.execute((
1356 &node.id,
1357 &node.kind,
1358 &node.label,
1359 to_json(&node.properties)?,
1360 to_json(&node.provenance)?,
1361 optional_to_json(&node.freshness)?,
1362 row_hash(node)?,
1363 ))?;
1364 delete_properties.execute([&node.id])?;
1365 for (key, value) in &node.properties {
1366 insert_property.execute((&node.id, key, value))?;
1367 }
1368 }
1369 }
1370 {
1371 let mut insert_edge = tx.prepare(
1372 r#"
1373 INSERT INTO graph_edges
1374 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1375 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1376 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1377 edge_key = excluded.edge_key,
1378 properties_json = excluded.properties_json,
1379 provenance_json = excluded.provenance_json,
1380 freshness_json = excluded.freshness_json,
1381 row_hash = excluded.row_hash,
1382 source_watermark = excluded.source_watermark
1383 "#,
1384 )?;
1385 let mut delete_properties =
1386 tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1387 let mut insert_property = tx.prepare(
1388 r#"
1389 INSERT INTO graph_edge_properties (edge_key, key, value)
1390 VALUES (?1, ?2, ?3)
1391 "#,
1392 )?;
1393 for edge in &projection.edges {
1394 let edge_key = graph_edge_id(edge);
1395 insert_edge.execute((
1396 &edge_key,
1397 &edge.from_id,
1398 &edge.to_id,
1399 &edge.kind,
1400 to_json(&edge.properties)?,
1401 to_json(&edge.provenance)?,
1402 optional_to_json(&edge.freshness)?,
1403 row_hash(edge)?,
1404 ))?;
1405 delete_properties.execute([&edge_key])?;
1406 for (key, value) in &edge.properties {
1407 insert_property.execute((&edge_key, key, value))?;
1408 }
1409 }
1410 }
1411 tx.commit()?;
1412 Ok(())
1413 }
1414
1415 pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1416 self.conn
1417 .query_row(
1418 r#"
1419 SELECT projection_version, content_hash, source_watermark
1420 FROM graph_projection_versions
1421 WHERE scope = ?1
1422 "#,
1423 [scope],
1424 |row| {
1425 Ok(SqliteProjectionVersion {
1426 projection_version: row.get(0)?,
1427 content_hash: row.get(1)?,
1428 source_watermark: row.get(2)?,
1429 })
1430 },
1431 )
1432 .optional()
1433 .map_err(Into::into)
1434 }
1435
1436 pub fn update_projection_source_watermark(
1437 &mut self,
1438 scope: &str,
1439 source_watermark: Option<String>,
1440 ) -> Result<()> {
1441 self.conn.execute(
1442 r#"
1443 UPDATE graph_projection_versions
1444 SET source_watermark = ?2
1445 WHERE scope = ?1
1446 "#,
1447 (scope, source_watermark),
1448 )?;
1449 Ok(())
1450 }
1451
1452 pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1453 let pruned_tombstones = if prune_tombstones {
1454 self.conn.execute("DELETE FROM graph_tombstones", [])?
1455 } else {
1456 0
1457 };
1458 self.conn.execute_batch(
1459 r#"
1460 PRAGMA wal_checkpoint(TRUNCATE);
1461 VACUUM;
1462 "#,
1463 )?;
1464 let nodes = self
1465 .conn
1466 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1467 row.get::<_, i64>(0)
1468 })?;
1469 let edges = self
1470 .conn
1471 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1472 row.get::<_, i64>(0)
1473 })?;
1474 let tombstone_nodes = self.conn.query_row(
1475 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1476 [],
1477 |row| row.get::<_, i64>(0),
1478 )?;
1479 let tombstone_edges = self.conn.query_row(
1480 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1481 [],
1482 |row| row.get::<_, i64>(0),
1483 )?;
1484 let file_size_bytes = sqlite_database_size_bytes(&self.conn)
1485 .ok()
1486 .map(|value| value as i64);
1487 let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
1488 .ok()
1489 .map(|value| value as i64);
1490 self.conn.execute(
1491 r#"
1492 INSERT INTO graph_operator_stats
1493 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1494 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
1495 ON CONFLICT(scope) DO UPDATE SET
1496 nodes = excluded.nodes,
1497 edges = excluded.edges,
1498 tombstone_nodes = excluded.tombstone_nodes,
1499 tombstone_edges = excluded.tombstone_edges,
1500 file_size_bytes = excluded.file_size_bytes,
1501 freelist_bytes = excluded.freelist_bytes,
1502 observed_at_unix = excluded.observed_at_unix
1503 "#,
1504 (
1505 scope,
1506 nodes,
1507 edges,
1508 tombstone_nodes,
1509 tombstone_edges,
1510 file_size_bytes,
1511 freelist_bytes,
1512 ),
1513 )?;
1514 Ok(pruned_tombstones)
1515 }
1516}
1517
1518fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
1519 let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
1520 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
1521 row.get::<_, String>(3)
1522 })?)
1523}
1524
1525fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
1526 let mut diagnostics = vec![format!(
1527 "sqlite query pushdown active; plan: {}",
1528 plan.join(" | ")
1529 )];
1530 for expected_index in expected_indexes {
1531 if plan.iter().any(|row| row.contains(expected_index)) {
1532 diagnostics.push(format!("sqlite query plan uses {expected_index}"));
1533 } else {
1534 diagnostics.push(format!(
1535 "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
1536 ));
1537 }
1538 }
1539 diagnostics
1540}
1541
1542fn push_sqlite_property_filter_exists(
1543 sql: &mut String,
1544 values: &mut Vec<Value>,
1545 node_alias: &str,
1546 filters: &[GraphPropertyFilter],
1547) {
1548 for (index, filter) in filters.iter().enumerate() {
1549 sql.push_str(&format!(
1550 r#"
1551 AND EXISTS (
1552 SELECT 1
1553 FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
1554 WHERE p{index}.node_id = {node_alias}.id
1555 AND p{index}.key = ?
1556 AND p{index}.value = ?
1557 )
1558 "#
1559 ));
1560 values.push(Value::Text(filter.key.clone()));
1561 values.push(Value::Text(filter.value.clone()));
1562 }
1563}
1564
1565fn push_sqlite_edge_property_filter_exists(
1566 sql: &mut String,
1567 values: &mut Vec<Value>,
1568 edge_alias: &str,
1569 filters: &[GraphPropertyFilter],
1570) {
1571 for (index, filter) in filters.iter().enumerate() {
1572 sql.push_str(&format!(
1573 r#"
1574 AND EXISTS (
1575 SELECT 1
1576 FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
1577 WHERE ep{index}.edge_key = {edge_alias}.edge_key
1578 AND ep{index}.key = ?
1579 AND ep{index}.value = ?
1580 )
1581 "#
1582 ));
1583 values.push(Value::Text(filter.key.clone()));
1584 values.push(Value::Text(filter.value.clone()));
1585 }
1586}
1587
1588struct SqliteIncidentEdgeBranch<'a> {
1589 index_name: &'a str,
1590 endpoint_column: &'a str,
1591 node_id: &'a str,
1592 kind: Option<&'a str>,
1593 filters: &'a [GraphPropertyFilter],
1594 cursor: Option<&'a str>,
1595}
1596
1597fn push_sqlite_incident_edge_branch(
1598 sql: &mut String,
1599 values: &mut Vec<Value>,
1600 branch: SqliteIncidentEdgeBranch<'_>,
1601) {
1602 sql.push_str(&format!(
1603 r#"
1604 SELECT
1605 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
1606 FROM graph_edges e INDEXED BY {index_name}
1607 WHERE e.{endpoint_column} = ?
1608 "#,
1609 index_name = branch.index_name,
1610 endpoint_column = branch.endpoint_column,
1611 ));
1612 values.push(Value::Text(branch.node_id.to_string()));
1613 if let Some(kind) = branch.kind {
1614 sql.push_str(" AND e.kind = ?");
1615 values.push(Value::Text(kind.to_string()));
1616 }
1617 push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
1618 if let Some(cursor) = branch.cursor {
1619 sql.push_str(" AND e.edge_key > ?");
1620 values.push(Value::Text(cursor.to_string()));
1621 }
1622}
1623
1624fn sqlite_incident_edges_union_query(
1625 node_id: &str,
1626 kind: Option<&str>,
1627 filters: &[GraphPropertyFilter],
1628 cursor: Option<&str>,
1629 limit: Option<usize>,
1630) -> (String, Vec<Value>) {
1631 let mut sql = String::from(
1632 r#"
1633 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1634 FROM (
1635 "#,
1636 );
1637 let mut values = Vec::new();
1638 push_sqlite_incident_edge_branch(
1639 &mut sql,
1640 &mut values,
1641 SqliteIncidentEdgeBranch {
1642 index_name: "idx_graph_edges_from_kind",
1643 endpoint_column: "from_id",
1644 node_id,
1645 kind,
1646 filters,
1647 cursor,
1648 },
1649 );
1650 sql.push_str(" UNION ");
1651 push_sqlite_incident_edge_branch(
1652 &mut sql,
1653 &mut values,
1654 SqliteIncidentEdgeBranch {
1655 index_name: "idx_graph_edges_to_kind",
1656 endpoint_column: "to_id",
1657 node_id,
1658 kind,
1659 filters,
1660 cursor,
1661 },
1662 );
1663 sql.push_str(
1664 r#"
1665 ) e
1666 ORDER BY e.edge_key
1667 "#,
1668 );
1669 if let Some(limit) = limit {
1670 sql.push_str(" LIMIT ?");
1671 values.push(Value::Integer(limit.saturating_add(1) as i64));
1672 }
1673 (sql, values)
1674}
1675
1676impl GraphStore for SqliteGraphStore {
1677 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
1678 self.conn.execute(
1679 r#"
1680 INSERT INTO graph_nodes
1681 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1682 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1683 ON CONFLICT(id) DO UPDATE SET
1684 kind = excluded.kind,
1685 label = excluded.label,
1686 properties_json = excluded.properties_json,
1687 provenance_json = excluded.provenance_json,
1688 freshness_json = excluded.freshness_json,
1689 row_hash = excluded.row_hash,
1690 source_watermark = excluded.source_watermark
1691 "#,
1692 (
1693 &node.id,
1694 &node.kind,
1695 &node.label,
1696 to_json(&node.properties)?,
1697 to_json(&node.provenance)?,
1698 optional_to_json(&node.freshness)?,
1699 row_hash(node)?,
1700 ),
1701 )?;
1702 replace_node_properties(&self.conn, &node.id, &node.properties)?;
1703 Ok(())
1704 }
1705
1706 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
1707 let edge_key = graph_edge_id(edge);
1708 self.conn.execute(
1709 r#"
1710 INSERT INTO graph_edges
1711 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1712 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1713 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1714 edge_key = excluded.edge_key,
1715 properties_json = excluded.properties_json,
1716 provenance_json = excluded.provenance_json,
1717 freshness_json = excluded.freshness_json,
1718 row_hash = excluded.row_hash,
1719 source_watermark = excluded.source_watermark
1720 "#,
1721 (
1722 &edge_key,
1723 &edge.from_id,
1724 &edge.to_id,
1725 &edge.kind,
1726 to_json(&edge.properties)?,
1727 to_json(&edge.provenance)?,
1728 optional_to_json(&edge.freshness)?,
1729 row_hash(edge)?,
1730 ),
1731 )?;
1732 replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
1733 Ok(())
1734 }
1735
1736 fn delete_node(&self, id: &str) -> Result<usize> {
1737 self.conn
1738 .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
1739 .map_err(Into::into)
1740 }
1741
1742 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
1743 self.conn
1744 .execute(
1745 "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
1746 (from_id, to_id, kind),
1747 )
1748 .map_err(Into::into)
1749 }
1750
1751 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
1752 self.conn
1753 .query_row(
1754 r#"
1755 SELECT id, kind, label, properties_json, provenance_json, freshness_json
1756 FROM graph_nodes
1757 WHERE id = ?1
1758 "#,
1759 [id],
1760 node_from_row,
1761 )
1762 .optional()
1763 .map_err(Into::into)
1764 }
1765
1766 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
1767 let mut stmt = self.conn.prepare(
1768 r#"
1769 SELECT id, kind, label, properties_json, provenance_json, freshness_json
1770 FROM graph_nodes
1771 ORDER BY id
1772 "#,
1773 )?;
1774 collect_rows(stmt.query_map([], node_from_row)?)
1775 }
1776
1777 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
1778 let mut stmt = self.conn.prepare(
1779 r#"
1780 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1781 FROM graph_edges
1782 ORDER BY from_id, kind, to_id
1783 "#,
1784 )?;
1785 collect_rows(stmt.query_map([], edge_from_row)?)
1786 }
1787
1788 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
1789 self.conn
1790 .query_row(
1791 r#"
1792 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1793 FROM graph_edges INDEXED BY idx_graph_edges_edge_key
1794 WHERE edge_key = ?1
1795 "#,
1796 [edge_id],
1797 edge_from_row,
1798 )
1799 .optional()
1800 .map_err(Into::into)
1801 }
1802
1803 fn graph_counts(&self) -> Result<(usize, usize)> {
1804 let nodes = self
1805 .conn
1806 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1807 row.get::<_, usize>(0)
1808 })?;
1809 let edges = self
1810 .conn
1811 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1812 row.get::<_, usize>(0)
1813 })?;
1814 Ok((nodes, edges))
1815 }
1816
1817 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
1818 match kind {
1819 Some(kind) => self
1820 .conn
1821 .query_row(
1822 r#"
1823 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1824 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
1825 WHERE from_id <> to_id AND kind = ?1
1826 ORDER BY from_id, kind, to_id
1827 LIMIT 1
1828 "#,
1829 [kind],
1830 edge_from_row,
1831 )
1832 .optional()
1833 .map_err(Into::into),
1834 None => self
1835 .conn
1836 .query_row(
1837 r#"
1838 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1839 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
1840 WHERE from_id <> to_id
1841 ORDER BY from_id, kind, to_id
1842 LIMIT 1
1843 "#,
1844 [],
1845 edge_from_row,
1846 )
1847 .optional()
1848 .map_err(Into::into),
1849 }
1850 }
1851
1852 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
1853 self.conn
1854 .query_row(
1855 r#"
1856 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
1857 ep.key, ep.value
1858 FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
1859 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
1860 ON e.edge_key = ep.edge_key
1861 WHERE e.from_id <> e.to_id
1862 ORDER BY ep.key, ep.value, ep.edge_key
1863 LIMIT 1
1864 "#,
1865 [],
1866 |row| {
1867 Ok((
1868 edge_from_row(row)?,
1869 GraphPropertyFilter {
1870 key: row.get(7)?,
1871 value: row.get(8)?,
1872 },
1873 ))
1874 },
1875 )
1876 .optional()
1877 .map_err(Into::into)
1878 }
1879
1880 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
1881 let mut stmt = self.conn.prepare(
1882 r#"
1883 SELECT id, kind, label, properties_json, provenance_json, freshness_json
1884 FROM graph_nodes
1885 WHERE kind = ?1
1886 ORDER BY id
1887 "#,
1888 )?;
1889 collect_rows(stmt.query_map([kind], node_from_row)?)
1890 }
1891
1892 fn paged_nodes_by_kind(
1893 &self,
1894 kind: &str,
1895 options: GraphQueryOptions,
1896 ) -> Result<GraphPagedSubgraph> {
1897 let mut sql = String::from(
1898 r#"
1899 SELECT id, kind, label, properties_json, provenance_json, freshness_json
1900 FROM graph_nodes
1901 WHERE kind = ?
1902 "#,
1903 );
1904 let mut values = vec![Value::Text(kind.to_string())];
1905 push_sqlite_property_filter_exists(
1906 &mut sql,
1907 &mut values,
1908 "graph_nodes",
1909 &options.property_filters,
1910 );
1911 if let Some(cursor) = &options.cursor {
1912 sql.push_str(" AND id > ?");
1913 values.push(Value::Text(cursor.clone()));
1914 }
1915 sql.push_str(" ORDER BY id");
1916 if let Some(limit) = options.limit {
1917 sql.push_str(" LIMIT ?");
1918 values.push(Value::Integer(limit.saturating_add(1) as i64));
1919 }
1920
1921 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
1922 let mut stmt = self.conn.prepare(&sql)?;
1923 let mut nodes =
1924 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
1925 let before_limit = nodes.len();
1926 let mut next_cursor = None;
1927 if let Some(limit) = options.limit
1928 && nodes.len() > limit
1929 {
1930 next_cursor = nodes
1931 .get(limit.saturating_sub(1))
1932 .map(|node| node.id.clone());
1933 nodes.truncate(limit);
1934 }
1935 let expected_indexes = if options.property_filters.is_empty() {
1936 vec!["idx_graph_nodes_kind"]
1937 } else {
1938 vec![
1939 "idx_graph_nodes_kind",
1940 "idx_graph_node_properties_key_value_node",
1941 ]
1942 };
1943 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
1944 if !options.property_filters.is_empty() {
1945 diagnostics.push(
1946 "property filters were evaluated by SQLite materialized property rows before paging"
1947 .to_string(),
1948 );
1949 }
1950 if options.cursor.is_some() {
1951 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
1952 }
1953 if next_cursor.is_some() {
1954 diagnostics.push(
1955 "result was truncated; pass page.next_cursor as --cursor for the next page"
1956 .to_string(),
1957 );
1958 }
1959 Ok(GraphPagedSubgraph {
1960 page: GraphQueryPage {
1961 cursor: options.cursor,
1962 limit: options.limit,
1963 next_cursor,
1964 returned_nodes: nodes.len(),
1965 returned_edges: 0,
1966 truncated: options.limit.is_some_and(|limit| before_limit > limit),
1967 diagnostics,
1968 },
1969 nodes,
1970 edges: Vec::new(),
1971 })
1972 }
1973
1974 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
1975 match kind {
1976 Some(kind) => {
1977 let mut stmt = self.conn.prepare(
1978 r#"
1979 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1980 FROM graph_edges
1981 WHERE from_id = ?1 AND kind = ?2
1982 ORDER BY to_id, kind
1983 "#,
1984 )?;
1985 collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
1986 }
1987 None => {
1988 let mut stmt = self.conn.prepare(
1989 r#"
1990 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1991 FROM graph_edges
1992 WHERE from_id = ?1
1993 ORDER BY to_id, kind
1994 "#,
1995 )?;
1996 collect_rows(stmt.query_map([from_id], edge_from_row)?)
1997 }
1998 }
1999 }
2000
2001 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2002 let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2003 let mut stmt = self.conn.prepare(&sql)?;
2004 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2005 }
2006
2007 fn paged_edges(
2008 &self,
2009 kind: Option<&str>,
2010 options: GraphQueryOptions,
2011 ) -> Result<GraphPagedSubgraph> {
2012 let primary_property_filter = options.property_filters.first();
2013 let mut values = Vec::new();
2014 let mut sql = if let Some(filter) = primary_property_filter {
2015 values.push(Value::Text(filter.key.clone()));
2016 values.push(Value::Text(filter.value.clone()));
2017 String::from(
2018 r#"
2019 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2020 FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2021 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2022 ON e.edge_key = ep0.edge_key
2023 WHERE ep0.key = ?
2024 AND ep0.value = ?
2025 "#,
2026 )
2027 } else {
2028 String::from(
2029 r#"
2030 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2031 FROM graph_edges e
2032 WHERE 1 = 1
2033 "#,
2034 )
2035 };
2036 if let Some(kind) = kind {
2037 sql.push_str(" AND e.kind = ?");
2038 values.push(Value::Text(kind.to_string()));
2039 }
2040 push_sqlite_edge_property_filter_exists(
2041 &mut sql,
2042 &mut values,
2043 "e",
2044 if primary_property_filter.is_some() {
2045 &options.property_filters[1..]
2046 } else {
2047 &options.property_filters
2048 },
2049 );
2050 if let Some(cursor) = &options.cursor {
2051 if primary_property_filter.is_some() {
2052 sql.push_str(" AND ep0.edge_key > ?");
2053 } else {
2054 sql.push_str(" AND e.edge_key > ?");
2055 }
2056 values.push(Value::Text(cursor.clone()));
2057 }
2058 if primary_property_filter.is_some() {
2059 sql.push_str(" ORDER BY ep0.edge_key");
2060 } else {
2061 sql.push_str(" ORDER BY e.edge_key");
2062 }
2063 if let Some(limit) = options.limit {
2064 sql.push_str(" LIMIT ?");
2065 values.push(Value::Integer(limit.saturating_add(1) as i64));
2066 }
2067
2068 let primary_property_row_count = if let Some(filter) = primary_property_filter {
2069 Some(self.conn.query_row(
2070 r#"
2071 SELECT COUNT(*)
2072 FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2073 WHERE key = ?1 AND value = ?2
2074 "#,
2075 (&filter.key, &filter.value),
2076 |row| row.get::<_, usize>(0),
2077 )?)
2078 } else {
2079 None
2080 };
2081 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2082 let mut stmt = self.conn.prepare(&sql)?;
2083 let mut edges =
2084 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2085 let before_limit = edges.len();
2086 let mut next_cursor = None;
2087 if let Some(limit) = options.limit
2088 && edges.len() > limit
2089 {
2090 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2091 edges.truncate(limit);
2092 }
2093 let expected_indexes = if options.property_filters.is_empty() {
2094 vec!["idx_graph_edges_edge_key"]
2095 } else {
2096 vec![
2097 "idx_graph_edge_properties_key_value_edge",
2098 "idx_graph_edges_edge_key",
2099 ]
2100 };
2101 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2102 if !options.property_filters.is_empty() {
2103 if let Some(row_count) = primary_property_row_count {
2104 diagnostics.push(format!(
2105 "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2106 ));
2107 }
2108 diagnostics.push(
2109 "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2110 .to_string(),
2111 );
2112 }
2113 if options.cursor.is_some() {
2114 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2115 }
2116 if next_cursor.is_some() {
2117 diagnostics.push(
2118 "result was truncated; pass page.next_cursor as --cursor for the next page"
2119 .to_string(),
2120 );
2121 }
2122 Ok(GraphPagedSubgraph {
2123 page: GraphQueryPage {
2124 cursor: options.cursor,
2125 limit: options.limit,
2126 next_cursor,
2127 returned_nodes: 0,
2128 returned_edges: edges.len(),
2129 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2130 diagnostics,
2131 },
2132 nodes: Vec::new(),
2133 edges,
2134 })
2135 }
2136
2137 fn paged_incident_edges(
2138 &self,
2139 node_id: &str,
2140 kind: Option<&str>,
2141 options: GraphQueryOptions,
2142 ) -> Result<GraphPagedSubgraph> {
2143 let (sql, values) = sqlite_incident_edges_union_query(
2144 node_id,
2145 kind,
2146 &options.property_filters,
2147 options.cursor.as_deref(),
2148 options.limit,
2149 );
2150 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2151 let mut stmt = self.conn.prepare(&sql)?;
2152 let mut edges =
2153 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2154 let before_limit = edges.len();
2155 let mut next_cursor = None;
2156 if let Some(limit) = options.limit
2157 && edges.len() > limit
2158 {
2159 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2160 edges.truncate(limit);
2161 }
2162 let expected_indexes = if options.property_filters.is_empty() {
2163 vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2164 } else {
2165 vec![
2166 "idx_graph_edges_from_kind",
2167 "idx_graph_edges_to_kind",
2168 "idx_graph_edge_properties_key_value_edge",
2169 ]
2170 };
2171 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2172 diagnostics.push(
2173 "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2174 .to_string(),
2175 );
2176 if !options.property_filters.is_empty() {
2177 diagnostics.push(
2178 "edge property filters were evaluated by SQLite materialized property rows before paging"
2179 .to_string(),
2180 );
2181 }
2182 if options.cursor.is_some() {
2183 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2184 }
2185 if next_cursor.is_some() {
2186 diagnostics.push(
2187 "result was truncated; pass page.next_cursor as --cursor for the next page"
2188 .to_string(),
2189 );
2190 }
2191 Ok(GraphPagedSubgraph {
2192 page: GraphQueryPage {
2193 cursor: options.cursor,
2194 limit: options.limit,
2195 next_cursor,
2196 returned_nodes: 0,
2197 returned_edges: edges.len(),
2198 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2199 diagnostics,
2200 },
2201 nodes: Vec::new(),
2202 edges,
2203 })
2204 }
2205
2206 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2207 if node_ids.is_empty() {
2208 return Ok(Vec::new());
2209 }
2210 let node_ids = node_ids.iter().cloned().collect::<Vec<_>>();
2211 let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
2212 for from_chunk in node_ids.chunks(450) {
2213 let from_placeholders = std::iter::repeat_n("?", from_chunk.len())
2214 .collect::<Vec<_>>()
2215 .join(", ");
2216 for to_chunk in node_ids.chunks(450) {
2217 let to_placeholders = std::iter::repeat_n("?", to_chunk.len())
2218 .collect::<Vec<_>>()
2219 .join(", ");
2220 let sql = format!(
2221 r#"
2222 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2223 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2224 WHERE from_id IN ({from_placeholders})
2225 AND to_id IN ({to_placeholders})
2226 ORDER BY from_id, kind, to_id
2227 "#
2228 );
2229 let mut values = from_chunk
2230 .iter()
2231 .cloned()
2232 .map(Value::Text)
2233 .collect::<Vec<_>>();
2234 values.extend(to_chunk.iter().cloned().map(Value::Text));
2235 let mut stmt = self.conn.prepare(&sql)?;
2236 for edge in
2237 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?
2238 {
2239 edges
2240 .entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
2241 .or_insert(edge);
2242 }
2243 }
2244 }
2245 Ok(edges.into_values().collect())
2246 }
2247
2248 fn neighborhood(
2249 &self,
2250 center_id: &str,
2251 depth: usize,
2252 kind: Option<&str>,
2253 ) -> Result<Option<GraphSubgraph>> {
2254 self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
2255 .map(|result| {
2256 result.map(|result| {
2257 GraphSubgraph {
2258 nodes: result.nodes,
2259 edges: result.edges,
2260 }
2261 .sorted()
2262 })
2263 })
2264 }
2265
2266 fn paged_neighborhood(
2267 &self,
2268 center_id: &str,
2269 depth: usize,
2270 kind: Option<&str>,
2271 options: GraphQueryOptions,
2272 ) -> Result<Option<GraphPagedSubgraph>> {
2273 if self.node(center_id)?.is_none() {
2274 return Ok(None);
2275 }
2276 let mut sql = String::from(
2277 r#"
2278 WITH RECURSIVE walk(id, depth) AS (
2279 SELECT ?, 0
2280 UNION
2281 SELECT e.to_id, walk.depth + 1
2282 FROM walk
2283 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2284 ON e.from_id = walk.id
2285 WHERE walk.depth < ?
2286 "#,
2287 );
2288 let mut values = vec![
2289 Value::Text(center_id.to_string()),
2290 Value::Integer(depth as i64),
2291 ];
2292 if let Some(kind) = kind {
2293 sql.push_str(" AND e.kind = ?");
2294 values.push(Value::Text(kind.to_string()));
2295 }
2296 sql.push_str(
2297 r#"
2298 ),
2299 filtered_nodes AS (
2300 SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2301 FROM walk
2302 JOIN graph_nodes n ON n.id = walk.id
2303 WHERE 1 = 1
2304 "#,
2305 );
2306 push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
2307 if let Some(cursor) = &options.cursor {
2308 sql.push_str(" AND n.id > ?");
2309 values.push(Value::Text(cursor.clone()));
2310 }
2311 sql.push_str(
2312 r#"
2313 ),
2314 page_nodes AS (
2315 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2316 FROM filtered_nodes
2317 ORDER BY id
2318 "#,
2319 );
2320 if let Some(limit) = options.limit {
2321 sql.push_str(" LIMIT ?");
2322 values.push(Value::Integer(limit.saturating_add(1) as i64));
2323 }
2324 sql.push_str(
2325 r#"
2326 ),
2327 walk_edges AS (
2328 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2329 FROM walk
2330 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2331 ON e.from_id = walk.id
2332 WHERE walk.depth < ?
2333 "#,
2334 );
2335 values.push(Value::Integer(depth as i64));
2336 if let Some(kind) = kind {
2337 sql.push_str(" AND e.kind = ?");
2338 values.push(Value::Text(kind.to_string()));
2339 }
2340 sql.push_str(
2341 r#"
2342 )
2343 SELECT
2344 'node' AS row_type,
2345 p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
2346 NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
2347 NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
2348 FROM page_nodes p
2349 UNION ALL
2350 SELECT DISTINCT
2351 'edge' AS row_type,
2352 NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
2353 NULL AS provenance_json, NULL AS freshness_json,
2354 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2355 FROM walk_edges e
2356 WHERE e.from_id IN (SELECT id FROM page_nodes)
2357 AND e.to_id IN (SELECT id FROM page_nodes)
2358 "#,
2359 );
2360
2361 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2362 let mut stmt = self.conn.prepare(&sql)?;
2363 let mut nodes = Vec::new();
2364 let mut edges = Vec::new();
2365 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2366 let row_type: String = row.get(0)?;
2367 match row_type.as_str() {
2368 "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
2369 "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
2370 _ => Err(rusqlite::Error::InvalidQuery),
2371 }
2372 })?;
2373 for row in rows {
2374 let (node, edge) = row?;
2375 if let Some(node) = node {
2376 nodes.push(node);
2377 }
2378 if let Some(edge) = edge {
2379 edges.push(edge);
2380 }
2381 }
2382 nodes.sort_by(|left, right| left.id.cmp(&right.id));
2383 let before_limit = nodes.len();
2384 let mut next_cursor = None;
2385 if let Some(limit) = options.limit
2386 && nodes.len() > limit
2387 {
2388 next_cursor = nodes
2389 .get(limit.saturating_sub(1))
2390 .map(|node| node.id.clone());
2391 nodes.truncate(limit);
2392 }
2393 let node_ids = nodes
2394 .iter()
2395 .map(|node| node.id.as_str())
2396 .collect::<BTreeSet<_>>();
2397 edges.retain(|edge| {
2398 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
2399 });
2400 edges.sort_by(|left, right| {
2401 left.from_id
2402 .cmp(&right.from_id)
2403 .then(left.kind.cmp(&right.kind))
2404 .then(left.to_id.cmp(&right.to_id))
2405 });
2406 let expected_indexes = if options.property_filters.is_empty() {
2407 vec!["idx_graph_edges_from_kind"]
2408 } else {
2409 vec![
2410 "idx_graph_edges_from_kind",
2411 "idx_graph_node_properties_key_value_node",
2412 ]
2413 };
2414 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2415 diagnostics.push(
2416 "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
2417 );
2418 if !options.property_filters.is_empty() {
2419 diagnostics.push(
2420 "property filters were evaluated by SQLite materialized property rows before paging"
2421 .to_string(),
2422 );
2423 }
2424 if options.cursor.is_some() {
2425 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2426 }
2427 if next_cursor.is_some() {
2428 diagnostics.push(
2429 "result was truncated; pass page.next_cursor as --cursor for the next page"
2430 .to_string(),
2431 );
2432 }
2433 Ok(Some(GraphPagedSubgraph {
2434 page: GraphQueryPage {
2435 cursor: options.cursor,
2436 limit: options.limit,
2437 next_cursor,
2438 returned_nodes: nodes.len(),
2439 returned_edges: edges.len(),
2440 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2441 diagnostics,
2442 },
2443 nodes,
2444 edges,
2445 }))
2446 }
2447
2448 fn shortest_path(
2449 &self,
2450 from_id: &str,
2451 to_id: &str,
2452 kind: Option<&str>,
2453 ) -> Result<Option<GraphPath>> {
2454 self.shortest_path_with_max_hops(from_id, to_id, kind, None)
2455 }
2456
2457 fn shortest_path_with_max_hops(
2458 &self,
2459 from_id: &str,
2460 to_id: &str,
2461 kind: Option<&str>,
2462 max_hops: Option<usize>,
2463 ) -> Result<Option<GraphPath>> {
2464 if from_id == to_id {
2465 return Ok(Some(GraphPath {
2466 nodes: vec![from_id.to_string()],
2467 hops: 0,
2468 }));
2469 }
2470 let hop_limit = max_hops.unwrap_or(usize::MAX);
2471 if hop_limit == 0 {
2472 return Ok(None);
2473 }
2474
2475 let mut visited = BTreeSet::from([from_id.to_string()]);
2476 let mut parent = BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
2477 let mut frontier = vec![from_id.to_string()];
2478 let mut single_frontier_stmt = if kind.is_none() {
2479 Some(self.conn.prepare(
2480 r#"
2481 SELECT from_id, to_id
2482 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2483 WHERE from_id = ?1
2484 ORDER BY from_id, to_id, kind
2485 "#,
2486 )?)
2487 } else {
2488 None
2489 };
2490 let mut single_frontier_kind_stmt = if kind.is_some() {
2491 Some(self.conn.prepare(
2492 r#"
2493 SELECT from_id, to_id
2494 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2495 WHERE from_id = ?1 AND kind = ?2
2496 ORDER BY from_id, to_id, kind
2497 "#,
2498 )?)
2499 } else {
2500 None
2501 };
2502 for _depth in 0..hop_limit {
2503 if frontier.is_empty() {
2504 break;
2505 }
2506 let mut next_frontier = BTreeSet::new();
2507 for chunk in frontier.chunks(256) {
2508 let edges = if chunk.len() == 1 {
2509 match kind {
2510 Some(kind) => {
2511 let stmt = single_frontier_kind_stmt
2512 .as_mut()
2513 .context("single-frontier kind statement missing")?;
2514 collect_rows(stmt.query_map((chunk[0].as_str(), kind), |row| {
2515 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2516 })?)?
2517 }
2518 None => {
2519 let stmt = single_frontier_stmt
2520 .as_mut()
2521 .context("single-frontier statement missing")?;
2522 collect_rows(stmt.query_map([chunk[0].as_str()], |row| {
2523 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2524 })?)?
2525 }
2526 }
2527 } else {
2528 let placeholders = std::iter::repeat_n("?", chunk.len())
2529 .collect::<Vec<_>>()
2530 .join(", ");
2531 let mut sql = format!(
2532 r#"
2533 SELECT from_id, to_id
2534 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2535 WHERE from_id IN ({placeholders})
2536 "#
2537 );
2538 let mut values = chunk.iter().cloned().map(Value::Text).collect::<Vec<_>>();
2539 if let Some(kind) = kind {
2540 sql.push_str(" AND kind = ?");
2541 values.push(Value::Text(kind.to_string()));
2542 }
2543 sql.push_str(" ORDER BY from_id, to_id, kind");
2544 let mut stmt = self.conn.prepare(&sql)?;
2545 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2546 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2547 })?)?
2548 };
2549 for (from, next) in edges {
2550 if !visited.insert(next.clone()) {
2551 continue;
2552 }
2553 parent.insert(next.clone(), from);
2554 if next == to_id {
2555 let mut nodes = vec![to_id.to_string()];
2556 let mut cursor = to_id;
2557 while let Some(previous) = parent.get(cursor) {
2558 if previous.is_empty() {
2559 break;
2560 }
2561 nodes.push(previous.clone());
2562 cursor = previous;
2563 }
2564 nodes.reverse();
2565 return Ok(Some(GraphPath {
2566 hops: nodes.len().saturating_sub(1),
2567 nodes,
2568 }));
2569 }
2570 next_frontier.insert(next);
2571 }
2572 }
2573 frontier = next_frontier.into_iter().collect();
2574 }
2575 Ok(None)
2576 }
2577
2578 fn reachable_nodes_by_kind(
2579 &self,
2580 from_id: &str,
2581 kind: &str,
2582 depth: usize,
2583 limit: usize,
2584 ) -> Result<Vec<(GraphNode, GraphPath)>> {
2585 Ok(self
2586 .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
2587 .remove(kind)
2588 .unwrap_or_default())
2589 }
2590
2591 fn reachable_nodes_by_kinds(
2592 &self,
2593 from_id: &str,
2594 kinds: &[&str],
2595 depth: usize,
2596 limit: usize,
2597 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
2598 let mut requested = kinds
2599 .iter()
2600 .map(|kind| (*kind).to_string())
2601 .collect::<BTreeSet<_>>()
2602 .into_iter()
2603 .collect::<Vec<_>>();
2604 let mut results = requested
2605 .iter()
2606 .map(|kind| (kind.clone(), Vec::new()))
2607 .collect::<BTreeMap<_, _>>();
2608 if requested.is_empty() {
2609 return Ok(results);
2610 }
2611 requested.sort();
2612 let placeholders = std::iter::repeat_n("?", requested.len())
2613 .collect::<Vec<_>>()
2614 .join(", ");
2615 let mut sql = format!(
2616 r#"
2617 WITH RECURSIVE walk(id, depth, path) AS (
2618 SELECT ?, 0, char(31) || ? || char(31)
2619 UNION ALL
2620 SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
2621 FROM walk
2622 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2623 ON e.from_id = walk.id
2624 WHERE walk.depth < ?
2625 AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
2626 ),
2627 ranked AS (
2628 SELECT
2629 n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2630 walk.path, walk.depth,
2631 ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
2632 FROM walk
2633 JOIN graph_nodes n ON n.id = walk.id
2634 WHERE n.kind IN ({placeholders}) AND n.id <> ?
2635 ),
2636 kind_ranked AS (
2637 SELECT *,
2638 ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
2639 FROM ranked
2640 WHERE rn = 1
2641 )
2642 SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
2643 FROM kind_ranked
2644 "#,
2645 );
2646 let mut values = vec![
2647 Value::Text(from_id.to_string()),
2648 Value::Text(from_id.to_string()),
2649 Value::Integer(depth as i64),
2650 ];
2651 values.extend(requested.iter().cloned().map(Value::Text));
2652 values.push(Value::Text(from_id.to_string()));
2653 if limit > 0 && limit != usize::MAX {
2654 sql.push_str(" WHERE kind_rank <= ?");
2655 values.push(Value::Integer(limit as i64));
2656 }
2657 sql.push_str(" ORDER BY kind, depth, label, id");
2658 let mut stmt = self.conn.prepare(&sql)?;
2659 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2660 let node = node_from_row(row)?;
2661 let path: String = row.get(6)?;
2662 let hops: usize = row.get(7)?;
2663 Ok((
2664 node,
2665 GraphPath {
2666 nodes: path
2667 .split('\u{1f}')
2668 .filter(|part| !part.is_empty())
2669 .map(str::to_string)
2670 .collect(),
2671 hops,
2672 },
2673 ))
2674 })?)?;
2675 for (node, path) in rows {
2676 results
2677 .entry(node.kind.clone())
2678 .or_default()
2679 .push((node, path));
2680 }
2681 Ok(results)
2682 }
2683
2684 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
2685 if let Some(node) = self.node(target)? {
2686 return Ok(Some(node));
2687 }
2688 if kinds.is_empty() {
2689 return Ok(None);
2690 }
2691
2692 let normalized = target.trim().trim_start_matches('#');
2693 let kind_placeholders = std::iter::repeat_n("?", kinds.len())
2694 .collect::<Vec<_>>()
2695 .join(", ");
2696 let kind_rank = kinds
2697 .iter()
2698 .enumerate()
2699 .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
2700 .collect::<Vec<_>>()
2701 .join(" ");
2702 let sql = format!(
2703 r#"
2704 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2705 FROM graph_nodes n
2706 WHERE n.kind IN ({kind_placeholders})
2707 AND (
2708 EXISTS (
2709 SELECT 1
2710 FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
2711 WHERE p_handle.node_id = n.id
2712 AND p_handle.key = 'handle'
2713 AND p_handle.value = ?
2714 )
2715 OR EXISTS (
2716 SELECT 1
2717 FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
2718 WHERE p_ref.node_id = n.id
2719 AND p_ref.key = 'ref_id'
2720 AND p_ref.value = ?
2721 )
2722 OR n.label = ?
2723 OR n.label = ?
2724 )
2725 ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
2726 LIMIT 1
2727 "#
2728 );
2729 let mut values = kinds
2730 .iter()
2731 .map(|kind| Value::Text((*kind).to_string()))
2732 .collect::<Vec<_>>();
2733 values.push(Value::Text(target.to_string()));
2734 values.push(Value::Text(normalized.to_string()));
2735 values.push(Value::Text(target.to_string()));
2736 values.push(Value::Text(format!("#{normalized}")));
2737 values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
2738 self.conn
2739 .query_row(&sql, params_from_iter(values.iter()), node_from_row)
2740 .optional()
2741 .map_err(Into::into)
2742 }
2743}
2744
2745fn to_json<T: Serialize>(value: &T) -> Result<String> {
2746 serde_json::to_string(value).map_err(Into::into)
2747}
2748
2749fn row_hash<T: Serialize>(value: &T) -> Result<String> {
2750 let payload = serde_json::to_vec(value)?;
2751 Ok(blake3::hash(&payload).to_hex().to_string())
2752}
2753
2754fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
2755 value.as_ref().map(to_json).transpose()
2756}
2757
2758fn collect_rows<T>(
2759 rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
2760) -> Result<Vec<T>> {
2761 rows.collect::<std::result::Result<Vec<_>, _>>()
2762 .map_err(Into::into)
2763}
2764
2765fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
2766 let properties_col = offset + 3;
2767 let provenance_col = offset + 4;
2768 let freshness_col = offset + 5;
2769 let properties_json: String = row.get(properties_col)?;
2770 let provenance_json: String = row.get(provenance_col)?;
2771 let freshness_json: Option<String> = row.get(freshness_col)?;
2772 Ok(GraphNode {
2773 id: row.get(offset)?,
2774 kind: row.get(offset + 1)?,
2775 label: row.get(offset + 2)?,
2776 properties: from_json(properties_col, &properties_json)?,
2777 provenance: from_json(provenance_col, &provenance_json)?,
2778 freshness: optional_from_json(freshness_col, freshness_json)?,
2779 })
2780}
2781
2782fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
2783 node_from_row_at(row, 0)
2784}
2785
2786fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
2787 let properties_col = offset + 4;
2788 let provenance_col = offset + 5;
2789 let freshness_col = offset + 6;
2790 let properties_json: String = row.get(properties_col)?;
2791 let provenance_json: String = row.get(provenance_col)?;
2792 let freshness_json: Option<String> = row.get(freshness_col)?;
2793 Ok(GraphEdge {
2794 id: row.get(offset)?,
2795 from_id: row.get(offset + 1)?,
2796 to_id: row.get(offset + 2)?,
2797 kind: row.get(offset + 3)?,
2798 properties: from_json(properties_col, &properties_json)?,
2799 provenance: from_json(provenance_col, &provenance_json)?,
2800 freshness: optional_from_json(freshness_col, freshness_json)?,
2801 })
2802}
2803
2804fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
2805 edge_from_row_at(row, 0)
2806}
2807
2808fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
2809 serde_json::from_str(raw)
2810 .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
2811}
2812
2813fn optional_from_json<T: DeserializeOwned>(
2814 column: usize,
2815 raw: Option<String>,
2816) -> rusqlite::Result<Option<T>> {
2817 raw.map(|value| from_json(column, &value)).transpose()
2818}
2819
2820fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
2821 nodes
2822 .iter()
2823 .find(|node| node.kind == "projection_meta")
2824 .and_then(|node| node.properties.get("projection_version").cloned())
2825}
2826
2827fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
2828 nodes
2829 .iter()
2830 .find(|node| node.kind == "projection_meta")
2831 .and_then(|node| node.properties.get("content_hash").cloned())
2832}
2833
2834fn unix_now() -> i64 {
2835 std::time::SystemTime::now()
2836 .duration_since(std::time::UNIX_EPOCH)
2837 .map(|duration| duration.as_secs() as i64)
2838 .unwrap_or_default()
2839}
2840
2841fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
2842 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
2843 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
2844 Ok(page_count.saturating_mul(page_size))
2845}
2846
2847fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
2848 let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
2849 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
2850 Ok(freelist_count.saturating_mul(page_size))
2851}
2852
2853#[cfg(test)]
2854mod tests {
2855 use super::*;
2856
2857 fn sample_provenance() -> GraphProvenance {
2858 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
2859 }
2860
2861 fn sample_projection() -> GraphProjection {
2862 let source = sample_provenance();
2863 GraphProjection {
2864 nodes: vec![
2865 GraphNode::new("doc:livekit", "document", "LiveKit guide")
2866 .with_property("domain", "livekit")
2867 .with_provenance(source.clone())
2868 .with_freshness(GraphFreshness::content_hash("node-hash")),
2869 GraphNode::new("topic:rooms", "topic", "Rooms"),
2870 GraphNode::new("topic:egress", "topic", "Egress"),
2871 ],
2872 edges: vec![
2873 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
2874 .with_property("confidence", "0.91")
2875 .with_provenance(source.clone())
2876 .with_freshness(GraphFreshness::content_hash("edge-hash")),
2877 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
2878 ],
2879 }
2880 }
2881
2882 fn assert_projection_store_contract(store: &impl GraphStore) {
2883 let projection = sample_projection();
2884 projection.upsert_into(store).unwrap();
2885
2886 assert_eq!(
2887 store.node("doc:livekit").unwrap(),
2888 projection
2889 .nodes
2890 .iter()
2891 .find(|node| node.id == "doc:livekit")
2892 .cloned()
2893 );
2894 assert_eq!(
2895 store.nodes_by_kind("topic").unwrap(),
2896 vec![
2897 GraphNode::new("topic:egress", "topic", "Egress"),
2898 GraphNode::new("topic:rooms", "topic", "Rooms"),
2899 ]
2900 );
2901
2902 let mentions = store
2903 .outgoing_edges("doc:livekit", Some("mentions"))
2904 .unwrap();
2905 assert_eq!(mentions.len(), 1);
2906 assert_eq!(mentions[0].to_id, "topic:rooms");
2907 assert_eq!(
2908 mentions[0].properties.get("confidence"),
2909 Some(&"0.91".into())
2910 );
2911
2912 let path = store
2913 .shortest_path("doc:livekit", "topic:egress", None)
2914 .unwrap()
2915 .unwrap();
2916 assert_eq!(
2917 path.nodes,
2918 vec!["doc:livekit", "topic:rooms", "topic:egress"]
2919 );
2920 }
2921
2922 #[test]
2923 fn sqlite_store_round_trips_generic_nodes_edges() {
2924 let store = SqliteGraphStore::in_memory().unwrap();
2925 let source = sample_provenance();
2926 let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
2927 .with_property("domain", "livekit")
2928 .with_provenance(source.clone())
2929 .with_freshness(GraphFreshness::content_hash("node-hash"));
2930 let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
2931 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
2932 .with_property("confidence", "0.91")
2933 .with_provenance(source)
2934 .with_freshness(GraphFreshness::content_hash("edge-hash"));
2935
2936 store.upsert_node(&node).unwrap();
2937 store.upsert_node(&topic).unwrap();
2938 store.upsert_edge(&edge).unwrap();
2939
2940 assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
2941 assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
2942 assert_eq!(store.all_nodes().unwrap().len(), 2);
2943 assert_eq!(store.all_edges().unwrap().len(), 1);
2944 assert_eq!(
2945 store
2946 .outgoing_edges("doc:livekit", Some("mentions"))
2947 .unwrap(),
2948 vec![edge]
2949 );
2950 }
2951
2952 #[test]
2953 fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
2954 let store = SqliteGraphStore::in_memory().unwrap();
2955 for node in [
2956 GraphNode::new("doc:livekit", "document", "LiveKit guide"),
2957 GraphNode::new("topic:rooms", "topic", "Rooms"),
2958 GraphNode::new("topic:egress", "topic", "Egress"),
2959 ] {
2960 store.upsert_node(&node).unwrap();
2961 }
2962 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
2963 .with_property("confidence", "0.91");
2964 let edge_id = edge.id.clone();
2965 store.upsert_edge(&edge).unwrap();
2966 store
2967 .upsert_edge(
2968 &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
2969 .with_property("confidence", "0.42"),
2970 )
2971 .unwrap();
2972
2973 assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
2974 let mut expected_incident_ids = vec![
2975 GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
2976 GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
2977 ];
2978 expected_incident_ids.sort();
2979 assert_eq!(
2980 store
2981 .incident_edges("topic:rooms", None)
2982 .unwrap()
2983 .into_iter()
2984 .map(|edge| edge.id)
2985 .collect::<Vec<_>>(),
2986 expected_incident_ids
2987 );
2988
2989 let page = store
2990 .paged_edges(
2991 Some("mentions"),
2992 GraphQueryOptions {
2993 property_filters: vec![GraphPropertyFilter {
2994 key: "confidence".to_string(),
2995 value: "0.91".to_string(),
2996 }],
2997 ..GraphQueryOptions::default()
2998 },
2999 )
3000 .unwrap();
3001 assert_eq!(page.edges.len(), 1);
3002 assert_eq!(page.edges[0].id, edge_id);
3003 assert!(
3004 page.page
3005 .diagnostics
3006 .iter()
3007 .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
3008 "{:?}",
3009 page.page.diagnostics
3010 );
3011 assert!(
3012 page.page
3013 .diagnostics
3014 .iter()
3015 .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
3016 "{:?}",
3017 page.page.diagnostics
3018 );
3019 assert!(
3020 page.page.diagnostics.iter().any(|diagnostic| diagnostic
3021 .contains("edge property primary filter matched 1 materialized row")),
3022 "{:?}",
3023 page.page.diagnostics
3024 );
3025 assert!(
3026 page.page
3027 .diagnostics
3028 .iter()
3029 .any(|diagnostic| diagnostic
3030 .contains("drives from SQLite materialized property rows")),
3031 "{:?}",
3032 page.page.diagnostics
3033 );
3034
3035 let property_rows: usize = store
3036 .conn
3037 .query_row(
3038 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3039 [],
3040 |row| row.get(0),
3041 )
3042 .unwrap();
3043 assert_eq!(property_rows, 2);
3044 }
3045
3046 #[test]
3047 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
3048 let sqlite = SqliteGraphStore::in_memory().unwrap();
3049 assert_projection_store_contract(&sqlite);
3050 }
3051
3052 #[test]
3053 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
3054 fn assert_crud_contract(store: &impl GraphStore) {
3055 let projection = sample_projection();
3056 projection.upsert_into(store).unwrap();
3057
3058 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
3059 assert_eq!(
3060 neighborhood
3061 .nodes
3062 .iter()
3063 .map(|node| node.id.as_str())
3064 .collect::<Vec<_>>(),
3065 vec!["doc:livekit", "topic:egress", "topic:rooms"]
3066 );
3067 assert_eq!(
3068 neighborhood
3069 .edges
3070 .iter()
3071 .map(|edge| (
3072 edge.from_id.as_str(),
3073 edge.kind.as_str(),
3074 edge.to_id.as_str()
3075 ))
3076 .collect::<Vec<_>>(),
3077 vec![
3078 ("doc:livekit", "mentions", "topic:rooms"),
3079 ("topic:rooms", "related_to", "topic:egress"),
3080 ]
3081 );
3082
3083 assert_eq!(
3084 store
3085 .delete_edge("topic:rooms", "topic:egress", "related_to")
3086 .unwrap(),
3087 1
3088 );
3089 assert!(
3090 store
3091 .shortest_path("doc:livekit", "topic:egress", None)
3092 .unwrap()
3093 .is_none()
3094 );
3095 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
3096 assert!(store.node("topic:rooms").unwrap().is_none());
3097 assert!(
3098 store
3099 .outgoing_edges("doc:livekit", None)
3100 .unwrap()
3101 .is_empty()
3102 );
3103 }
3104
3105 assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
3106 }
3107
3108 #[test]
3109 fn sqlite_upsert_projection_batches_rows_and_properties() {
3110 let mut store = SqliteGraphStore::in_memory().unwrap();
3111 let mut projection = sample_projection();
3112 store.upsert_projection(&projection).unwrap();
3113
3114 let page = store
3115 .paged_nodes_by_kind(
3116 "document",
3117 GraphQueryOptions {
3118 property_filters: vec![GraphPropertyFilter {
3119 key: "domain".to_string(),
3120 value: "livekit".to_string(),
3121 }],
3122 ..GraphQueryOptions::default()
3123 },
3124 )
3125 .unwrap();
3126 assert_eq!(page.nodes[0].id, "doc:livekit");
3127
3128 projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3129 .with_property("domain", "recording");
3130 store.upsert_projection(&projection).unwrap();
3131
3132 let old_property_count: usize = store
3133 .conn
3134 .query_row(
3135 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
3136 [],
3137 |row| row.get(0),
3138 )
3139 .unwrap();
3140 let updated_page = store
3141 .paged_nodes_by_kind(
3142 "document",
3143 GraphQueryOptions {
3144 property_filters: vec![GraphPropertyFilter {
3145 key: "domain".to_string(),
3146 value: "recording".to_string(),
3147 }],
3148 ..GraphQueryOptions::default()
3149 },
3150 )
3151 .unwrap();
3152 assert_eq!(old_property_count, 0);
3153 assert_eq!(updated_page.nodes[0].id, "doc:livekit");
3154 let edge_property_count: usize = store
3155 .conn
3156 .query_row(
3157 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3158 [],
3159 |row| row.get(0),
3160 )
3161 .unwrap();
3162 assert_eq!(edge_property_count, 1);
3163 }
3164
3165 #[test]
3166 fn sqlite_store_filters_edges_by_kind_and_paths() {
3167 let store = SqliteGraphStore::in_memory().unwrap();
3168 for id in ["a", "b", "c"] {
3169 store
3170 .upsert_node(&GraphNode::new(id, "symbol", id))
3171 .unwrap();
3172 }
3173 store
3174 .upsert_edge(&GraphEdge::new("a", "b", "calls"))
3175 .unwrap();
3176 store
3177 .upsert_edge(&GraphEdge::new("a", "c", "documents"))
3178 .unwrap();
3179 store
3180 .upsert_edge(&GraphEdge::new("b", "c", "calls"))
3181 .unwrap();
3182
3183 let calls = store.outgoing_edges("a", Some("calls")).unwrap();
3184 assert_eq!(calls.len(), 1);
3185 assert_eq!(calls[0].to_id, "b");
3186 assert_eq!(store.graph_counts().unwrap(), (3, 3));
3187 assert_eq!(
3188 store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
3189 "b"
3190 );
3191
3192 let path = store
3193 .shortest_path("a", "c", Some("calls"))
3194 .unwrap()
3195 .unwrap();
3196 assert_eq!(path.nodes, vec!["a", "b", "c"]);
3197 assert_eq!(path.hops, 2);
3198
3199 assert!(
3200 store
3201 .shortest_path("c", "a", Some("calls"))
3202 .unwrap()
3203 .is_none()
3204 );
3205 }
3206
3207 #[test]
3208 fn sqlite_store_batches_edges_between_node_sets() {
3209 let store = SqliteGraphStore::in_memory().unwrap();
3210 for id in ["a", "b", "c", "outside"] {
3211 store
3212 .upsert_node(&GraphNode::new(id, "symbol", id))
3213 .unwrap();
3214 }
3215 for edge in [
3216 GraphEdge::new("a", "b", "calls"),
3217 GraphEdge::new("b", "c", "calls"),
3218 GraphEdge::new("a", "outside", "calls"),
3219 GraphEdge::new("outside", "c", "calls"),
3220 ] {
3221 store.upsert_edge(&edge).unwrap();
3222 }
3223
3224 let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
3225 .into_iter()
3226 .collect::<BTreeSet<_>>();
3227 let edge_keys = store
3228 .edges_between_nodes(&scoped)
3229 .unwrap()
3230 .into_iter()
3231 .map(|edge| (edge.from_id, edge.kind, edge.to_id))
3232 .collect::<Vec<_>>();
3233
3234 assert_eq!(
3235 edge_keys,
3236 vec![
3237 ("a".to_string(), "calls".to_string(), "b".to_string()),
3238 ("b".to_string(), "calls".to_string(), "c".to_string()),
3239 ]
3240 );
3241 }
3242
3243 #[test]
3244 fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
3245 let mut store = SqliteGraphStore::in_memory().unwrap();
3246 let mut projection = sample_projection();
3247 projection.nodes.push(
3248 GraphNode::new(
3249 "projection:fixture",
3250 "projection_meta",
3251 "fixture projection",
3252 )
3253 .with_property("projection_version", "fixture-v1")
3254 .with_property("content_hash", "hash-a"),
3255 );
3256 store
3257 .replace_projection_with_version(
3258 "root",
3259 &projection,
3260 Some("fixture-v1"),
3261 Some("commit-a".to_string()),
3262 )
3263 .unwrap();
3264
3265 projection.nodes.retain(|node| node.id != "topic:egress");
3266 projection.edges.retain(|edge| edge.to_id != "topic:egress");
3267 let refresh = store
3268 .replace_projection_with_version(
3269 "root",
3270 &projection,
3271 Some("fixture-v2"),
3272 Some("commit-b".to_string()),
3273 )
3274 .unwrap();
3275
3276 assert_eq!(refresh.projection_version, "fixture-v2");
3277 assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
3278 assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
3279 assert_eq!(refresh.tombstoned_edges.len(), 1);
3280 assert_eq!(refresh.deleted_nodes, 1);
3281 assert_eq!(refresh.deleted_edges, 1);
3282 assert_eq!(refresh.unchanged_nodes, 3);
3283 assert_eq!(refresh.upserted_nodes, 0);
3284 assert_eq!(refresh.unchanged_properties, 4);
3285 assert_eq!(refresh.upserted_properties, 0);
3286 assert_eq!(refresh.deleted_properties, 0);
3287 assert!(
3288 refresh
3289 .phase_timings
3290 .iter()
3291 .any(|phase| phase.name == "sqlite_property_row_staging"),
3292 "{:?}",
3293 refresh.phase_timings
3294 );
3295 assert!(
3296 refresh
3297 .phase_timings
3298 .iter()
3299 .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
3300 "{:?}",
3301 refresh.phase_timings
3302 );
3303 let version = store.projection_version("root").unwrap().unwrap();
3304 assert_eq!(version.projection_version, "fixture-v2");
3305 assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
3306 let cached_counts: (usize, usize, usize, usize) = store
3307 .conn
3308 .query_row(
3309 r#"
3310 SELECT nodes, edges, tombstone_nodes, tombstone_edges
3311 FROM graph_operator_stats
3312 WHERE scope = 'root'
3313 "#,
3314 [],
3315 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3316 )
3317 .unwrap();
3318 assert_eq!(cached_counts, (3, 1, 1, 1));
3319
3320 projection
3321 .nodes
3322 .push(GraphNode::new("topic:egress", "topic", "Egress"));
3323 let refresh = store
3324 .replace_projection_with_version(
3325 "root",
3326 &projection,
3327 Some("fixture-v3"),
3328 Some("commit-c".to_string()),
3329 )
3330 .unwrap();
3331 assert_eq!(refresh.pruned_tombstones, 1);
3332 assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
3333
3334 projection.nodes.retain(|node| node.id != "topic:egress");
3335 store
3336 .replace_projection_with_version(
3337 "root",
3338 &projection,
3339 Some("fixture-v4"),
3340 Some("commit-d".to_string()),
3341 )
3342 .unwrap();
3343 assert_eq!(store.compact_storage("root", true).unwrap(), 2);
3344 let cached_counts: (usize, usize, usize, usize) = store
3345 .conn
3346 .query_row(
3347 r#"
3348 SELECT nodes, edges, tombstone_nodes, tombstone_edges
3349 FROM graph_operator_stats
3350 WHERE scope = 'root'
3351 "#,
3352 [],
3353 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3354 )
3355 .unwrap();
3356 assert_eq!(cached_counts, (3, 1, 0, 0));
3357 }
3358
3359 #[test]
3360 fn sqlite_shortest_path_uses_bounded_frontier() {
3361 let store = SqliteGraphStore::in_memory().unwrap();
3362 for idx in 0..80 {
3363 store
3364 .upsert_node(&GraphNode::new(
3365 format!("node:{idx:02}"),
3366 "symbol",
3367 format!("node {idx:02}"),
3368 ))
3369 .unwrap();
3370 }
3371 for idx in 0..79 {
3372 store
3373 .upsert_edge(&GraphEdge::new(
3374 format!("node:{idx:02}"),
3375 format!("node:{:02}", idx + 1),
3376 "calls",
3377 ))
3378 .unwrap();
3379 }
3380 store
3381 .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
3382 .unwrap();
3383
3384 assert!(
3385 store
3386 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
3387 .unwrap()
3388 .is_none()
3389 );
3390 let path = store
3391 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
3392 .unwrap()
3393 .unwrap();
3394 assert_eq!(path.hops, 79);
3395 assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
3396 assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
3397
3398 let direct = store
3399 .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
3400 .unwrap()
3401 .unwrap();
3402 assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
3403 }
3404
3405 #[test]
3406 fn sqlite_resolves_evidence_targets_with_indexed_properties() {
3407 let store = SqliteGraphStore::in_memory().unwrap();
3408 for node in [
3409 GraphNode::new("gbak-refresh", "backlog", "#refresh")
3410 .with_property("ref_id", "refresh")
3411 .with_property("handle", "backlog-handle"),
3412 GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
3413 .with_property("ref_id", "refresh"),
3414 GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
3415 .with_property("ref_id", "refresh"),
3416 ] {
3417 store.upsert_node(&node).unwrap();
3418 }
3419
3420 let by_ref = store
3421 .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
3422 .unwrap()
3423 .unwrap();
3424 assert_eq!(by_ref.id, "gbak-refresh");
3425 let by_handle = store
3426 .resolve_evidence_target("backlog-handle", &["backlog"])
3427 .unwrap()
3428 .unwrap();
3429 assert_eq!(by_handle.id, "gbak-refresh");
3430 }
3431
3432 #[test]
3433 fn sqlite_schema_migration_backfills_materialized_node_properties() {
3434 let conn = Connection::open_in_memory().unwrap();
3435 conn.execute_batch(
3436 r#"
3437 PRAGMA user_version = 2;
3438 CREATE TABLE graph_nodes (
3439 id TEXT PRIMARY KEY,
3440 kind TEXT NOT NULL,
3441 label TEXT NOT NULL,
3442 properties_json TEXT NOT NULL DEFAULT '{}',
3443 provenance_json TEXT NOT NULL DEFAULT '[]',
3444 freshness_json TEXT,
3445 row_hash TEXT,
3446 source_watermark TEXT
3447 );
3448 CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
3449 CREATE TABLE graph_edges (
3450 from_id TEXT NOT NULL,
3451 to_id TEXT NOT NULL,
3452 kind TEXT NOT NULL,
3453 properties_json TEXT NOT NULL DEFAULT '{}',
3454 provenance_json TEXT NOT NULL DEFAULT '[]',
3455 freshness_json TEXT,
3456 row_hash TEXT,
3457 source_watermark TEXT,
3458 PRIMARY KEY (from_id, to_id, kind)
3459 );
3460 CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
3461 CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
3462 CREATE TABLE graph_projection_versions (
3463 scope TEXT PRIMARY KEY,
3464 projection_version TEXT NOT NULL,
3465 content_hash TEXT,
3466 source_watermark TEXT,
3467 observed_at_unix INTEGER NOT NULL
3468 );
3469 CREATE TABLE graph_tombstones (
3470 row_key TEXT PRIMARY KEY,
3471 row_kind TEXT NOT NULL,
3472 deleted_at_unix INTEGER NOT NULL
3473 );
3474 INSERT INTO graph_nodes
3475 (id, kind, label, properties_json, provenance_json)
3476 VALUES
3477 ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
3478 ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]');
3479 INSERT INTO graph_edges
3480 (from_id, to_id, kind, properties_json, provenance_json)
3481 VALUES
3482 ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
3483 "#,
3484 )
3485 .unwrap();
3486
3487 let store = SqliteGraphStore::from_connection(conn).unwrap();
3488 let version: i64 = store
3489 .conn
3490 .pragma_query_value(None, "user_version", |row| row.get(0))
3491 .unwrap();
3492 assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
3493 let property_rows: usize = store
3494 .conn
3495 .query_row(
3496 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
3497 [],
3498 |row| row.get(0),
3499 )
3500 .unwrap();
3501 assert_eq!(property_rows, 2);
3502 let edge_property_rows: usize = store
3503 .conn
3504 .query_row(
3505 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3506 [],
3507 |row| row.get(0),
3508 )
3509 .unwrap();
3510 assert_eq!(edge_property_rows, 1);
3511 let edge = store
3512 .edge(&GraphEdge::stable_id(
3513 "topic:rooms",
3514 "topic:egress",
3515 "mentions",
3516 ))
3517 .unwrap()
3518 .unwrap();
3519 assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
3520
3521 let page = store
3522 .paged_nodes_by_kind(
3523 "topic",
3524 GraphQueryOptions {
3525 property_filters: vec![GraphPropertyFilter {
3526 key: "domain".to_string(),
3527 value: "livekit".to_string(),
3528 }],
3529 ..GraphQueryOptions::default()
3530 },
3531 )
3532 .unwrap();
3533 assert_eq!(page.nodes[0].id, "topic:rooms");
3534 assert!(
3535 page.page
3536 .diagnostics
3537 .iter()
3538 .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
3539 "{:?}",
3540 page.page.diagnostics
3541 );
3542 }
3543
3544 #[test]
3545 fn sqlite_store_batches_reachable_nodes_by_kinds() {
3546 let store = SqliteGraphStore::in_memory().unwrap();
3547 for node in [
3548 GraphNode::new("start", "backlog", "start"),
3549 GraphNode::new("ctx", "worker_context", "context"),
3550 GraphNode::new("src", "source_handle", "source"),
3551 GraphNode::new("sem", "semantic_concept", "concept"),
3552 ] {
3553 store.upsert_node(&node).unwrap();
3554 }
3555 store
3556 .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
3557 .unwrap();
3558 store
3559 .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
3560 .unwrap();
3561 store
3562 .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
3563 .unwrap();
3564
3565 let rows = store
3566 .reachable_nodes_by_kinds(
3567 "start",
3568 &["worker_context", "source_handle", "semantic_concept"],
3569 2,
3570 8,
3571 )
3572 .unwrap();
3573 assert_eq!(rows["worker_context"][0].0.id, "ctx");
3574 assert_eq!(
3575 rows["source_handle"][0].1.nodes,
3576 vec!["start", "ctx", "src"]
3577 );
3578 assert_eq!(rows["semantic_concept"][0].1.hops, 1);
3579 }
3580
3581 #[test]
3582 fn sqlite_projection_refresh_handles_bulk_row_replacement() {
3583 let mut store = SqliteGraphStore::in_memory().unwrap();
3584 let source = GraphProvenance::new("fixture", "bulk");
3585 let mut projection = GraphProjection::default();
3586 for idx in 0..128 {
3587 projection.nodes.push(
3588 GraphNode::new(
3589 format!("node:{idx:03}"),
3590 if idx % 2 == 0 { "symbol" } else { "file" },
3591 format!("bulk node {idx:03}"),
3592 )
3593 .with_property("ordinal", idx.to_string())
3594 .with_provenance(source.clone())
3595 .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
3596 );
3597 }
3598 for idx in 0..127 {
3599 projection.edges.push(
3600 GraphEdge::new(
3601 format!("node:{idx:03}"),
3602 format!("node:{:03}", idx + 1),
3603 "next",
3604 )
3605 .with_property("ordinal", idx.to_string())
3606 .with_provenance(source.clone())
3607 .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
3608 );
3609 }
3610
3611 store
3612 .replace_projection_with_version(
3613 "root",
3614 &projection,
3615 Some("bulk-v1"),
3616 Some("commit-a".to_string()),
3617 )
3618 .unwrap();
3619
3620 projection
3621 .nodes
3622 .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
3623 projection.edges.retain(|edge| {
3624 !edge.from_id.ends_with("000")
3625 && !edge.to_id.ends_with("000")
3626 && !edge.from_id.ends_with("064")
3627 && !edge.to_id.ends_with("064")
3628 });
3629 let refresh = store
3630 .replace_projection_with_version(
3631 "root",
3632 &projection,
3633 Some("bulk-v2"),
3634 Some("commit-b".to_string()),
3635 )
3636 .unwrap();
3637
3638 assert_eq!(store.all_nodes().unwrap().len(), 126);
3639 assert_eq!(store.all_edges().unwrap().len(), 124);
3640 assert_eq!(
3641 refresh.tombstoned_nodes,
3642 vec!["node:000".to_string(), "node:064".to_string()]
3643 );
3644 assert_eq!(refresh.tombstoned_edges.len(), 3);
3645 assert_eq!(refresh.deleted_nodes, 2);
3646 assert_eq!(refresh.deleted_edges, 3);
3647 assert_eq!(refresh.unchanged_nodes, 126);
3648 assert_eq!(refresh.unchanged_edges, 124);
3649 assert_eq!(refresh.upserted_nodes, 0);
3650 assert_eq!(refresh.upserted_edges, 0);
3651 assert_eq!(refresh.unchanged_properties, 250);
3652 assert_eq!(refresh.upserted_properties, 0);
3653 assert!(
3654 refresh
3655 .phase_timings
3656 .iter()
3657 .any(|phase| phase.name == "sqlite_node_staging"
3658 && phase.detail.contains("bulk stage 126 graph_nodes rows")
3659 && phase.detail.contains("multi-row chunks up to 50 rows")),
3660 "{:?}",
3661 refresh.phase_timings
3662 );
3663 assert!(
3664 refresh
3665 .phase_timings
3666 .iter()
3667 .any(|phase| phase.name == "sqlite_edge_staging"
3668 && phase.detail.contains("bulk stage 124 graph_edges rows")
3669 && phase.detail.contains("multi-row chunks up to 50 rows")),
3670 "{:?}",
3671 refresh.phase_timings
3672 );
3673 let staged_node_properties: usize = store
3674 .conn
3675 .query_row(
3676 "SELECT COUNT(*) FROM temp.next_graph_node_properties",
3677 [],
3678 |row| row.get(0),
3679 )
3680 .unwrap();
3681 let staged_edge_properties: usize = store
3682 .conn
3683 .query_row(
3684 "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
3685 [],
3686 |row| row.get(0),
3687 )
3688 .unwrap();
3689 assert_eq!(staged_node_properties, 0);
3690 assert_eq!(staged_edge_properties, 0);
3691 assert!(
3692 refresh
3693 .phase_timings
3694 .iter()
3695 .any(|phase| phase.name == "sqlite_property_row_staging"
3696 && phase.detail.contains("new/changed node rows")),
3697 "{:?}",
3698 refresh.phase_timings
3699 );
3700 assert!(
3701 refresh
3702 .phase_timings
3703 .iter()
3704 .any(|phase| phase.name == "sqlite_edge_property_row_staging"
3705 && phase.detail.contains("new/changed edge rows")),
3706 "{:?}",
3707 refresh.phase_timings
3708 );
3709 assert_eq!(
3710 store
3711 .projection_version("root")
3712 .unwrap()
3713 .unwrap()
3714 .source_watermark
3715 .as_deref(),
3716 Some("commit-b")
3717 );
3718 }
3719}