1use anyhow::{Context, Result, bail};
2use rusqlite::{
3 Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4 types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::collections::{BTreeMap, BTreeSet};
8use std::cell::Cell;
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15 ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16 ConvexRowsGraphClient, GraphEdge, GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath,
17 GraphProjection, GraphPropertyFilter, GraphProvenance, GraphQueryOptions, GraphQueryPage,
18 GraphStore, GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
19 SQLITE_GRAPH_SCHEMA_VERSION, TerseGraphEdge, TerseGraphNode,
20 apply_graph_edge_query_page,
21 apply_graph_query_page, graph_edge_id, shortest_path_using_outgoing, stable_graph_edge_id,
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "snake_case")]
26pub enum ReadOnlyRecovery {
27 SnapshotFallback,
28 SnapshotFallbackWal,
29}
30
31pub fn read_only_snapshot_recovery(
32 db_path: &Path,
33 err: &anyhow::Error,
34) -> Option<ReadOnlyRecovery> {
35 if !error_mentions_locked_db(err) {
36 return None;
37 }
38 if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
39 Some(ReadOnlyRecovery::SnapshotFallbackWal)
40 } else if rollback_journal_path(db_path).exists() {
41 Some(ReadOnlyRecovery::SnapshotFallback)
42 } else {
43 None
44 }
45}
46
47pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
48 let mut journal = db_path.as_os_str().to_os_string();
49 journal.push("-journal");
50 PathBuf::from(journal)
51}
52
53pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
54 let mut wal = db_path.as_os_str().to_os_string();
55 wal.push("-wal");
56 PathBuf::from(wal)
57}
58
59pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
60 let mut shm = db_path.as_os_str().to_os_string();
61 shm.push("-shm");
62 PathBuf::from(shm)
63}
64
65pub fn copy_read_only_snapshot(
66 db_path: &Path,
67 default_stem: &str,
68) -> Result<(PathBuf, Vec<PathBuf>)> {
69 let snapshot_path = snapshot_copy_path(db_path, default_stem);
70 std::fs::copy(db_path, &snapshot_path).with_context(|| {
71 format!(
72 "copying locked db {} to snapshot {}",
73 db_path.display(),
74 snapshot_path.display()
75 )
76 })?;
77 let mut cleanup_paths = vec![snapshot_path.clone()];
78 copy_optional_snapshot_sidecar(
79 &wal_sidecar_path(db_path),
80 &wal_sidecar_path(&snapshot_path),
81 &mut cleanup_paths,
82 )?;
83 copy_optional_snapshot_sidecar(
84 &shared_memory_sidecar_path(db_path),
85 &shared_memory_sidecar_path(&snapshot_path),
86 &mut cleanup_paths,
87 )?;
88 Ok((snapshot_path, cleanup_paths))
89}
90
91pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
92 err.chain()
93 .any(|cause| cause.to_string().contains("database is locked"))
94}
95
96fn copy_optional_snapshot_sidecar(
97 source_path: &Path,
98 snapshot_path: &Path,
99 cleanup_paths: &mut Vec<PathBuf>,
100) -> Result<()> {
101 match std::fs::copy(source_path, snapshot_path) {
102 Ok(_) => {
103 cleanup_paths.push(snapshot_path.to_path_buf());
104 Ok(())
105 }
106 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
107 Err(err) => Err(err).with_context(|| {
108 format!(
109 "copying SQLite sidecar {} to snapshot {}",
110 source_path.display(),
111 snapshot_path.display()
112 )
113 }),
114 }
115}
116
117fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
118 let nanos = SystemTime::now()
119 .duration_since(UNIX_EPOCH)
120 .unwrap_or(Duration::ZERO)
121 .as_nanos();
122 let stem = db_path
123 .file_stem()
124 .and_then(|stem| stem.to_str())
125 .unwrap_or(default_stem);
126 let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
127 file_name.push(".db");
128 std::env::temp_dir().join(file_name)
129}
130const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
131const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
132
133pub struct SqliteGraphStore {
146 conn: Connection,
147 _snapshot_copy: Option<SnapshotCopyGuard>,
148 read_only_recovery: Option<ReadOnlyRecovery>,
149 temp_table_active: Cell<bool>,
150}
151
152pub struct SqliteReadOnlyConnection {
163 conn: Connection,
164 _snapshot_copy: Option<SnapshotCopyGuard>,
165 recovery: Option<ReadOnlyRecovery>,
166}
167
168static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
169
170impl SqliteReadOnlyConnection {
171 pub fn conn(&self) -> &Connection {
172 &self.conn
173 }
174
175 pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
176 self.recovery
177 }
178}
179
180struct SnapshotCopyGuard {
181 paths: Vec<PathBuf>,
182}
183
184impl Drop for SnapshotCopyGuard {
185 fn drop(&mut self) {
186 for path in &self.paths {
187 let _ = std::fs::remove_file(path);
188 }
189 }
190}
191
192fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
193 conn.busy_timeout(Duration::from_secs(5))?;
194 conn.pragma_update(None, "journal_mode", "WAL")?;
195 let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
196 if mode.to_lowercase() != "wal" {
197 bail!(
198 "graph substrate db {} requires WAL mode for concurrent reads, got {}",
199 db_path.display(),
200 mode
201 );
202 }
203 conn.pragma_update(
204 None,
205 "wal_autocheckpoint",
206 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
207 )?;
208 let checkpoint_pages: i64 =
209 conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
210 if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
211 bail!(
212 "graph substrate db {} requires wal_autocheckpoint={}, got {}",
213 db_path.display(),
214 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
215 checkpoint_pages
216 );
217 }
218 Ok(())
219}
220
221fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
222 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
223 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
224 for row in rows {
225 if row? == column {
226 return Ok(true);
227 }
228 }
229 Ok(false)
230}
231
232fn add_column_if_missing(
233 conn: &Connection,
234 table: &str,
235 column: &str,
236 definition: &str,
237) -> Result<()> {
238 if !sqlite_column_exists(conn, table, column)? {
239 conn.execute(
240 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
241 [],
242 )?;
243 }
244 Ok(())
245}
246
247fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
248 if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
249 return Ok(());
250 }
251 let rows = {
252 let mut stmt = conn.prepare(
253 r#"
254 SELECT from_id, to_id, kind
255 FROM graph_edges
256 WHERE edge_key IS NULL OR edge_key = ''
257 ORDER BY from_id, kind, to_id
258 "#,
259 )?;
260 collect_rows(stmt.query_map([], |row| {
261 Ok((
262 row.get::<_, String>(0)?,
263 row.get::<_, String>(1)?,
264 row.get::<_, String>(2)?,
265 ))
266 })?)?
267 };
268 let mut update = conn.prepare(
269 r#"
270 UPDATE graph_edges
271 SET edge_key = ?4
272 WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
273 "#,
274 )?;
275 for (from_id, to_id, kind) in rows {
276 update.execute((
277 &from_id,
278 &to_id,
279 &kind,
280 stable_graph_edge_id(&from_id, &to_id, &kind),
281 ))?;
282 }
283 Ok(())
284}
285
286fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
287 if old_version < 2 {
288 add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
289 add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
290 add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
291 add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
292 }
293 if old_version < 3 {
294 rebuild_graph_node_properties(conn)?;
295 }
296 if old_version < 4 {
297 ensure_sqlite_graph_operator_stats_schema(conn)?;
298 }
299 if old_version < 5 {
300 add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
301 backfill_graph_edge_keys(conn)?;
302 ensure_sqlite_graph_edge_properties_schema(conn)?;
303 rebuild_graph_edge_properties(conn)?;
304 }
305 Ok(())
306}
307
308fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
309 conn.execute_batch(
310 r#"
311 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
312 ON graph_nodes(kind, label, id);
313 CREATE TABLE IF NOT EXISTS graph_operator_stats (
314 scope TEXT PRIMARY KEY,
315 nodes INTEGER NOT NULL,
316 edges INTEGER NOT NULL,
317 tombstone_nodes INTEGER NOT NULL,
318 tombstone_edges INTEGER NOT NULL,
319 file_size_bytes INTEGER,
320 freelist_bytes INTEGER,
321 observed_at_unix INTEGER NOT NULL
322 );
323 "#,
324 )?;
325 Ok(())
326}
327
328fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
329 conn.execute_batch(
330 r#"
331 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
332 ON graph_edges(edge_key);
333 CREATE TABLE IF NOT EXISTS graph_edge_properties (
334 edge_key TEXT NOT NULL,
335 key TEXT NOT NULL,
336 value TEXT NOT NULL,
337 PRIMARY KEY (edge_key, key),
338 FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
339 );
340 CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
341 ON graph_edge_properties(key, value, edge_key);
342 "#,
343 )?;
344 Ok(())
345}
346
347fn replace_node_properties(
348 conn: &Connection,
349 node_id: &str,
350 properties: &BTreeMap<String, String>,
351) -> Result<()> {
352 conn.execute(
353 "DELETE FROM graph_node_properties WHERE node_id = ?1",
354 [node_id],
355 )?;
356 let mut insert = conn.prepare(
357 r#"
358 INSERT INTO graph_node_properties (node_id, key, value)
359 VALUES (?1, ?2, ?3)
360 ON CONFLICT(node_id, key) DO UPDATE SET
361 value = excluded.value
362 "#,
363 )?;
364 for (key, value) in properties {
365 insert.execute((node_id, key, value))?;
366 }
367 Ok(())
368}
369
370fn replace_edge_properties(
371 conn: &Connection,
372 edge_key: &str,
373 properties: &BTreeMap<String, String>,
374) -> Result<()> {
375 conn.execute(
376 "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
377 [edge_key],
378 )?;
379 let mut insert = conn.prepare(
380 r#"
381 INSERT INTO graph_edge_properties (edge_key, key, value)
382 VALUES (?1, ?2, ?3)
383 ON CONFLICT(edge_key, key) DO UPDATE SET
384 value = excluded.value
385 "#,
386 )?;
387 for (key, value) in properties {
388 insert.execute((edge_key, key, value))?;
389 }
390 Ok(())
391}
392
393fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
394 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
395 return Ok(());
396 }
397 conn.execute_batch(
398 r#"
399 DELETE FROM graph_node_properties;
400 INSERT INTO graph_node_properties (node_id, key, value)
401 SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
402 FROM graph_nodes, json_each(graph_nodes.properties_json)
403 WHERE json_each.key IS NOT NULL
404 AND json_each.value IS NOT NULL
405 "#,
406 )?;
407 Ok(())
408}
409
410fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
411 if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
412 || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
413 {
414 return Ok(());
415 }
416 conn.execute_batch(
417 r#"
418 DELETE FROM graph_edge_properties;
419 INSERT INTO graph_edge_properties (edge_key, key, value)
420 SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
421 FROM graph_edges, json_each(graph_edges.properties_json)
422 WHERE graph_edges.edge_key IS NOT NULL
423 AND graph_edges.edge_key <> ''
424 AND json_each.key IS NOT NULL
425 AND json_each.value IS NOT NULL
426 "#,
427 )?;
428 Ok(())
429}
430
431pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
432 let conn = Connection::open_with_flags(
433 db_path,
434 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
435 )
436 .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
437 conn.busy_timeout(Duration::from_secs(5))?;
438 Ok(SqliteReadOnlyConnection {
439 conn,
440 _snapshot_copy: None,
441 recovery: None,
442 })
443}
444
445pub fn open_graph_read_only_connection_resilient(
446 db_path: &Path,
447) -> Result<SqliteReadOnlyConnection> {
448 match open_graph_read_only_connection(db_path).and_then(|connection| {
449 connection
450 .conn
451 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
452 Ok(connection)
453 }) {
454 Ok(connection) => Ok(connection),
455 Err(err) => match read_only_snapshot_recovery(db_path, &err) {
456 Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
457 None => Err(err),
458 },
459 }
460}
461
462fn open_graph_read_only_snapshot(
463 db_path: &Path,
464 recovery: ReadOnlyRecovery,
465) -> Result<SqliteReadOnlyConnection> {
466 let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
467 let conn = Connection::open_with_flags(
468 &snapshot_path,
469 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470 )
471 .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
472 conn.busy_timeout(Duration::from_secs(5))?;
473 Ok(SqliteReadOnlyConnection {
474 conn,
475 _snapshot_copy: Some(SnapshotCopyGuard {
476 paths: cleanup_paths,
477 }),
478 recovery: Some(recovery),
479 })
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
483pub struct SqliteProjectionRefreshPhase {
484 pub name: String,
485 pub duration_micros: u128,
486 pub detail: String,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490pub struct SqliteProjectionRefresh {
491 pub scope: String,
492 pub projection_version: String,
493 pub source_watermark: Option<String>,
494 pub tombstoned_nodes: Vec<String>,
495 pub tombstoned_edges: Vec<String>,
496 pub upserted_nodes: usize,
497 pub upserted_edges: usize,
498 pub unchanged_nodes: usize,
499 pub unchanged_edges: usize,
500 pub upserted_properties: usize,
501 pub unchanged_properties: usize,
502 pub deleted_properties: usize,
503 pub deleted_nodes: usize,
504 pub deleted_edges: usize,
505 pub pruned_tombstones: usize,
506 pub file_size_bytes_before: Option<u64>,
507 pub file_size_bytes_after: Option<u64>,
508 pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
512pub struct SqliteProjectionVersion {
513 pub projection_version: String,
514 pub content_hash: Option<String>,
515 pub source_watermark: Option<String>,
516}
517
518fn sqlite_refresh_phase_timing(
519 name: &str,
520 started: Instant,
521 detail: &str,
522) -> SqliteProjectionRefreshPhase {
523 SqliteProjectionRefreshPhase {
524 name: name.to_string(),
525 duration_micros: started.elapsed().as_micros(),
526 detail: detail.to_string(),
527 }
528}
529
530fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
531 let row = format!(
532 "({})",
533 (0..column_count)
534 .map(|_| "?")
535 .collect::<Vec<_>>()
536 .join(", ")
537 );
538 (0..row_count)
539 .map(|_| row.as_str())
540 .collect::<Vec<_>>()
541 .join(", ")
542}
543
544fn sqlite_stage_all_ids(
545 tx: &rusqlite::Transaction<'_>,
546 nodes: &[GraphNode],
547 edges: &[GraphEdge],
548) -> Result<()> {
549 for chunk in nodes.iter().map(|n| &n.id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
550 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
551 let sql = format!("INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}", placeholders.join(", "));
552 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
553 tx.execute(&sql, params_from_iter(values.iter()))?;
554 }
555 for chunk in edges.iter().map(graph_edge_id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
556 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
557 let sql = format!("INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}", placeholders.join(", "));
558 let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
559 tx.execute(&sql, params_from_iter(values.iter()))?;
560 }
561 Ok(())
562}
563
564fn sqlite_stage_projection_nodes(
565 tx: &rusqlite::Transaction<'_>,
566 nodes: &[&GraphNode],
567 source_watermark: Option<&str>,
568) -> Result<()> {
569 for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
570 let sql = format!(
571 r#"
572 INSERT INTO next_graph_nodes
573 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
574 VALUES {}
575 "#,
576 sqlite_graph_staging_placeholders(8, chunk.len())
577 );
578 let mut values = Vec::with_capacity(chunk.len() * 8);
579 for node in chunk {
580 values.push(Value::Text(node.id.clone()));
581 values.push(Value::Text(node.kind.clone()));
582 values.push(Value::Text(node.label.clone()));
583 values.push(Value::Text(to_json(&node.properties)?));
584 values.push(Value::Text(to_json(&node.provenance)?));
585 values.push(
586 optional_to_json(&node.freshness)?
587 .map(Value::Text)
588 .unwrap_or(Value::Null),
589 );
590 values.push(Value::Text(row_hash(node)?));
591 values.push(
592 source_watermark
593 .map(|watermark| Value::Text(watermark.to_string()))
594 .unwrap_or(Value::Null),
595 );
596 }
597 tx.execute(&sql, params_from_iter(values))?;
598 }
599 Ok(())
600}
601
602fn sqlite_stage_projection_edges(
603 tx: &rusqlite::Transaction<'_>,
604 edges: &[&GraphEdge],
605 source_watermark: Option<&str>,
606) -> Result<()> {
607 for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
608 let sql = format!(
609 r#"
610 INSERT INTO next_graph_edges
611 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
612 VALUES {}
613 "#,
614 sqlite_graph_staging_placeholders(9, chunk.len())
615 );
616 let mut values = Vec::with_capacity(chunk.len() * 9);
617 for edge in chunk {
618 values.push(Value::Text(graph_edge_id(edge)));
619 values.push(Value::Text(edge.from_id.clone()));
620 values.push(Value::Text(edge.to_id.clone()));
621 values.push(Value::Text(edge.kind.clone()));
622 values.push(Value::Text(to_json(&edge.properties)?));
623 values.push(Value::Text(to_json(&edge.provenance)?));
624 values.push(
625 optional_to_json(&edge.freshness)?
626 .map(Value::Text)
627 .unwrap_or(Value::Null),
628 );
629 values.push(Value::Text(row_hash(edge)?));
630 values.push(
631 source_watermark
632 .map(|watermark| Value::Text(watermark.to_string()))
633 .unwrap_or(Value::Null),
634 );
635 }
636 tx.execute(&sql, params_from_iter(values))?;
637 }
638 Ok(())
639}
640
641impl SqliteGraphStore {
642 fn assert_not_in_temp_table_section(&self) {
643 if self.temp_table_active.get() {
644 panic!("SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection");
645 }
646 }
647
648 pub fn open(db_path: &Path) -> Result<Self> {
649 if let Some(parent) = db_path.parent() {
650 std::fs::create_dir_all(parent)
651 .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
652 }
653 let conn = Connection::open(db_path)
654 .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
655 configure_writable_graph_connection(&conn, db_path)?;
656 Self::from_connection(conn)
657 }
658
659 pub fn in_memory() -> Result<Self> {
660 let conn = Connection::open_in_memory()?;
661 conn.busy_timeout(Duration::from_secs(5))?;
662 Self::from_connection(conn)
663 }
664
665 pub fn open_read_only(db_path: &Path) -> Result<Self> {
666 let connection = open_graph_read_only_connection(db_path)?;
667 Self::from_read_only_connection(connection)
668 }
669
670 pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
671 let connection = open_graph_read_only_connection_resilient(db_path)?;
672 Self::from_read_only_connection(connection)
673 }
674
675 pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
676 self.read_only_recovery
677 }
678
679 pub fn has_user_triggers(&self) -> Result<bool> {
680 self.conn
681 .query_row(
682 r#"
683 SELECT EXISTS(
684 SELECT 1
685 FROM sqlite_master
686 WHERE type = 'trigger'
687 AND name NOT LIKE 'sqlite_%'
688 )
689 "#,
690 [],
691 |row| row.get::<_, bool>(0),
692 )
693 .map_err(Into::into)
694 }
695
696 fn from_connection(conn: Connection) -> Result<Self> {
697 conn.pragma_update(None, "foreign_keys", "ON")?;
698 let user_version: i64 =
699 conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
700 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
701 bail!(
702 "graph.db schema version {} is newer than supported version {}",
703 user_version,
704 SQLITE_GRAPH_SCHEMA_VERSION
705 );
706 }
707 conn.execute_batch(
708 r#"
709 CREATE TABLE IF NOT EXISTS graph_nodes (
710 id TEXT PRIMARY KEY,
711 kind TEXT NOT NULL,
712 label TEXT NOT NULL,
713 properties_json TEXT NOT NULL DEFAULT '{}',
714 provenance_json TEXT NOT NULL DEFAULT '[]',
715 freshness_json TEXT,
716 row_hash TEXT,
717 source_watermark TEXT
718 );
719 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
720 ON graph_nodes(kind);
721 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
722 ON graph_nodes(kind, label, id);
723
724 CREATE TABLE IF NOT EXISTS graph_edges (
725 edge_key TEXT NOT NULL UNIQUE,
726 from_id TEXT NOT NULL,
727 to_id TEXT NOT NULL,
728 kind TEXT NOT NULL,
729 properties_json TEXT NOT NULL DEFAULT '{}',
730 provenance_json TEXT NOT NULL DEFAULT '[]',
731 freshness_json TEXT,
732 row_hash TEXT,
733 source_watermark TEXT,
734 PRIMARY KEY (from_id, to_id, kind),
735 FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
736 FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
737 );
738 CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
739 ON graph_edges(from_id, kind);
740 CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
741 ON graph_edges(to_id, kind);
742
743 CREATE TABLE IF NOT EXISTS graph_projection_versions (
744 scope TEXT PRIMARY KEY,
745 projection_version TEXT NOT NULL,
746 content_hash TEXT,
747 source_watermark TEXT,
748 observed_at_unix INTEGER NOT NULL
749 );
750
751 CREATE TABLE IF NOT EXISTS graph_tombstones (
752 row_key TEXT PRIMARY KEY,
753 row_kind TEXT NOT NULL,
754 deleted_at_unix INTEGER NOT NULL
755 );
756
757 CREATE TABLE IF NOT EXISTS graph_node_properties (
758 node_id TEXT NOT NULL,
759 key TEXT NOT NULL,
760 value TEXT NOT NULL,
761 PRIMARY KEY (node_id, key),
762 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
763 );
764 CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
765 ON graph_node_properties(key, value, node_id);
766
767 CREATE TABLE IF NOT EXISTS graph_operator_stats (
768 scope TEXT PRIMARY KEY,
769 nodes INTEGER NOT NULL,
770 edges INTEGER NOT NULL,
771 tombstone_nodes INTEGER NOT NULL,
772 tombstone_edges INTEGER NOT NULL,
773 file_size_bytes INTEGER,
774 freelist_bytes INTEGER,
775 observed_at_unix INTEGER NOT NULL
776 );
777 "#,
778 )?;
779 if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
780 migrate_sqlite_graph_schema(&conn, user_version)?;
781 conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
782 }
783 Ok(Self {
784 conn,
785 _snapshot_copy: None,
786 read_only_recovery: None,
787 temp_table_active: Cell::new(false),
788 })
789 }
790
791 fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
792 connection.conn.pragma_update(None, "foreign_keys", "ON")?;
793 let user_version: i64 =
794 connection
795 .conn
796 .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
797 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
798 bail!(
799 "graph.db schema version {} is newer than supported version {}",
800 user_version,
801 SQLITE_GRAPH_SCHEMA_VERSION
802 );
803 }
804 connection
805 .conn
806 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
807 Ok(Self {
808 conn: connection.conn,
809 _snapshot_copy: connection._snapshot_copy,
810 read_only_recovery: connection.recovery,
811 temp_table_active: Cell::new(false),
812 })
813 }
814
815 pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
816 self.replace_projection_with_version("root", projection, None, None)
817 .map(|_| ())
818 }
819
820 pub fn replace_projection_with_version(
821 &mut self,
822 scope: impl Into<String>,
823 projection: &GraphProjection,
824 projection_version: Option<&str>,
825 source_watermark: Option<String>,
826 ) -> Result<SqliteProjectionRefresh> {
827 self.assert_not_in_temp_table_section();
828 self.temp_table_active.set(true);
829 let scope = scope.into();
830 let result = self.replace_projection_with_version_fallible(scope, projection, projection_version, source_watermark);
831 self.temp_table_active.set(false);
832 if let Ok(ref refresh) = result {
833 let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
834 let autocheckpoint = if total_rows > 10000 {
835 8192
836 } else if total_rows > 1000 {
837 4096
838 } else {
839 2048
840 };
841 let _ = self.conn.pragma_update(None, "wal_autocheckpoint", autocheckpoint);
842 let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
843 }
844 result
845 }
846
847 fn replace_projection_with_version_fallible(
848 &mut self,
849 scope: String,
850 projection: &GraphProjection,
851 projection_version: Option<&str>,
852 source_watermark: Option<String>,
853 ) -> Result<SqliteProjectionRefresh> {
854 let projection_version = projection_version
855 .map(str::to_string)
856 .or_else(|| projection_version_from_nodes(&projection.nodes))
857 .unwrap_or_else(|| "unversioned".to_string());
858 let projection_hash = projection_hash_from_nodes(&projection.nodes);
859 let observed_at_unix = unix_now();
860 let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
861 let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
862 let mut phase_timings = Vec::new();
863
864 let tx = self.conn.transaction()?;
865 let started = Instant::now();
866 tx.execute_batch(
867 r#"
868 CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
869 id TEXT PRIMARY KEY,
870 kind TEXT NOT NULL,
871 label TEXT NOT NULL,
872 properties_json TEXT NOT NULL,
873 provenance_json TEXT NOT NULL,
874 freshness_json TEXT,
875 row_hash TEXT NOT NULL,
876 source_watermark TEXT
877 );
878 CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
879 edge_key TEXT PRIMARY KEY,
880 from_id TEXT NOT NULL,
881 to_id TEXT NOT NULL,
882 kind TEXT NOT NULL,
883 properties_json TEXT NOT NULL,
884 provenance_json TEXT NOT NULL,
885 freshness_json TEXT,
886 row_hash TEXT NOT NULL,
887 source_watermark TEXT
888 );
889 CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
890 ON next_graph_edges(from_id, to_id, kind);
891 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
892 node_id TEXT NOT NULL,
893 key TEXT NOT NULL,
894 value TEXT NOT NULL,
895 PRIMARY KEY (node_id, key)
896 );
897 CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
898 edge_key TEXT NOT NULL,
899 key TEXT NOT NULL,
900 value TEXT NOT NULL,
901 PRIMARY KEY (edge_key, key)
902 );
903 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
904 id TEXT PRIMARY KEY
905 );
906 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
907 edge_key TEXT PRIMARY KEY
908 );
909 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
910 id TEXT PRIMARY KEY
911 );
912 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
913 edge_key TEXT PRIMARY KEY
914 );
915 DELETE FROM next_graph_nodes;
916 DELETE FROM next_graph_edges;
917 DELETE FROM next_graph_node_properties;
918 DELETE FROM next_graph_edge_properties;
919 DELETE FROM next_graph_changed_nodes;
920 DELETE FROM next_graph_changed_edges;
921 DELETE FROM next_graph_all_node_ids;
922 DELETE FROM next_graph_all_edge_keys;
923 "#,
924 )?;
925 phase_timings.push(sqlite_refresh_phase_timing(
926 "sqlite_temp_table_prepare",
927 started,
928 "create and clear refresh staging tables before row loading",
929 ));
930 let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
931 (projection.nodes.iter().collect(), projection.edges.iter().collect(), 0usize, 0usize)
932 } else {
933 let existing_node_hashes: BTreeMap<String, String> = {
934 let mut stmt = tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
935 let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
936 collect_rows(rows)?.into_iter().collect()
937 };
938 let existing_edge_hashes: BTreeMap<String, String> = {
939 let mut stmt = tx.prepare("SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL")?;
940 let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
941 collect_rows(rows)?.into_iter().collect()
942 };
943 let cn: Vec<&GraphNode> = projection.nodes.iter().filter(|n| {
944 let hash = row_hash(*n).ok();
945 hash.as_ref().is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
946 }).collect();
947 let ce: Vec<&GraphEdge> = projection.edges.iter().filter(|e| {
948 let hash = row_hash(*e).ok();
949 let ek = graph_edge_id(e);
950 hash.as_ref().is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
951 }).collect();
952 let sn = projection.nodes.len() - cn.len();
953 let se = projection.edges.len() - ce.len();
954 (cn, ce, sn, se)
955 };
956 {
957 let started = Instant::now();
958 sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
959 sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
960 sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
961 phase_timings.push(sqlite_refresh_phase_timing(
962 "sqlite_node_staging",
963 started,
964 &format!(
965 "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
966 changed_nodes.len(),
967 skipped_nodes,
968 changed_edges.len(),
969 skipped_edges,
970 SQLITE_GRAPH_STAGING_CHUNK_ROWS
971 ),
972 ));
973 }
974 {
975 let started = Instant::now();
976 let changed_nodes_sql = if force_refresh_writes {
977 r#"
978 INSERT INTO next_graph_changed_nodes (id)
979 SELECT id
980 FROM next_graph_nodes
981 "#
982 } else {
983 r#"
984 INSERT INTO next_graph_changed_nodes (id)
985 SELECT n.id
986 FROM next_graph_nodes n
987 LEFT JOIN graph_nodes g ON g.id = n.id
988 WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
989 "#
990 };
991 tx.execute(changed_nodes_sql, [])?;
992 tx.execute_batch(
993 r#"
994 INSERT INTO next_graph_node_properties (node_id, key, value)
995 SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
996 FROM next_graph_nodes n
997 JOIN next_graph_changed_nodes c ON c.id = n.id,
998 json_each(n.properties_json)
999 WHERE json_each.key IS NOT NULL
1000 AND json_each.value IS NOT NULL
1001 "#,
1002 )?;
1003 phase_timings.push(sqlite_refresh_phase_timing(
1004 "sqlite_property_row_staging",
1005 started,
1006 "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1007 ));
1008 }
1009 {
1010 let started = Instant::now();
1011 let changed_edges_sql = if force_refresh_writes {
1012 r#"
1013 INSERT INTO next_graph_changed_edges (edge_key)
1014 SELECT edge_key
1015 FROM next_graph_edges
1016 "#
1017 } else {
1018 r#"
1019 INSERT INTO next_graph_changed_edges (edge_key)
1020 SELECT n.edge_key
1021 FROM next_graph_edges n
1022 LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1023 WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1024 "#
1025 };
1026 tx.execute(changed_edges_sql, [])?;
1027 tx.execute_batch(
1028 r#"
1029 INSERT INTO next_graph_edge_properties (edge_key, key, value)
1030 SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1031 FROM next_graph_edges e
1032 JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1033 json_each(e.properties_json)
1034 WHERE json_each.key IS NOT NULL
1035 AND json_each.value IS NOT NULL
1036 "#,
1037 )?;
1038 phase_timings.push(sqlite_refresh_phase_timing(
1039 "sqlite_edge_property_row_staging",
1040 started,
1041 "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1042 ));
1043 }
1044
1045 let delta_started = Instant::now();
1046 let tombstoned_nodes = {
1047 let sql = if force_refresh_writes {
1048 r#"
1049 SELECT g.id
1050 FROM graph_nodes g
1051 LEFT JOIN next_graph_nodes n ON n.id = g.id
1052 WHERE n.id IS NULL
1053 ORDER BY g.id
1054 "#
1055 } else {
1056 r#"
1057 SELECT g.id
1058 FROM graph_nodes g
1059 LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1060 WHERE n.id IS NULL
1061 ORDER BY g.id
1062 "#
1063 };
1064 let mut stmt = tx.prepare(sql)?;
1065 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1066 };
1067 let tombstoned_edges = {
1068 let sql = if force_refresh_writes {
1069 r#"
1070 SELECT g.edge_key
1071 FROM graph_edges g
1072 LEFT JOIN next_graph_edges n
1073 ON n.edge_key = g.edge_key
1074 WHERE n.edge_key IS NULL
1075 ORDER BY g.edge_key
1076 "#
1077 } else {
1078 r#"
1079 SELECT g.edge_key
1080 FROM graph_edges g
1081 LEFT JOIN next_graph_all_edge_keys n
1082 ON n.edge_key = g.edge_key
1083 WHERE n.edge_key IS NULL
1084 ORDER BY g.edge_key
1085 "#
1086 };
1087 let mut stmt = tx.prepare(sql)?;
1088 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1089 };
1090 let unchanged_nodes: usize = if force_refresh_writes {
1091 tx.query_row(
1092 r#"
1093 SELECT COUNT(*)
1094 FROM next_graph_nodes n
1095 JOIN graph_nodes g ON g.id = n.id
1096 WHERE g.row_hash = n.row_hash
1097 "#,
1098 [],
1099 |row| row.get(0),
1100 )?
1101 } else {
1102 skipped_nodes
1103 };
1104 let unchanged_edges: usize = if force_refresh_writes {
1105 tx.query_row(
1106 r#"
1107 SELECT COUNT(*)
1108 FROM next_graph_edges n
1109 JOIN graph_edges g
1110 ON g.edge_key = n.edge_key
1111 WHERE g.row_hash = n.row_hash
1112 "#,
1113 [],
1114 |row| row.get(0),
1115 )?
1116 } else {
1117 skipped_edges
1118 };
1119 let reused_owner_node_properties: usize = if force_refresh_writes {
1120 tx.query_row(
1121 r#"
1122 SELECT COUNT(*)
1123 FROM graph_node_properties g
1124 JOIN next_graph_nodes n ON n.id = g.node_id
1125 LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1126 WHERE c.id IS NULL
1127 "#,
1128 [],
1129 |row| row.get(0),
1130 )?
1131 } else {
1132 tx.query_row(
1133 r#"
1134 SELECT COUNT(*)
1135 FROM graph_node_properties g
1136 JOIN next_graph_all_node_ids a ON a.id = g.node_id
1137 LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1138 WHERE c.id IS NULL
1139 "#,
1140 [],
1141 |row| row.get(0),
1142 )?
1143 };
1144 let reused_owner_edge_properties: usize = if force_refresh_writes {
1145 tx.query_row(
1146 r#"
1147 SELECT COUNT(*)
1148 FROM graph_edge_properties g
1149 JOIN next_graph_edges n ON n.edge_key = g.edge_key
1150 LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1151 WHERE c.edge_key IS NULL
1152 "#,
1153 [],
1154 |row| row.get(0),
1155 )?
1156 } else {
1157 tx.query_row(
1158 r#"
1159 SELECT COUNT(*)
1160 FROM graph_edge_properties g
1161 JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1162 LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1163 WHERE c.edge_key IS NULL
1164 "#,
1165 [],
1166 |row| row.get(0),
1167 )?
1168 };
1169 let unchanged_changed_node_properties: usize = tx.query_row(
1170 r#"
1171 SELECT COUNT(*)
1172 FROM next_graph_node_properties n
1173 JOIN graph_node_properties g
1174 ON g.node_id = n.node_id AND g.key = n.key
1175 WHERE g.value = n.value
1176 "#,
1177 [],
1178 |row| row.get(0),
1179 )?;
1180 let unchanged_changed_edge_properties: usize = tx.query_row(
1181 r#"
1182 SELECT COUNT(*)
1183 FROM next_graph_edge_properties n
1184 JOIN graph_edge_properties g
1185 ON g.edge_key = n.edge_key AND g.key = n.key
1186 WHERE g.value = n.value
1187 "#,
1188 [],
1189 |row| row.get(0),
1190 )?;
1191 let unchanged_properties = reused_owner_node_properties
1192 + reused_owner_edge_properties
1193 + unchanged_changed_node_properties
1194 + unchanged_changed_edge_properties;
1195
1196 let deleted_edges = if force_refresh_writes {
1197 tx.execute(
1198 r#"
1199 DELETE FROM graph_edges
1200 WHERE NOT EXISTS (
1201 SELECT 1
1202 FROM next_graph_edges n
1203 WHERE n.edge_key = graph_edges.edge_key
1204 )
1205 "#,
1206 [],
1207 )?
1208 } else {
1209 tx.execute(
1210 r#"
1211 DELETE FROM graph_edges
1212 WHERE NOT EXISTS (
1213 SELECT 1
1214 FROM next_graph_all_edge_keys n
1215 WHERE n.edge_key = graph_edges.edge_key
1216 )
1217 "#,
1218 [],
1219 )?
1220 };
1221 let deleted_nodes = if force_refresh_writes {
1222 tx.execute(
1223 r#"
1224 DELETE FROM graph_nodes
1225 WHERE NOT EXISTS (
1226 SELECT 1
1227 FROM next_graph_nodes n
1228 WHERE n.id = graph_nodes.id
1229 )
1230 "#,
1231 [],
1232 )?
1233 } else {
1234 tx.execute(
1235 r#"
1236 DELETE FROM graph_nodes
1237 WHERE NOT EXISTS (
1238 SELECT 1
1239 FROM next_graph_all_node_ids n
1240 WHERE n.id = graph_nodes.id
1241 )
1242 "#,
1243 [],
1244 )?
1245 };
1246
1247 let upsert_nodes_sql = r#"
1248 INSERT INTO graph_nodes
1249 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1250 SELECT
1251 n.id,
1252 n.kind,
1253 n.label,
1254 n.properties_json,
1255 n.provenance_json,
1256 n.freshness_json,
1257 n.row_hash,
1258 n.source_watermark
1259 FROM next_graph_nodes n
1260 JOIN next_graph_changed_nodes c ON c.id = n.id
1261 WHERE true
1262 ON CONFLICT(id) DO UPDATE SET
1263 kind = excluded.kind,
1264 label = excluded.label,
1265 properties_json = excluded.properties_json,
1266 provenance_json = excluded.provenance_json,
1267 freshness_json = excluded.freshness_json,
1268 row_hash = excluded.row_hash,
1269 source_watermark = excluded.source_watermark
1270 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1271 "#;
1272 tx.execute(upsert_nodes_sql, [])?;
1273 let upsert_edges_sql = r#"
1274 INSERT INTO graph_edges
1275 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1276 SELECT
1277 n.edge_key,
1278 n.from_id,
1279 n.to_id,
1280 n.kind,
1281 n.properties_json,
1282 n.provenance_json,
1283 n.freshness_json,
1284 n.row_hash,
1285 n.source_watermark
1286 FROM next_graph_edges n
1287 JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1288 WHERE true
1289 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1290 edge_key = excluded.edge_key,
1291 properties_json = excluded.properties_json,
1292 provenance_json = excluded.provenance_json,
1293 freshness_json = excluded.freshness_json,
1294 row_hash = excluded.row_hash,
1295 source_watermark = excluded.source_watermark
1296 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1297 "#;
1298 tx.execute(upsert_edges_sql, [])?;
1299 let deleted_node_properties = tx.execute(
1300 r#"
1301 DELETE FROM graph_node_properties
1302 WHERE EXISTS (
1303 SELECT 1
1304 FROM next_graph_changed_nodes c
1305 WHERE c.id = graph_node_properties.node_id
1306 )
1307 AND NOT EXISTS (
1308 SELECT 1
1309 FROM next_graph_node_properties n
1310 WHERE n.node_id = graph_node_properties.node_id
1311 AND n.key = graph_node_properties.key
1312 )
1313 "#,
1314 [],
1315 )?;
1316 let deleted_edge_properties = tx.execute(
1317 r#"
1318 DELETE FROM graph_edge_properties
1319 WHERE EXISTS (
1320 SELECT 1
1321 FROM next_graph_changed_edges c
1322 WHERE c.edge_key = graph_edge_properties.edge_key
1323 )
1324 AND NOT EXISTS (
1325 SELECT 1
1326 FROM next_graph_edge_properties n
1327 WHERE n.edge_key = graph_edge_properties.edge_key
1328 AND n.key = graph_edge_properties.key
1329 )
1330 "#,
1331 [],
1332 )?;
1333 let deleted_properties = deleted_node_properties + deleted_edge_properties;
1334 let upsert_properties_sql = r#"
1335 INSERT INTO graph_node_properties (node_id, key, value)
1336 SELECT n.node_id, n.key, n.value
1337 FROM next_graph_node_properties n
1338 LEFT JOIN graph_node_properties g
1339 ON g.node_id = n.node_id AND g.key = n.key
1340 WHERE g.node_id IS NULL OR g.value IS NOT n.value
1341 ON CONFLICT(node_id, key) DO UPDATE SET
1342 value = excluded.value
1343 WHERE graph_node_properties.value IS NOT excluded.value
1344 "#;
1345 let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1346 let upsert_edge_properties_sql = r#"
1347 INSERT INTO graph_edge_properties (edge_key, key, value)
1348 SELECT n.edge_key, n.key, n.value
1349 FROM next_graph_edge_properties n
1350 LEFT JOIN graph_edge_properties g
1351 ON g.edge_key = n.edge_key AND g.key = n.key
1352 WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1353 ON CONFLICT(edge_key, key) DO UPDATE SET
1354 value = excluded.value
1355 WHERE graph_edge_properties.value IS NOT excluded.value
1356 "#;
1357 let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1358 let upserted_properties = upserted_node_properties + upserted_edge_properties;
1359 tx.execute(
1360 r#"
1361 INSERT INTO graph_projection_versions
1362 (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1363 VALUES (?1, ?2, ?3, ?4, ?5)
1364 ON CONFLICT(scope) DO UPDATE SET
1365 projection_version = excluded.projection_version,
1366 content_hash = excluded.content_hash,
1367 source_watermark = excluded.source_watermark,
1368 observed_at_unix = excluded.observed_at_unix
1369 "#,
1370 (
1371 &scope,
1372 &projection_version,
1373 &projection_hash,
1374 &source_watermark,
1375 observed_at_unix,
1376 ),
1377 )?;
1378 let pruned_node_tombstones = tx.execute(
1379 r#"
1380 DELETE FROM graph_tombstones
1381 WHERE row_kind = 'node'
1382 AND EXISTS (
1383 SELECT 1
1384 FROM next_graph_nodes n
1385 WHERE n.id = substr(graph_tombstones.row_key, 6)
1386 )
1387 "#,
1388 [],
1389 )?;
1390 let pruned_edge_tombstones = tx.execute(
1391 r#"
1392 DELETE FROM graph_tombstones
1393 WHERE row_kind = 'edge'
1394 AND EXISTS (
1395 SELECT 1
1396 FROM next_graph_edges n
1397 WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1398 )
1399 "#,
1400 [],
1401 )?;
1402 {
1403 let mut insert_node_tombstone = tx.prepare(
1404 r#"
1405 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1406 VALUES (?1, 'node', ?2)
1407 ON CONFLICT(row_key) DO UPDATE SET
1408 row_kind = excluded.row_kind,
1409 deleted_at_unix = excluded.deleted_at_unix
1410 "#,
1411 )?;
1412 for id in &tombstoned_nodes {
1413 insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1414 }
1415 }
1416 {
1417 let mut insert_edge_tombstone = tx.prepare(
1418 r#"
1419 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1420 VALUES (?1, 'edge', ?2)
1421 ON CONFLICT(row_key) DO UPDATE SET
1422 row_kind = excluded.row_kind,
1423 deleted_at_unix = excluded.deleted_at_unix
1424 "#,
1425 )?;
1426 for key in &tombstoned_edges {
1427 insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1428 }
1429 }
1430 let tombstone_node_count: usize = tx.query_row(
1431 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1432 [],
1433 |row| row.get(0),
1434 )?;
1435 let tombstone_edge_count: usize = tx.query_row(
1436 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1437 [],
1438 |row| row.get(0),
1439 )?;
1440 tx.execute(
1441 r#"
1442 INSERT INTO graph_operator_stats
1443 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1444 VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1445 ON CONFLICT(scope) DO UPDATE SET
1446 nodes = excluded.nodes,
1447 edges = excluded.edges,
1448 tombstone_nodes = excluded.tombstone_nodes,
1449 tombstone_edges = excluded.tombstone_edges,
1450 file_size_bytes = excluded.file_size_bytes,
1451 freelist_bytes = excluded.freelist_bytes,
1452 observed_at_unix = excluded.observed_at_unix
1453 "#,
1454 (
1455 &scope,
1456 projection.nodes.len() as i64,
1457 projection.edges.len() as i64,
1458 tombstone_node_count as i64,
1459 tombstone_edge_count as i64,
1460 observed_at_unix,
1461 ),
1462 )?;
1463 phase_timings.push(sqlite_refresh_phase_timing(
1464 "sqlite_delta_write",
1465 delta_started,
1466 "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1467 ));
1468 let commit_started = Instant::now();
1469 tx.commit()?;
1470 phase_timings.push(sqlite_refresh_phase_timing(
1471 "sqlite_commit",
1472 commit_started,
1473 "commit refresh transaction and publish old-or-new graph visibility",
1474 ));
1475 let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1476 let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1477 let stats_started = Instant::now();
1478 self.conn.execute(
1479 r#"
1480 UPDATE graph_operator_stats
1481 SET file_size_bytes = ?2,
1482 freelist_bytes = ?3,
1483 observed_at_unix = ?4
1484 WHERE scope = ?1
1485 "#,
1486 (
1487 &scope,
1488 file_size_bytes_after.map(|value| value as i64),
1489 freelist_bytes_after.map(|value| value as i64),
1490 unix_now(),
1491 ),
1492 )?;
1493 phase_timings.push(sqlite_refresh_phase_timing(
1494 "sqlite_stats_cache_update",
1495 stats_started,
1496 "persist post-commit file and freelist proof for status/doctor",
1497 ));
1498 Ok(SqliteProjectionRefresh {
1499 scope,
1500 projection_version,
1501 source_watermark,
1502 upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1503 upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1504 unchanged_nodes,
1505 unchanged_edges,
1506 upserted_properties,
1507 unchanged_properties,
1508 deleted_properties,
1509 deleted_nodes,
1510 deleted_edges,
1511 pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1512 file_size_bytes_before,
1513 file_size_bytes_after,
1514 tombstoned_nodes,
1515 tombstoned_edges,
1516 phase_timings,
1517 })
1518 }
1519
1520 pub fn derive_ontology(&self) -> Result<GraphProjection> {
1529 let mut projection = GraphProjection::default();
1530
1531 let mut node_stmt = self.conn.prepare(
1532 "SELECT kind, COUNT(*) FROM graph_nodes \
1533 WHERE kind != 'ontology_type' \
1534 GROUP BY kind ORDER BY kind",
1535 )?;
1536 let node_rows =
1537 node_stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)))?;
1538 for row in node_rows {
1539 let (kind, count) = row?;
1540 projection.nodes.push(
1541 GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1542 .with_property("type_kind", &kind)
1543 .with_property("instance_count", count.to_string())
1544 .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1545 );
1546 }
1547
1548 let mut rel_stmt = self.conn.prepare(
1549 "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1550 FROM graph_edges e \
1551 JOIN graph_nodes n1 ON e.from_id = n1.id \
1552 JOIN graph_nodes n2 ON e.to_id = n2.id \
1553 WHERE e.kind NOT LIKE 'ontology_relation:%' \
1554 AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1555 GROUP BY n1.kind, e.kind, n2.kind \
1556 ORDER BY n1.kind, e.kind, n2.kind",
1557 )?;
1558 let rel_rows = rel_stmt.query_map([], |row| {
1559 Ok((
1560 row.get::<_, String>(0)?,
1561 row.get::<_, String>(1)?,
1562 row.get::<_, String>(2)?,
1563 row.get::<_, i64>(3)?,
1564 ))
1565 })?;
1566 for row in rel_rows {
1567 let (from_kind, edge_kind, to_kind, count) = row?;
1568 projection.edges.push(
1569 GraphEdge::new(
1570 format!("ontology_type:{from_kind}"),
1571 format!("ontology_type:{to_kind}"),
1572 format!("ontology_relation:{edge_kind}"),
1573 )
1574 .with_property("edge_kind", &edge_kind)
1575 .with_property("instance_count", count.to_string())
1576 .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1577 );
1578 }
1579
1580 Ok(projection)
1581 }
1582
1583 pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1584 let tx = self.conn.transaction()?;
1585 {
1586 let mut insert_node = tx.prepare(
1587 r#"
1588 INSERT INTO graph_nodes
1589 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1590 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1591 ON CONFLICT(id) DO UPDATE SET
1592 kind = excluded.kind,
1593 label = excluded.label,
1594 properties_json = excluded.properties_json,
1595 provenance_json = excluded.provenance_json,
1596 freshness_json = excluded.freshness_json,
1597 row_hash = excluded.row_hash,
1598 source_watermark = excluded.source_watermark
1599 "#,
1600 )?;
1601 let mut delete_properties =
1602 tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1603 let mut insert_property = tx.prepare(
1604 r#"
1605 INSERT INTO graph_node_properties (node_id, key, value)
1606 VALUES (?1, ?2, ?3)
1607 "#,
1608 )?;
1609 for node in &projection.nodes {
1610 insert_node.execute((
1611 &node.id,
1612 &node.kind,
1613 &node.label,
1614 to_json(&node.properties)?,
1615 to_json(&node.provenance)?,
1616 optional_to_json(&node.freshness)?,
1617 row_hash(node)?,
1618 ))?;
1619 delete_properties.execute([&node.id])?;
1620 for (key, value) in &node.properties {
1621 insert_property.execute((&node.id, key, value))?;
1622 }
1623 }
1624 }
1625 {
1626 let mut insert_edge = tx.prepare(
1627 r#"
1628 INSERT INTO graph_edges
1629 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1630 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1631 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1632 edge_key = excluded.edge_key,
1633 properties_json = excluded.properties_json,
1634 provenance_json = excluded.provenance_json,
1635 freshness_json = excluded.freshness_json,
1636 row_hash = excluded.row_hash,
1637 source_watermark = excluded.source_watermark
1638 "#,
1639 )?;
1640 let mut delete_properties =
1641 tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1642 let mut insert_property = tx.prepare(
1643 r#"
1644 INSERT INTO graph_edge_properties (edge_key, key, value)
1645 VALUES (?1, ?2, ?3)
1646 "#,
1647 )?;
1648 for edge in &projection.edges {
1649 let edge_key = graph_edge_id(edge);
1650 insert_edge.execute((
1651 &edge_key,
1652 &edge.from_id,
1653 &edge.to_id,
1654 &edge.kind,
1655 to_json(&edge.properties)?,
1656 to_json(&edge.provenance)?,
1657 optional_to_json(&edge.freshness)?,
1658 row_hash(edge)?,
1659 ))?;
1660 delete_properties.execute([&edge_key])?;
1661 for (key, value) in &edge.properties {
1662 insert_property.execute((&edge_key, key, value))?;
1663 }
1664 }
1665 }
1666 tx.commit()?;
1667 Ok(())
1668 }
1669
1670 pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1671 self.conn
1672 .query_row(
1673 r#"
1674 SELECT projection_version, content_hash, source_watermark
1675 FROM graph_projection_versions
1676 WHERE scope = ?1
1677 "#,
1678 [scope],
1679 |row| {
1680 Ok(SqliteProjectionVersion {
1681 projection_version: row.get(0)?,
1682 content_hash: row.get(1)?,
1683 source_watermark: row.get(2)?,
1684 })
1685 },
1686 )
1687 .optional()
1688 .map_err(Into::into)
1689 }
1690
1691 pub fn update_projection_source_watermark(
1692 &mut self,
1693 scope: &str,
1694 source_watermark: Option<String>,
1695 ) -> Result<()> {
1696 self.conn.execute(
1697 r#"
1698 UPDATE graph_projection_versions
1699 SET source_watermark = ?2
1700 WHERE scope = ?1
1701 "#,
1702 (scope, source_watermark),
1703 )?;
1704 Ok(())
1705 }
1706
1707 pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1708 let pruned_tombstones = if prune_tombstones {
1709 self.conn.execute("DELETE FROM graph_tombstones", [])?
1710 } else {
1711 0
1712 };
1713 self.conn.execute_batch(
1714 r#"
1715 PRAGMA wal_checkpoint(TRUNCATE);
1716 VACUUM;
1717 "#,
1718 )?;
1719 let nodes = self
1720 .conn
1721 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1722 row.get::<_, i64>(0)
1723 })?;
1724 let edges = self
1725 .conn
1726 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1727 row.get::<_, i64>(0)
1728 })?;
1729 let tombstone_nodes = self.conn.query_row(
1730 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1731 [],
1732 |row| row.get::<_, i64>(0),
1733 )?;
1734 let tombstone_edges = self.conn.query_row(
1735 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1736 [],
1737 |row| row.get::<_, i64>(0),
1738 )?;
1739 let file_size_bytes = sqlite_database_size_bytes(&self.conn)
1740 .ok()
1741 .map(|value| value as i64);
1742 let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
1743 .ok()
1744 .map(|value| value as i64);
1745 self.conn.execute(
1746 r#"
1747 INSERT INTO graph_operator_stats
1748 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1749 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
1750 ON CONFLICT(scope) DO UPDATE SET
1751 nodes = excluded.nodes,
1752 edges = excluded.edges,
1753 tombstone_nodes = excluded.tombstone_nodes,
1754 tombstone_edges = excluded.tombstone_edges,
1755 file_size_bytes = excluded.file_size_bytes,
1756 freelist_bytes = excluded.freelist_bytes,
1757 observed_at_unix = excluded.observed_at_unix
1758 "#,
1759 (
1760 scope,
1761 nodes,
1762 edges,
1763 tombstone_nodes,
1764 tombstone_edges,
1765 file_size_bytes,
1766 freelist_bytes,
1767 ),
1768 )?;
1769 Ok(pruned_tombstones)
1770 }
1771
1772 fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
1773 let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
1774 let in_clause = placeholders.join(", ");
1775 let sql = format!(
1776 "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
1777 FROM graph_edges e \
1778 WHERE e.from_id IN ({in_clause}) \
1779 AND e.to_id IN ({in_clause}) \
1780 ORDER BY e.from_id, e.kind, e.to_id"
1781 );
1782 let values: Vec<Value> = node_ids
1783 .iter()
1784 .chain(node_ids.iter())
1785 .map(|id| Value::Text(id.clone()))
1786 .collect();
1787 let mut stmt = self.conn.prepare(&sql)?;
1788 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
1789 }
1790}
1791
1792fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
1793 let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
1794 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
1795 row.get::<_, String>(3)
1796 })?)
1797}
1798
1799fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
1800 let mut diagnostics = vec![format!(
1801 "sqlite query pushdown active; plan: {}",
1802 plan.join(" | ")
1803 )];
1804 for expected_index in expected_indexes {
1805 if plan.iter().any(|row| row.contains(expected_index)) {
1806 diagnostics.push(format!("sqlite query plan uses {expected_index}"));
1807 } else {
1808 diagnostics.push(format!(
1809 "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
1810 ));
1811 }
1812 }
1813 diagnostics
1814}
1815
1816#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1817pub struct TerseDiagnostic {
1818 pub code: &'static str,
1819 #[serde(skip_serializing_if = "Option::is_none")]
1820 pub index: Option<String>,
1821}
1822
1823#[allow(dead_code)]
1824fn terse_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<TerseDiagnostic> {
1825 let mut diagnostics = vec![TerseDiagnostic {
1826 code: "plan_active",
1827 index: None,
1828 }];
1829 for expected_index in expected_indexes {
1830 if plan.iter().any(|row| row.contains(expected_index)) {
1831 diagnostics.push(TerseDiagnostic {
1832 code: "idx_ok",
1833 index: Some(expected_index.to_string()),
1834 });
1835 } else {
1836 diagnostics.push(TerseDiagnostic {
1837 code: "idx_missing",
1838 index: Some(expected_index.to_string()),
1839 });
1840 }
1841 }
1842 diagnostics
1843}
1844
1845fn push_sqlite_property_filter_exists(
1846 sql: &mut String,
1847 values: &mut Vec<Value>,
1848 node_alias: &str,
1849 filters: &[GraphPropertyFilter],
1850) {
1851 for (index, filter) in filters.iter().enumerate() {
1852 sql.push_str(&format!(
1853 r#"
1854 AND EXISTS (
1855 SELECT 1
1856 FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
1857 WHERE p{index}.node_id = {node_alias}.id
1858 AND p{index}.key = ?
1859 AND p{index}.value = ?
1860 )
1861 "#
1862 ));
1863 values.push(Value::Text(filter.key.clone()));
1864 values.push(Value::Text(filter.value.clone()));
1865 }
1866}
1867
1868fn push_sqlite_edge_property_filter_exists(
1869 sql: &mut String,
1870 values: &mut Vec<Value>,
1871 edge_alias: &str,
1872 filters: &[GraphPropertyFilter],
1873) {
1874 for (index, filter) in filters.iter().enumerate() {
1875 sql.push_str(&format!(
1876 r#"
1877 AND EXISTS (
1878 SELECT 1
1879 FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
1880 WHERE ep{index}.edge_key = {edge_alias}.edge_key
1881 AND ep{index}.key = ?
1882 AND ep{index}.value = ?
1883 )
1884 "#
1885 ));
1886 values.push(Value::Text(filter.key.clone()));
1887 values.push(Value::Text(filter.value.clone()));
1888 }
1889}
1890
1891struct SqliteIncidentEdgeBranch<'a> {
1892 index_name: &'a str,
1893 endpoint_column: &'a str,
1894 node_id: &'a str,
1895 kind: Option<&'a str>,
1896 filters: &'a [GraphPropertyFilter],
1897 cursor: Option<&'a str>,
1898}
1899
1900fn push_sqlite_incident_edge_branch(
1901 sql: &mut String,
1902 values: &mut Vec<Value>,
1903 branch: SqliteIncidentEdgeBranch<'_>,
1904) {
1905 sql.push_str(&format!(
1906 r#"
1907 SELECT
1908 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
1909 FROM graph_edges e INDEXED BY {index_name}
1910 WHERE e.{endpoint_column} = ?
1911 "#,
1912 index_name = branch.index_name,
1913 endpoint_column = branch.endpoint_column,
1914 ));
1915 values.push(Value::Text(branch.node_id.to_string()));
1916 if let Some(kind) = branch.kind {
1917 sql.push_str(" AND e.kind = ?");
1918 values.push(Value::Text(kind.to_string()));
1919 }
1920 push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
1921 if let Some(cursor) = branch.cursor {
1922 sql.push_str(" AND e.edge_key > ?");
1923 values.push(Value::Text(cursor.to_string()));
1924 }
1925}
1926
1927fn sqlite_incident_edges_union_query(
1928 node_id: &str,
1929 kind: Option<&str>,
1930 filters: &[GraphPropertyFilter],
1931 cursor: Option<&str>,
1932 limit: Option<usize>,
1933) -> (String, Vec<Value>) {
1934 let mut sql = String::from(
1935 r#"
1936 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1937 FROM (
1938 "#,
1939 );
1940 let mut values = Vec::new();
1941 push_sqlite_incident_edge_branch(
1942 &mut sql,
1943 &mut values,
1944 SqliteIncidentEdgeBranch {
1945 index_name: "idx_graph_edges_from_kind",
1946 endpoint_column: "from_id",
1947 node_id,
1948 kind,
1949 filters,
1950 cursor,
1951 },
1952 );
1953 sql.push_str(" UNION ");
1954 push_sqlite_incident_edge_branch(
1955 &mut sql,
1956 &mut values,
1957 SqliteIncidentEdgeBranch {
1958 index_name: "idx_graph_edges_to_kind",
1959 endpoint_column: "to_id",
1960 node_id,
1961 kind,
1962 filters,
1963 cursor,
1964 },
1965 );
1966 sql.push_str(
1967 r#"
1968 ) e
1969 ORDER BY e.edge_key
1970 "#,
1971 );
1972 if let Some(limit) = limit {
1973 sql.push_str(" LIMIT ?");
1974 values.push(Value::Integer(limit.saturating_add(1) as i64));
1975 }
1976 (sql, values)
1977}
1978
1979impl GraphStore for SqliteGraphStore {
1980 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
1981 self.conn.execute(
1982 r#"
1983 INSERT INTO graph_nodes
1984 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1985 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1986 ON CONFLICT(id) DO UPDATE SET
1987 kind = excluded.kind,
1988 label = excluded.label,
1989 properties_json = excluded.properties_json,
1990 provenance_json = excluded.provenance_json,
1991 freshness_json = excluded.freshness_json,
1992 row_hash = excluded.row_hash,
1993 source_watermark = excluded.source_watermark
1994 "#,
1995 (
1996 &node.id,
1997 &node.kind,
1998 &node.label,
1999 to_json(&node.properties)?,
2000 to_json(&node.provenance)?,
2001 optional_to_json(&node.freshness)?,
2002 row_hash(node)?,
2003 ),
2004 )?;
2005 replace_node_properties(&self.conn, &node.id, &node.properties)?;
2006 Ok(())
2007 }
2008
2009 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2010 let edge_key = graph_edge_id(edge);
2011 self.conn.execute(
2012 r#"
2013 INSERT INTO graph_edges
2014 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2015 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2016 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2017 edge_key = excluded.edge_key,
2018 properties_json = excluded.properties_json,
2019 provenance_json = excluded.provenance_json,
2020 freshness_json = excluded.freshness_json,
2021 row_hash = excluded.row_hash,
2022 source_watermark = excluded.source_watermark
2023 "#,
2024 (
2025 &edge_key,
2026 &edge.from_id,
2027 &edge.to_id,
2028 &edge.kind,
2029 to_json(&edge.properties)?,
2030 to_json(&edge.provenance)?,
2031 optional_to_json(&edge.freshness)?,
2032 row_hash(edge)?,
2033 ),
2034 )?;
2035 replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2036 Ok(())
2037 }
2038
2039 fn delete_node(&self, id: &str) -> Result<usize> {
2040 self.conn
2041 .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2042 .map_err(Into::into)
2043 }
2044
2045 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2046 self.conn
2047 .execute(
2048 "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2049 (from_id, to_id, kind),
2050 )
2051 .map_err(Into::into)
2052 }
2053
2054 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2055 self.conn
2056 .query_row(
2057 r#"
2058 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2059 FROM graph_nodes
2060 WHERE id = ?1
2061 "#,
2062 [id],
2063 node_from_row,
2064 )
2065 .optional()
2066 .map_err(Into::into)
2067 }
2068
2069 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2070 let mut stmt = self.conn.prepare(
2071 r#"
2072 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2073 FROM graph_nodes
2074 ORDER BY id
2075 "#,
2076 )?;
2077 collect_rows(stmt.query_map([], node_from_row)?)
2078 }
2079
2080 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2081 let mut stmt = self.conn.prepare(
2082 r#"
2083 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2084 FROM graph_edges
2085 ORDER BY from_id, kind, to_id
2086 "#,
2087 )?;
2088 collect_rows(stmt.query_map([], edge_from_row)?)
2089 }
2090
2091 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2092 self.conn
2093 .query_row(
2094 r#"
2095 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2096 FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2097 WHERE edge_key = ?1
2098 "#,
2099 [edge_id],
2100 edge_from_row,
2101 )
2102 .optional()
2103 .map_err(Into::into)
2104 }
2105
2106 fn graph_counts(&self) -> Result<(usize, usize)> {
2107 let nodes = self
2108 .conn
2109 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2110 row.get::<_, usize>(0)
2111 })?;
2112 let edges = self
2113 .conn
2114 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2115 row.get::<_, usize>(0)
2116 })?;
2117 Ok((nodes, edges))
2118 }
2119
2120 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2121 match kind {
2122 Some(kind) => self
2123 .conn
2124 .query_row(
2125 r#"
2126 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2127 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2128 WHERE from_id <> to_id AND kind = ?1
2129 ORDER BY from_id, kind, to_id
2130 LIMIT 1
2131 "#,
2132 [kind],
2133 edge_from_row,
2134 )
2135 .optional()
2136 .map_err(Into::into),
2137 None => self
2138 .conn
2139 .query_row(
2140 r#"
2141 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2142 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2143 WHERE from_id <> to_id
2144 ORDER BY from_id, kind, to_id
2145 LIMIT 1
2146 "#,
2147 [],
2148 edge_from_row,
2149 )
2150 .optional()
2151 .map_err(Into::into),
2152 }
2153 }
2154
2155 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2156 self.conn
2157 .query_row(
2158 r#"
2159 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2160 ep.key, ep.value
2161 FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2162 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2163 ON e.edge_key = ep.edge_key
2164 WHERE e.from_id <> e.to_id
2165 ORDER BY ep.key, ep.value, ep.edge_key
2166 LIMIT 1
2167 "#,
2168 [],
2169 |row| {
2170 Ok((
2171 edge_from_row(row)?,
2172 GraphPropertyFilter {
2173 key: row.get(7)?,
2174 value: row.get(8)?,
2175 },
2176 ))
2177 },
2178 )
2179 .optional()
2180 .map_err(Into::into)
2181 }
2182
2183 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2184 let mut stmt = self.conn.prepare(
2185 r#"
2186 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2187 FROM graph_nodes
2188 WHERE kind = ?1
2189 ORDER BY id
2190 "#,
2191 )?;
2192 collect_rows(stmt.query_map([kind], node_from_row)?)
2193 }
2194
2195 fn paged_nodes_by_kind(
2196 &self,
2197 kind: &str,
2198 options: GraphQueryOptions,
2199 ) -> Result<GraphPagedSubgraph> {
2200 let mut sql = String::from(
2201 r#"
2202 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2203 FROM graph_nodes
2204 WHERE kind = ?
2205 "#,
2206 );
2207 let mut values = vec![Value::Text(kind.to_string())];
2208 push_sqlite_property_filter_exists(
2209 &mut sql,
2210 &mut values,
2211 "graph_nodes",
2212 &options.property_filters,
2213 );
2214 if let Some(cursor) = &options.cursor {
2215 sql.push_str(" AND id > ?");
2216 values.push(Value::Text(cursor.clone()));
2217 }
2218 sql.push_str(" ORDER BY id");
2219 if let Some(limit) = options.limit {
2220 sql.push_str(" LIMIT ?");
2221 values.push(Value::Integer(limit.saturating_add(1) as i64));
2222 }
2223
2224 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2225 let mut stmt = self.conn.prepare(&sql)?;
2226 let mut nodes =
2227 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2228 let before_limit = nodes.len();
2229 let mut next_cursor = None;
2230 if let Some(limit) = options.limit
2231 && nodes.len() > limit
2232 {
2233 next_cursor = nodes
2234 .get(limit.saturating_sub(1))
2235 .map(|node| node.id.clone());
2236 nodes.truncate(limit);
2237 }
2238 let expected_indexes = if options.property_filters.is_empty() {
2239 vec!["idx_graph_nodes_kind"]
2240 } else {
2241 vec![
2242 "idx_graph_nodes_kind",
2243 "idx_graph_node_properties_key_value_node",
2244 ]
2245 };
2246 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2247 if !options.property_filters.is_empty() {
2248 diagnostics.push(
2249 "property filters were evaluated by SQLite materialized property rows before paging"
2250 .to_string(),
2251 );
2252 }
2253 if options.cursor.is_some() {
2254 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2255 }
2256 if next_cursor.is_some() {
2257 diagnostics.push(
2258 "result was truncated; pass page.next_cursor as --cursor for the next page"
2259 .to_string(),
2260 );
2261 }
2262 Ok(GraphPagedSubgraph {
2263 page: GraphQueryPage {
2264 cursor: options.cursor,
2265 limit: options.limit,
2266 next_cursor,
2267 returned_nodes: nodes.len(),
2268 returned_edges: 0,
2269 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2270 diagnostics,
2271 },
2272 nodes,
2273 edges: Vec::new(),
2274 })
2275 }
2276
2277 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2278 match kind {
2279 Some(kind) => {
2280 let mut stmt = self.conn.prepare(
2281 r#"
2282 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2283 FROM graph_edges
2284 WHERE from_id = ?1 AND kind = ?2
2285 ORDER BY to_id, kind
2286 "#,
2287 )?;
2288 collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2289 }
2290 None => {
2291 let mut stmt = self.conn.prepare(
2292 r#"
2293 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2294 FROM graph_edges
2295 WHERE from_id = ?1
2296 ORDER BY to_id, kind
2297 "#,
2298 )?;
2299 collect_rows(stmt.query_map([from_id], edge_from_row)?)
2300 }
2301 }
2302 }
2303
2304 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2305 let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2306 let mut stmt = self.conn.prepare(&sql)?;
2307 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2308 }
2309
2310 fn paged_edges(
2311 &self,
2312 kind: Option<&str>,
2313 options: GraphQueryOptions,
2314 ) -> Result<GraphPagedSubgraph> {
2315 let primary_property_filter = options.property_filters.first();
2316 let mut values = Vec::new();
2317 let mut sql = if let Some(filter) = primary_property_filter {
2318 values.push(Value::Text(filter.key.clone()));
2319 values.push(Value::Text(filter.value.clone()));
2320 String::from(
2321 r#"
2322 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2323 FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2324 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2325 ON e.edge_key = ep0.edge_key
2326 WHERE ep0.key = ?
2327 AND ep0.value = ?
2328 "#,
2329 )
2330 } else {
2331 String::from(
2332 r#"
2333 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2334 FROM graph_edges e
2335 WHERE 1 = 1
2336 "#,
2337 )
2338 };
2339 if let Some(kind) = kind {
2340 sql.push_str(" AND e.kind = ?");
2341 values.push(Value::Text(kind.to_string()));
2342 }
2343 push_sqlite_edge_property_filter_exists(
2344 &mut sql,
2345 &mut values,
2346 "e",
2347 if primary_property_filter.is_some() {
2348 &options.property_filters[1..]
2349 } else {
2350 &options.property_filters
2351 },
2352 );
2353 if let Some(cursor) = &options.cursor {
2354 if primary_property_filter.is_some() {
2355 sql.push_str(" AND ep0.edge_key > ?");
2356 } else {
2357 sql.push_str(" AND e.edge_key > ?");
2358 }
2359 values.push(Value::Text(cursor.clone()));
2360 }
2361 if primary_property_filter.is_some() {
2362 sql.push_str(" ORDER BY ep0.edge_key");
2363 } else {
2364 sql.push_str(" ORDER BY e.edge_key");
2365 }
2366 if let Some(limit) = options.limit {
2367 sql.push_str(" LIMIT ?");
2368 values.push(Value::Integer(limit.saturating_add(1) as i64));
2369 }
2370
2371 let primary_property_row_count = if let Some(filter) = primary_property_filter {
2372 Some(self.conn.query_row(
2373 r#"
2374 SELECT COUNT(*)
2375 FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2376 WHERE key = ?1 AND value = ?2
2377 "#,
2378 (&filter.key, &filter.value),
2379 |row| row.get::<_, usize>(0),
2380 )?)
2381 } else {
2382 None
2383 };
2384 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2385 let mut stmt = self.conn.prepare(&sql)?;
2386 let mut edges =
2387 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2388 let before_limit = edges.len();
2389 let mut next_cursor = None;
2390 if let Some(limit) = options.limit
2391 && edges.len() > limit
2392 {
2393 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2394 edges.truncate(limit);
2395 }
2396 let expected_indexes = if options.property_filters.is_empty() {
2397 vec!["idx_graph_edges_edge_key"]
2398 } else {
2399 vec![
2400 "idx_graph_edge_properties_key_value_edge",
2401 "idx_graph_edges_edge_key",
2402 ]
2403 };
2404 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2405 if !options.property_filters.is_empty() {
2406 if let Some(row_count) = primary_property_row_count {
2407 diagnostics.push(format!(
2408 "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2409 ));
2410 }
2411 diagnostics.push(
2412 "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2413 .to_string(),
2414 );
2415 }
2416 if options.cursor.is_some() {
2417 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2418 }
2419 if next_cursor.is_some() {
2420 diagnostics.push(
2421 "result was truncated; pass page.next_cursor as --cursor for the next page"
2422 .to_string(),
2423 );
2424 }
2425 Ok(GraphPagedSubgraph {
2426 page: GraphQueryPage {
2427 cursor: options.cursor,
2428 limit: options.limit,
2429 next_cursor,
2430 returned_nodes: 0,
2431 returned_edges: edges.len(),
2432 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2433 diagnostics,
2434 },
2435 nodes: Vec::new(),
2436 edges,
2437 })
2438 }
2439
2440 fn paged_incident_edges(
2441 &self,
2442 node_id: &str,
2443 kind: Option<&str>,
2444 options: GraphQueryOptions,
2445 ) -> Result<GraphPagedSubgraph> {
2446 let (sql, values) = sqlite_incident_edges_union_query(
2447 node_id,
2448 kind,
2449 &options.property_filters,
2450 options.cursor.as_deref(),
2451 options.limit,
2452 );
2453 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2454 let mut stmt = self.conn.prepare(&sql)?;
2455 let mut edges =
2456 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2457 let before_limit = edges.len();
2458 let mut next_cursor = None;
2459 if let Some(limit) = options.limit
2460 && edges.len() > limit
2461 {
2462 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2463 edges.truncate(limit);
2464 }
2465 let expected_indexes = if options.property_filters.is_empty() {
2466 vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2467 } else {
2468 vec![
2469 "idx_graph_edges_from_kind",
2470 "idx_graph_edges_to_kind",
2471 "idx_graph_edge_properties_key_value_edge",
2472 ]
2473 };
2474 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2475 diagnostics.push(
2476 "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2477 .to_string(),
2478 );
2479 if !options.property_filters.is_empty() {
2480 diagnostics.push(
2481 "edge property filters were evaluated by SQLite materialized property rows before paging"
2482 .to_string(),
2483 );
2484 }
2485 if options.cursor.is_some() {
2486 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2487 }
2488 if next_cursor.is_some() {
2489 diagnostics.push(
2490 "result was truncated; pass page.next_cursor as --cursor for the next page"
2491 .to_string(),
2492 );
2493 }
2494 Ok(GraphPagedSubgraph {
2495 page: GraphQueryPage {
2496 cursor: options.cursor,
2497 limit: options.limit,
2498 next_cursor,
2499 returned_nodes: 0,
2500 returned_edges: edges.len(),
2501 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2502 diagnostics,
2503 },
2504 nodes: Vec::new(),
2505 edges,
2506 })
2507 }
2508
2509 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2510 if node_ids.is_empty() {
2511 return Ok(Vec::new());
2512 }
2513 if node_ids.len() <= 20 {
2514 return self.edges_between_nodes_inline(node_ids);
2515 }
2516 self.assert_not_in_temp_table_section();
2517 self.temp_table_active.set(true);
2518 let result = (|| -> Result<Vec<GraphEdge>> {
2519 let tx = self.conn.unchecked_transaction()?;
2520 tx.execute_batch(
2521 r#"
2522 CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
2523 DELETE FROM _edges_between_ids;
2524 "#,
2525 )?;
2526 for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
2527 let row_placeholders: Vec<String> =
2528 chunk.iter().map(|_| "(?)".to_string()).collect();
2529 let placeholders = row_placeholders.join(", ");
2530 let sql = format!(
2531 "INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}"
2532 );
2533 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
2534 tx.execute(&sql, params_from_iter(values.iter()))?;
2535 }
2536 let edges = {
2537 let mut stmt = tx.prepare(
2538 r#"
2539 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2540 FROM graph_edges e
2541 WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
2542 AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
2543 ORDER BY e.from_id, e.kind, e.to_id
2544 "#,
2545 )?;
2546 collect_rows(stmt.query_map([], edge_from_row)?)?
2547 };
2548 tx.finish()?;
2549 Ok(edges)
2550 })();
2551 self.temp_table_active.set(false);
2552 result
2553 }
2554
2555 fn ranked_neighborhood(
2556 &self,
2557 center_id: &str,
2558 options: &RankedNeighborhoodOptions,
2559 ) -> Result<Option<RankedNeighborhoodResult>> {
2560 if self.node(center_id)?.is_none() {
2561 return Ok(None);
2562 }
2563 let center = self.node(center_id)?.unwrap();
2564
2565 let score_expr = match options.scoring {
2566 tsift_core::NeighborhoodScoring::BreadthFirst => {
2567 "MAX(0, 120 - (walk.depth * 18))".to_string()
2568 }
2569 tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
2570 "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
2571 WHEN 'semantic_relation' THEN 34 \
2572 WHEN 'mentions_entity' THEN 28 \
2573 WHEN 'mentions_concept' THEN 28 \
2574 WHEN 'tagged_entity' THEN 28 \
2575 WHEN 'tagged_concept' THEN 28 \
2576 WHEN 'related_concept' THEN 28 \
2577 WHEN 'mentions' THEN 22 \
2578 WHEN 'calls' THEN 20 \
2579 WHEN 'requests_context' THEN 18 \
2580 WHEN 'scopes_context' THEN 18 \
2581 WHEN 'scopes_source' THEN 18 \
2582 WHEN 'explains_result' THEN 18 \
2583 WHEN 'defines' THEN 12 \
2584 WHEN 'contains' THEN 12 \
2585 WHEN 'belongs_to' THEN 12 \
2586 ELSE 8 END".to_string()
2587 }
2588 tsift_core::NeighborhoodScoring::DegreeWeighted => {
2589 "MAX(0, 120 - (walk.depth * 18)) + CASE \
2590 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
2591 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
2592 ELSE 0 END"
2593 .to_string()
2594 }
2595 };
2596
2597 let use_degree_cache = matches!(options.scoring, tsift_core::NeighborhoodScoring::DegreeWeighted);
2598 let degree_cte = if use_degree_cache {
2599 "degree_cache AS ( \
2600 SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
2601 FROM graph_nodes n), "
2602 } else {
2603 ""
2604 };
2605 let mut sql = format!(
2606 r#"
2607 WITH {degree_cte}RECURSIVE walk(id, depth, edge_kind, score) AS (
2608 SELECT ?, 0, '', ?
2609 UNION
2610 SELECT e.to_id, walk.depth + 1, e.kind,
2611 "#,
2612 );
2613 sql.push_str(&format!(" {}\n", score_expr));
2614 sql.push_str(
2615 r#"
2616 FROM walk
2617 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2618 ON e.from_id = walk.id
2619 WHERE walk.depth < ?
2620 "#,
2621 );
2622 let mut values = vec![
2623 Value::Text(center_id.to_string()),
2624 Value::Integer(i64::MAX),
2625 Value::Integer(options.depth as i64),
2626 ];
2627 if let Some(kind) = &options.edge_kind {
2628 sql.push_str(" AND e.kind = ?");
2629 values.push(Value::Text(kind.clone()));
2630 }
2631 sql.push_str(
2632 r#"
2633 ),
2634 scored_nodes AS (
2635 SELECT walk.id, walk.score,
2636 n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2637 FROM walk
2638 JOIN graph_nodes n ON n.id = walk.id
2639 GROUP BY walk.id
2640 ),
2641 ranked AS (
2642 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2643 FROM scored_nodes
2644 ORDER BY score DESC, id ASC
2645 ),
2646 kept AS (
2647 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2648 FROM ranked
2649 LIMIT ?
2650 ),
2651 total AS (
2652 SELECT COUNT(*) AS cnt FROM scored_nodes
2653 )
2654 SELECT
2655 'meta' AS row_type,
2656 (SELECT cnt FROM total) AS total_discovered,
2657 0 AS node_id, '' AS node_kind, '' AS node_label,
2658 '' AS node_props, '' AS node_prov, '' AS node_fresh,
2659 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2660 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2661 UNION ALL
2662 SELECT
2663 'node' AS row_type,
2664 0 AS total_discovered,
2665 k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
2666 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2667 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2668 FROM kept k
2669 UNION ALL
2670 SELECT
2671 'edge' AS row_type,
2672 0 AS total_discovered,
2673 '' AS node_id, '' AS node_kind, '' AS node_label,
2674 '' AS node_props, '' AS node_prov, '' AS node_fresh,
2675 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2676 FROM graph_edges e
2677 WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
2678 AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
2679 "#,
2680 );
2681 values.push(Value::Integer(options.max_nodes as i64));
2682
2683 let mut stmt = self.conn.prepare(&sql)?;
2684 let mut nodes = vec![center.clone()];
2685 let mut edges = Vec::new();
2686 let mut total_discovered = 0usize;
2687
2688 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2689 let row_type: String = row.get(0)?;
2690 match row_type.as_str() {
2691 "meta" => Ok(QueryResult::Meta {
2692 total: row.get::<_, i64>(1)? as usize,
2693 }),
2694 "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
2695 "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
2696 _ => Err(rusqlite::Error::InvalidQuery),
2697 }
2698 })?;
2699 for row_result in rows {
2700 match row_result? {
2701 QueryResult::Meta { total } => {
2702 total_discovered = total;
2703 }
2704 QueryResult::Node(node) => {
2705 if node.id != center_id {
2706 nodes.push(node);
2707 }
2708 }QueryResult::Edge(edge) => {
2709 edges.push(edge);
2710 }
2711 }
2712 }
2713
2714 let total_discovered = total_discovered.max(nodes.len());
2715 let pruned_count = total_discovered.saturating_sub(nodes.len());
2716
2717 match options.property_mode {
2718 PropertyMode::Full => {}
2719 PropertyMode::Omit => {
2720 for n in &mut nodes {
2721 n.properties.clear();
2722 }
2723 for e in &mut edges {
2724 e.properties.clear();
2725 }
2726 }
2727 PropertyMode::Sample => {
2728 let mut seen_kinds = std::collections::BTreeSet::new();
2729 for n in &mut nodes {
2730 if !seen_kinds.contains(&n.kind) {
2731 seen_kinds.insert(n.kind.clone());
2732 } else {
2733 n.properties.clear();
2734 }
2735 }
2736 for e in &mut edges {
2737 e.properties.clear();
2738 }
2739 }
2740 }
2741
2742 Ok(Some(RankedNeighborhoodResult {
2743 nodes,
2744 edges,
2745 pruned_count,
2746 total_discovered,
2747 }))
2748 }
2749
2750 fn neighborhood(
2751 &self,
2752 center_id: &str,
2753 depth: usize,
2754 kind: Option<&str>,
2755 ) -> Result<Option<GraphSubgraph>> {
2756 self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
2757 .map(|result| {
2758 result.map(|result| {
2759 GraphSubgraph {
2760 nodes: result.nodes,
2761 edges: result.edges,
2762 }
2763 .sorted()
2764 })
2765 })
2766 }
2767
2768 fn paged_neighborhood(
2769 &self,
2770 center_id: &str,
2771 depth: usize,
2772 kind: Option<&str>,
2773 options: GraphQueryOptions,
2774 ) -> Result<Option<GraphPagedSubgraph>> {
2775 if self.node(center_id)?.is_none() {
2776 return Ok(None);
2777 }
2778 let mut sql = String::from(
2779 r#"
2780 WITH RECURSIVE walk(id, depth) AS (
2781 SELECT ?, 0
2782 UNION
2783 SELECT e.to_id, walk.depth + 1
2784 FROM walk
2785 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2786 ON e.from_id = walk.id
2787 WHERE walk.depth < ?
2788 "#,
2789 );
2790 let mut values = vec![
2791 Value::Text(center_id.to_string()),
2792 Value::Integer(depth as i64),
2793 ];
2794 if let Some(kind) = kind {
2795 sql.push_str(" AND e.kind = ?");
2796 values.push(Value::Text(kind.to_string()));
2797 }
2798 sql.push_str(
2799 r#"
2800 ),
2801 filtered_nodes AS (
2802 SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2803 FROM walk
2804 JOIN graph_nodes n ON n.id = walk.id
2805 WHERE 1 = 1
2806 "#,
2807 );
2808 push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
2809 if let Some(cursor) = &options.cursor {
2810 sql.push_str(" AND n.id > ?");
2811 values.push(Value::Text(cursor.clone()));
2812 }
2813 sql.push_str(
2814 r#"
2815 ),
2816 page_nodes AS (
2817 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2818 FROM filtered_nodes
2819 ORDER BY id
2820 "#,
2821 );
2822 if let Some(limit) = options.limit {
2823 sql.push_str(" LIMIT ?");
2824 values.push(Value::Integer(limit.saturating_add(1) as i64));
2825 }
2826 sql.push_str(
2827 r#"
2828 ),
2829 walk_edges AS (
2830 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2831 FROM walk
2832 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2833 ON e.from_id = walk.id
2834 WHERE walk.depth < ?
2835 "#,
2836 );
2837 values.push(Value::Integer(depth as i64));
2838 if let Some(kind) = kind {
2839 sql.push_str(" AND e.kind = ?");
2840 values.push(Value::Text(kind.to_string()));
2841 }
2842 sql.push_str(
2843 r#"
2844 )
2845 SELECT
2846 'node' AS row_type,
2847 p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
2848 NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
2849 NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
2850 FROM page_nodes p
2851 UNION ALL
2852 SELECT DISTINCT
2853 'edge' AS row_type,
2854 NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
2855 NULL AS provenance_json, NULL AS freshness_json,
2856 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2857 FROM walk_edges e
2858 WHERE e.from_id IN (SELECT id FROM page_nodes)
2859 AND e.to_id IN (SELECT id FROM page_nodes)
2860 "#,
2861 );
2862
2863 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2864 let mut stmt = self.conn.prepare(&sql)?;
2865 let mut nodes = Vec::new();
2866 let mut edges = Vec::new();
2867 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2868 let row_type: String = row.get(0)?;
2869 match row_type.as_str() {
2870 "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
2871 "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
2872 _ => Err(rusqlite::Error::InvalidQuery),
2873 }
2874 })?;
2875 for row in rows {
2876 let (node, edge) = row?;
2877 if let Some(node) = node {
2878 nodes.push(node);
2879 }
2880 if let Some(edge) = edge {
2881 edges.push(edge);
2882 }
2883 }
2884 nodes.sort_by(|left, right| left.id.cmp(&right.id));
2885 let before_limit = nodes.len();
2886 let mut next_cursor = None;
2887 if let Some(limit) = options.limit
2888 && nodes.len() > limit
2889 {
2890 next_cursor = nodes
2891 .get(limit.saturating_sub(1))
2892 .map(|node| node.id.clone());
2893 nodes.truncate(limit);
2894 }
2895 let node_ids = nodes
2896 .iter()
2897 .map(|node| node.id.as_str())
2898 .collect::<BTreeSet<_>>();
2899 edges.retain(|edge| {
2900 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
2901 });
2902 edges.sort_by(|left, right| {
2903 left.from_id
2904 .cmp(&right.from_id)
2905 .then(left.kind.cmp(&right.kind))
2906 .then(left.to_id.cmp(&right.to_id))
2907 });
2908 let expected_indexes = if options.property_filters.is_empty() {
2909 vec!["idx_graph_edges_from_kind"]
2910 } else {
2911 vec![
2912 "idx_graph_edges_from_kind",
2913 "idx_graph_node_properties_key_value_node",
2914 ]
2915 };
2916 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2917 diagnostics.push(
2918 "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
2919 );
2920 if !options.property_filters.is_empty() {
2921 diagnostics.push(
2922 "property filters were evaluated by SQLite materialized property rows before paging"
2923 .to_string(),
2924 );
2925 }
2926 if options.cursor.is_some() {
2927 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2928 }
2929 if next_cursor.is_some() {
2930 diagnostics.push(
2931 "result was truncated; pass page.next_cursor as --cursor for the next page"
2932 .to_string(),
2933 );
2934 }
2935 Ok(Some(GraphPagedSubgraph {
2936 page: GraphQueryPage {
2937 cursor: options.cursor,
2938 limit: options.limit,
2939 next_cursor,
2940 returned_nodes: nodes.len(),
2941 returned_edges: edges.len(),
2942 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2943 diagnostics,
2944 },
2945 nodes,
2946 edges,
2947 }))
2948 }
2949
2950 fn shortest_path(
2951 &self,
2952 from_id: &str,
2953 to_id: &str,
2954 kind: Option<&str>,
2955 ) -> Result<Option<GraphPath>> {
2956 self.shortest_path_with_max_hops(from_id, to_id, kind, None)
2957 }
2958
2959 fn shortest_path_with_max_hops(
2960 &self,
2961 from_id: &str,
2962 to_id: &str,
2963 kind: Option<&str>,
2964 max_hops: Option<usize>,
2965 ) -> Result<Option<GraphPath>> {
2966 if from_id == to_id {
2967 return Ok(Some(GraphPath {
2968 nodes: vec![from_id.to_string()],
2969 hops: 0,
2970 }));
2971 }
2972 let hop_limit = max_hops.unwrap_or(usize::MAX);
2973 if hop_limit == 0 {
2974 return Ok(None);
2975 }
2976
2977 self.assert_not_in_temp_table_section();
2978 self.temp_table_active.set(true);
2979 let result = (|| -> Result<Option<GraphPath>> {
2980 let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
2981 let tbl = format!("_tsift_frontier_{call_id}");
2982
2983 let mut visited = BTreeSet::from([from_id.to_string()]);
2984 let mut parent = BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
2985 let mut frontier = vec![from_id.to_string()];
2986 self.conn.execute_batch(&format!(
2987 r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
2988 DELETE FROM "{tbl}";"#,
2989 ))?;
2990 let select_sql = if kind.is_some() {
2991 format!(
2992 r#"SELECT e.from_id, e.to_id
2993 FROM "{tbl}" f
2994 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2995 ON e.from_id = f.id
2996 WHERE e.kind = ?
2997 ORDER BY e.from_id, e.to_id, e.kind"#,
2998 )
2999 } else {
3000 format!(
3001 r#"SELECT e.from_id, e.to_id
3002 FROM "{tbl}" f
3003 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3004 ON e.from_id = f.id
3005 ORDER BY e.from_id, e.to_id, e.kind"#,
3006 )
3007 };
3008 let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3009 let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3010 let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3011 let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3012 let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3013 let mut found_path: Option<GraphPath> = None;
3014 for _depth in 0..hop_limit {
3015 if frontier.is_empty() {
3016 break;
3017 }
3018 self.conn.execute(&delete_sql, [])?;
3019 for id in &frontier {
3020 frontier_insert_stmt.execute([id.as_str()])?;
3021 }
3022 let mut next_frontier = BTreeSet::new();
3023 let rows = if let Some(kind) = kind {
3024 collect_rows(frontier_select_stmt.query_map([kind], |row| {
3025 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3026 })?)?
3027 } else {
3028 collect_rows(frontier_select_stmt.query_map([], |row| {
3029 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3030 })?)?
3031 };
3032 for (from, next) in rows {
3033 if !visited.insert(next.clone()) {
3034 continue;
3035 }
3036 parent.insert(next.clone(), from);
3037 if next == to_id {
3038 let mut nodes = vec![to_id.to_string()];
3039 let mut cursor = to_id;
3040 while let Some(previous) = parent.get(cursor) {
3041 if previous.is_empty() {
3042 break;
3043 }
3044 nodes.push(previous.clone());
3045 cursor = previous;
3046 }
3047 nodes.reverse();
3048 found_path = Some(GraphPath {
3049 hops: nodes.len().saturating_sub(1),
3050 nodes,
3051 });
3052 break;
3053 }
3054 next_frontier.insert(next);
3055 }
3056 if found_path.is_some() {
3057 break;
3058 }
3059 frontier = next_frontier.into_iter().collect();
3060 }
3061 let _ = self.conn.execute_batch(&drop_sql);
3062 Ok(found_path)
3063 })();
3064 self.temp_table_active.set(false);
3065 result
3066 }
3067
3068 fn reachable_nodes_by_kind(
3069 &self,
3070 from_id: &str,
3071 kind: &str,
3072 depth: usize,
3073 limit: usize,
3074 ) -> Result<Vec<(GraphNode, GraphPath)>> {
3075 Ok(self
3076 .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3077 .remove(kind)
3078 .unwrap_or_default())
3079 }
3080
3081 fn reachable_nodes_by_kinds(
3082 &self,
3083 from_id: &str,
3084 kinds: &[&str],
3085 depth: usize,
3086 limit: usize,
3087 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3088 let mut requested = kinds
3089 .iter()
3090 .map(|kind| (*kind).to_string())
3091 .collect::<BTreeSet<_>>()
3092 .into_iter()
3093 .collect::<Vec<_>>();
3094 let mut results = requested
3095 .iter()
3096 .map(|kind| (kind.clone(), Vec::new()))
3097 .collect::<BTreeMap<_, _>>();
3098 if requested.is_empty() {
3099 return Ok(results);
3100 }
3101 requested.sort();
3102 let placeholders = std::iter::repeat_n("?", requested.len())
3103 .collect::<Vec<_>>()
3104 .join(", ");
3105 let mut sql = format!(
3106 r#"
3107 WITH RECURSIVE walk(id, depth, path) AS (
3108 SELECT ?, 0, char(31) || ? || char(31)
3109 UNION ALL
3110 SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3111 FROM walk
3112 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3113 ON e.from_id = walk.id
3114 WHERE walk.depth < ?
3115 AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3116 ),
3117 ranked AS (
3118 SELECT
3119 n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3120 walk.path, walk.depth,
3121 ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3122 FROM walk
3123 JOIN graph_nodes n ON n.id = walk.id
3124 WHERE n.kind IN ({placeholders}) AND n.id <> ?
3125 ),
3126 kind_ranked AS (
3127 SELECT *,
3128 ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3129 FROM ranked
3130 WHERE rn = 1
3131 )
3132 SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3133 FROM kind_ranked
3134 "#,
3135 );
3136 let mut values = vec![
3137 Value::Text(from_id.to_string()),
3138 Value::Text(from_id.to_string()),
3139 Value::Integer(depth as i64),
3140 ];
3141 values.extend(requested.iter().cloned().map(Value::Text));
3142 values.push(Value::Text(from_id.to_string()));
3143 if limit > 0 && limit != usize::MAX {
3144 sql.push_str(" WHERE kind_rank <= ?");
3145 values.push(Value::Integer(limit as i64));
3146 }
3147 sql.push_str(" ORDER BY kind, depth, label, id");
3148 let mut stmt = self.conn.prepare(&sql)?;
3149 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3150 let node = node_from_row(row)?;
3151 let path: String = row.get(6)?;
3152 let hops: usize = row.get(7)?;
3153 Ok((
3154 node,
3155 GraphPath {
3156 nodes: path
3157 .split('\u{1f}')
3158 .filter(|part| !part.is_empty())
3159 .map(str::to_string)
3160 .collect(),
3161 hops,
3162 },
3163 ))
3164 })?)?;
3165 for (node, path) in rows {
3166 results
3167 .entry(node.kind.clone())
3168 .or_default()
3169 .push((node, path));
3170 }
3171 Ok(results)
3172 }
3173
3174 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3175 if let Some(node) = self.node(target)? {
3176 return Ok(Some(node));
3177 }
3178 if kinds.is_empty() {
3179 return Ok(None);
3180 }
3181
3182 let normalized = target.trim().trim_start_matches('#');
3183 let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3184 .collect::<Vec<_>>()
3185 .join(", ");
3186 let kind_rank = kinds
3187 .iter()
3188 .enumerate()
3189 .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3190 .collect::<Vec<_>>()
3191 .join(" ");
3192 let sql = format!(
3193 r#"
3194 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3195 FROM graph_nodes n
3196 WHERE n.kind IN ({kind_placeholders})
3197 AND (
3198 EXISTS (
3199 SELECT 1
3200 FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3201 WHERE p_handle.node_id = n.id
3202 AND p_handle.key = 'handle'
3203 AND p_handle.value = ?
3204 )
3205 OR EXISTS (
3206 SELECT 1
3207 FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3208 WHERE p_ref.node_id = n.id
3209 AND p_ref.key = 'ref_id'
3210 AND p_ref.value = ?
3211 )
3212 OR n.label = ?
3213 OR n.label = ?
3214 )
3215 ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3216 LIMIT 1
3217 "#
3218 );
3219 let mut values = kinds
3220 .iter()
3221 .map(|kind| Value::Text((*kind).to_string()))
3222 .collect::<Vec<_>>();
3223 values.push(Value::Text(target.to_string()));
3224 values.push(Value::Text(normalized.to_string()));
3225 values.push(Value::Text(target.to_string()));
3226 values.push(Value::Text(format!("#{normalized}")));
3227 values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3228 self.conn
3229 .query_row(&sql, params_from_iter(values.iter()), node_from_row)
3230 .optional()
3231 .map_err(Into::into)
3232 }
3233}
3234
3235fn to_json<T: Serialize>(value: &T) -> Result<String> {
3236 serde_json::to_string(value).map_err(Into::into)
3237}
3238
3239fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3240 let payload = serde_json::to_vec(value)?;
3241 Ok(blake3::hash(&payload).to_hex().to_string())
3242}
3243
3244fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3245 value.as_ref().map(to_json).transpose()
3246}
3247
3248fn collect_rows<T>(
3249 rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3250) -> Result<Vec<T>> {
3251 rows.collect::<std::result::Result<Vec<_>, _>>()
3252 .map_err(Into::into)
3253}
3254
3255enum QueryResult {
3256 Meta { total: usize },
3257 Node(GraphNode),
3258 Edge(GraphEdge),
3259}
3260
3261fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3262 let properties_col = offset + 3;
3263 let provenance_col = offset + 4;
3264 let freshness_col = offset + 5;
3265 let properties_json: String = row.get(properties_col)?;
3266 let provenance_json: String = row.get(provenance_col)?;
3267 let freshness_json: Option<String> = row.get(freshness_col)?;
3268 Ok(GraphNode {
3269 id: row.get(offset)?,
3270 kind: row.get(offset + 1)?,
3271 label: row.get(offset + 2)?,
3272 properties: from_json(properties_col, &properties_json)?,
3273 provenance: from_json(provenance_col, &provenance_json)?,
3274 freshness: optional_from_json(freshness_col, freshness_json)?,
3275 })
3276}
3277
3278fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3279 node_from_row_at(row, 0)
3280}
3281
3282fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3283 let properties_col = offset + 4;
3284 let provenance_col = offset + 5;
3285 let freshness_col = offset + 6;
3286 let properties_json: String = row.get(properties_col)?;
3287 let provenance_json: String = row.get(provenance_col)?;
3288 let freshness_json: Option<String> = row.get(freshness_col)?;
3289 Ok(GraphEdge {
3290 id: row.get(offset)?,
3291 from_id: row.get(offset + 1)?,
3292 to_id: row.get(offset + 2)?,
3293 kind: row.get(offset + 3)?,
3294 properties: from_json(properties_col, &properties_json)?,
3295 provenance: from_json(provenance_col, &provenance_json)?,
3296 freshness: optional_from_json(freshness_col, freshness_json)?,
3297 })
3298}
3299
3300fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3301 edge_from_row_at(row, 0)
3302}
3303
3304fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3305 serde_json::from_str(raw)
3306 .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3307}
3308
3309fn optional_from_json<T: DeserializeOwned>(
3310 column: usize,
3311 raw: Option<String>,
3312) -> rusqlite::Result<Option<T>> {
3313 raw.map(|value| from_json(column, &value)).transpose()
3314}
3315
3316fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3317 nodes
3318 .iter()
3319 .find(|node| node.kind == "projection_meta")
3320 .and_then(|node| node.properties.get("projection_version").cloned())
3321}
3322
3323fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3324 nodes
3325 .iter()
3326 .find(|node| node.kind == "projection_meta")
3327 .and_then(|node| node.properties.get("content_hash").cloned())
3328}
3329
3330fn unix_now() -> i64 {
3331 std::time::SystemTime::now()
3332 .duration_since(std::time::UNIX_EPOCH)
3333 .map(|duration| duration.as_secs() as i64)
3334 .unwrap_or_default()
3335}
3336
3337fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3338 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3339 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3340 Ok(page_count.saturating_mul(page_size))
3341}
3342
3343fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3344 let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3345 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3346 Ok(freelist_count.saturating_mul(page_size))
3347}
3348
3349#[cfg(test)]
3350mod tests {
3351 use super::*;
3352
3353 fn sample_provenance() -> GraphProvenance {
3354 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3355 }
3356
3357 fn sample_projection() -> GraphProjection {
3358 let source = sample_provenance();
3359 GraphProjection {
3360 nodes: vec![
3361 GraphNode::new("doc:livekit", "document", "LiveKit guide")
3362 .with_property("domain", "livekit")
3363 .with_provenance(source.clone())
3364 .with_freshness(GraphFreshness::content_hash("node-hash")),
3365 GraphNode::new("topic:rooms", "topic", "Rooms"),
3366 GraphNode::new("topic:egress", "topic", "Egress"),
3367 ],
3368 edges: vec![
3369 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3370 .with_property("confidence", "0.91")
3371 .with_provenance(source.clone())
3372 .with_freshness(GraphFreshness::content_hash("edge-hash")),
3373 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3374 ],
3375 }
3376 }
3377
3378 fn assert_projection_store_contract(store: &impl GraphStore) {
3379 let projection = sample_projection();
3380 projection.upsert_into(store).unwrap();
3381
3382 assert_eq!(
3383 store.node("doc:livekit").unwrap(),
3384 projection
3385 .nodes
3386 .iter()
3387 .find(|node| node.id == "doc:livekit")
3388 .cloned()
3389 );
3390 assert_eq!(
3391 store.nodes_by_kind("topic").unwrap(),
3392 vec![
3393 GraphNode::new("topic:egress", "topic", "Egress"),
3394 GraphNode::new("topic:rooms", "topic", "Rooms"),
3395 ]
3396 );
3397
3398 let mentions = store
3399 .outgoing_edges("doc:livekit", Some("mentions"))
3400 .unwrap();
3401 assert_eq!(mentions.len(), 1);
3402 assert_eq!(mentions[0].to_id, "topic:rooms");
3403 assert_eq!(
3404 mentions[0].properties.get("confidence"),
3405 Some(&"0.91".into())
3406 );
3407
3408 let path = store
3409 .shortest_path("doc:livekit", "topic:egress", None)
3410 .unwrap()
3411 .unwrap();
3412 assert_eq!(
3413 path.nodes,
3414 vec!["doc:livekit", "topic:rooms", "topic:egress"]
3415 );
3416 }
3417
3418 #[test]
3419 fn sqlite_store_round_trips_generic_nodes_edges() {
3420 let store = SqliteGraphStore::in_memory().unwrap();
3421 let source = sample_provenance();
3422 let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3423 .with_property("domain", "livekit")
3424 .with_provenance(source.clone())
3425 .with_freshness(GraphFreshness::content_hash("node-hash"));
3426 let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
3427 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3428 .with_property("confidence", "0.91")
3429 .with_provenance(source)
3430 .with_freshness(GraphFreshness::content_hash("edge-hash"));
3431
3432 store.upsert_node(&node).unwrap();
3433 store.upsert_node(&topic).unwrap();
3434 store.upsert_edge(&edge).unwrap();
3435
3436 assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
3437 assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
3438 assert_eq!(store.all_nodes().unwrap().len(), 2);
3439 assert_eq!(store.all_edges().unwrap().len(), 1);
3440 assert_eq!(
3441 store
3442 .outgoing_edges("doc:livekit", Some("mentions"))
3443 .unwrap(),
3444 vec![edge]
3445 );
3446 }
3447
3448 #[test]
3449 fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
3450 let store = SqliteGraphStore::in_memory().unwrap();
3451 for node in [
3452 GraphNode::new("doc:livekit", "document", "LiveKit guide"),
3453 GraphNode::new("topic:rooms", "topic", "Rooms"),
3454 GraphNode::new("topic:egress", "topic", "Egress"),
3455 ] {
3456 store.upsert_node(&node).unwrap();
3457 }
3458 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3459 .with_property("confidence", "0.91");
3460 let edge_id = edge.id.clone();
3461 store.upsert_edge(&edge).unwrap();
3462 store
3463 .upsert_edge(
3464 &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
3465 .with_property("confidence", "0.42"),
3466 )
3467 .unwrap();
3468
3469 assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
3470 let mut expected_incident_ids = vec![
3471 GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
3472 GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
3473 ];
3474 expected_incident_ids.sort();
3475 assert_eq!(
3476 store
3477 .incident_edges("topic:rooms", None)
3478 .unwrap()
3479 .into_iter()
3480 .map(|edge| edge.id)
3481 .collect::<Vec<_>>(),
3482 expected_incident_ids
3483 );
3484
3485 let page = store
3486 .paged_edges(
3487 Some("mentions"),
3488 GraphQueryOptions {
3489 property_filters: vec![GraphPropertyFilter {
3490 key: "confidence".to_string(),
3491 value: "0.91".to_string(),
3492 }],
3493 ..GraphQueryOptions::default()
3494 },
3495 )
3496 .unwrap();
3497 assert_eq!(page.edges.len(), 1);
3498 assert_eq!(page.edges[0].id, edge_id);
3499 assert!(
3500 page.page
3501 .diagnostics
3502 .iter()
3503 .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
3504 "{:?}",
3505 page.page.diagnostics
3506 );
3507 assert!(
3508 page.page
3509 .diagnostics
3510 .iter()
3511 .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
3512 "{:?}",
3513 page.page.diagnostics
3514 );
3515 assert!(
3516 page.page.diagnostics.iter().any(|diagnostic| diagnostic
3517 .contains("edge property primary filter matched 1 materialized row")),
3518 "{:?}",
3519 page.page.diagnostics
3520 );
3521 assert!(
3522 page.page
3523 .diagnostics
3524 .iter()
3525 .any(|diagnostic| diagnostic
3526 .contains("drives from SQLite materialized property rows")),
3527 "{:?}",
3528 page.page.diagnostics
3529 );
3530
3531 let property_rows: usize = store
3532 .conn
3533 .query_row(
3534 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3535 [],
3536 |row| row.get(0),
3537 )
3538 .unwrap();
3539 assert_eq!(property_rows, 2);
3540 }
3541
3542 #[test]
3543 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
3544 let sqlite = SqliteGraphStore::in_memory().unwrap();
3545 assert_projection_store_contract(&sqlite);
3546 }
3547
3548 #[test]
3549 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
3550 fn assert_crud_contract(store: &impl GraphStore) {
3551 let projection = sample_projection();
3552 projection.upsert_into(store).unwrap();
3553
3554 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
3555 assert_eq!(
3556 neighborhood
3557 .nodes
3558 .iter()
3559 .map(|node| node.id.as_str())
3560 .collect::<Vec<_>>(),
3561 vec!["doc:livekit", "topic:egress", "topic:rooms"]
3562 );
3563 assert_eq!(
3564 neighborhood
3565 .edges
3566 .iter()
3567 .map(|edge| (
3568 edge.from_id.as_str(),
3569 edge.kind.as_str(),
3570 edge.to_id.as_str()
3571 ))
3572 .collect::<Vec<_>>(),
3573 vec![
3574 ("doc:livekit", "mentions", "topic:rooms"),
3575 ("topic:rooms", "related_to", "topic:egress"),
3576 ]
3577 );
3578
3579 assert_eq!(
3580 store
3581 .delete_edge("topic:rooms", "topic:egress", "related_to")
3582 .unwrap(),
3583 1
3584 );
3585 assert!(
3586 store
3587 .shortest_path("doc:livekit", "topic:egress", None)
3588 .unwrap()
3589 .is_none()
3590 );
3591 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
3592 assert!(store.node("topic:rooms").unwrap().is_none());
3593 assert!(
3594 store
3595 .outgoing_edges("doc:livekit", None)
3596 .unwrap()
3597 .is_empty()
3598 );
3599 }
3600
3601 assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
3602 }
3603
3604 #[test]
3605 fn sqlite_upsert_projection_batches_rows_and_properties() {
3606 let mut store = SqliteGraphStore::in_memory().unwrap();
3607 let mut projection = sample_projection();
3608 store.upsert_projection(&projection).unwrap();
3609
3610 let page = store
3611 .paged_nodes_by_kind(
3612 "document",
3613 GraphQueryOptions {
3614 property_filters: vec![GraphPropertyFilter {
3615 key: "domain".to_string(),
3616 value: "livekit".to_string(),
3617 }],
3618 ..GraphQueryOptions::default()
3619 },
3620 )
3621 .unwrap();
3622 assert_eq!(page.nodes[0].id, "doc:livekit");
3623
3624 projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3625 .with_property("domain", "recording");
3626 store.upsert_projection(&projection).unwrap();
3627
3628 let old_property_count: usize = store
3629 .conn
3630 .query_row(
3631 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
3632 [],
3633 |row| row.get(0),
3634 )
3635 .unwrap();
3636 let updated_page = store
3637 .paged_nodes_by_kind(
3638 "document",
3639 GraphQueryOptions {
3640 property_filters: vec![GraphPropertyFilter {
3641 key: "domain".to_string(),
3642 value: "recording".to_string(),
3643 }],
3644 ..GraphQueryOptions::default()
3645 },
3646 )
3647 .unwrap();
3648 assert_eq!(old_property_count, 0);
3649 assert_eq!(updated_page.nodes[0].id, "doc:livekit");
3650 let edge_property_count: usize = store
3651 .conn
3652 .query_row(
3653 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3654 [],
3655 |row| row.get(0),
3656 )
3657 .unwrap();
3658 assert_eq!(edge_property_count, 1);
3659 }
3660
3661 #[test]
3662 fn sqlite_store_filters_edges_by_kind_and_paths() {
3663 let store = SqliteGraphStore::in_memory().unwrap();
3664 for id in ["a", "b", "c"] {
3665 store
3666 .upsert_node(&GraphNode::new(id, "symbol", id))
3667 .unwrap();
3668 }
3669 store
3670 .upsert_edge(&GraphEdge::new("a", "b", "calls"))
3671 .unwrap();
3672 store
3673 .upsert_edge(&GraphEdge::new("a", "c", "documents"))
3674 .unwrap();
3675 store
3676 .upsert_edge(&GraphEdge::new("b", "c", "calls"))
3677 .unwrap();
3678
3679 let calls = store.outgoing_edges("a", Some("calls")).unwrap();
3680 assert_eq!(calls.len(), 1);
3681 assert_eq!(calls[0].to_id, "b");
3682 assert_eq!(store.graph_counts().unwrap(), (3, 3));
3683 assert_eq!(
3684 store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
3685 "b"
3686 );
3687
3688 let path = store
3689 .shortest_path("a", "c", Some("calls"))
3690 .unwrap()
3691 .unwrap();
3692 assert_eq!(path.nodes, vec!["a", "b", "c"]);
3693 assert_eq!(path.hops, 2);
3694
3695 assert!(
3696 store
3697 .shortest_path("c", "a", Some("calls"))
3698 .unwrap()
3699 .is_none()
3700 );
3701 }
3702
3703 #[test]
3704 fn sqlite_store_batches_edges_between_node_sets() {
3705 let store = SqliteGraphStore::in_memory().unwrap();
3706 for id in ["a", "b", "c", "outside"] {
3707 store
3708 .upsert_node(&GraphNode::new(id, "symbol", id))
3709 .unwrap();
3710 }
3711 for edge in [
3712 GraphEdge::new("a", "b", "calls"),
3713 GraphEdge::new("b", "c", "calls"),
3714 GraphEdge::new("a", "outside", "calls"),
3715 GraphEdge::new("outside", "c", "calls"),
3716 ] {
3717 store.upsert_edge(&edge).unwrap();
3718 }
3719
3720 let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
3721 .into_iter()
3722 .collect::<BTreeSet<_>>();
3723 let edge_keys = store
3724 .edges_between_nodes(&scoped)
3725 .unwrap()
3726 .into_iter()
3727 .map(|edge| (edge.from_id, edge.kind, edge.to_id))
3728 .collect::<Vec<_>>();
3729
3730 assert_eq!(
3731 edge_keys,
3732 vec![
3733 ("a".to_string(), "calls".to_string(), "b".to_string()),
3734 ("b".to_string(), "calls".to_string(), "c".to_string()),
3735 ]
3736 );
3737 }
3738
3739 #[test]
3740 fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
3741 let mut store = SqliteGraphStore::in_memory().unwrap();
3742 let mut projection = sample_projection();
3743 projection.nodes.push(
3744 GraphNode::new(
3745 "projection:fixture",
3746 "projection_meta",
3747 "fixture projection",
3748 )
3749 .with_property("projection_version", "fixture-v1")
3750 .with_property("content_hash", "hash-a"),
3751 );
3752 store
3753 .replace_projection_with_version(
3754 "root",
3755 &projection,
3756 Some("fixture-v1"),
3757 Some("commit-a".to_string()),
3758 )
3759 .unwrap();
3760
3761 projection.nodes.retain(|node| node.id != "topic:egress");
3762 projection.edges.retain(|edge| edge.to_id != "topic:egress");
3763 let refresh = store
3764 .replace_projection_with_version(
3765 "root",
3766 &projection,
3767 Some("fixture-v2"),
3768 Some("commit-b".to_string()),
3769 )
3770 .unwrap();
3771
3772 assert_eq!(refresh.projection_version, "fixture-v2");
3773 assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
3774 assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
3775 assert_eq!(refresh.tombstoned_edges.len(), 1);
3776 assert_eq!(refresh.deleted_nodes, 1);
3777 assert_eq!(refresh.deleted_edges, 1);
3778 assert_eq!(refresh.unchanged_nodes, 3);
3779 assert_eq!(refresh.upserted_nodes, 0);
3780 assert_eq!(refresh.unchanged_properties, 4);
3781 assert_eq!(refresh.upserted_properties, 0);
3782 assert_eq!(refresh.deleted_properties, 0);
3783 assert!(
3784 refresh
3785 .phase_timings
3786 .iter()
3787 .any(|phase| phase.name == "sqlite_property_row_staging"),
3788 "{:?}",
3789 refresh.phase_timings
3790 );
3791 assert!(
3792 refresh
3793 .phase_timings
3794 .iter()
3795 .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
3796 "{:?}",
3797 refresh.phase_timings
3798 );
3799 let version = store.projection_version("root").unwrap().unwrap();
3800 assert_eq!(version.projection_version, "fixture-v2");
3801 assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
3802 let cached_counts: (usize, usize, usize, usize) = store
3803 .conn
3804 .query_row(
3805 r#"
3806 SELECT nodes, edges, tombstone_nodes, tombstone_edges
3807 FROM graph_operator_stats
3808 WHERE scope = 'root'
3809 "#,
3810 [],
3811 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3812 )
3813 .unwrap();
3814 assert_eq!(cached_counts, (3, 1, 1, 1));
3815
3816 projection
3817 .nodes
3818 .push(GraphNode::new("topic:egress", "topic", "Egress"));
3819 let refresh = store
3820 .replace_projection_with_version(
3821 "root",
3822 &projection,
3823 Some("fixture-v3"),
3824 Some("commit-c".to_string()),
3825 )
3826 .unwrap();
3827 assert_eq!(refresh.pruned_tombstones, 1);
3828 assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
3829
3830 projection.nodes.retain(|node| node.id != "topic:egress");
3831 store
3832 .replace_projection_with_version(
3833 "root",
3834 &projection,
3835 Some("fixture-v4"),
3836 Some("commit-d".to_string()),
3837 )
3838 .unwrap();
3839 assert_eq!(store.compact_storage("root", true).unwrap(), 2);
3840 let cached_counts: (usize, usize, usize, usize) = store
3841 .conn
3842 .query_row(
3843 r#"
3844 SELECT nodes, edges, tombstone_nodes, tombstone_edges
3845 FROM graph_operator_stats
3846 WHERE scope = 'root'
3847 "#,
3848 [],
3849 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3850 )
3851 .unwrap();
3852 assert_eq!(cached_counts, (3, 1, 0, 0));
3853 }
3854
3855 #[test]
3856 fn sqlite_shortest_path_uses_bounded_frontier() {
3857 let store = SqliteGraphStore::in_memory().unwrap();
3858 for idx in 0..80 {
3859 store
3860 .upsert_node(&GraphNode::new(
3861 format!("node:{idx:02}"),
3862 "symbol",
3863 format!("node {idx:02}"),
3864 ))
3865 .unwrap();
3866 }
3867 for idx in 0..79 {
3868 store
3869 .upsert_edge(&GraphEdge::new(
3870 format!("node:{idx:02}"),
3871 format!("node:{:02}", idx + 1),
3872 "calls",
3873 ))
3874 .unwrap();
3875 }
3876 store
3877 .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
3878 .unwrap();
3879
3880 assert!(
3881 store
3882 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
3883 .unwrap()
3884 .is_none()
3885 );
3886 let path = store
3887 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
3888 .unwrap()
3889 .unwrap();
3890 assert_eq!(path.hops, 79);
3891 assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
3892 assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
3893
3894 let direct = store
3895 .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
3896 .unwrap()
3897 .unwrap();
3898 assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
3899 }
3900
3901 #[test]
3902 fn sqlite_resolves_evidence_targets_with_indexed_properties() {
3903 let store = SqliteGraphStore::in_memory().unwrap();
3904 for node in [
3905 GraphNode::new("gbak-refresh", "backlog", "#refresh")
3906 .with_property("ref_id", "refresh")
3907 .with_property("handle", "backlog-handle"),
3908 GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
3909 .with_property("ref_id", "refresh"),
3910 GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
3911 .with_property("ref_id", "refresh"),
3912 ] {
3913 store.upsert_node(&node).unwrap();
3914 }
3915
3916 let by_ref = store
3917 .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
3918 .unwrap()
3919 .unwrap();
3920 assert_eq!(by_ref.id, "gbak-refresh");
3921 let by_handle = store
3922 .resolve_evidence_target("backlog-handle", &["backlog"])
3923 .unwrap()
3924 .unwrap();
3925 assert_eq!(by_handle.id, "gbak-refresh");
3926 }
3927
3928 #[test]
3929 fn sqlite_schema_migration_backfills_materialized_node_properties() {
3930 let conn = Connection::open_in_memory().unwrap();
3931 conn.execute_batch(
3932 r#"
3933 PRAGMA user_version = 2;
3934 CREATE TABLE graph_nodes (
3935 id TEXT PRIMARY KEY,
3936 kind TEXT NOT NULL,
3937 label TEXT NOT NULL,
3938 properties_json TEXT NOT NULL DEFAULT '{}',
3939 provenance_json TEXT NOT NULL DEFAULT '[]',
3940 freshness_json TEXT,
3941 row_hash TEXT,
3942 source_watermark TEXT
3943 );
3944 CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
3945 CREATE TABLE graph_edges (
3946 from_id TEXT NOT NULL,
3947 to_id TEXT NOT NULL,
3948 kind TEXT NOT NULL,
3949 properties_json TEXT NOT NULL DEFAULT '{}',
3950 provenance_json TEXT NOT NULL DEFAULT '[]',
3951 freshness_json TEXT,
3952 row_hash TEXT,
3953 source_watermark TEXT,
3954 PRIMARY KEY (from_id, to_id, kind)
3955 );
3956 CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
3957 CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
3958 CREATE TABLE graph_projection_versions (
3959 scope TEXT PRIMARY KEY,
3960 projection_version TEXT NOT NULL,
3961 content_hash TEXT,
3962 source_watermark TEXT,
3963 observed_at_unix INTEGER NOT NULL
3964 );
3965 CREATE TABLE graph_tombstones (
3966 row_key TEXT PRIMARY KEY,
3967 row_kind TEXT NOT NULL,
3968 deleted_at_unix INTEGER NOT NULL
3969 );
3970 INSERT INTO graph_nodes
3971 (id, kind, label, properties_json, provenance_json)
3972 VALUES
3973 ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
3974 ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]');
3975 INSERT INTO graph_edges
3976 (from_id, to_id, kind, properties_json, provenance_json)
3977 VALUES
3978 ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
3979 "#,
3980 )
3981 .unwrap();
3982
3983 let store = SqliteGraphStore::from_connection(conn).unwrap();
3984 let version: i64 = store
3985 .conn
3986 .pragma_query_value(None, "user_version", |row| row.get(0))
3987 .unwrap();
3988 assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
3989 let property_rows: usize = store
3990 .conn
3991 .query_row(
3992 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
3993 [],
3994 |row| row.get(0),
3995 )
3996 .unwrap();
3997 assert_eq!(property_rows, 2);
3998 let edge_property_rows: usize = store
3999 .conn
4000 .query_row(
4001 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4002 [],
4003 |row| row.get(0),
4004 )
4005 .unwrap();
4006 assert_eq!(edge_property_rows, 1);
4007 let edge = store
4008 .edge(&GraphEdge::stable_id(
4009 "topic:rooms",
4010 "topic:egress",
4011 "mentions",
4012 ))
4013 .unwrap()
4014 .unwrap();
4015 assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4016
4017 let page = store
4018 .paged_nodes_by_kind(
4019 "topic",
4020 GraphQueryOptions {
4021 property_filters: vec![GraphPropertyFilter {
4022 key: "domain".to_string(),
4023 value: "livekit".to_string(),
4024 }],
4025 ..GraphQueryOptions::default()
4026 },
4027 )
4028 .unwrap();
4029 assert_eq!(page.nodes[0].id, "topic:rooms");
4030 assert!(
4031 page.page
4032 .diagnostics
4033 .iter()
4034 .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
4035 "{:?}",
4036 page.page.diagnostics
4037 );
4038 }
4039
4040 #[test]
4041 fn sqlite_store_batches_reachable_nodes_by_kinds() {
4042 let store = SqliteGraphStore::in_memory().unwrap();
4043 for node in [
4044 GraphNode::new("start", "backlog", "start"),
4045 GraphNode::new("ctx", "worker_context", "context"),
4046 GraphNode::new("src", "source_handle", "source"),
4047 GraphNode::new("sem", "semantic_concept", "concept"),
4048 ] {
4049 store.upsert_node(&node).unwrap();
4050 }
4051 store
4052 .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
4053 .unwrap();
4054 store
4055 .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
4056 .unwrap();
4057 store
4058 .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
4059 .unwrap();
4060
4061 let rows = store
4062 .reachable_nodes_by_kinds(
4063 "start",
4064 &["worker_context", "source_handle", "semantic_concept"],
4065 2,
4066 8,
4067 )
4068 .unwrap();
4069 assert_eq!(rows["worker_context"][0].0.id, "ctx");
4070 assert_eq!(
4071 rows["source_handle"][0].1.nodes,
4072 vec!["start", "ctx", "src"]
4073 );
4074 assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4075 }
4076
4077 #[test]
4078 fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4079 let mut store = SqliteGraphStore::in_memory().unwrap();
4080 let source = GraphProvenance::new("fixture", "bulk");
4081 let mut projection = GraphProjection::default();
4082 for idx in 0..128 {
4083 projection.nodes.push(
4084 GraphNode::new(
4085 format!("node:{idx:03}"),
4086 if idx % 2 == 0 { "symbol" } else { "file" },
4087 format!("bulk node {idx:03}"),
4088 )
4089 .with_property("ordinal", idx.to_string())
4090 .with_provenance(source.clone())
4091 .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4092 );
4093 }
4094 for idx in 0..127 {
4095 projection.edges.push(
4096 GraphEdge::new(
4097 format!("node:{idx:03}"),
4098 format!("node:{:03}", idx + 1),
4099 "next",
4100 )
4101 .with_property("ordinal", idx.to_string())
4102 .with_provenance(source.clone())
4103 .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4104 );
4105 }
4106
4107 store
4108 .replace_projection_with_version(
4109 "root",
4110 &projection,
4111 Some("bulk-v1"),
4112 Some("commit-a".to_string()),
4113 )
4114 .unwrap();
4115
4116 projection
4117 .nodes
4118 .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4119 projection.edges.retain(|edge| {
4120 !edge.from_id.ends_with("000")
4121 && !edge.to_id.ends_with("000")
4122 && !edge.from_id.ends_with("064")
4123 && !edge.to_id.ends_with("064")
4124 });
4125 let refresh = store
4126 .replace_projection_with_version(
4127 "root",
4128 &projection,
4129 Some("bulk-v2"),
4130 Some("commit-b".to_string()),
4131 )
4132 .unwrap();
4133
4134 assert_eq!(store.all_nodes().unwrap().len(), 126);
4135 assert_eq!(store.all_edges().unwrap().len(), 124);
4136 assert_eq!(
4137 refresh.tombstoned_nodes,
4138 vec!["node:000".to_string(), "node:064".to_string()]
4139 );
4140 assert_eq!(refresh.tombstoned_edges.len(), 3);
4141 assert_eq!(refresh.deleted_nodes, 2);
4142 assert_eq!(refresh.deleted_edges, 3);
4143 assert_eq!(refresh.unchanged_nodes, 126);
4144 assert_eq!(refresh.unchanged_edges, 124);
4145 assert_eq!(refresh.upserted_nodes, 0);
4146 assert_eq!(refresh.upserted_edges, 0);
4147 assert_eq!(refresh.unchanged_properties, 250);
4148 assert_eq!(refresh.upserted_properties, 0);
4149 assert!(
4150 refresh
4151 .phase_timings
4152 .iter()
4153 .any(|phase| phase.name == "sqlite_node_staging"
4154 && phase.detail.contains("126 unchanged skipped")
4155 && phase.detail.contains("multi-row chunks up to 500 rows")),
4156 "{:?}",
4157 refresh.phase_timings
4158 );
4159 assert!(
4160 refresh
4161 .phase_timings
4162 .iter()
4163 .any(|phase| phase.name == "sqlite_node_staging"
4164 && phase.detail.contains("124 unchanged skipped")),
4165 "{:?}",
4166 refresh.phase_timings
4167 );
4168 let staged_node_properties: usize = store
4169 .conn
4170 .query_row(
4171 "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4172 [],
4173 |row| row.get(0),
4174 )
4175 .unwrap();
4176 let staged_edge_properties: usize = store
4177 .conn
4178 .query_row(
4179 "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4180 [],
4181 |row| row.get(0),
4182 )
4183 .unwrap();
4184 assert_eq!(staged_node_properties, 0);
4185 assert_eq!(staged_edge_properties, 0);
4186 assert!(
4187 refresh
4188 .phase_timings
4189 .iter()
4190 .any(|phase| phase.name == "sqlite_property_row_staging"
4191 && phase.detail.contains("new/changed node rows")),
4192 "{:?}",
4193 refresh.phase_timings
4194 );
4195 assert!(
4196 refresh
4197 .phase_timings
4198 .iter()
4199 .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4200 && phase.detail.contains("new/changed edge rows")),
4201 "{:?}",
4202 refresh.phase_timings
4203 );
4204 assert_eq!(
4205 store
4206 .projection_version("root")
4207 .unwrap()
4208 .unwrap()
4209 .source_watermark
4210 .as_deref(),
4211 Some("commit-b")
4212 );
4213 }
4214
4215 #[test]
4216 fn sqlite_reentrant_temp_table_guard_panics() {
4217 let store = SqliteGraphStore::in_memory().unwrap();
4218 store.temp_table_active.set(true);
4219 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4220 store.assert_not_in_temp_table_section();
4221 }));
4222 assert!(result.is_err());
4223 }
4224
4225 #[test]
4226 fn sqlite_temp_table_guard_clears_after_method() {
4227 let mut store = SqliteGraphStore::in_memory().unwrap();
4228 let projection = GraphProjection {
4229 nodes: vec![],
4230 edges: vec![],
4231 };
4232 store.replace_projection(&projection).unwrap();
4233 assert!(!store.temp_table_active.get());
4234 }
4235
4236 #[test]
4237 fn derive_ontology_summarizes_types_and_relations() {
4238 let mut store = SqliteGraphStore::in_memory().unwrap();
4239 let seed = GraphProjection {
4240 nodes: vec![
4241 GraphNode::new("fn:a", "function", "a"),
4242 GraphNode::new("fn:b", "function", "b"),
4243 GraphNode::new("mod:m", "module", "m"),
4244 ],
4245 edges: vec![
4246 GraphEdge::new("fn:a", "fn:b", "calls"),
4247 GraphEdge::new("mod:m", "fn:a", "contains"),
4248 ],
4249 };
4250 store.upsert_projection(&seed).unwrap();
4251
4252 let onto = store.derive_ontology().unwrap();
4253 let type_kinds: std::collections::BTreeSet<_> =
4254 onto.nodes.iter().map(|n| n.label.clone()).collect();
4255 assert!(type_kinds.contains("function"));
4256 assert!(type_kinds.contains("module"));
4257 assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
4258
4259 let rel: std::collections::BTreeSet<_> = onto
4260 .edges
4261 .iter()
4262 .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
4263 .collect();
4264 assert!(rel.contains(&(
4265 "ontology_type:function".into(),
4266 "ontology_relation:calls".into(),
4267 "ontology_type:function".into()
4268 )));
4269 assert!(rel.contains(&(
4270 "ontology_type:module".into(),
4271 "ontology_relation:contains".into(),
4272 "ontology_type:function".into()
4273 )));
4274
4275 let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
4276 assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
4277
4278 store.upsert_projection(&onto).unwrap();
4280 let onto2 = store.derive_ontology().unwrap();
4281 assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
4282 assert_eq!(onto2.nodes.len(), onto.nodes.len());
4283 assert_eq!(onto2.edges.len(), onto.edges.len());
4284 }
4285}