1use anyhow::{Context, Result, bail};
2use rusqlite::{
3 Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4 types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::collections::{BTreeMap, BTreeSet};
8use std::cell::Cell;
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15 ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16 ConvexRowsGraphClient, GraphEdge, GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath,
17 GraphProjection, GraphPropertyFilter, GraphProvenance, GraphQueryOptions, GraphQueryPage,
18 GraphStore, GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
19 SQLITE_GRAPH_SCHEMA_VERSION, TerseGraphEdge, TerseGraphNode,
20 apply_graph_edge_query_page,
21 apply_graph_query_page, graph_edge_id, shortest_path_using_outgoing, stable_graph_edge_id,
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "snake_case")]
26pub enum ReadOnlyRecovery {
27 SnapshotFallback,
28 SnapshotFallbackWal,
29}
30
31pub fn read_only_snapshot_recovery(
32 db_path: &Path,
33 err: &anyhow::Error,
34) -> Option<ReadOnlyRecovery> {
35 if !error_mentions_locked_db(err) {
36 return None;
37 }
38 if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
39 Some(ReadOnlyRecovery::SnapshotFallbackWal)
40 } else if rollback_journal_path(db_path).exists() {
41 Some(ReadOnlyRecovery::SnapshotFallback)
42 } else {
43 None
44 }
45}
46
47pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
48 let mut journal = db_path.as_os_str().to_os_string();
49 journal.push("-journal");
50 PathBuf::from(journal)
51}
52
53pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
54 let mut wal = db_path.as_os_str().to_os_string();
55 wal.push("-wal");
56 PathBuf::from(wal)
57}
58
59pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
60 let mut shm = db_path.as_os_str().to_os_string();
61 shm.push("-shm");
62 PathBuf::from(shm)
63}
64
65pub fn copy_read_only_snapshot(
66 db_path: &Path,
67 default_stem: &str,
68) -> Result<(PathBuf, Vec<PathBuf>)> {
69 let snapshot_path = snapshot_copy_path(db_path, default_stem);
70 std::fs::copy(db_path, &snapshot_path).with_context(|| {
71 format!(
72 "copying locked db {} to snapshot {}",
73 db_path.display(),
74 snapshot_path.display()
75 )
76 })?;
77 let mut cleanup_paths = vec![snapshot_path.clone()];
78 copy_optional_snapshot_sidecar(
79 &wal_sidecar_path(db_path),
80 &wal_sidecar_path(&snapshot_path),
81 &mut cleanup_paths,
82 )?;
83 copy_optional_snapshot_sidecar(
84 &shared_memory_sidecar_path(db_path),
85 &shared_memory_sidecar_path(&snapshot_path),
86 &mut cleanup_paths,
87 )?;
88 Ok((snapshot_path, cleanup_paths))
89}
90
91pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
92 err.chain()
93 .any(|cause| cause.to_string().contains("database is locked"))
94}
95
96fn copy_optional_snapshot_sidecar(
97 source_path: &Path,
98 snapshot_path: &Path,
99 cleanup_paths: &mut Vec<PathBuf>,
100) -> Result<()> {
101 match std::fs::copy(source_path, snapshot_path) {
102 Ok(_) => {
103 cleanup_paths.push(snapshot_path.to_path_buf());
104 Ok(())
105 }
106 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
107 Err(err) => Err(err).with_context(|| {
108 format!(
109 "copying SQLite sidecar {} to snapshot {}",
110 source_path.display(),
111 snapshot_path.display()
112 )
113 }),
114 }
115}
116
117fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
118 let nanos = SystemTime::now()
119 .duration_since(UNIX_EPOCH)
120 .unwrap_or(Duration::ZERO)
121 .as_nanos();
122 let stem = db_path
123 .file_stem()
124 .and_then(|stem| stem.to_str())
125 .unwrap_or(default_stem);
126 let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
127 file_name.push(".db");
128 std::env::temp_dir().join(file_name)
129}
130const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
131const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
132
133pub struct SqliteGraphStore {
146 conn: Connection,
147 _snapshot_copy: Option<SnapshotCopyGuard>,
148 read_only_recovery: Option<ReadOnlyRecovery>,
149 temp_table_active: Cell<bool>,
150}
151
152pub struct SqliteReadOnlyConnection {
163 conn: Connection,
164 _snapshot_copy: Option<SnapshotCopyGuard>,
165 recovery: Option<ReadOnlyRecovery>,
166}
167
168static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
169
170impl SqliteReadOnlyConnection {
171 pub fn conn(&self) -> &Connection {
172 &self.conn
173 }
174
175 pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
176 self.recovery
177 }
178}
179
180struct SnapshotCopyGuard {
181 paths: Vec<PathBuf>,
182}
183
184impl Drop for SnapshotCopyGuard {
185 fn drop(&mut self) {
186 for path in &self.paths {
187 let _ = std::fs::remove_file(path);
188 }
189 }
190}
191
192fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
193 conn.busy_timeout(Duration::from_secs(5))?;
194 conn.pragma_update(None, "journal_mode", "WAL")?;
195 let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
196 if mode.to_lowercase() != "wal" {
197 bail!(
198 "graph substrate db {} requires WAL mode for concurrent reads, got {}",
199 db_path.display(),
200 mode
201 );
202 }
203 conn.pragma_update(
204 None,
205 "wal_autocheckpoint",
206 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
207 )?;
208 let checkpoint_pages: i64 =
209 conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
210 if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
211 bail!(
212 "graph substrate db {} requires wal_autocheckpoint={}, got {}",
213 db_path.display(),
214 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
215 checkpoint_pages
216 );
217 }
218 Ok(())
219}
220
221fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
222 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
223 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
224 for row in rows {
225 if row? == column {
226 return Ok(true);
227 }
228 }
229 Ok(false)
230}
231
232fn add_column_if_missing(
233 conn: &Connection,
234 table: &str,
235 column: &str,
236 definition: &str,
237) -> Result<()> {
238 if !sqlite_column_exists(conn, table, column)? {
239 conn.execute(
240 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
241 [],
242 )?;
243 }
244 Ok(())
245}
246
247fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
248 if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
249 return Ok(());
250 }
251 let rows = {
252 let mut stmt = conn.prepare(
253 r#"
254 SELECT from_id, to_id, kind
255 FROM graph_edges
256 WHERE edge_key IS NULL OR edge_key = ''
257 ORDER BY from_id, kind, to_id
258 "#,
259 )?;
260 collect_rows(stmt.query_map([], |row| {
261 Ok((
262 row.get::<_, String>(0)?,
263 row.get::<_, String>(1)?,
264 row.get::<_, String>(2)?,
265 ))
266 })?)?
267 };
268 let mut update = conn.prepare(
269 r#"
270 UPDATE graph_edges
271 SET edge_key = ?4
272 WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
273 "#,
274 )?;
275 for (from_id, to_id, kind) in rows {
276 update.execute((
277 &from_id,
278 &to_id,
279 &kind,
280 stable_graph_edge_id(&from_id, &to_id, &kind),
281 ))?;
282 }
283 Ok(())
284}
285
286fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
287 if old_version < 2 {
288 add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
289 add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
290 add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
291 add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
292 }
293 if old_version < 3 {
294 rebuild_graph_node_properties(conn)?;
295 }
296 if old_version < 4 {
297 ensure_sqlite_graph_operator_stats_schema(conn)?;
298 }
299 if old_version < 5 {
300 add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
301 backfill_graph_edge_keys(conn)?;
302 ensure_sqlite_graph_edge_properties_schema(conn)?;
303 rebuild_graph_edge_properties(conn)?;
304 }
305 Ok(())
306}
307
308fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
309 conn.execute_batch(
310 r#"
311 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
312 ON graph_nodes(kind, label, id);
313 CREATE TABLE IF NOT EXISTS graph_operator_stats (
314 scope TEXT PRIMARY KEY,
315 nodes INTEGER NOT NULL,
316 edges INTEGER NOT NULL,
317 tombstone_nodes INTEGER NOT NULL,
318 tombstone_edges INTEGER NOT NULL,
319 file_size_bytes INTEGER,
320 freelist_bytes INTEGER,
321 observed_at_unix INTEGER NOT NULL
322 );
323 "#,
324 )?;
325 Ok(())
326}
327
328fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
329 conn.execute_batch(
330 r#"
331 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
332 ON graph_edges(edge_key);
333 CREATE TABLE IF NOT EXISTS graph_edge_properties (
334 edge_key TEXT NOT NULL,
335 key TEXT NOT NULL,
336 value TEXT NOT NULL,
337 PRIMARY KEY (edge_key, key),
338 FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
339 );
340 CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
341 ON graph_edge_properties(key, value, edge_key);
342 "#,
343 )?;
344 Ok(())
345}
346
347fn replace_node_properties(
348 conn: &Connection,
349 node_id: &str,
350 properties: &BTreeMap<String, String>,
351) -> Result<()> {
352 conn.execute(
353 "DELETE FROM graph_node_properties WHERE node_id = ?1",
354 [node_id],
355 )?;
356 let mut insert = conn.prepare(
357 r#"
358 INSERT INTO graph_node_properties (node_id, key, value)
359 VALUES (?1, ?2, ?3)
360 ON CONFLICT(node_id, key) DO UPDATE SET
361 value = excluded.value
362 "#,
363 )?;
364 for (key, value) in properties {
365 insert.execute((node_id, key, value))?;
366 }
367 Ok(())
368}
369
370fn replace_edge_properties(
371 conn: &Connection,
372 edge_key: &str,
373 properties: &BTreeMap<String, String>,
374) -> Result<()> {
375 conn.execute(
376 "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
377 [edge_key],
378 )?;
379 let mut insert = conn.prepare(
380 r#"
381 INSERT INTO graph_edge_properties (edge_key, key, value)
382 VALUES (?1, ?2, ?3)
383 ON CONFLICT(edge_key, key) DO UPDATE SET
384 value = excluded.value
385 "#,
386 )?;
387 for (key, value) in properties {
388 insert.execute((edge_key, key, value))?;
389 }
390 Ok(())
391}
392
393fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
394 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
395 return Ok(());
396 }
397 conn.execute_batch(
398 r#"
399 DELETE FROM graph_node_properties;
400 INSERT INTO graph_node_properties (node_id, key, value)
401 SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
402 FROM graph_nodes, json_each(graph_nodes.properties_json)
403 WHERE json_each.key IS NOT NULL
404 AND json_each.value IS NOT NULL
405 "#,
406 )?;
407 Ok(())
408}
409
410fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
411 if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
412 || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
413 {
414 return Ok(());
415 }
416 conn.execute_batch(
417 r#"
418 DELETE FROM graph_edge_properties;
419 INSERT INTO graph_edge_properties (edge_key, key, value)
420 SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
421 FROM graph_edges, json_each(graph_edges.properties_json)
422 WHERE graph_edges.edge_key IS NOT NULL
423 AND graph_edges.edge_key <> ''
424 AND json_each.key IS NOT NULL
425 AND json_each.value IS NOT NULL
426 "#,
427 )?;
428 Ok(())
429}
430
431pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
432 let conn = Connection::open_with_flags(
433 db_path,
434 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
435 )
436 .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
437 conn.busy_timeout(Duration::from_secs(5))?;
438 Ok(SqliteReadOnlyConnection {
439 conn,
440 _snapshot_copy: None,
441 recovery: None,
442 })
443}
444
445pub fn open_graph_read_only_connection_resilient(
446 db_path: &Path,
447) -> Result<SqliteReadOnlyConnection> {
448 match open_graph_read_only_connection(db_path).and_then(|connection| {
449 connection
450 .conn
451 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
452 Ok(connection)
453 }) {
454 Ok(connection) => Ok(connection),
455 Err(err) => match read_only_snapshot_recovery(db_path, &err) {
456 Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
457 None => Err(err),
458 },
459 }
460}
461
462fn open_graph_read_only_snapshot(
463 db_path: &Path,
464 recovery: ReadOnlyRecovery,
465) -> Result<SqliteReadOnlyConnection> {
466 let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
467 let conn = Connection::open_with_flags(
468 &snapshot_path,
469 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470 )
471 .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
472 conn.busy_timeout(Duration::from_secs(5))?;
473 Ok(SqliteReadOnlyConnection {
474 conn,
475 _snapshot_copy: Some(SnapshotCopyGuard {
476 paths: cleanup_paths,
477 }),
478 recovery: Some(recovery),
479 })
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
483pub struct SqliteProjectionRefreshPhase {
484 pub name: String,
485 pub duration_micros: u128,
486 pub detail: String,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490pub struct SqliteProjectionRefresh {
491 pub scope: String,
492 pub projection_version: String,
493 pub source_watermark: Option<String>,
494 pub tombstoned_nodes: Vec<String>,
495 pub tombstoned_edges: Vec<String>,
496 pub upserted_nodes: usize,
497 pub upserted_edges: usize,
498 pub unchanged_nodes: usize,
499 pub unchanged_edges: usize,
500 pub upserted_properties: usize,
501 pub unchanged_properties: usize,
502 pub deleted_properties: usize,
503 pub deleted_nodes: usize,
504 pub deleted_edges: usize,
505 pub pruned_tombstones: usize,
506 pub file_size_bytes_before: Option<u64>,
507 pub file_size_bytes_after: Option<u64>,
508 pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
512pub struct SqliteProjectionVersion {
513 pub projection_version: String,
514 pub content_hash: Option<String>,
515 pub source_watermark: Option<String>,
516}
517
518fn sqlite_refresh_phase_timing(
519 name: &str,
520 started: Instant,
521 detail: &str,
522) -> SqliteProjectionRefreshPhase {
523 SqliteProjectionRefreshPhase {
524 name: name.to_string(),
525 duration_micros: started.elapsed().as_micros(),
526 detail: detail.to_string(),
527 }
528}
529
530fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
531 let row = format!(
532 "({})",
533 (0..column_count)
534 .map(|_| "?")
535 .collect::<Vec<_>>()
536 .join(", ")
537 );
538 (0..row_count)
539 .map(|_| row.as_str())
540 .collect::<Vec<_>>()
541 .join(", ")
542}
543
544fn sqlite_stage_all_ids(
545 tx: &rusqlite::Transaction<'_>,
546 nodes: &[GraphNode],
547 edges: &[GraphEdge],
548) -> Result<()> {
549 for chunk in nodes.iter().map(|n| &n.id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
550 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
551 let sql = format!("INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}", placeholders.join(", "));
552 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
553 tx.execute(&sql, params_from_iter(values.iter()))?;
554 }
555 for chunk in edges.iter().map(graph_edge_id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
556 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
557 let sql = format!("INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}", placeholders.join(", "));
558 let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
559 tx.execute(&sql, params_from_iter(values.iter()))?;
560 }
561 Ok(())
562}
563
564fn sqlite_stage_projection_nodes(
565 tx: &rusqlite::Transaction<'_>,
566 nodes: &[&GraphNode],
567 source_watermark: Option<&str>,
568) -> Result<()> {
569 for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
570 let sql = format!(
571 r#"
572 INSERT INTO next_graph_nodes
573 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
574 VALUES {}
575 "#,
576 sqlite_graph_staging_placeholders(8, chunk.len())
577 );
578 let mut values = Vec::with_capacity(chunk.len() * 8);
579 for node in chunk {
580 values.push(Value::Text(node.id.clone()));
581 values.push(Value::Text(node.kind.clone()));
582 values.push(Value::Text(node.label.clone()));
583 values.push(Value::Text(to_json(&node.properties)?));
584 values.push(Value::Text(to_json(&node.provenance)?));
585 values.push(
586 optional_to_json(&node.freshness)?
587 .map(Value::Text)
588 .unwrap_or(Value::Null),
589 );
590 values.push(Value::Text(row_hash(node)?));
591 values.push(
592 source_watermark
593 .map(|watermark| Value::Text(watermark.to_string()))
594 .unwrap_or(Value::Null),
595 );
596 }
597 tx.execute(&sql, params_from_iter(values))?;
598 }
599 Ok(())
600}
601
602fn sqlite_stage_projection_edges(
603 tx: &rusqlite::Transaction<'_>,
604 edges: &[&GraphEdge],
605 source_watermark: Option<&str>,
606) -> Result<()> {
607 for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
608 let sql = format!(
609 r#"
610 INSERT INTO next_graph_edges
611 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
612 VALUES {}
613 "#,
614 sqlite_graph_staging_placeholders(9, chunk.len())
615 );
616 let mut values = Vec::with_capacity(chunk.len() * 9);
617 for edge in chunk {
618 values.push(Value::Text(graph_edge_id(edge)));
619 values.push(Value::Text(edge.from_id.clone()));
620 values.push(Value::Text(edge.to_id.clone()));
621 values.push(Value::Text(edge.kind.clone()));
622 values.push(Value::Text(to_json(&edge.properties)?));
623 values.push(Value::Text(to_json(&edge.provenance)?));
624 values.push(
625 optional_to_json(&edge.freshness)?
626 .map(Value::Text)
627 .unwrap_or(Value::Null),
628 );
629 values.push(Value::Text(row_hash(edge)?));
630 values.push(
631 source_watermark
632 .map(|watermark| Value::Text(watermark.to_string()))
633 .unwrap_or(Value::Null),
634 );
635 }
636 tx.execute(&sql, params_from_iter(values))?;
637 }
638 Ok(())
639}
640
641impl SqliteGraphStore {
642 fn assert_not_in_temp_table_section(&self) {
643 if self.temp_table_active.get() {
644 panic!("SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection");
645 }
646 }
647
648 pub fn open(db_path: &Path) -> Result<Self> {
649 if let Some(parent) = db_path.parent() {
650 std::fs::create_dir_all(parent)
651 .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
652 }
653 let conn = Connection::open(db_path)
654 .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
655 configure_writable_graph_connection(&conn, db_path)?;
656 Self::from_connection(conn)
657 }
658
659 pub fn in_memory() -> Result<Self> {
660 let conn = Connection::open_in_memory()?;
661 conn.busy_timeout(Duration::from_secs(5))?;
662 Self::from_connection(conn)
663 }
664
665 pub fn open_read_only(db_path: &Path) -> Result<Self> {
666 let connection = open_graph_read_only_connection(db_path)?;
667 Self::from_read_only_connection(connection)
668 }
669
670 pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
671 let connection = open_graph_read_only_connection_resilient(db_path)?;
672 Self::from_read_only_connection(connection)
673 }
674
675 pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
676 self.read_only_recovery
677 }
678
679 pub fn has_user_triggers(&self) -> Result<bool> {
680 self.conn
681 .query_row(
682 r#"
683 SELECT EXISTS(
684 SELECT 1
685 FROM sqlite_master
686 WHERE type = 'trigger'
687 AND name NOT LIKE 'sqlite_%'
688 )
689 "#,
690 [],
691 |row| row.get::<_, bool>(0),
692 )
693 .map_err(Into::into)
694 }
695
696 fn from_connection(conn: Connection) -> Result<Self> {
697 conn.pragma_update(None, "foreign_keys", "ON")?;
698 let user_version: i64 =
699 conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
700 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
701 bail!(
702 "graph.db schema version {} is newer than supported version {}",
703 user_version,
704 SQLITE_GRAPH_SCHEMA_VERSION
705 );
706 }
707 conn.execute_batch(
708 r#"
709 CREATE TABLE IF NOT EXISTS graph_nodes (
710 id TEXT PRIMARY KEY,
711 kind TEXT NOT NULL,
712 label TEXT NOT NULL,
713 properties_json TEXT NOT NULL DEFAULT '{}',
714 provenance_json TEXT NOT NULL DEFAULT '[]',
715 freshness_json TEXT,
716 row_hash TEXT,
717 source_watermark TEXT
718 );
719 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
720 ON graph_nodes(kind);
721 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
722 ON graph_nodes(kind, label, id);
723
724 CREATE TABLE IF NOT EXISTS graph_edges (
725 edge_key TEXT NOT NULL UNIQUE,
726 from_id TEXT NOT NULL,
727 to_id TEXT NOT NULL,
728 kind TEXT NOT NULL,
729 properties_json TEXT NOT NULL DEFAULT '{}',
730 provenance_json TEXT NOT NULL DEFAULT '[]',
731 freshness_json TEXT,
732 row_hash TEXT,
733 source_watermark TEXT,
734 PRIMARY KEY (from_id, to_id, kind),
735 FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
736 FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
737 );
738 CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
739 ON graph_edges(from_id, kind);
740 CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
741 ON graph_edges(to_id, kind);
742
743 CREATE TABLE IF NOT EXISTS graph_projection_versions (
744 scope TEXT PRIMARY KEY,
745 projection_version TEXT NOT NULL,
746 content_hash TEXT,
747 source_watermark TEXT,
748 observed_at_unix INTEGER NOT NULL
749 );
750
751 CREATE TABLE IF NOT EXISTS graph_tombstones (
752 row_key TEXT PRIMARY KEY,
753 row_kind TEXT NOT NULL,
754 deleted_at_unix INTEGER NOT NULL
755 );
756
757 CREATE TABLE IF NOT EXISTS graph_node_properties (
758 node_id TEXT NOT NULL,
759 key TEXT NOT NULL,
760 value TEXT NOT NULL,
761 PRIMARY KEY (node_id, key),
762 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
763 );
764 CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
765 ON graph_node_properties(key, value, node_id);
766
767 CREATE TABLE IF NOT EXISTS graph_operator_stats (
768 scope TEXT PRIMARY KEY,
769 nodes INTEGER NOT NULL,
770 edges INTEGER NOT NULL,
771 tombstone_nodes INTEGER NOT NULL,
772 tombstone_edges INTEGER NOT NULL,
773 file_size_bytes INTEGER,
774 freelist_bytes INTEGER,
775 observed_at_unix INTEGER NOT NULL
776 );
777 "#,
778 )?;
779 if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
780 migrate_sqlite_graph_schema(&conn, user_version)?;
781 conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
782 }
783 Ok(Self {
784 conn,
785 _snapshot_copy: None,
786 read_only_recovery: None,
787 temp_table_active: Cell::new(false),
788 })
789 }
790
791 fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
792 connection.conn.pragma_update(None, "foreign_keys", "ON")?;
793 let user_version: i64 =
794 connection
795 .conn
796 .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
797 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
798 bail!(
799 "graph.db schema version {} is newer than supported version {}",
800 user_version,
801 SQLITE_GRAPH_SCHEMA_VERSION
802 );
803 }
804 connection
805 .conn
806 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
807 Ok(Self {
808 conn: connection.conn,
809 _snapshot_copy: connection._snapshot_copy,
810 read_only_recovery: connection.recovery,
811 temp_table_active: Cell::new(false),
812 })
813 }
814
815 pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
816 self.replace_projection_with_version("root", projection, None, None)
817 .map(|_| ())
818 }
819
820 pub fn replace_projection_with_version(
821 &mut self,
822 scope: impl Into<String>,
823 projection: &GraphProjection,
824 projection_version: Option<&str>,
825 source_watermark: Option<String>,
826 ) -> Result<SqliteProjectionRefresh> {
827 self.assert_not_in_temp_table_section();
828 self.temp_table_active.set(true);
829 let scope = scope.into();
830 let result = self.replace_projection_with_version_fallible(scope, projection, projection_version, source_watermark);
831 self.temp_table_active.set(false);
832 if let Ok(ref refresh) = result {
833 let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
834 let autocheckpoint = if total_rows > 10000 {
835 8192
836 } else if total_rows > 1000 {
837 4096
838 } else {
839 2048
840 };
841 let _ = self.conn.pragma_update(None, "wal_autocheckpoint", autocheckpoint);
842 let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
843 }
844 result
845 }
846
847 fn replace_projection_with_version_fallible(
848 &mut self,
849 scope: String,
850 projection: &GraphProjection,
851 projection_version: Option<&str>,
852 source_watermark: Option<String>,
853 ) -> Result<SqliteProjectionRefresh> {
854 let projection_version = projection_version
855 .map(str::to_string)
856 .or_else(|| projection_version_from_nodes(&projection.nodes))
857 .unwrap_or_else(|| "unversioned".to_string());
858 let projection_hash = projection_hash_from_nodes(&projection.nodes);
859 let observed_at_unix = unix_now();
860 let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
861 let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
862 let mut phase_timings = Vec::new();
863
864 let tx = self.conn.transaction()?;
865 let started = Instant::now();
866 tx.execute_batch(
867 r#"
868 CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
869 id TEXT PRIMARY KEY,
870 kind TEXT NOT NULL,
871 label TEXT NOT NULL,
872 properties_json TEXT NOT NULL,
873 provenance_json TEXT NOT NULL,
874 freshness_json TEXT,
875 row_hash TEXT NOT NULL,
876 source_watermark TEXT
877 );
878 CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
879 edge_key TEXT PRIMARY KEY,
880 from_id TEXT NOT NULL,
881 to_id TEXT NOT NULL,
882 kind TEXT NOT NULL,
883 properties_json TEXT NOT NULL,
884 provenance_json TEXT NOT NULL,
885 freshness_json TEXT,
886 row_hash TEXT NOT NULL,
887 source_watermark TEXT
888 );
889 CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
890 ON next_graph_edges(from_id, to_id, kind);
891 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
892 node_id TEXT NOT NULL,
893 key TEXT NOT NULL,
894 value TEXT NOT NULL,
895 PRIMARY KEY (node_id, key)
896 );
897 CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
898 edge_key TEXT NOT NULL,
899 key TEXT NOT NULL,
900 value TEXT NOT NULL,
901 PRIMARY KEY (edge_key, key)
902 );
903 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
904 id TEXT PRIMARY KEY
905 );
906 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
907 edge_key TEXT PRIMARY KEY
908 );
909 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
910 id TEXT PRIMARY KEY
911 );
912 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
913 edge_key TEXT PRIMARY KEY
914 );
915 DELETE FROM next_graph_nodes;
916 DELETE FROM next_graph_edges;
917 DELETE FROM next_graph_node_properties;
918 DELETE FROM next_graph_edge_properties;
919 DELETE FROM next_graph_changed_nodes;
920 DELETE FROM next_graph_changed_edges;
921 DELETE FROM next_graph_all_node_ids;
922 DELETE FROM next_graph_all_edge_keys;
923 "#,
924 )?;
925 phase_timings.push(sqlite_refresh_phase_timing(
926 "sqlite_temp_table_prepare",
927 started,
928 "create and clear refresh staging tables before row loading",
929 ));
930 let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
931 (projection.nodes.iter().collect(), projection.edges.iter().collect(), 0usize, 0usize)
932 } else {
933 let existing_node_hashes: BTreeMap<String, String> = {
934 let mut stmt = tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
935 let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
936 collect_rows(rows)?.into_iter().collect()
937 };
938 let existing_edge_hashes: BTreeMap<String, String> = {
939 let mut stmt = tx.prepare("SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL")?;
940 let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
941 collect_rows(rows)?.into_iter().collect()
942 };
943 let cn: Vec<&GraphNode> = projection.nodes.iter().filter(|n| {
944 let hash = row_hash(*n).ok();
945 hash.as_ref().is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
946 }).collect();
947 let ce: Vec<&GraphEdge> = projection.edges.iter().filter(|e| {
948 let hash = row_hash(*e).ok();
949 let ek = graph_edge_id(e);
950 hash.as_ref().is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
951 }).collect();
952 let sn = projection.nodes.len() - cn.len();
953 let se = projection.edges.len() - ce.len();
954 (cn, ce, sn, se)
955 };
956 {
957 let started = Instant::now();
958 sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
959 sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
960 sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
961 phase_timings.push(sqlite_refresh_phase_timing(
962 "sqlite_node_staging",
963 started,
964 &format!(
965 "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
966 changed_nodes.len(),
967 skipped_nodes,
968 changed_edges.len(),
969 skipped_edges,
970 SQLITE_GRAPH_STAGING_CHUNK_ROWS
971 ),
972 ));
973 }
974 {
975 let started = Instant::now();
976 let changed_nodes_sql = if force_refresh_writes {
977 r#"
978 INSERT INTO next_graph_changed_nodes (id)
979 SELECT id
980 FROM next_graph_nodes
981 "#
982 } else {
983 r#"
984 INSERT INTO next_graph_changed_nodes (id)
985 SELECT n.id
986 FROM next_graph_nodes n
987 LEFT JOIN graph_nodes g ON g.id = n.id
988 WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
989 "#
990 };
991 tx.execute(changed_nodes_sql, [])?;
992 tx.execute_batch(
993 r#"
994 INSERT INTO next_graph_node_properties (node_id, key, value)
995 SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
996 FROM next_graph_nodes n
997 JOIN next_graph_changed_nodes c ON c.id = n.id,
998 json_each(n.properties_json)
999 WHERE json_each.key IS NOT NULL
1000 AND json_each.value IS NOT NULL
1001 "#,
1002 )?;
1003 phase_timings.push(sqlite_refresh_phase_timing(
1004 "sqlite_property_row_staging",
1005 started,
1006 "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1007 ));
1008 }
1009 {
1010 let started = Instant::now();
1011 let changed_edges_sql = if force_refresh_writes {
1012 r#"
1013 INSERT INTO next_graph_changed_edges (edge_key)
1014 SELECT edge_key
1015 FROM next_graph_edges
1016 "#
1017 } else {
1018 r#"
1019 INSERT INTO next_graph_changed_edges (edge_key)
1020 SELECT n.edge_key
1021 FROM next_graph_edges n
1022 LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1023 WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1024 "#
1025 };
1026 tx.execute(changed_edges_sql, [])?;
1027 tx.execute_batch(
1028 r#"
1029 INSERT INTO next_graph_edge_properties (edge_key, key, value)
1030 SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1031 FROM next_graph_edges e
1032 JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1033 json_each(e.properties_json)
1034 WHERE json_each.key IS NOT NULL
1035 AND json_each.value IS NOT NULL
1036 "#,
1037 )?;
1038 phase_timings.push(sqlite_refresh_phase_timing(
1039 "sqlite_edge_property_row_staging",
1040 started,
1041 "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1042 ));
1043 }
1044
1045 let delta_started = Instant::now();
1046 let tombstoned_nodes = {
1047 let sql = if force_refresh_writes {
1048 r#"
1049 SELECT g.id
1050 FROM graph_nodes g
1051 LEFT JOIN next_graph_nodes n ON n.id = g.id
1052 WHERE n.id IS NULL
1053 ORDER BY g.id
1054 "#
1055 } else {
1056 r#"
1057 SELECT g.id
1058 FROM graph_nodes g
1059 LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1060 WHERE n.id IS NULL
1061 ORDER BY g.id
1062 "#
1063 };
1064 let mut stmt = tx.prepare(sql)?;
1065 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1066 };
1067 let tombstoned_edges = {
1068 let sql = if force_refresh_writes {
1069 r#"
1070 SELECT g.edge_key
1071 FROM graph_edges g
1072 LEFT JOIN next_graph_edges n
1073 ON n.edge_key = g.edge_key
1074 WHERE n.edge_key IS NULL
1075 ORDER BY g.edge_key
1076 "#
1077 } else {
1078 r#"
1079 SELECT g.edge_key
1080 FROM graph_edges g
1081 LEFT JOIN next_graph_all_edge_keys n
1082 ON n.edge_key = g.edge_key
1083 WHERE n.edge_key IS NULL
1084 ORDER BY g.edge_key
1085 "#
1086 };
1087 let mut stmt = tx.prepare(sql)?;
1088 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1089 };
1090 let unchanged_nodes: usize = if force_refresh_writes {
1091 tx.query_row(
1092 r#"
1093 SELECT COUNT(*)
1094 FROM next_graph_nodes n
1095 JOIN graph_nodes g ON g.id = n.id
1096 WHERE g.row_hash = n.row_hash
1097 "#,
1098 [],
1099 |row| row.get(0),
1100 )?
1101 } else {
1102 skipped_nodes
1103 };
1104 let unchanged_edges: usize = if force_refresh_writes {
1105 tx.query_row(
1106 r#"
1107 SELECT COUNT(*)
1108 FROM next_graph_edges n
1109 JOIN graph_edges g
1110 ON g.edge_key = n.edge_key
1111 WHERE g.row_hash = n.row_hash
1112 "#,
1113 [],
1114 |row| row.get(0),
1115 )?
1116 } else {
1117 skipped_edges
1118 };
1119 let reused_owner_node_properties: usize = if force_refresh_writes {
1120 tx.query_row(
1121 r#"
1122 SELECT COUNT(*)
1123 FROM graph_node_properties g
1124 JOIN next_graph_nodes n ON n.id = g.node_id
1125 LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1126 WHERE c.id IS NULL
1127 "#,
1128 [],
1129 |row| row.get(0),
1130 )?
1131 } else {
1132 tx.query_row(
1133 r#"
1134 SELECT COUNT(*)
1135 FROM graph_node_properties g
1136 JOIN next_graph_all_node_ids a ON a.id = g.node_id
1137 LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1138 WHERE c.id IS NULL
1139 "#,
1140 [],
1141 |row| row.get(0),
1142 )?
1143 };
1144 let reused_owner_edge_properties: usize = if force_refresh_writes {
1145 tx.query_row(
1146 r#"
1147 SELECT COUNT(*)
1148 FROM graph_edge_properties g
1149 JOIN next_graph_edges n ON n.edge_key = g.edge_key
1150 LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1151 WHERE c.edge_key IS NULL
1152 "#,
1153 [],
1154 |row| row.get(0),
1155 )?
1156 } else {
1157 tx.query_row(
1158 r#"
1159 SELECT COUNT(*)
1160 FROM graph_edge_properties g
1161 JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1162 LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1163 WHERE c.edge_key IS NULL
1164 "#,
1165 [],
1166 |row| row.get(0),
1167 )?
1168 };
1169 let unchanged_changed_node_properties: usize = tx.query_row(
1170 r#"
1171 SELECT COUNT(*)
1172 FROM next_graph_node_properties n
1173 JOIN graph_node_properties g
1174 ON g.node_id = n.node_id AND g.key = n.key
1175 WHERE g.value = n.value
1176 "#,
1177 [],
1178 |row| row.get(0),
1179 )?;
1180 let unchanged_changed_edge_properties: usize = tx.query_row(
1181 r#"
1182 SELECT COUNT(*)
1183 FROM next_graph_edge_properties n
1184 JOIN graph_edge_properties g
1185 ON g.edge_key = n.edge_key AND g.key = n.key
1186 WHERE g.value = n.value
1187 "#,
1188 [],
1189 |row| row.get(0),
1190 )?;
1191 let unchanged_properties = reused_owner_node_properties
1192 + reused_owner_edge_properties
1193 + unchanged_changed_node_properties
1194 + unchanged_changed_edge_properties;
1195
1196 let deleted_edges = if force_refresh_writes {
1197 tx.execute(
1198 r#"
1199 DELETE FROM graph_edges
1200 WHERE NOT EXISTS (
1201 SELECT 1
1202 FROM next_graph_edges n
1203 WHERE n.edge_key = graph_edges.edge_key
1204 )
1205 "#,
1206 [],
1207 )?
1208 } else {
1209 tx.execute(
1210 r#"
1211 DELETE FROM graph_edges
1212 WHERE NOT EXISTS (
1213 SELECT 1
1214 FROM next_graph_all_edge_keys n
1215 WHERE n.edge_key = graph_edges.edge_key
1216 )
1217 "#,
1218 [],
1219 )?
1220 };
1221 let deleted_nodes = if force_refresh_writes {
1222 tx.execute(
1223 r#"
1224 DELETE FROM graph_nodes
1225 WHERE NOT EXISTS (
1226 SELECT 1
1227 FROM next_graph_nodes n
1228 WHERE n.id = graph_nodes.id
1229 )
1230 "#,
1231 [],
1232 )?
1233 } else {
1234 tx.execute(
1235 r#"
1236 DELETE FROM graph_nodes
1237 WHERE NOT EXISTS (
1238 SELECT 1
1239 FROM next_graph_all_node_ids n
1240 WHERE n.id = graph_nodes.id
1241 )
1242 "#,
1243 [],
1244 )?
1245 };
1246
1247 let upsert_nodes_sql = r#"
1248 INSERT INTO graph_nodes
1249 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1250 SELECT
1251 n.id,
1252 n.kind,
1253 n.label,
1254 n.properties_json,
1255 n.provenance_json,
1256 n.freshness_json,
1257 n.row_hash,
1258 n.source_watermark
1259 FROM next_graph_nodes n
1260 JOIN next_graph_changed_nodes c ON c.id = n.id
1261 WHERE true
1262 ON CONFLICT(id) DO UPDATE SET
1263 kind = excluded.kind,
1264 label = excluded.label,
1265 properties_json = excluded.properties_json,
1266 provenance_json = excluded.provenance_json,
1267 freshness_json = excluded.freshness_json,
1268 row_hash = excluded.row_hash,
1269 source_watermark = excluded.source_watermark
1270 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1271 "#;
1272 tx.execute(upsert_nodes_sql, [])?;
1273 let upsert_edges_sql = r#"
1274 INSERT INTO graph_edges
1275 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1276 SELECT
1277 n.edge_key,
1278 n.from_id,
1279 n.to_id,
1280 n.kind,
1281 n.properties_json,
1282 n.provenance_json,
1283 n.freshness_json,
1284 n.row_hash,
1285 n.source_watermark
1286 FROM next_graph_edges n
1287 JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1288 WHERE true
1289 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1290 edge_key = excluded.edge_key,
1291 properties_json = excluded.properties_json,
1292 provenance_json = excluded.provenance_json,
1293 freshness_json = excluded.freshness_json,
1294 row_hash = excluded.row_hash,
1295 source_watermark = excluded.source_watermark
1296 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1297 "#;
1298 tx.execute(upsert_edges_sql, [])?;
1299 let deleted_node_properties = tx.execute(
1300 r#"
1301 DELETE FROM graph_node_properties
1302 WHERE EXISTS (
1303 SELECT 1
1304 FROM next_graph_changed_nodes c
1305 WHERE c.id = graph_node_properties.node_id
1306 )
1307 AND NOT EXISTS (
1308 SELECT 1
1309 FROM next_graph_node_properties n
1310 WHERE n.node_id = graph_node_properties.node_id
1311 AND n.key = graph_node_properties.key
1312 )
1313 "#,
1314 [],
1315 )?;
1316 let deleted_edge_properties = tx.execute(
1317 r#"
1318 DELETE FROM graph_edge_properties
1319 WHERE EXISTS (
1320 SELECT 1
1321 FROM next_graph_changed_edges c
1322 WHERE c.edge_key = graph_edge_properties.edge_key
1323 )
1324 AND NOT EXISTS (
1325 SELECT 1
1326 FROM next_graph_edge_properties n
1327 WHERE n.edge_key = graph_edge_properties.edge_key
1328 AND n.key = graph_edge_properties.key
1329 )
1330 "#,
1331 [],
1332 )?;
1333 let deleted_properties = deleted_node_properties + deleted_edge_properties;
1334 let upsert_properties_sql = r#"
1335 INSERT INTO graph_node_properties (node_id, key, value)
1336 SELECT n.node_id, n.key, n.value
1337 FROM next_graph_node_properties n
1338 LEFT JOIN graph_node_properties g
1339 ON g.node_id = n.node_id AND g.key = n.key
1340 WHERE g.node_id IS NULL OR g.value IS NOT n.value
1341 ON CONFLICT(node_id, key) DO UPDATE SET
1342 value = excluded.value
1343 WHERE graph_node_properties.value IS NOT excluded.value
1344 "#;
1345 let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1346 let upsert_edge_properties_sql = r#"
1347 INSERT INTO graph_edge_properties (edge_key, key, value)
1348 SELECT n.edge_key, n.key, n.value
1349 FROM next_graph_edge_properties n
1350 LEFT JOIN graph_edge_properties g
1351 ON g.edge_key = n.edge_key AND g.key = n.key
1352 WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1353 ON CONFLICT(edge_key, key) DO UPDATE SET
1354 value = excluded.value
1355 WHERE graph_edge_properties.value IS NOT excluded.value
1356 "#;
1357 let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1358 let upserted_properties = upserted_node_properties + upserted_edge_properties;
1359 tx.execute(
1360 r#"
1361 INSERT INTO graph_projection_versions
1362 (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1363 VALUES (?1, ?2, ?3, ?4, ?5)
1364 ON CONFLICT(scope) DO UPDATE SET
1365 projection_version = excluded.projection_version,
1366 content_hash = excluded.content_hash,
1367 source_watermark = excluded.source_watermark,
1368 observed_at_unix = excluded.observed_at_unix
1369 "#,
1370 (
1371 &scope,
1372 &projection_version,
1373 &projection_hash,
1374 &source_watermark,
1375 observed_at_unix,
1376 ),
1377 )?;
1378 let pruned_node_tombstones = tx.execute(
1379 r#"
1380 DELETE FROM graph_tombstones
1381 WHERE row_kind = 'node'
1382 AND EXISTS (
1383 SELECT 1
1384 FROM next_graph_nodes n
1385 WHERE n.id = substr(graph_tombstones.row_key, 6)
1386 )
1387 "#,
1388 [],
1389 )?;
1390 let pruned_edge_tombstones = tx.execute(
1391 r#"
1392 DELETE FROM graph_tombstones
1393 WHERE row_kind = 'edge'
1394 AND EXISTS (
1395 SELECT 1
1396 FROM next_graph_edges n
1397 WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1398 )
1399 "#,
1400 [],
1401 )?;
1402 {
1403 let mut insert_node_tombstone = tx.prepare(
1404 r#"
1405 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1406 VALUES (?1, 'node', ?2)
1407 ON CONFLICT(row_key) DO UPDATE SET
1408 row_kind = excluded.row_kind,
1409 deleted_at_unix = excluded.deleted_at_unix
1410 "#,
1411 )?;
1412 for id in &tombstoned_nodes {
1413 insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1414 }
1415 }
1416 {
1417 let mut insert_edge_tombstone = tx.prepare(
1418 r#"
1419 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1420 VALUES (?1, 'edge', ?2)
1421 ON CONFLICT(row_key) DO UPDATE SET
1422 row_kind = excluded.row_kind,
1423 deleted_at_unix = excluded.deleted_at_unix
1424 "#,
1425 )?;
1426 for key in &tombstoned_edges {
1427 insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1428 }
1429 }
1430 let tombstone_node_count: usize = tx.query_row(
1431 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1432 [],
1433 |row| row.get(0),
1434 )?;
1435 let tombstone_edge_count: usize = tx.query_row(
1436 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1437 [],
1438 |row| row.get(0),
1439 )?;
1440 tx.execute(
1441 r#"
1442 INSERT INTO graph_operator_stats
1443 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1444 VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1445 ON CONFLICT(scope) DO UPDATE SET
1446 nodes = excluded.nodes,
1447 edges = excluded.edges,
1448 tombstone_nodes = excluded.tombstone_nodes,
1449 tombstone_edges = excluded.tombstone_edges,
1450 file_size_bytes = excluded.file_size_bytes,
1451 freelist_bytes = excluded.freelist_bytes,
1452 observed_at_unix = excluded.observed_at_unix
1453 "#,
1454 (
1455 &scope,
1456 projection.nodes.len() as i64,
1457 projection.edges.len() as i64,
1458 tombstone_node_count as i64,
1459 tombstone_edge_count as i64,
1460 observed_at_unix,
1461 ),
1462 )?;
1463 phase_timings.push(sqlite_refresh_phase_timing(
1464 "sqlite_delta_write",
1465 delta_started,
1466 "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1467 ));
1468 let commit_started = Instant::now();
1469 tx.commit()?;
1470 phase_timings.push(sqlite_refresh_phase_timing(
1471 "sqlite_commit",
1472 commit_started,
1473 "commit refresh transaction and publish old-or-new graph visibility",
1474 ));
1475 let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1476 let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1477 let stats_started = Instant::now();
1478 self.conn.execute(
1479 r#"
1480 UPDATE graph_operator_stats
1481 SET file_size_bytes = ?2,
1482 freelist_bytes = ?3,
1483 observed_at_unix = ?4
1484 WHERE scope = ?1
1485 "#,
1486 (
1487 &scope,
1488 file_size_bytes_after.map(|value| value as i64),
1489 freelist_bytes_after.map(|value| value as i64),
1490 unix_now(),
1491 ),
1492 )?;
1493 phase_timings.push(sqlite_refresh_phase_timing(
1494 "sqlite_stats_cache_update",
1495 stats_started,
1496 "persist post-commit file and freelist proof for status/doctor",
1497 ));
1498 Ok(SqliteProjectionRefresh {
1499 scope,
1500 projection_version,
1501 source_watermark,
1502 upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1503 upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1504 unchanged_nodes,
1505 unchanged_edges,
1506 upserted_properties,
1507 unchanged_properties,
1508 deleted_properties,
1509 deleted_nodes,
1510 deleted_edges,
1511 pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1512 file_size_bytes_before,
1513 file_size_bytes_after,
1514 tombstoned_nodes,
1515 tombstoned_edges,
1516 phase_timings,
1517 })
1518 }
1519
1520 pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1521 let tx = self.conn.transaction()?;
1522 {
1523 let mut insert_node = tx.prepare(
1524 r#"
1525 INSERT INTO graph_nodes
1526 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1527 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1528 ON CONFLICT(id) DO UPDATE SET
1529 kind = excluded.kind,
1530 label = excluded.label,
1531 properties_json = excluded.properties_json,
1532 provenance_json = excluded.provenance_json,
1533 freshness_json = excluded.freshness_json,
1534 row_hash = excluded.row_hash,
1535 source_watermark = excluded.source_watermark
1536 "#,
1537 )?;
1538 let mut delete_properties =
1539 tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1540 let mut insert_property = tx.prepare(
1541 r#"
1542 INSERT INTO graph_node_properties (node_id, key, value)
1543 VALUES (?1, ?2, ?3)
1544 "#,
1545 )?;
1546 for node in &projection.nodes {
1547 insert_node.execute((
1548 &node.id,
1549 &node.kind,
1550 &node.label,
1551 to_json(&node.properties)?,
1552 to_json(&node.provenance)?,
1553 optional_to_json(&node.freshness)?,
1554 row_hash(node)?,
1555 ))?;
1556 delete_properties.execute([&node.id])?;
1557 for (key, value) in &node.properties {
1558 insert_property.execute((&node.id, key, value))?;
1559 }
1560 }
1561 }
1562 {
1563 let mut insert_edge = tx.prepare(
1564 r#"
1565 INSERT INTO graph_edges
1566 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1567 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1568 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1569 edge_key = excluded.edge_key,
1570 properties_json = excluded.properties_json,
1571 provenance_json = excluded.provenance_json,
1572 freshness_json = excluded.freshness_json,
1573 row_hash = excluded.row_hash,
1574 source_watermark = excluded.source_watermark
1575 "#,
1576 )?;
1577 let mut delete_properties =
1578 tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1579 let mut insert_property = tx.prepare(
1580 r#"
1581 INSERT INTO graph_edge_properties (edge_key, key, value)
1582 VALUES (?1, ?2, ?3)
1583 "#,
1584 )?;
1585 for edge in &projection.edges {
1586 let edge_key = graph_edge_id(edge);
1587 insert_edge.execute((
1588 &edge_key,
1589 &edge.from_id,
1590 &edge.to_id,
1591 &edge.kind,
1592 to_json(&edge.properties)?,
1593 to_json(&edge.provenance)?,
1594 optional_to_json(&edge.freshness)?,
1595 row_hash(edge)?,
1596 ))?;
1597 delete_properties.execute([&edge_key])?;
1598 for (key, value) in &edge.properties {
1599 insert_property.execute((&edge_key, key, value))?;
1600 }
1601 }
1602 }
1603 tx.commit()?;
1604 Ok(())
1605 }
1606
1607 pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1608 self.conn
1609 .query_row(
1610 r#"
1611 SELECT projection_version, content_hash, source_watermark
1612 FROM graph_projection_versions
1613 WHERE scope = ?1
1614 "#,
1615 [scope],
1616 |row| {
1617 Ok(SqliteProjectionVersion {
1618 projection_version: row.get(0)?,
1619 content_hash: row.get(1)?,
1620 source_watermark: row.get(2)?,
1621 })
1622 },
1623 )
1624 .optional()
1625 .map_err(Into::into)
1626 }
1627
1628 pub fn update_projection_source_watermark(
1629 &mut self,
1630 scope: &str,
1631 source_watermark: Option<String>,
1632 ) -> Result<()> {
1633 self.conn.execute(
1634 r#"
1635 UPDATE graph_projection_versions
1636 SET source_watermark = ?2
1637 WHERE scope = ?1
1638 "#,
1639 (scope, source_watermark),
1640 )?;
1641 Ok(())
1642 }
1643
1644 pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1645 let pruned_tombstones = if prune_tombstones {
1646 self.conn.execute("DELETE FROM graph_tombstones", [])?
1647 } else {
1648 0
1649 };
1650 self.conn.execute_batch(
1651 r#"
1652 PRAGMA wal_checkpoint(TRUNCATE);
1653 VACUUM;
1654 "#,
1655 )?;
1656 let nodes = self
1657 .conn
1658 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1659 row.get::<_, i64>(0)
1660 })?;
1661 let edges = self
1662 .conn
1663 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1664 row.get::<_, i64>(0)
1665 })?;
1666 let tombstone_nodes = self.conn.query_row(
1667 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1668 [],
1669 |row| row.get::<_, i64>(0),
1670 )?;
1671 let tombstone_edges = self.conn.query_row(
1672 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1673 [],
1674 |row| row.get::<_, i64>(0),
1675 )?;
1676 let file_size_bytes = sqlite_database_size_bytes(&self.conn)
1677 .ok()
1678 .map(|value| value as i64);
1679 let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
1680 .ok()
1681 .map(|value| value as i64);
1682 self.conn.execute(
1683 r#"
1684 INSERT INTO graph_operator_stats
1685 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1686 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
1687 ON CONFLICT(scope) DO UPDATE SET
1688 nodes = excluded.nodes,
1689 edges = excluded.edges,
1690 tombstone_nodes = excluded.tombstone_nodes,
1691 tombstone_edges = excluded.tombstone_edges,
1692 file_size_bytes = excluded.file_size_bytes,
1693 freelist_bytes = excluded.freelist_bytes,
1694 observed_at_unix = excluded.observed_at_unix
1695 "#,
1696 (
1697 scope,
1698 nodes,
1699 edges,
1700 tombstone_nodes,
1701 tombstone_edges,
1702 file_size_bytes,
1703 freelist_bytes,
1704 ),
1705 )?;
1706 Ok(pruned_tombstones)
1707 }
1708
1709 fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
1710 let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
1711 let in_clause = placeholders.join(", ");
1712 let sql = format!(
1713 "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
1714 FROM graph_edges e \
1715 WHERE e.from_id IN ({in_clause}) \
1716 AND e.to_id IN ({in_clause}) \
1717 ORDER BY e.from_id, e.kind, e.to_id"
1718 );
1719 let values: Vec<Value> = node_ids
1720 .iter()
1721 .chain(node_ids.iter())
1722 .map(|id| Value::Text(id.clone()))
1723 .collect();
1724 let mut stmt = self.conn.prepare(&sql)?;
1725 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
1726 }
1727}
1728
1729fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
1730 let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
1731 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
1732 row.get::<_, String>(3)
1733 })?)
1734}
1735
1736fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
1737 let mut diagnostics = vec![format!(
1738 "sqlite query pushdown active; plan: {}",
1739 plan.join(" | ")
1740 )];
1741 for expected_index in expected_indexes {
1742 if plan.iter().any(|row| row.contains(expected_index)) {
1743 diagnostics.push(format!("sqlite query plan uses {expected_index}"));
1744 } else {
1745 diagnostics.push(format!(
1746 "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
1747 ));
1748 }
1749 }
1750 diagnostics
1751}
1752
1753#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1754pub struct TerseDiagnostic {
1755 pub code: &'static str,
1756 #[serde(skip_serializing_if = "Option::is_none")]
1757 pub index: Option<String>,
1758}
1759
1760#[allow(dead_code)]
1761fn terse_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<TerseDiagnostic> {
1762 let mut diagnostics = vec![TerseDiagnostic {
1763 code: "plan_active",
1764 index: None,
1765 }];
1766 for expected_index in expected_indexes {
1767 if plan.iter().any(|row| row.contains(expected_index)) {
1768 diagnostics.push(TerseDiagnostic {
1769 code: "idx_ok",
1770 index: Some(expected_index.to_string()),
1771 });
1772 } else {
1773 diagnostics.push(TerseDiagnostic {
1774 code: "idx_missing",
1775 index: Some(expected_index.to_string()),
1776 });
1777 }
1778 }
1779 diagnostics
1780}
1781
1782fn push_sqlite_property_filter_exists(
1783 sql: &mut String,
1784 values: &mut Vec<Value>,
1785 node_alias: &str,
1786 filters: &[GraphPropertyFilter],
1787) {
1788 for (index, filter) in filters.iter().enumerate() {
1789 sql.push_str(&format!(
1790 r#"
1791 AND EXISTS (
1792 SELECT 1
1793 FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
1794 WHERE p{index}.node_id = {node_alias}.id
1795 AND p{index}.key = ?
1796 AND p{index}.value = ?
1797 )
1798 "#
1799 ));
1800 values.push(Value::Text(filter.key.clone()));
1801 values.push(Value::Text(filter.value.clone()));
1802 }
1803}
1804
1805fn push_sqlite_edge_property_filter_exists(
1806 sql: &mut String,
1807 values: &mut Vec<Value>,
1808 edge_alias: &str,
1809 filters: &[GraphPropertyFilter],
1810) {
1811 for (index, filter) in filters.iter().enumerate() {
1812 sql.push_str(&format!(
1813 r#"
1814 AND EXISTS (
1815 SELECT 1
1816 FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
1817 WHERE ep{index}.edge_key = {edge_alias}.edge_key
1818 AND ep{index}.key = ?
1819 AND ep{index}.value = ?
1820 )
1821 "#
1822 ));
1823 values.push(Value::Text(filter.key.clone()));
1824 values.push(Value::Text(filter.value.clone()));
1825 }
1826}
1827
1828struct SqliteIncidentEdgeBranch<'a> {
1829 index_name: &'a str,
1830 endpoint_column: &'a str,
1831 node_id: &'a str,
1832 kind: Option<&'a str>,
1833 filters: &'a [GraphPropertyFilter],
1834 cursor: Option<&'a str>,
1835}
1836
1837fn push_sqlite_incident_edge_branch(
1838 sql: &mut String,
1839 values: &mut Vec<Value>,
1840 branch: SqliteIncidentEdgeBranch<'_>,
1841) {
1842 sql.push_str(&format!(
1843 r#"
1844 SELECT
1845 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
1846 FROM graph_edges e INDEXED BY {index_name}
1847 WHERE e.{endpoint_column} = ?
1848 "#,
1849 index_name = branch.index_name,
1850 endpoint_column = branch.endpoint_column,
1851 ));
1852 values.push(Value::Text(branch.node_id.to_string()));
1853 if let Some(kind) = branch.kind {
1854 sql.push_str(" AND e.kind = ?");
1855 values.push(Value::Text(kind.to_string()));
1856 }
1857 push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
1858 if let Some(cursor) = branch.cursor {
1859 sql.push_str(" AND e.edge_key > ?");
1860 values.push(Value::Text(cursor.to_string()));
1861 }
1862}
1863
1864fn sqlite_incident_edges_union_query(
1865 node_id: &str,
1866 kind: Option<&str>,
1867 filters: &[GraphPropertyFilter],
1868 cursor: Option<&str>,
1869 limit: Option<usize>,
1870) -> (String, Vec<Value>) {
1871 let mut sql = String::from(
1872 r#"
1873 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1874 FROM (
1875 "#,
1876 );
1877 let mut values = Vec::new();
1878 push_sqlite_incident_edge_branch(
1879 &mut sql,
1880 &mut values,
1881 SqliteIncidentEdgeBranch {
1882 index_name: "idx_graph_edges_from_kind",
1883 endpoint_column: "from_id",
1884 node_id,
1885 kind,
1886 filters,
1887 cursor,
1888 },
1889 );
1890 sql.push_str(" UNION ");
1891 push_sqlite_incident_edge_branch(
1892 &mut sql,
1893 &mut values,
1894 SqliteIncidentEdgeBranch {
1895 index_name: "idx_graph_edges_to_kind",
1896 endpoint_column: "to_id",
1897 node_id,
1898 kind,
1899 filters,
1900 cursor,
1901 },
1902 );
1903 sql.push_str(
1904 r#"
1905 ) e
1906 ORDER BY e.edge_key
1907 "#,
1908 );
1909 if let Some(limit) = limit {
1910 sql.push_str(" LIMIT ?");
1911 values.push(Value::Integer(limit.saturating_add(1) as i64));
1912 }
1913 (sql, values)
1914}
1915
1916impl GraphStore for SqliteGraphStore {
1917 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
1918 self.conn.execute(
1919 r#"
1920 INSERT INTO graph_nodes
1921 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1922 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1923 ON CONFLICT(id) DO UPDATE SET
1924 kind = excluded.kind,
1925 label = excluded.label,
1926 properties_json = excluded.properties_json,
1927 provenance_json = excluded.provenance_json,
1928 freshness_json = excluded.freshness_json,
1929 row_hash = excluded.row_hash,
1930 source_watermark = excluded.source_watermark
1931 "#,
1932 (
1933 &node.id,
1934 &node.kind,
1935 &node.label,
1936 to_json(&node.properties)?,
1937 to_json(&node.provenance)?,
1938 optional_to_json(&node.freshness)?,
1939 row_hash(node)?,
1940 ),
1941 )?;
1942 replace_node_properties(&self.conn, &node.id, &node.properties)?;
1943 Ok(())
1944 }
1945
1946 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
1947 let edge_key = graph_edge_id(edge);
1948 self.conn.execute(
1949 r#"
1950 INSERT INTO graph_edges
1951 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1952 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1953 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1954 edge_key = excluded.edge_key,
1955 properties_json = excluded.properties_json,
1956 provenance_json = excluded.provenance_json,
1957 freshness_json = excluded.freshness_json,
1958 row_hash = excluded.row_hash,
1959 source_watermark = excluded.source_watermark
1960 "#,
1961 (
1962 &edge_key,
1963 &edge.from_id,
1964 &edge.to_id,
1965 &edge.kind,
1966 to_json(&edge.properties)?,
1967 to_json(&edge.provenance)?,
1968 optional_to_json(&edge.freshness)?,
1969 row_hash(edge)?,
1970 ),
1971 )?;
1972 replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
1973 Ok(())
1974 }
1975
1976 fn delete_node(&self, id: &str) -> Result<usize> {
1977 self.conn
1978 .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
1979 .map_err(Into::into)
1980 }
1981
1982 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
1983 self.conn
1984 .execute(
1985 "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
1986 (from_id, to_id, kind),
1987 )
1988 .map_err(Into::into)
1989 }
1990
1991 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
1992 self.conn
1993 .query_row(
1994 r#"
1995 SELECT id, kind, label, properties_json, provenance_json, freshness_json
1996 FROM graph_nodes
1997 WHERE id = ?1
1998 "#,
1999 [id],
2000 node_from_row,
2001 )
2002 .optional()
2003 .map_err(Into::into)
2004 }
2005
2006 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2007 let mut stmt = self.conn.prepare(
2008 r#"
2009 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2010 FROM graph_nodes
2011 ORDER BY id
2012 "#,
2013 )?;
2014 collect_rows(stmt.query_map([], node_from_row)?)
2015 }
2016
2017 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2018 let mut stmt = self.conn.prepare(
2019 r#"
2020 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2021 FROM graph_edges
2022 ORDER BY from_id, kind, to_id
2023 "#,
2024 )?;
2025 collect_rows(stmt.query_map([], edge_from_row)?)
2026 }
2027
2028 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2029 self.conn
2030 .query_row(
2031 r#"
2032 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2033 FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2034 WHERE edge_key = ?1
2035 "#,
2036 [edge_id],
2037 edge_from_row,
2038 )
2039 .optional()
2040 .map_err(Into::into)
2041 }
2042
2043 fn graph_counts(&self) -> Result<(usize, usize)> {
2044 let nodes = self
2045 .conn
2046 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2047 row.get::<_, usize>(0)
2048 })?;
2049 let edges = self
2050 .conn
2051 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2052 row.get::<_, usize>(0)
2053 })?;
2054 Ok((nodes, edges))
2055 }
2056
2057 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2058 match kind {
2059 Some(kind) => self
2060 .conn
2061 .query_row(
2062 r#"
2063 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2064 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2065 WHERE from_id <> to_id AND kind = ?1
2066 ORDER BY from_id, kind, to_id
2067 LIMIT 1
2068 "#,
2069 [kind],
2070 edge_from_row,
2071 )
2072 .optional()
2073 .map_err(Into::into),
2074 None => self
2075 .conn
2076 .query_row(
2077 r#"
2078 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2079 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2080 WHERE from_id <> to_id
2081 ORDER BY from_id, kind, to_id
2082 LIMIT 1
2083 "#,
2084 [],
2085 edge_from_row,
2086 )
2087 .optional()
2088 .map_err(Into::into),
2089 }
2090 }
2091
2092 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2093 self.conn
2094 .query_row(
2095 r#"
2096 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2097 ep.key, ep.value
2098 FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2099 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2100 ON e.edge_key = ep.edge_key
2101 WHERE e.from_id <> e.to_id
2102 ORDER BY ep.key, ep.value, ep.edge_key
2103 LIMIT 1
2104 "#,
2105 [],
2106 |row| {
2107 Ok((
2108 edge_from_row(row)?,
2109 GraphPropertyFilter {
2110 key: row.get(7)?,
2111 value: row.get(8)?,
2112 },
2113 ))
2114 },
2115 )
2116 .optional()
2117 .map_err(Into::into)
2118 }
2119
2120 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2121 let mut stmt = self.conn.prepare(
2122 r#"
2123 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2124 FROM graph_nodes
2125 WHERE kind = ?1
2126 ORDER BY id
2127 "#,
2128 )?;
2129 collect_rows(stmt.query_map([kind], node_from_row)?)
2130 }
2131
2132 fn paged_nodes_by_kind(
2133 &self,
2134 kind: &str,
2135 options: GraphQueryOptions,
2136 ) -> Result<GraphPagedSubgraph> {
2137 let mut sql = String::from(
2138 r#"
2139 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2140 FROM graph_nodes
2141 WHERE kind = ?
2142 "#,
2143 );
2144 let mut values = vec![Value::Text(kind.to_string())];
2145 push_sqlite_property_filter_exists(
2146 &mut sql,
2147 &mut values,
2148 "graph_nodes",
2149 &options.property_filters,
2150 );
2151 if let Some(cursor) = &options.cursor {
2152 sql.push_str(" AND id > ?");
2153 values.push(Value::Text(cursor.clone()));
2154 }
2155 sql.push_str(" ORDER BY id");
2156 if let Some(limit) = options.limit {
2157 sql.push_str(" LIMIT ?");
2158 values.push(Value::Integer(limit.saturating_add(1) as i64));
2159 }
2160
2161 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2162 let mut stmt = self.conn.prepare(&sql)?;
2163 let mut nodes =
2164 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2165 let before_limit = nodes.len();
2166 let mut next_cursor = None;
2167 if let Some(limit) = options.limit
2168 && nodes.len() > limit
2169 {
2170 next_cursor = nodes
2171 .get(limit.saturating_sub(1))
2172 .map(|node| node.id.clone());
2173 nodes.truncate(limit);
2174 }
2175 let expected_indexes = if options.property_filters.is_empty() {
2176 vec!["idx_graph_nodes_kind"]
2177 } else {
2178 vec![
2179 "idx_graph_nodes_kind",
2180 "idx_graph_node_properties_key_value_node",
2181 ]
2182 };
2183 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2184 if !options.property_filters.is_empty() {
2185 diagnostics.push(
2186 "property filters were evaluated by SQLite materialized property rows before paging"
2187 .to_string(),
2188 );
2189 }
2190 if options.cursor.is_some() {
2191 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2192 }
2193 if next_cursor.is_some() {
2194 diagnostics.push(
2195 "result was truncated; pass page.next_cursor as --cursor for the next page"
2196 .to_string(),
2197 );
2198 }
2199 Ok(GraphPagedSubgraph {
2200 page: GraphQueryPage {
2201 cursor: options.cursor,
2202 limit: options.limit,
2203 next_cursor,
2204 returned_nodes: nodes.len(),
2205 returned_edges: 0,
2206 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2207 diagnostics,
2208 },
2209 nodes,
2210 edges: Vec::new(),
2211 })
2212 }
2213
2214 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2215 match kind {
2216 Some(kind) => {
2217 let mut stmt = self.conn.prepare(
2218 r#"
2219 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2220 FROM graph_edges
2221 WHERE from_id = ?1 AND kind = ?2
2222 ORDER BY to_id, kind
2223 "#,
2224 )?;
2225 collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2226 }
2227 None => {
2228 let mut stmt = self.conn.prepare(
2229 r#"
2230 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2231 FROM graph_edges
2232 WHERE from_id = ?1
2233 ORDER BY to_id, kind
2234 "#,
2235 )?;
2236 collect_rows(stmt.query_map([from_id], edge_from_row)?)
2237 }
2238 }
2239 }
2240
2241 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2242 let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2243 let mut stmt = self.conn.prepare(&sql)?;
2244 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2245 }
2246
2247 fn paged_edges(
2248 &self,
2249 kind: Option<&str>,
2250 options: GraphQueryOptions,
2251 ) -> Result<GraphPagedSubgraph> {
2252 let primary_property_filter = options.property_filters.first();
2253 let mut values = Vec::new();
2254 let mut sql = if let Some(filter) = primary_property_filter {
2255 values.push(Value::Text(filter.key.clone()));
2256 values.push(Value::Text(filter.value.clone()));
2257 String::from(
2258 r#"
2259 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2260 FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2261 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2262 ON e.edge_key = ep0.edge_key
2263 WHERE ep0.key = ?
2264 AND ep0.value = ?
2265 "#,
2266 )
2267 } else {
2268 String::from(
2269 r#"
2270 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2271 FROM graph_edges e
2272 WHERE 1 = 1
2273 "#,
2274 )
2275 };
2276 if let Some(kind) = kind {
2277 sql.push_str(" AND e.kind = ?");
2278 values.push(Value::Text(kind.to_string()));
2279 }
2280 push_sqlite_edge_property_filter_exists(
2281 &mut sql,
2282 &mut values,
2283 "e",
2284 if primary_property_filter.is_some() {
2285 &options.property_filters[1..]
2286 } else {
2287 &options.property_filters
2288 },
2289 );
2290 if let Some(cursor) = &options.cursor {
2291 if primary_property_filter.is_some() {
2292 sql.push_str(" AND ep0.edge_key > ?");
2293 } else {
2294 sql.push_str(" AND e.edge_key > ?");
2295 }
2296 values.push(Value::Text(cursor.clone()));
2297 }
2298 if primary_property_filter.is_some() {
2299 sql.push_str(" ORDER BY ep0.edge_key");
2300 } else {
2301 sql.push_str(" ORDER BY e.edge_key");
2302 }
2303 if let Some(limit) = options.limit {
2304 sql.push_str(" LIMIT ?");
2305 values.push(Value::Integer(limit.saturating_add(1) as i64));
2306 }
2307
2308 let primary_property_row_count = if let Some(filter) = primary_property_filter {
2309 Some(self.conn.query_row(
2310 r#"
2311 SELECT COUNT(*)
2312 FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2313 WHERE key = ?1 AND value = ?2
2314 "#,
2315 (&filter.key, &filter.value),
2316 |row| row.get::<_, usize>(0),
2317 )?)
2318 } else {
2319 None
2320 };
2321 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2322 let mut stmt = self.conn.prepare(&sql)?;
2323 let mut edges =
2324 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2325 let before_limit = edges.len();
2326 let mut next_cursor = None;
2327 if let Some(limit) = options.limit
2328 && edges.len() > limit
2329 {
2330 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2331 edges.truncate(limit);
2332 }
2333 let expected_indexes = if options.property_filters.is_empty() {
2334 vec!["idx_graph_edges_edge_key"]
2335 } else {
2336 vec![
2337 "idx_graph_edge_properties_key_value_edge",
2338 "idx_graph_edges_edge_key",
2339 ]
2340 };
2341 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2342 if !options.property_filters.is_empty() {
2343 if let Some(row_count) = primary_property_row_count {
2344 diagnostics.push(format!(
2345 "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2346 ));
2347 }
2348 diagnostics.push(
2349 "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2350 .to_string(),
2351 );
2352 }
2353 if options.cursor.is_some() {
2354 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2355 }
2356 if next_cursor.is_some() {
2357 diagnostics.push(
2358 "result was truncated; pass page.next_cursor as --cursor for the next page"
2359 .to_string(),
2360 );
2361 }
2362 Ok(GraphPagedSubgraph {
2363 page: GraphQueryPage {
2364 cursor: options.cursor,
2365 limit: options.limit,
2366 next_cursor,
2367 returned_nodes: 0,
2368 returned_edges: edges.len(),
2369 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2370 diagnostics,
2371 },
2372 nodes: Vec::new(),
2373 edges,
2374 })
2375 }
2376
2377 fn paged_incident_edges(
2378 &self,
2379 node_id: &str,
2380 kind: Option<&str>,
2381 options: GraphQueryOptions,
2382 ) -> Result<GraphPagedSubgraph> {
2383 let (sql, values) = sqlite_incident_edges_union_query(
2384 node_id,
2385 kind,
2386 &options.property_filters,
2387 options.cursor.as_deref(),
2388 options.limit,
2389 );
2390 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2391 let mut stmt = self.conn.prepare(&sql)?;
2392 let mut edges =
2393 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2394 let before_limit = edges.len();
2395 let mut next_cursor = None;
2396 if let Some(limit) = options.limit
2397 && edges.len() > limit
2398 {
2399 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2400 edges.truncate(limit);
2401 }
2402 let expected_indexes = if options.property_filters.is_empty() {
2403 vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2404 } else {
2405 vec![
2406 "idx_graph_edges_from_kind",
2407 "idx_graph_edges_to_kind",
2408 "idx_graph_edge_properties_key_value_edge",
2409 ]
2410 };
2411 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2412 diagnostics.push(
2413 "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2414 .to_string(),
2415 );
2416 if !options.property_filters.is_empty() {
2417 diagnostics.push(
2418 "edge property filters were evaluated by SQLite materialized property rows before paging"
2419 .to_string(),
2420 );
2421 }
2422 if options.cursor.is_some() {
2423 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2424 }
2425 if next_cursor.is_some() {
2426 diagnostics.push(
2427 "result was truncated; pass page.next_cursor as --cursor for the next page"
2428 .to_string(),
2429 );
2430 }
2431 Ok(GraphPagedSubgraph {
2432 page: GraphQueryPage {
2433 cursor: options.cursor,
2434 limit: options.limit,
2435 next_cursor,
2436 returned_nodes: 0,
2437 returned_edges: edges.len(),
2438 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2439 diagnostics,
2440 },
2441 nodes: Vec::new(),
2442 edges,
2443 })
2444 }
2445
2446 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2447 if node_ids.is_empty() {
2448 return Ok(Vec::new());
2449 }
2450 if node_ids.len() <= 20 {
2451 return self.edges_between_nodes_inline(node_ids);
2452 }
2453 self.assert_not_in_temp_table_section();
2454 self.temp_table_active.set(true);
2455 let result = (|| -> Result<Vec<GraphEdge>> {
2456 let tx = self.conn.unchecked_transaction()?;
2457 tx.execute_batch(
2458 r#"
2459 CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
2460 DELETE FROM _edges_between_ids;
2461 "#,
2462 )?;
2463 for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
2464 let row_placeholders: Vec<String> =
2465 chunk.iter().map(|_| "(?)".to_string()).collect();
2466 let placeholders = row_placeholders.join(", ");
2467 let sql = format!(
2468 "INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}"
2469 );
2470 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
2471 tx.execute(&sql, params_from_iter(values.iter()))?;
2472 }
2473 let edges = {
2474 let mut stmt = tx.prepare(
2475 r#"
2476 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2477 FROM graph_edges e
2478 WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
2479 AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
2480 ORDER BY e.from_id, e.kind, e.to_id
2481 "#,
2482 )?;
2483 collect_rows(stmt.query_map([], edge_from_row)?)?
2484 };
2485 tx.finish()?;
2486 Ok(edges)
2487 })();
2488 self.temp_table_active.set(false);
2489 result
2490 }
2491
2492 fn ranked_neighborhood(
2493 &self,
2494 center_id: &str,
2495 options: &RankedNeighborhoodOptions,
2496 ) -> Result<Option<RankedNeighborhoodResult>> {
2497 if self.node(center_id)?.is_none() {
2498 return Ok(None);
2499 }
2500 let center = self.node(center_id)?.unwrap();
2501
2502 let score_expr = match options.scoring {
2503 tsift_core::NeighborhoodScoring::BreadthFirst => {
2504 "MAX(0, 120 - (walk.depth * 18))".to_string()
2505 }
2506 tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
2507 "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
2508 WHEN 'semantic_relation' THEN 34 \
2509 WHEN 'mentions_entity' THEN 28 \
2510 WHEN 'mentions_concept' THEN 28 \
2511 WHEN 'tagged_entity' THEN 28 \
2512 WHEN 'tagged_concept' THEN 28 \
2513 WHEN 'related_concept' THEN 28 \
2514 WHEN 'mentions' THEN 22 \
2515 WHEN 'calls' THEN 20 \
2516 WHEN 'requests_context' THEN 18 \
2517 WHEN 'scopes_context' THEN 18 \
2518 WHEN 'scopes_source' THEN 18 \
2519 WHEN 'explains_result' THEN 18 \
2520 WHEN 'defines' THEN 12 \
2521 WHEN 'contains' THEN 12 \
2522 WHEN 'belongs_to' THEN 12 \
2523 ELSE 8 END".to_string()
2524 }
2525 tsift_core::NeighborhoodScoring::DegreeWeighted => {
2526 "MAX(0, 120 - (walk.depth * 18)) + CASE \
2527 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
2528 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
2529 ELSE 0 END"
2530 .to_string()
2531 }
2532 };
2533
2534 let use_degree_cache = matches!(options.scoring, tsift_core::NeighborhoodScoring::DegreeWeighted);
2535 let degree_cte = if use_degree_cache {
2536 "degree_cache AS ( \
2537 SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
2538 FROM graph_nodes n), "
2539 } else {
2540 ""
2541 };
2542 let mut sql = format!(
2543 r#"
2544 WITH {degree_cte}RECURSIVE walk(id, depth, edge_kind, score) AS (
2545 SELECT ?, 0, '', ?
2546 UNION
2547 SELECT e.to_id, walk.depth + 1, e.kind,
2548 "#,
2549 );
2550 sql.push_str(&format!(" {}\n", score_expr));
2551 sql.push_str(
2552 r#"
2553 FROM walk
2554 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2555 ON e.from_id = walk.id
2556 WHERE walk.depth < ?
2557 "#,
2558 );
2559 let mut values = vec![
2560 Value::Text(center_id.to_string()),
2561 Value::Integer(i64::MAX),
2562 Value::Integer(options.depth as i64),
2563 ];
2564 if let Some(kind) = &options.edge_kind {
2565 sql.push_str(" AND e.kind = ?");
2566 values.push(Value::Text(kind.clone()));
2567 }
2568 sql.push_str(
2569 r#"
2570 ),
2571 scored_nodes AS (
2572 SELECT walk.id, walk.score,
2573 n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2574 FROM walk
2575 JOIN graph_nodes n ON n.id = walk.id
2576 GROUP BY walk.id
2577 ),
2578 ranked AS (
2579 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2580 FROM scored_nodes
2581 ORDER BY score DESC, id ASC
2582 ),
2583 kept AS (
2584 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2585 FROM ranked
2586 LIMIT ?
2587 ),
2588 total AS (
2589 SELECT COUNT(*) AS cnt FROM scored_nodes
2590 )
2591 SELECT
2592 'meta' AS row_type,
2593 (SELECT cnt FROM total) AS total_discovered,
2594 0 AS node_id, '' AS node_kind, '' AS node_label,
2595 '' AS node_props, '' AS node_prov, '' AS node_fresh,
2596 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2597 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2598 UNION ALL
2599 SELECT
2600 'node' AS row_type,
2601 0 AS total_discovered,
2602 k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
2603 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2604 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2605 FROM kept k
2606 UNION ALL
2607 SELECT
2608 'edge' AS row_type,
2609 0 AS total_discovered,
2610 '' AS node_id, '' AS node_kind, '' AS node_label,
2611 '' AS node_props, '' AS node_prov, '' AS node_fresh,
2612 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2613 FROM graph_edges e
2614 WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
2615 AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
2616 "#,
2617 );
2618 values.push(Value::Integer(options.max_nodes as i64));
2619
2620 let mut stmt = self.conn.prepare(&sql)?;
2621 let mut nodes = vec![center.clone()];
2622 let mut edges = Vec::new();
2623 let mut total_discovered = 0usize;
2624
2625 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2626 let row_type: String = row.get(0)?;
2627 match row_type.as_str() {
2628 "meta" => Ok(QueryResult::Meta {
2629 total: row.get::<_, i64>(1)? as usize,
2630 }),
2631 "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
2632 "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
2633 _ => Err(rusqlite::Error::InvalidQuery),
2634 }
2635 })?;
2636 for row_result in rows {
2637 match row_result? {
2638 QueryResult::Meta { total } => {
2639 total_discovered = total;
2640 }
2641 QueryResult::Node(node) => {
2642 if node.id != center_id {
2643 nodes.push(node);
2644 }
2645 }QueryResult::Edge(edge) => {
2646 edges.push(edge);
2647 }
2648 }
2649 }
2650
2651 let total_discovered = total_discovered.max(nodes.len());
2652 let pruned_count = total_discovered.saturating_sub(nodes.len());
2653
2654 match options.property_mode {
2655 PropertyMode::Full => {}
2656 PropertyMode::Omit => {
2657 for n in &mut nodes {
2658 n.properties.clear();
2659 }
2660 for e in &mut edges {
2661 e.properties.clear();
2662 }
2663 }
2664 PropertyMode::Sample => {
2665 let mut seen_kinds = std::collections::BTreeSet::new();
2666 for n in &mut nodes {
2667 if !seen_kinds.contains(&n.kind) {
2668 seen_kinds.insert(n.kind.clone());
2669 } else {
2670 n.properties.clear();
2671 }
2672 }
2673 for e in &mut edges {
2674 e.properties.clear();
2675 }
2676 }
2677 }
2678
2679 Ok(Some(RankedNeighborhoodResult {
2680 nodes,
2681 edges,
2682 pruned_count,
2683 total_discovered,
2684 }))
2685 }
2686
2687 fn neighborhood(
2688 &self,
2689 center_id: &str,
2690 depth: usize,
2691 kind: Option<&str>,
2692 ) -> Result<Option<GraphSubgraph>> {
2693 self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
2694 .map(|result| {
2695 result.map(|result| {
2696 GraphSubgraph {
2697 nodes: result.nodes,
2698 edges: result.edges,
2699 }
2700 .sorted()
2701 })
2702 })
2703 }
2704
2705 fn paged_neighborhood(
2706 &self,
2707 center_id: &str,
2708 depth: usize,
2709 kind: Option<&str>,
2710 options: GraphQueryOptions,
2711 ) -> Result<Option<GraphPagedSubgraph>> {
2712 if self.node(center_id)?.is_none() {
2713 return Ok(None);
2714 }
2715 let mut sql = String::from(
2716 r#"
2717 WITH RECURSIVE walk(id, depth) AS (
2718 SELECT ?, 0
2719 UNION
2720 SELECT e.to_id, walk.depth + 1
2721 FROM walk
2722 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2723 ON e.from_id = walk.id
2724 WHERE walk.depth < ?
2725 "#,
2726 );
2727 let mut values = vec![
2728 Value::Text(center_id.to_string()),
2729 Value::Integer(depth as i64),
2730 ];
2731 if let Some(kind) = kind {
2732 sql.push_str(" AND e.kind = ?");
2733 values.push(Value::Text(kind.to_string()));
2734 }
2735 sql.push_str(
2736 r#"
2737 ),
2738 filtered_nodes AS (
2739 SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2740 FROM walk
2741 JOIN graph_nodes n ON n.id = walk.id
2742 WHERE 1 = 1
2743 "#,
2744 );
2745 push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
2746 if let Some(cursor) = &options.cursor {
2747 sql.push_str(" AND n.id > ?");
2748 values.push(Value::Text(cursor.clone()));
2749 }
2750 sql.push_str(
2751 r#"
2752 ),
2753 page_nodes AS (
2754 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2755 FROM filtered_nodes
2756 ORDER BY id
2757 "#,
2758 );
2759 if let Some(limit) = options.limit {
2760 sql.push_str(" LIMIT ?");
2761 values.push(Value::Integer(limit.saturating_add(1) as i64));
2762 }
2763 sql.push_str(
2764 r#"
2765 ),
2766 walk_edges AS (
2767 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2768 FROM walk
2769 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2770 ON e.from_id = walk.id
2771 WHERE walk.depth < ?
2772 "#,
2773 );
2774 values.push(Value::Integer(depth as i64));
2775 if let Some(kind) = kind {
2776 sql.push_str(" AND e.kind = ?");
2777 values.push(Value::Text(kind.to_string()));
2778 }
2779 sql.push_str(
2780 r#"
2781 )
2782 SELECT
2783 'node' AS row_type,
2784 p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
2785 NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
2786 NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
2787 FROM page_nodes p
2788 UNION ALL
2789 SELECT DISTINCT
2790 'edge' AS row_type,
2791 NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
2792 NULL AS provenance_json, NULL AS freshness_json,
2793 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2794 FROM walk_edges e
2795 WHERE e.from_id IN (SELECT id FROM page_nodes)
2796 AND e.to_id IN (SELECT id FROM page_nodes)
2797 "#,
2798 );
2799
2800 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2801 let mut stmt = self.conn.prepare(&sql)?;
2802 let mut nodes = Vec::new();
2803 let mut edges = Vec::new();
2804 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2805 let row_type: String = row.get(0)?;
2806 match row_type.as_str() {
2807 "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
2808 "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
2809 _ => Err(rusqlite::Error::InvalidQuery),
2810 }
2811 })?;
2812 for row in rows {
2813 let (node, edge) = row?;
2814 if let Some(node) = node {
2815 nodes.push(node);
2816 }
2817 if let Some(edge) = edge {
2818 edges.push(edge);
2819 }
2820 }
2821 nodes.sort_by(|left, right| left.id.cmp(&right.id));
2822 let before_limit = nodes.len();
2823 let mut next_cursor = None;
2824 if let Some(limit) = options.limit
2825 && nodes.len() > limit
2826 {
2827 next_cursor = nodes
2828 .get(limit.saturating_sub(1))
2829 .map(|node| node.id.clone());
2830 nodes.truncate(limit);
2831 }
2832 let node_ids = nodes
2833 .iter()
2834 .map(|node| node.id.as_str())
2835 .collect::<BTreeSet<_>>();
2836 edges.retain(|edge| {
2837 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
2838 });
2839 edges.sort_by(|left, right| {
2840 left.from_id
2841 .cmp(&right.from_id)
2842 .then(left.kind.cmp(&right.kind))
2843 .then(left.to_id.cmp(&right.to_id))
2844 });
2845 let expected_indexes = if options.property_filters.is_empty() {
2846 vec!["idx_graph_edges_from_kind"]
2847 } else {
2848 vec![
2849 "idx_graph_edges_from_kind",
2850 "idx_graph_node_properties_key_value_node",
2851 ]
2852 };
2853 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2854 diagnostics.push(
2855 "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
2856 );
2857 if !options.property_filters.is_empty() {
2858 diagnostics.push(
2859 "property filters were evaluated by SQLite materialized property rows before paging"
2860 .to_string(),
2861 );
2862 }
2863 if options.cursor.is_some() {
2864 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2865 }
2866 if next_cursor.is_some() {
2867 diagnostics.push(
2868 "result was truncated; pass page.next_cursor as --cursor for the next page"
2869 .to_string(),
2870 );
2871 }
2872 Ok(Some(GraphPagedSubgraph {
2873 page: GraphQueryPage {
2874 cursor: options.cursor,
2875 limit: options.limit,
2876 next_cursor,
2877 returned_nodes: nodes.len(),
2878 returned_edges: edges.len(),
2879 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2880 diagnostics,
2881 },
2882 nodes,
2883 edges,
2884 }))
2885 }
2886
2887 fn shortest_path(
2888 &self,
2889 from_id: &str,
2890 to_id: &str,
2891 kind: Option<&str>,
2892 ) -> Result<Option<GraphPath>> {
2893 self.shortest_path_with_max_hops(from_id, to_id, kind, None)
2894 }
2895
2896 fn shortest_path_with_max_hops(
2897 &self,
2898 from_id: &str,
2899 to_id: &str,
2900 kind: Option<&str>,
2901 max_hops: Option<usize>,
2902 ) -> Result<Option<GraphPath>> {
2903 if from_id == to_id {
2904 return Ok(Some(GraphPath {
2905 nodes: vec![from_id.to_string()],
2906 hops: 0,
2907 }));
2908 }
2909 let hop_limit = max_hops.unwrap_or(usize::MAX);
2910 if hop_limit == 0 {
2911 return Ok(None);
2912 }
2913
2914 self.assert_not_in_temp_table_section();
2915 self.temp_table_active.set(true);
2916 let result = (|| -> Result<Option<GraphPath>> {
2917 let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
2918 let tbl = format!("_tsift_frontier_{call_id}");
2919
2920 let mut visited = BTreeSet::from([from_id.to_string()]);
2921 let mut parent = BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
2922 let mut frontier = vec![from_id.to_string()];
2923 self.conn.execute_batch(&format!(
2924 r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
2925 DELETE FROM "{tbl}";"#,
2926 ))?;
2927 let select_sql = if kind.is_some() {
2928 format!(
2929 r#"SELECT e.from_id, e.to_id
2930 FROM "{tbl}" f
2931 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2932 ON e.from_id = f.id
2933 WHERE e.kind = ?
2934 ORDER BY e.from_id, e.to_id, e.kind"#,
2935 )
2936 } else {
2937 format!(
2938 r#"SELECT e.from_id, e.to_id
2939 FROM "{tbl}" f
2940 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2941 ON e.from_id = f.id
2942 ORDER BY e.from_id, e.to_id, e.kind"#,
2943 )
2944 };
2945 let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
2946 let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
2947 let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
2948 let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
2949 let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
2950 let mut found_path: Option<GraphPath> = None;
2951 for _depth in 0..hop_limit {
2952 if frontier.is_empty() {
2953 break;
2954 }
2955 self.conn.execute(&delete_sql, [])?;
2956 for id in &frontier {
2957 frontier_insert_stmt.execute([id.as_str()])?;
2958 }
2959 let mut next_frontier = BTreeSet::new();
2960 let rows = if let Some(kind) = kind {
2961 collect_rows(frontier_select_stmt.query_map([kind], |row| {
2962 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2963 })?)?
2964 } else {
2965 collect_rows(frontier_select_stmt.query_map([], |row| {
2966 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2967 })?)?
2968 };
2969 for (from, next) in rows {
2970 if !visited.insert(next.clone()) {
2971 continue;
2972 }
2973 parent.insert(next.clone(), from);
2974 if next == to_id {
2975 let mut nodes = vec![to_id.to_string()];
2976 let mut cursor = to_id;
2977 while let Some(previous) = parent.get(cursor) {
2978 if previous.is_empty() {
2979 break;
2980 }
2981 nodes.push(previous.clone());
2982 cursor = previous;
2983 }
2984 nodes.reverse();
2985 found_path = Some(GraphPath {
2986 hops: nodes.len().saturating_sub(1),
2987 nodes,
2988 });
2989 break;
2990 }
2991 next_frontier.insert(next);
2992 }
2993 if found_path.is_some() {
2994 break;
2995 }
2996 frontier = next_frontier.into_iter().collect();
2997 }
2998 let _ = self.conn.execute_batch(&drop_sql);
2999 Ok(found_path)
3000 })();
3001 self.temp_table_active.set(false);
3002 result
3003 }
3004
3005 fn reachable_nodes_by_kind(
3006 &self,
3007 from_id: &str,
3008 kind: &str,
3009 depth: usize,
3010 limit: usize,
3011 ) -> Result<Vec<(GraphNode, GraphPath)>> {
3012 Ok(self
3013 .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3014 .remove(kind)
3015 .unwrap_or_default())
3016 }
3017
3018 fn reachable_nodes_by_kinds(
3019 &self,
3020 from_id: &str,
3021 kinds: &[&str],
3022 depth: usize,
3023 limit: usize,
3024 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3025 let mut requested = kinds
3026 .iter()
3027 .map(|kind| (*kind).to_string())
3028 .collect::<BTreeSet<_>>()
3029 .into_iter()
3030 .collect::<Vec<_>>();
3031 let mut results = requested
3032 .iter()
3033 .map(|kind| (kind.clone(), Vec::new()))
3034 .collect::<BTreeMap<_, _>>();
3035 if requested.is_empty() {
3036 return Ok(results);
3037 }
3038 requested.sort();
3039 let placeholders = std::iter::repeat_n("?", requested.len())
3040 .collect::<Vec<_>>()
3041 .join(", ");
3042 let mut sql = format!(
3043 r#"
3044 WITH RECURSIVE walk(id, depth, path) AS (
3045 SELECT ?, 0, char(31) || ? || char(31)
3046 UNION ALL
3047 SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3048 FROM walk
3049 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3050 ON e.from_id = walk.id
3051 WHERE walk.depth < ?
3052 AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3053 ),
3054 ranked AS (
3055 SELECT
3056 n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3057 walk.path, walk.depth,
3058 ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3059 FROM walk
3060 JOIN graph_nodes n ON n.id = walk.id
3061 WHERE n.kind IN ({placeholders}) AND n.id <> ?
3062 ),
3063 kind_ranked AS (
3064 SELECT *,
3065 ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3066 FROM ranked
3067 WHERE rn = 1
3068 )
3069 SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3070 FROM kind_ranked
3071 "#,
3072 );
3073 let mut values = vec![
3074 Value::Text(from_id.to_string()),
3075 Value::Text(from_id.to_string()),
3076 Value::Integer(depth as i64),
3077 ];
3078 values.extend(requested.iter().cloned().map(Value::Text));
3079 values.push(Value::Text(from_id.to_string()));
3080 if limit > 0 && limit != usize::MAX {
3081 sql.push_str(" WHERE kind_rank <= ?");
3082 values.push(Value::Integer(limit as i64));
3083 }
3084 sql.push_str(" ORDER BY kind, depth, label, id");
3085 let mut stmt = self.conn.prepare(&sql)?;
3086 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3087 let node = node_from_row(row)?;
3088 let path: String = row.get(6)?;
3089 let hops: usize = row.get(7)?;
3090 Ok((
3091 node,
3092 GraphPath {
3093 nodes: path
3094 .split('\u{1f}')
3095 .filter(|part| !part.is_empty())
3096 .map(str::to_string)
3097 .collect(),
3098 hops,
3099 },
3100 ))
3101 })?)?;
3102 for (node, path) in rows {
3103 results
3104 .entry(node.kind.clone())
3105 .or_default()
3106 .push((node, path));
3107 }
3108 Ok(results)
3109 }
3110
3111 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3112 if let Some(node) = self.node(target)? {
3113 return Ok(Some(node));
3114 }
3115 if kinds.is_empty() {
3116 return Ok(None);
3117 }
3118
3119 let normalized = target.trim().trim_start_matches('#');
3120 let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3121 .collect::<Vec<_>>()
3122 .join(", ");
3123 let kind_rank = kinds
3124 .iter()
3125 .enumerate()
3126 .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3127 .collect::<Vec<_>>()
3128 .join(" ");
3129 let sql = format!(
3130 r#"
3131 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3132 FROM graph_nodes n
3133 WHERE n.kind IN ({kind_placeholders})
3134 AND (
3135 EXISTS (
3136 SELECT 1
3137 FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3138 WHERE p_handle.node_id = n.id
3139 AND p_handle.key = 'handle'
3140 AND p_handle.value = ?
3141 )
3142 OR EXISTS (
3143 SELECT 1
3144 FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3145 WHERE p_ref.node_id = n.id
3146 AND p_ref.key = 'ref_id'
3147 AND p_ref.value = ?
3148 )
3149 OR n.label = ?
3150 OR n.label = ?
3151 )
3152 ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3153 LIMIT 1
3154 "#
3155 );
3156 let mut values = kinds
3157 .iter()
3158 .map(|kind| Value::Text((*kind).to_string()))
3159 .collect::<Vec<_>>();
3160 values.push(Value::Text(target.to_string()));
3161 values.push(Value::Text(normalized.to_string()));
3162 values.push(Value::Text(target.to_string()));
3163 values.push(Value::Text(format!("#{normalized}")));
3164 values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3165 self.conn
3166 .query_row(&sql, params_from_iter(values.iter()), node_from_row)
3167 .optional()
3168 .map_err(Into::into)
3169 }
3170}
3171
3172fn to_json<T: Serialize>(value: &T) -> Result<String> {
3173 serde_json::to_string(value).map_err(Into::into)
3174}
3175
3176fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3177 let payload = serde_json::to_vec(value)?;
3178 Ok(blake3::hash(&payload).to_hex().to_string())
3179}
3180
3181fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3182 value.as_ref().map(to_json).transpose()
3183}
3184
3185fn collect_rows<T>(
3186 rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3187) -> Result<Vec<T>> {
3188 rows.collect::<std::result::Result<Vec<_>, _>>()
3189 .map_err(Into::into)
3190}
3191
3192enum QueryResult {
3193 Meta { total: usize },
3194 Node(GraphNode),
3195 Edge(GraphEdge),
3196}
3197
3198fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3199 let properties_col = offset + 3;
3200 let provenance_col = offset + 4;
3201 let freshness_col = offset + 5;
3202 let properties_json: String = row.get(properties_col)?;
3203 let provenance_json: String = row.get(provenance_col)?;
3204 let freshness_json: Option<String> = row.get(freshness_col)?;
3205 Ok(GraphNode {
3206 id: row.get(offset)?,
3207 kind: row.get(offset + 1)?,
3208 label: row.get(offset + 2)?,
3209 properties: from_json(properties_col, &properties_json)?,
3210 provenance: from_json(provenance_col, &provenance_json)?,
3211 freshness: optional_from_json(freshness_col, freshness_json)?,
3212 })
3213}
3214
3215fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3216 node_from_row_at(row, 0)
3217}
3218
3219fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3220 let properties_col = offset + 4;
3221 let provenance_col = offset + 5;
3222 let freshness_col = offset + 6;
3223 let properties_json: String = row.get(properties_col)?;
3224 let provenance_json: String = row.get(provenance_col)?;
3225 let freshness_json: Option<String> = row.get(freshness_col)?;
3226 Ok(GraphEdge {
3227 id: row.get(offset)?,
3228 from_id: row.get(offset + 1)?,
3229 to_id: row.get(offset + 2)?,
3230 kind: row.get(offset + 3)?,
3231 properties: from_json(properties_col, &properties_json)?,
3232 provenance: from_json(provenance_col, &provenance_json)?,
3233 freshness: optional_from_json(freshness_col, freshness_json)?,
3234 })
3235}
3236
3237fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3238 edge_from_row_at(row, 0)
3239}
3240
3241fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3242 serde_json::from_str(raw)
3243 .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3244}
3245
3246fn optional_from_json<T: DeserializeOwned>(
3247 column: usize,
3248 raw: Option<String>,
3249) -> rusqlite::Result<Option<T>> {
3250 raw.map(|value| from_json(column, &value)).transpose()
3251}
3252
3253fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3254 nodes
3255 .iter()
3256 .find(|node| node.kind == "projection_meta")
3257 .and_then(|node| node.properties.get("projection_version").cloned())
3258}
3259
3260fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3261 nodes
3262 .iter()
3263 .find(|node| node.kind == "projection_meta")
3264 .and_then(|node| node.properties.get("content_hash").cloned())
3265}
3266
3267fn unix_now() -> i64 {
3268 std::time::SystemTime::now()
3269 .duration_since(std::time::UNIX_EPOCH)
3270 .map(|duration| duration.as_secs() as i64)
3271 .unwrap_or_default()
3272}
3273
3274fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3275 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3276 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3277 Ok(page_count.saturating_mul(page_size))
3278}
3279
3280fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3281 let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3282 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3283 Ok(freelist_count.saturating_mul(page_size))
3284}
3285
3286#[cfg(test)]
3287mod tests {
3288 use super::*;
3289
3290 fn sample_provenance() -> GraphProvenance {
3291 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3292 }
3293
3294 fn sample_projection() -> GraphProjection {
3295 let source = sample_provenance();
3296 GraphProjection {
3297 nodes: vec![
3298 GraphNode::new("doc:livekit", "document", "LiveKit guide")
3299 .with_property("domain", "livekit")
3300 .with_provenance(source.clone())
3301 .with_freshness(GraphFreshness::content_hash("node-hash")),
3302 GraphNode::new("topic:rooms", "topic", "Rooms"),
3303 GraphNode::new("topic:egress", "topic", "Egress"),
3304 ],
3305 edges: vec![
3306 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3307 .with_property("confidence", "0.91")
3308 .with_provenance(source.clone())
3309 .with_freshness(GraphFreshness::content_hash("edge-hash")),
3310 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3311 ],
3312 }
3313 }
3314
3315 fn assert_projection_store_contract(store: &impl GraphStore) {
3316 let projection = sample_projection();
3317 projection.upsert_into(store).unwrap();
3318
3319 assert_eq!(
3320 store.node("doc:livekit").unwrap(),
3321 projection
3322 .nodes
3323 .iter()
3324 .find(|node| node.id == "doc:livekit")
3325 .cloned()
3326 );
3327 assert_eq!(
3328 store.nodes_by_kind("topic").unwrap(),
3329 vec![
3330 GraphNode::new("topic:egress", "topic", "Egress"),
3331 GraphNode::new("topic:rooms", "topic", "Rooms"),
3332 ]
3333 );
3334
3335 let mentions = store
3336 .outgoing_edges("doc:livekit", Some("mentions"))
3337 .unwrap();
3338 assert_eq!(mentions.len(), 1);
3339 assert_eq!(mentions[0].to_id, "topic:rooms");
3340 assert_eq!(
3341 mentions[0].properties.get("confidence"),
3342 Some(&"0.91".into())
3343 );
3344
3345 let path = store
3346 .shortest_path("doc:livekit", "topic:egress", None)
3347 .unwrap()
3348 .unwrap();
3349 assert_eq!(
3350 path.nodes,
3351 vec!["doc:livekit", "topic:rooms", "topic:egress"]
3352 );
3353 }
3354
3355 #[test]
3356 fn sqlite_store_round_trips_generic_nodes_edges() {
3357 let store = SqliteGraphStore::in_memory().unwrap();
3358 let source = sample_provenance();
3359 let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3360 .with_property("domain", "livekit")
3361 .with_provenance(source.clone())
3362 .with_freshness(GraphFreshness::content_hash("node-hash"));
3363 let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
3364 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3365 .with_property("confidence", "0.91")
3366 .with_provenance(source)
3367 .with_freshness(GraphFreshness::content_hash("edge-hash"));
3368
3369 store.upsert_node(&node).unwrap();
3370 store.upsert_node(&topic).unwrap();
3371 store.upsert_edge(&edge).unwrap();
3372
3373 assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
3374 assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
3375 assert_eq!(store.all_nodes().unwrap().len(), 2);
3376 assert_eq!(store.all_edges().unwrap().len(), 1);
3377 assert_eq!(
3378 store
3379 .outgoing_edges("doc:livekit", Some("mentions"))
3380 .unwrap(),
3381 vec![edge]
3382 );
3383 }
3384
3385 #[test]
3386 fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
3387 let store = SqliteGraphStore::in_memory().unwrap();
3388 for node in [
3389 GraphNode::new("doc:livekit", "document", "LiveKit guide"),
3390 GraphNode::new("topic:rooms", "topic", "Rooms"),
3391 GraphNode::new("topic:egress", "topic", "Egress"),
3392 ] {
3393 store.upsert_node(&node).unwrap();
3394 }
3395 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3396 .with_property("confidence", "0.91");
3397 let edge_id = edge.id.clone();
3398 store.upsert_edge(&edge).unwrap();
3399 store
3400 .upsert_edge(
3401 &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
3402 .with_property("confidence", "0.42"),
3403 )
3404 .unwrap();
3405
3406 assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
3407 let mut expected_incident_ids = vec![
3408 GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
3409 GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
3410 ];
3411 expected_incident_ids.sort();
3412 assert_eq!(
3413 store
3414 .incident_edges("topic:rooms", None)
3415 .unwrap()
3416 .into_iter()
3417 .map(|edge| edge.id)
3418 .collect::<Vec<_>>(),
3419 expected_incident_ids
3420 );
3421
3422 let page = store
3423 .paged_edges(
3424 Some("mentions"),
3425 GraphQueryOptions {
3426 property_filters: vec![GraphPropertyFilter {
3427 key: "confidence".to_string(),
3428 value: "0.91".to_string(),
3429 }],
3430 ..GraphQueryOptions::default()
3431 },
3432 )
3433 .unwrap();
3434 assert_eq!(page.edges.len(), 1);
3435 assert_eq!(page.edges[0].id, edge_id);
3436 assert!(
3437 page.page
3438 .diagnostics
3439 .iter()
3440 .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
3441 "{:?}",
3442 page.page.diagnostics
3443 );
3444 assert!(
3445 page.page
3446 .diagnostics
3447 .iter()
3448 .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
3449 "{:?}",
3450 page.page.diagnostics
3451 );
3452 assert!(
3453 page.page.diagnostics.iter().any(|diagnostic| diagnostic
3454 .contains("edge property primary filter matched 1 materialized row")),
3455 "{:?}",
3456 page.page.diagnostics
3457 );
3458 assert!(
3459 page.page
3460 .diagnostics
3461 .iter()
3462 .any(|diagnostic| diagnostic
3463 .contains("drives from SQLite materialized property rows")),
3464 "{:?}",
3465 page.page.diagnostics
3466 );
3467
3468 let property_rows: usize = store
3469 .conn
3470 .query_row(
3471 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3472 [],
3473 |row| row.get(0),
3474 )
3475 .unwrap();
3476 assert_eq!(property_rows, 2);
3477 }
3478
3479 #[test]
3480 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
3481 let sqlite = SqliteGraphStore::in_memory().unwrap();
3482 assert_projection_store_contract(&sqlite);
3483 }
3484
3485 #[test]
3486 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
3487 fn assert_crud_contract(store: &impl GraphStore) {
3488 let projection = sample_projection();
3489 projection.upsert_into(store).unwrap();
3490
3491 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
3492 assert_eq!(
3493 neighborhood
3494 .nodes
3495 .iter()
3496 .map(|node| node.id.as_str())
3497 .collect::<Vec<_>>(),
3498 vec!["doc:livekit", "topic:egress", "topic:rooms"]
3499 );
3500 assert_eq!(
3501 neighborhood
3502 .edges
3503 .iter()
3504 .map(|edge| (
3505 edge.from_id.as_str(),
3506 edge.kind.as_str(),
3507 edge.to_id.as_str()
3508 ))
3509 .collect::<Vec<_>>(),
3510 vec![
3511 ("doc:livekit", "mentions", "topic:rooms"),
3512 ("topic:rooms", "related_to", "topic:egress"),
3513 ]
3514 );
3515
3516 assert_eq!(
3517 store
3518 .delete_edge("topic:rooms", "topic:egress", "related_to")
3519 .unwrap(),
3520 1
3521 );
3522 assert!(
3523 store
3524 .shortest_path("doc:livekit", "topic:egress", None)
3525 .unwrap()
3526 .is_none()
3527 );
3528 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
3529 assert!(store.node("topic:rooms").unwrap().is_none());
3530 assert!(
3531 store
3532 .outgoing_edges("doc:livekit", None)
3533 .unwrap()
3534 .is_empty()
3535 );
3536 }
3537
3538 assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
3539 }
3540
3541 #[test]
3542 fn sqlite_upsert_projection_batches_rows_and_properties() {
3543 let mut store = SqliteGraphStore::in_memory().unwrap();
3544 let mut projection = sample_projection();
3545 store.upsert_projection(&projection).unwrap();
3546
3547 let page = store
3548 .paged_nodes_by_kind(
3549 "document",
3550 GraphQueryOptions {
3551 property_filters: vec![GraphPropertyFilter {
3552 key: "domain".to_string(),
3553 value: "livekit".to_string(),
3554 }],
3555 ..GraphQueryOptions::default()
3556 },
3557 )
3558 .unwrap();
3559 assert_eq!(page.nodes[0].id, "doc:livekit");
3560
3561 projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3562 .with_property("domain", "recording");
3563 store.upsert_projection(&projection).unwrap();
3564
3565 let old_property_count: usize = store
3566 .conn
3567 .query_row(
3568 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
3569 [],
3570 |row| row.get(0),
3571 )
3572 .unwrap();
3573 let updated_page = store
3574 .paged_nodes_by_kind(
3575 "document",
3576 GraphQueryOptions {
3577 property_filters: vec![GraphPropertyFilter {
3578 key: "domain".to_string(),
3579 value: "recording".to_string(),
3580 }],
3581 ..GraphQueryOptions::default()
3582 },
3583 )
3584 .unwrap();
3585 assert_eq!(old_property_count, 0);
3586 assert_eq!(updated_page.nodes[0].id, "doc:livekit");
3587 let edge_property_count: usize = store
3588 .conn
3589 .query_row(
3590 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3591 [],
3592 |row| row.get(0),
3593 )
3594 .unwrap();
3595 assert_eq!(edge_property_count, 1);
3596 }
3597
3598 #[test]
3599 fn sqlite_store_filters_edges_by_kind_and_paths() {
3600 let store = SqliteGraphStore::in_memory().unwrap();
3601 for id in ["a", "b", "c"] {
3602 store
3603 .upsert_node(&GraphNode::new(id, "symbol", id))
3604 .unwrap();
3605 }
3606 store
3607 .upsert_edge(&GraphEdge::new("a", "b", "calls"))
3608 .unwrap();
3609 store
3610 .upsert_edge(&GraphEdge::new("a", "c", "documents"))
3611 .unwrap();
3612 store
3613 .upsert_edge(&GraphEdge::new("b", "c", "calls"))
3614 .unwrap();
3615
3616 let calls = store.outgoing_edges("a", Some("calls")).unwrap();
3617 assert_eq!(calls.len(), 1);
3618 assert_eq!(calls[0].to_id, "b");
3619 assert_eq!(store.graph_counts().unwrap(), (3, 3));
3620 assert_eq!(
3621 store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
3622 "b"
3623 );
3624
3625 let path = store
3626 .shortest_path("a", "c", Some("calls"))
3627 .unwrap()
3628 .unwrap();
3629 assert_eq!(path.nodes, vec!["a", "b", "c"]);
3630 assert_eq!(path.hops, 2);
3631
3632 assert!(
3633 store
3634 .shortest_path("c", "a", Some("calls"))
3635 .unwrap()
3636 .is_none()
3637 );
3638 }
3639
3640 #[test]
3641 fn sqlite_store_batches_edges_between_node_sets() {
3642 let store = SqliteGraphStore::in_memory().unwrap();
3643 for id in ["a", "b", "c", "outside"] {
3644 store
3645 .upsert_node(&GraphNode::new(id, "symbol", id))
3646 .unwrap();
3647 }
3648 for edge in [
3649 GraphEdge::new("a", "b", "calls"),
3650 GraphEdge::new("b", "c", "calls"),
3651 GraphEdge::new("a", "outside", "calls"),
3652 GraphEdge::new("outside", "c", "calls"),
3653 ] {
3654 store.upsert_edge(&edge).unwrap();
3655 }
3656
3657 let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
3658 .into_iter()
3659 .collect::<BTreeSet<_>>();
3660 let edge_keys = store
3661 .edges_between_nodes(&scoped)
3662 .unwrap()
3663 .into_iter()
3664 .map(|edge| (edge.from_id, edge.kind, edge.to_id))
3665 .collect::<Vec<_>>();
3666
3667 assert_eq!(
3668 edge_keys,
3669 vec![
3670 ("a".to_string(), "calls".to_string(), "b".to_string()),
3671 ("b".to_string(), "calls".to_string(), "c".to_string()),
3672 ]
3673 );
3674 }
3675
3676 #[test]
3677 fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
3678 let mut store = SqliteGraphStore::in_memory().unwrap();
3679 let mut projection = sample_projection();
3680 projection.nodes.push(
3681 GraphNode::new(
3682 "projection:fixture",
3683 "projection_meta",
3684 "fixture projection",
3685 )
3686 .with_property("projection_version", "fixture-v1")
3687 .with_property("content_hash", "hash-a"),
3688 );
3689 store
3690 .replace_projection_with_version(
3691 "root",
3692 &projection,
3693 Some("fixture-v1"),
3694 Some("commit-a".to_string()),
3695 )
3696 .unwrap();
3697
3698 projection.nodes.retain(|node| node.id != "topic:egress");
3699 projection.edges.retain(|edge| edge.to_id != "topic:egress");
3700 let refresh = store
3701 .replace_projection_with_version(
3702 "root",
3703 &projection,
3704 Some("fixture-v2"),
3705 Some("commit-b".to_string()),
3706 )
3707 .unwrap();
3708
3709 assert_eq!(refresh.projection_version, "fixture-v2");
3710 assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
3711 assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
3712 assert_eq!(refresh.tombstoned_edges.len(), 1);
3713 assert_eq!(refresh.deleted_nodes, 1);
3714 assert_eq!(refresh.deleted_edges, 1);
3715 assert_eq!(refresh.unchanged_nodes, 3);
3716 assert_eq!(refresh.upserted_nodes, 0);
3717 assert_eq!(refresh.unchanged_properties, 4);
3718 assert_eq!(refresh.upserted_properties, 0);
3719 assert_eq!(refresh.deleted_properties, 0);
3720 assert!(
3721 refresh
3722 .phase_timings
3723 .iter()
3724 .any(|phase| phase.name == "sqlite_property_row_staging"),
3725 "{:?}",
3726 refresh.phase_timings
3727 );
3728 assert!(
3729 refresh
3730 .phase_timings
3731 .iter()
3732 .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
3733 "{:?}",
3734 refresh.phase_timings
3735 );
3736 let version = store.projection_version("root").unwrap().unwrap();
3737 assert_eq!(version.projection_version, "fixture-v2");
3738 assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
3739 let cached_counts: (usize, usize, usize, usize) = store
3740 .conn
3741 .query_row(
3742 r#"
3743 SELECT nodes, edges, tombstone_nodes, tombstone_edges
3744 FROM graph_operator_stats
3745 WHERE scope = 'root'
3746 "#,
3747 [],
3748 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3749 )
3750 .unwrap();
3751 assert_eq!(cached_counts, (3, 1, 1, 1));
3752
3753 projection
3754 .nodes
3755 .push(GraphNode::new("topic:egress", "topic", "Egress"));
3756 let refresh = store
3757 .replace_projection_with_version(
3758 "root",
3759 &projection,
3760 Some("fixture-v3"),
3761 Some("commit-c".to_string()),
3762 )
3763 .unwrap();
3764 assert_eq!(refresh.pruned_tombstones, 1);
3765 assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
3766
3767 projection.nodes.retain(|node| node.id != "topic:egress");
3768 store
3769 .replace_projection_with_version(
3770 "root",
3771 &projection,
3772 Some("fixture-v4"),
3773 Some("commit-d".to_string()),
3774 )
3775 .unwrap();
3776 assert_eq!(store.compact_storage("root", true).unwrap(), 2);
3777 let cached_counts: (usize, usize, usize, usize) = store
3778 .conn
3779 .query_row(
3780 r#"
3781 SELECT nodes, edges, tombstone_nodes, tombstone_edges
3782 FROM graph_operator_stats
3783 WHERE scope = 'root'
3784 "#,
3785 [],
3786 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3787 )
3788 .unwrap();
3789 assert_eq!(cached_counts, (3, 1, 0, 0));
3790 }
3791
3792 #[test]
3793 fn sqlite_shortest_path_uses_bounded_frontier() {
3794 let store = SqliteGraphStore::in_memory().unwrap();
3795 for idx in 0..80 {
3796 store
3797 .upsert_node(&GraphNode::new(
3798 format!("node:{idx:02}"),
3799 "symbol",
3800 format!("node {idx:02}"),
3801 ))
3802 .unwrap();
3803 }
3804 for idx in 0..79 {
3805 store
3806 .upsert_edge(&GraphEdge::new(
3807 format!("node:{idx:02}"),
3808 format!("node:{:02}", idx + 1),
3809 "calls",
3810 ))
3811 .unwrap();
3812 }
3813 store
3814 .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
3815 .unwrap();
3816
3817 assert!(
3818 store
3819 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
3820 .unwrap()
3821 .is_none()
3822 );
3823 let path = store
3824 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
3825 .unwrap()
3826 .unwrap();
3827 assert_eq!(path.hops, 79);
3828 assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
3829 assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
3830
3831 let direct = store
3832 .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
3833 .unwrap()
3834 .unwrap();
3835 assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
3836 }
3837
3838 #[test]
3839 fn sqlite_resolves_evidence_targets_with_indexed_properties() {
3840 let store = SqliteGraphStore::in_memory().unwrap();
3841 for node in [
3842 GraphNode::new("gbak-refresh", "backlog", "#refresh")
3843 .with_property("ref_id", "refresh")
3844 .with_property("handle", "backlog-handle"),
3845 GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
3846 .with_property("ref_id", "refresh"),
3847 GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
3848 .with_property("ref_id", "refresh"),
3849 ] {
3850 store.upsert_node(&node).unwrap();
3851 }
3852
3853 let by_ref = store
3854 .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
3855 .unwrap()
3856 .unwrap();
3857 assert_eq!(by_ref.id, "gbak-refresh");
3858 let by_handle = store
3859 .resolve_evidence_target("backlog-handle", &["backlog"])
3860 .unwrap()
3861 .unwrap();
3862 assert_eq!(by_handle.id, "gbak-refresh");
3863 }
3864
3865 #[test]
3866 fn sqlite_schema_migration_backfills_materialized_node_properties() {
3867 let conn = Connection::open_in_memory().unwrap();
3868 conn.execute_batch(
3869 r#"
3870 PRAGMA user_version = 2;
3871 CREATE TABLE graph_nodes (
3872 id TEXT PRIMARY KEY,
3873 kind TEXT NOT NULL,
3874 label TEXT NOT NULL,
3875 properties_json TEXT NOT NULL DEFAULT '{}',
3876 provenance_json TEXT NOT NULL DEFAULT '[]',
3877 freshness_json TEXT,
3878 row_hash TEXT,
3879 source_watermark TEXT
3880 );
3881 CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
3882 CREATE TABLE graph_edges (
3883 from_id TEXT NOT NULL,
3884 to_id TEXT NOT NULL,
3885 kind TEXT NOT NULL,
3886 properties_json TEXT NOT NULL DEFAULT '{}',
3887 provenance_json TEXT NOT NULL DEFAULT '[]',
3888 freshness_json TEXT,
3889 row_hash TEXT,
3890 source_watermark TEXT,
3891 PRIMARY KEY (from_id, to_id, kind)
3892 );
3893 CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
3894 CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
3895 CREATE TABLE graph_projection_versions (
3896 scope TEXT PRIMARY KEY,
3897 projection_version TEXT NOT NULL,
3898 content_hash TEXT,
3899 source_watermark TEXT,
3900 observed_at_unix INTEGER NOT NULL
3901 );
3902 CREATE TABLE graph_tombstones (
3903 row_key TEXT PRIMARY KEY,
3904 row_kind TEXT NOT NULL,
3905 deleted_at_unix INTEGER NOT NULL
3906 );
3907 INSERT INTO graph_nodes
3908 (id, kind, label, properties_json, provenance_json)
3909 VALUES
3910 ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
3911 ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]');
3912 INSERT INTO graph_edges
3913 (from_id, to_id, kind, properties_json, provenance_json)
3914 VALUES
3915 ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
3916 "#,
3917 )
3918 .unwrap();
3919
3920 let store = SqliteGraphStore::from_connection(conn).unwrap();
3921 let version: i64 = store
3922 .conn
3923 .pragma_query_value(None, "user_version", |row| row.get(0))
3924 .unwrap();
3925 assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
3926 let property_rows: usize = store
3927 .conn
3928 .query_row(
3929 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
3930 [],
3931 |row| row.get(0),
3932 )
3933 .unwrap();
3934 assert_eq!(property_rows, 2);
3935 let edge_property_rows: usize = store
3936 .conn
3937 .query_row(
3938 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3939 [],
3940 |row| row.get(0),
3941 )
3942 .unwrap();
3943 assert_eq!(edge_property_rows, 1);
3944 let edge = store
3945 .edge(&GraphEdge::stable_id(
3946 "topic:rooms",
3947 "topic:egress",
3948 "mentions",
3949 ))
3950 .unwrap()
3951 .unwrap();
3952 assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
3953
3954 let page = store
3955 .paged_nodes_by_kind(
3956 "topic",
3957 GraphQueryOptions {
3958 property_filters: vec![GraphPropertyFilter {
3959 key: "domain".to_string(),
3960 value: "livekit".to_string(),
3961 }],
3962 ..GraphQueryOptions::default()
3963 },
3964 )
3965 .unwrap();
3966 assert_eq!(page.nodes[0].id, "topic:rooms");
3967 assert!(
3968 page.page
3969 .diagnostics
3970 .iter()
3971 .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
3972 "{:?}",
3973 page.page.diagnostics
3974 );
3975 }
3976
3977 #[test]
3978 fn sqlite_store_batches_reachable_nodes_by_kinds() {
3979 let store = SqliteGraphStore::in_memory().unwrap();
3980 for node in [
3981 GraphNode::new("start", "backlog", "start"),
3982 GraphNode::new("ctx", "worker_context", "context"),
3983 GraphNode::new("src", "source_handle", "source"),
3984 GraphNode::new("sem", "semantic_concept", "concept"),
3985 ] {
3986 store.upsert_node(&node).unwrap();
3987 }
3988 store
3989 .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
3990 .unwrap();
3991 store
3992 .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
3993 .unwrap();
3994 store
3995 .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
3996 .unwrap();
3997
3998 let rows = store
3999 .reachable_nodes_by_kinds(
4000 "start",
4001 &["worker_context", "source_handle", "semantic_concept"],
4002 2,
4003 8,
4004 )
4005 .unwrap();
4006 assert_eq!(rows["worker_context"][0].0.id, "ctx");
4007 assert_eq!(
4008 rows["source_handle"][0].1.nodes,
4009 vec!["start", "ctx", "src"]
4010 );
4011 assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4012 }
4013
4014 #[test]
4015 fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4016 let mut store = SqliteGraphStore::in_memory().unwrap();
4017 let source = GraphProvenance::new("fixture", "bulk");
4018 let mut projection = GraphProjection::default();
4019 for idx in 0..128 {
4020 projection.nodes.push(
4021 GraphNode::new(
4022 format!("node:{idx:03}"),
4023 if idx % 2 == 0 { "symbol" } else { "file" },
4024 format!("bulk node {idx:03}"),
4025 )
4026 .with_property("ordinal", idx.to_string())
4027 .with_provenance(source.clone())
4028 .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4029 );
4030 }
4031 for idx in 0..127 {
4032 projection.edges.push(
4033 GraphEdge::new(
4034 format!("node:{idx:03}"),
4035 format!("node:{:03}", idx + 1),
4036 "next",
4037 )
4038 .with_property("ordinal", idx.to_string())
4039 .with_provenance(source.clone())
4040 .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4041 );
4042 }
4043
4044 store
4045 .replace_projection_with_version(
4046 "root",
4047 &projection,
4048 Some("bulk-v1"),
4049 Some("commit-a".to_string()),
4050 )
4051 .unwrap();
4052
4053 projection
4054 .nodes
4055 .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4056 projection.edges.retain(|edge| {
4057 !edge.from_id.ends_with("000")
4058 && !edge.to_id.ends_with("000")
4059 && !edge.from_id.ends_with("064")
4060 && !edge.to_id.ends_with("064")
4061 });
4062 let refresh = store
4063 .replace_projection_with_version(
4064 "root",
4065 &projection,
4066 Some("bulk-v2"),
4067 Some("commit-b".to_string()),
4068 )
4069 .unwrap();
4070
4071 assert_eq!(store.all_nodes().unwrap().len(), 126);
4072 assert_eq!(store.all_edges().unwrap().len(), 124);
4073 assert_eq!(
4074 refresh.tombstoned_nodes,
4075 vec!["node:000".to_string(), "node:064".to_string()]
4076 );
4077 assert_eq!(refresh.tombstoned_edges.len(), 3);
4078 assert_eq!(refresh.deleted_nodes, 2);
4079 assert_eq!(refresh.deleted_edges, 3);
4080 assert_eq!(refresh.unchanged_nodes, 126);
4081 assert_eq!(refresh.unchanged_edges, 124);
4082 assert_eq!(refresh.upserted_nodes, 0);
4083 assert_eq!(refresh.upserted_edges, 0);
4084 assert_eq!(refresh.unchanged_properties, 250);
4085 assert_eq!(refresh.upserted_properties, 0);
4086 assert!(
4087 refresh
4088 .phase_timings
4089 .iter()
4090 .any(|phase| phase.name == "sqlite_node_staging"
4091 && phase.detail.contains("126 unchanged skipped")
4092 && phase.detail.contains("multi-row chunks up to 500 rows")),
4093 "{:?}",
4094 refresh.phase_timings
4095 );
4096 assert!(
4097 refresh
4098 .phase_timings
4099 .iter()
4100 .any(|phase| phase.name == "sqlite_node_staging"
4101 && phase.detail.contains("124 unchanged skipped")),
4102 "{:?}",
4103 refresh.phase_timings
4104 );
4105 let staged_node_properties: usize = store
4106 .conn
4107 .query_row(
4108 "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4109 [],
4110 |row| row.get(0),
4111 )
4112 .unwrap();
4113 let staged_edge_properties: usize = store
4114 .conn
4115 .query_row(
4116 "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4117 [],
4118 |row| row.get(0),
4119 )
4120 .unwrap();
4121 assert_eq!(staged_node_properties, 0);
4122 assert_eq!(staged_edge_properties, 0);
4123 assert!(
4124 refresh
4125 .phase_timings
4126 .iter()
4127 .any(|phase| phase.name == "sqlite_property_row_staging"
4128 && phase.detail.contains("new/changed node rows")),
4129 "{:?}",
4130 refresh.phase_timings
4131 );
4132 assert!(
4133 refresh
4134 .phase_timings
4135 .iter()
4136 .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4137 && phase.detail.contains("new/changed edge rows")),
4138 "{:?}",
4139 refresh.phase_timings
4140 );
4141 assert_eq!(
4142 store
4143 .projection_version("root")
4144 .unwrap()
4145 .unwrap()
4146 .source_watermark
4147 .as_deref(),
4148 Some("commit-b")
4149 );
4150 }
4151
4152 #[test]
4153 fn sqlite_reentrant_temp_table_guard_panics() {
4154 let store = SqliteGraphStore::in_memory().unwrap();
4155 store.temp_table_active.set(true);
4156 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4157 store.assert_not_in_temp_table_section();
4158 }));
4159 assert!(result.is_err());
4160 }
4161
4162 #[test]
4163 fn sqlite_temp_table_guard_clears_after_method() {
4164 let mut store = SqliteGraphStore::in_memory().unwrap();
4165 let projection = GraphProjection {
4166 nodes: vec![],
4167 edges: vec![],
4168 };
4169 store.replace_projection(&projection).unwrap();
4170 assert!(!store.temp_table_active.get());
4171 }
4172}