1use anyhow::{Context, Result, bail};
2use rusqlite::{
3 Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4 types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::cell::Cell;
8use std::collections::{BTreeMap, BTreeSet};
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15 ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16 ConvexRowsGraphClient, GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL,
17 GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY, GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY, GraphEdge,
18 GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath, GraphProjection, GraphPropertyFilter,
19 GraphProvenance, GraphQueryOptions, GraphQueryPage, GraphSemanticCandidate, GraphStore,
20 GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
21 SQLITE_GRAPH_SCHEMA_VERSION, SemanticSeededNeighborhoodExpansion,
22 SemanticSeededNeighborhoodOptions, TerseGraphEdge, TerseGraphNode, apply_graph_edge_query_page,
23 apply_graph_query_page, graph_edge_id, graph_semantic_cosine,
24 graph_semantic_top_candidates_by_property_scan, parse_graph_semantic_vector_property,
25 shortest_path_using_outgoing, stable_graph_edge_id,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "snake_case")]
30pub enum ReadOnlyRecovery {
31 SnapshotFallback,
32 SnapshotFallbackWal,
33}
34
35pub fn read_only_snapshot_recovery(
36 db_path: &Path,
37 err: &anyhow::Error,
38) -> Option<ReadOnlyRecovery> {
39 if !error_mentions_locked_db(err) {
40 return None;
41 }
42 if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
43 Some(ReadOnlyRecovery::SnapshotFallbackWal)
44 } else if rollback_journal_path(db_path).exists() {
45 Some(ReadOnlyRecovery::SnapshotFallback)
46 } else {
47 None
48 }
49}
50
51pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
52 let mut journal = db_path.as_os_str().to_os_string();
53 journal.push("-journal");
54 PathBuf::from(journal)
55}
56
57pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
58 let mut wal = db_path.as_os_str().to_os_string();
59 wal.push("-wal");
60 PathBuf::from(wal)
61}
62
63pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
64 let mut shm = db_path.as_os_str().to_os_string();
65 shm.push("-shm");
66 PathBuf::from(shm)
67}
68
69pub fn copy_read_only_snapshot(
70 db_path: &Path,
71 default_stem: &str,
72) -> Result<(PathBuf, Vec<PathBuf>)> {
73 let snapshot_path = snapshot_copy_path(db_path, default_stem);
74 std::fs::copy(db_path, &snapshot_path).with_context(|| {
75 format!(
76 "copying locked db {} to snapshot {}",
77 db_path.display(),
78 snapshot_path.display()
79 )
80 })?;
81 let mut cleanup_paths = vec![snapshot_path.clone()];
82 copy_optional_snapshot_sidecar(
83 &wal_sidecar_path(db_path),
84 &wal_sidecar_path(&snapshot_path),
85 &mut cleanup_paths,
86 )?;
87 copy_optional_snapshot_sidecar(
88 &shared_memory_sidecar_path(db_path),
89 &shared_memory_sidecar_path(&snapshot_path),
90 &mut cleanup_paths,
91 )?;
92 Ok((snapshot_path, cleanup_paths))
93}
94
95pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
96 err.chain()
97 .any(|cause| cause.to_string().contains("database is locked"))
98}
99
100fn copy_optional_snapshot_sidecar(
101 source_path: &Path,
102 snapshot_path: &Path,
103 cleanup_paths: &mut Vec<PathBuf>,
104) -> Result<()> {
105 match std::fs::copy(source_path, snapshot_path) {
106 Ok(_) => {
107 cleanup_paths.push(snapshot_path.to_path_buf());
108 Ok(())
109 }
110 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
111 Err(err) => Err(err).with_context(|| {
112 format!(
113 "copying SQLite sidecar {} to snapshot {}",
114 source_path.display(),
115 snapshot_path.display()
116 )
117 }),
118 }
119}
120
121fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
122 let nanos = SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .unwrap_or(Duration::ZERO)
125 .as_nanos();
126 let stem = db_path
127 .file_stem()
128 .and_then(|stem| stem.to_str())
129 .unwrap_or(default_stem);
130 let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
131 file_name.push(".db");
132 std::env::temp_dir().join(file_name)
133}
134const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
135const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
136
137pub struct SqliteGraphStore {
150 conn: Connection,
151 _snapshot_copy: Option<SnapshotCopyGuard>,
152 read_only_recovery: Option<ReadOnlyRecovery>,
153 temp_table_active: Cell<bool>,
154}
155
156pub struct SqliteReadOnlyConnection {
167 conn: Connection,
168 _snapshot_copy: Option<SnapshotCopyGuard>,
169 recovery: Option<ReadOnlyRecovery>,
170}
171
172static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
173
174impl SqliteReadOnlyConnection {
175 pub fn conn(&self) -> &Connection {
176 &self.conn
177 }
178
179 pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
180 self.recovery
181 }
182}
183
184struct SnapshotCopyGuard {
185 paths: Vec<PathBuf>,
186}
187
188impl Drop for SnapshotCopyGuard {
189 fn drop(&mut self) {
190 for path in &self.paths {
191 let _ = std::fs::remove_file(path);
192 }
193 }
194}
195
196fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
197 conn.busy_timeout(Duration::from_secs(5))?;
198 conn.pragma_update(None, "journal_mode", "WAL")?;
199 let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
200 if mode.to_lowercase() != "wal" {
201 bail!(
202 "graph substrate db {} requires WAL mode for concurrent reads, got {}",
203 db_path.display(),
204 mode
205 );
206 }
207 conn.pragma_update(
208 None,
209 "wal_autocheckpoint",
210 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
211 )?;
212 let checkpoint_pages: i64 =
213 conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
214 if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
215 bail!(
216 "graph substrate db {} requires wal_autocheckpoint={}, got {}",
217 db_path.display(),
218 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
219 checkpoint_pages
220 );
221 }
222 Ok(())
223}
224
225fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
226 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
227 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
228 for row in rows {
229 if row? == column {
230 return Ok(true);
231 }
232 }
233 Ok(false)
234}
235
236fn sqlite_table_exists(conn: &Connection, table: &str) -> Result<bool> {
237 conn.query_row(
238 r#"
239 SELECT EXISTS(
240 SELECT 1
241 FROM sqlite_master
242 WHERE type = 'table' AND name = ?1
243 )
244 "#,
245 [table],
246 |row| row.get::<_, bool>(0),
247 )
248 .map_err(Into::into)
249}
250
251fn add_column_if_missing(
252 conn: &Connection,
253 table: &str,
254 column: &str,
255 definition: &str,
256) -> Result<()> {
257 if !sqlite_column_exists(conn, table, column)? {
258 conn.execute(
259 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
260 [],
261 )?;
262 }
263 Ok(())
264}
265
266fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
267 if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
268 return Ok(());
269 }
270 let rows = {
271 let mut stmt = conn.prepare(
272 r#"
273 SELECT from_id, to_id, kind
274 FROM graph_edges
275 WHERE edge_key IS NULL OR edge_key = ''
276 ORDER BY from_id, kind, to_id
277 "#,
278 )?;
279 collect_rows(stmt.query_map([], |row| {
280 Ok((
281 row.get::<_, String>(0)?,
282 row.get::<_, String>(1)?,
283 row.get::<_, String>(2)?,
284 ))
285 })?)?
286 };
287 let mut update = conn.prepare(
288 r#"
289 UPDATE graph_edges
290 SET edge_key = ?4
291 WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
292 "#,
293 )?;
294 for (from_id, to_id, kind) in rows {
295 update.execute((
296 &from_id,
297 &to_id,
298 &kind,
299 stable_graph_edge_id(&from_id, &to_id, &kind),
300 ))?;
301 }
302 Ok(())
303}
304
305fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
306 if old_version < 2 {
307 add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
308 add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
309 add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
310 add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
311 }
312 if old_version < 3 {
313 rebuild_graph_node_properties(conn)?;
314 }
315 if old_version < 4 {
316 ensure_sqlite_graph_operator_stats_schema(conn)?;
317 }
318 if old_version < 5 {
319 add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
320 backfill_graph_edge_keys(conn)?;
321 ensure_sqlite_graph_edge_properties_schema(conn)?;
322 rebuild_graph_edge_properties(conn)?;
323 }
324 if old_version < 6 {
325 ensure_sqlite_graph_semantic_vectors_schema(conn)?;
326 rebuild_graph_node_semantic_vectors(conn)?;
327 }
328 Ok(())
329}
330
331fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
332 conn.execute_batch(
333 r#"
334 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
335 ON graph_nodes(kind, label, id);
336 CREATE TABLE IF NOT EXISTS graph_operator_stats (
337 scope TEXT PRIMARY KEY,
338 nodes INTEGER NOT NULL,
339 edges INTEGER NOT NULL,
340 tombstone_nodes INTEGER NOT NULL,
341 tombstone_edges INTEGER NOT NULL,
342 file_size_bytes INTEGER,
343 freelist_bytes INTEGER,
344 observed_at_unix INTEGER NOT NULL
345 );
346 "#,
347 )?;
348 Ok(())
349}
350
351fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
352 conn.execute_batch(
353 r#"
354 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
355 ON graph_edges(edge_key);
356 CREATE TABLE IF NOT EXISTS graph_edge_properties (
357 edge_key TEXT NOT NULL,
358 key TEXT NOT NULL,
359 value TEXT NOT NULL,
360 PRIMARY KEY (edge_key, key),
361 FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
362 );
363 CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
364 ON graph_edge_properties(key, value, edge_key);
365 "#,
366 )?;
367 Ok(())
368}
369
370fn ensure_sqlite_graph_semantic_vectors_schema(conn: &Connection) -> Result<()> {
371 conn.execute_batch(
372 r#"
373 CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
374 node_id TEXT PRIMARY KEY,
375 kind TEXT NOT NULL,
376 model TEXT NOT NULL,
377 dimensions INTEGER NOT NULL,
378 vector_blob BLOB NOT NULL,
379 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
380 );
381 CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
382 ON graph_node_semantic_vectors(kind, dimensions, node_id);
383 "#,
384 )?;
385 Ok(())
386}
387
388fn replace_node_properties(
389 conn: &Connection,
390 node_id: &str,
391 properties: &BTreeMap<String, String>,
392) -> Result<()> {
393 conn.execute(
394 "DELETE FROM graph_node_properties WHERE node_id = ?1",
395 [node_id],
396 )?;
397 let mut insert = conn.prepare(
398 r#"
399 INSERT INTO graph_node_properties (node_id, key, value)
400 VALUES (?1, ?2, ?3)
401 ON CONFLICT(node_id, key) DO UPDATE SET
402 value = excluded.value
403 "#,
404 )?;
405 for (key, value) in properties {
406 insert.execute((node_id, key, value))?;
407 }
408 Ok(())
409}
410
411struct GraphSemanticVectorRow {
412 model: String,
413 dimensions: usize,
414 vector_blob: Vec<u8>,
415}
416
417fn graph_semantic_vector_row(
418 properties: &BTreeMap<String, String>,
419) -> Option<GraphSemanticVectorRow> {
420 let vector = properties
421 .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
422 .and_then(|value| parse_graph_semantic_vector_property(value))?;
423 Some(GraphSemanticVectorRow {
424 model: properties
425 .get(GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY)
426 .cloned()
427 .unwrap_or_else(|| GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL.to_string()),
428 dimensions: vector.len(),
429 vector_blob: semantic_vector_to_blob(&vector),
430 })
431}
432
433fn semantic_vector_to_blob(vector: &[f64]) -> Vec<u8> {
434 let mut blob = Vec::with_capacity(std::mem::size_of_val(vector));
435 for value in vector {
436 blob.extend_from_slice(&value.to_le_bytes());
437 }
438 blob
439}
440
441fn semantic_vector_from_blob(blob: &[u8], dimensions: usize) -> Option<Vec<f64>> {
442 if dimensions == 0 || blob.len() != dimensions * std::mem::size_of::<f64>() {
443 return None;
444 }
445 let mut vector = Vec::with_capacity(dimensions);
446 for chunk in blob.chunks_exact(std::mem::size_of::<f64>()) {
447 let value = f64::from_le_bytes(chunk.try_into().ok()?);
448 if !value.is_finite() {
449 return None;
450 }
451 vector.push(value);
452 }
453 Some(vector)
454}
455
456fn replace_node_semantic_vector(
457 conn: &Connection,
458 node_id: &str,
459 kind: &str,
460 properties: &BTreeMap<String, String>,
461) -> Result<()> {
462 conn.execute(
463 "DELETE FROM graph_node_semantic_vectors WHERE node_id = ?1",
464 [node_id],
465 )?;
466 let Some(row) = graph_semantic_vector_row(properties) else {
467 return Ok(());
468 };
469 conn.execute(
470 r#"
471 INSERT INTO graph_node_semantic_vectors
472 (node_id, kind, model, dimensions, vector_blob)
473 VALUES (?1, ?2, ?3, ?4, ?5)
474 "#,
475 (
476 node_id,
477 kind,
478 row.model,
479 row.dimensions as i64,
480 row.vector_blob,
481 ),
482 )?;
483 Ok(())
484}
485
486fn replace_edge_properties(
487 conn: &Connection,
488 edge_key: &str,
489 properties: &BTreeMap<String, String>,
490) -> Result<()> {
491 conn.execute(
492 "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
493 [edge_key],
494 )?;
495 let mut insert = conn.prepare(
496 r#"
497 INSERT INTO graph_edge_properties (edge_key, key, value)
498 VALUES (?1, ?2, ?3)
499 ON CONFLICT(edge_key, key) DO UPDATE SET
500 value = excluded.value
501 "#,
502 )?;
503 for (key, value) in properties {
504 insert.execute((edge_key, key, value))?;
505 }
506 Ok(())
507}
508
509fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
510 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
511 return Ok(());
512 }
513 conn.execute_batch(
514 r#"
515 DELETE FROM graph_node_properties;
516 INSERT INTO graph_node_properties (node_id, key, value)
517 SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
518 FROM graph_nodes, json_each(graph_nodes.properties_json)
519 WHERE json_each.key IS NOT NULL
520 AND json_each.value IS NOT NULL
521 "#,
522 )?;
523 Ok(())
524}
525
526fn rebuild_graph_node_semantic_vectors(conn: &Connection) -> Result<()> {
527 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")?
528 || !sqlite_table_exists(conn, "graph_node_semantic_vectors")?
529 {
530 return Ok(());
531 }
532 conn.execute("DELETE FROM graph_node_semantic_vectors", [])?;
533 let rows = {
534 let mut stmt = conn.prepare(
535 r#"
536 SELECT id, kind, properties_json
537 FROM graph_nodes
538 WHERE json_extract(properties_json, '$.embedding') IS NOT NULL
539 ORDER BY id
540 "#,
541 )?;
542 collect_rows(stmt.query_map([], |row| {
543 Ok((
544 row.get::<_, String>(0)?,
545 row.get::<_, String>(1)?,
546 row.get::<_, String>(2)?,
547 ))
548 })?)?
549 };
550 for (node_id, kind, properties_json) in rows {
551 let properties: BTreeMap<String, String> = serde_json::from_str(&properties_json)
552 .with_context(|| format!("parsing semantic properties for graph node {node_id}"))?;
553 replace_node_semantic_vector(conn, &node_id, &kind, &properties)?;
554 }
555 Ok(())
556}
557
558fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
559 if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
560 || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
561 {
562 return Ok(());
563 }
564 conn.execute_batch(
565 r#"
566 DELETE FROM graph_edge_properties;
567 INSERT INTO graph_edge_properties (edge_key, key, value)
568 SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
569 FROM graph_edges, json_each(graph_edges.properties_json)
570 WHERE graph_edges.edge_key IS NOT NULL
571 AND graph_edges.edge_key <> ''
572 AND json_each.key IS NOT NULL
573 AND json_each.value IS NOT NULL
574 "#,
575 )?;
576 Ok(())
577}
578
579pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
580 let conn = Connection::open_with_flags(
581 db_path,
582 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
583 )
584 .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
585 conn.busy_timeout(Duration::from_secs(5))?;
586 Ok(SqliteReadOnlyConnection {
587 conn,
588 _snapshot_copy: None,
589 recovery: None,
590 })
591}
592
593pub fn open_graph_read_only_connection_resilient(
594 db_path: &Path,
595) -> Result<SqliteReadOnlyConnection> {
596 match open_graph_read_only_connection(db_path).and_then(|connection| {
597 connection
598 .conn
599 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
600 Ok(connection)
601 }) {
602 Ok(connection) => Ok(connection),
603 Err(err) => match read_only_snapshot_recovery(db_path, &err) {
604 Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
605 None => Err(err),
606 },
607 }
608}
609
610fn open_graph_read_only_snapshot(
611 db_path: &Path,
612 recovery: ReadOnlyRecovery,
613) -> Result<SqliteReadOnlyConnection> {
614 let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
615 let conn = Connection::open_with_flags(
616 &snapshot_path,
617 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
618 )
619 .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
620 conn.busy_timeout(Duration::from_secs(5))?;
621 Ok(SqliteReadOnlyConnection {
622 conn,
623 _snapshot_copy: Some(SnapshotCopyGuard {
624 paths: cleanup_paths,
625 }),
626 recovery: Some(recovery),
627 })
628}
629
630#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
631pub struct SqliteProjectionRefreshPhase {
632 pub name: String,
633 pub duration_micros: u128,
634 pub detail: String,
635}
636
637#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
638pub struct SqliteProjectionRefresh {
639 pub scope: String,
640 pub projection_version: String,
641 pub source_watermark: Option<String>,
642 pub tombstoned_nodes: Vec<String>,
643 pub tombstoned_edges: Vec<String>,
644 pub upserted_nodes: usize,
645 pub upserted_edges: usize,
646 pub unchanged_nodes: usize,
647 pub unchanged_edges: usize,
648 pub upserted_properties: usize,
649 pub unchanged_properties: usize,
650 pub deleted_properties: usize,
651 pub deleted_nodes: usize,
652 pub deleted_edges: usize,
653 pub pruned_tombstones: usize,
654 pub file_size_bytes_before: Option<u64>,
655 pub file_size_bytes_after: Option<u64>,
656 pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct SqliteProjectionVersion {
661 pub projection_version: String,
662 pub content_hash: Option<String>,
663 pub source_watermark: Option<String>,
664}
665
666fn sqlite_refresh_phase_timing(
667 name: &str,
668 started: Instant,
669 detail: &str,
670) -> SqliteProjectionRefreshPhase {
671 SqliteProjectionRefreshPhase {
672 name: name.to_string(),
673 duration_micros: started.elapsed().as_micros(),
674 detail: detail.to_string(),
675 }
676}
677
678fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
679 let row = format!(
680 "({})",
681 (0..column_count)
682 .map(|_| "?")
683 .collect::<Vec<_>>()
684 .join(", ")
685 );
686 (0..row_count)
687 .map(|_| row.as_str())
688 .collect::<Vec<_>>()
689 .join(", ")
690}
691
692fn sqlite_stage_all_ids(
693 tx: &rusqlite::Transaction<'_>,
694 nodes: &[GraphNode],
695 edges: &[GraphEdge],
696) -> Result<()> {
697 for chunk in nodes
698 .iter()
699 .map(|n| &n.id)
700 .collect::<Vec<_>>()
701 .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
702 {
703 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
704 let sql = format!(
705 "INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}",
706 placeholders.join(", ")
707 );
708 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
709 tx.execute(&sql, params_from_iter(values.iter()))?;
710 }
711 for chunk in edges
712 .iter()
713 .map(graph_edge_id)
714 .collect::<Vec<_>>()
715 .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
716 {
717 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
718 let sql = format!(
719 "INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}",
720 placeholders.join(", ")
721 );
722 let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
723 tx.execute(&sql, params_from_iter(values.iter()))?;
724 }
725 Ok(())
726}
727
728fn sqlite_stage_projection_nodes(
729 tx: &rusqlite::Transaction<'_>,
730 nodes: &[&GraphNode],
731 source_watermark: Option<&str>,
732) -> Result<()> {
733 let mut insert_semantic_vector = tx.prepare(
734 r#"
735 INSERT INTO next_graph_node_semantic_vectors
736 (node_id, kind, model, dimensions, vector_blob)
737 VALUES (?1, ?2, ?3, ?4, ?5)
738 "#,
739 )?;
740 for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
741 let sql = format!(
742 r#"
743 INSERT INTO next_graph_nodes
744 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
745 VALUES {}
746 "#,
747 sqlite_graph_staging_placeholders(8, chunk.len())
748 );
749 let mut values = Vec::with_capacity(chunk.len() * 8);
750 for node in chunk {
751 values.push(Value::Text(node.id.clone()));
752 values.push(Value::Text(node.kind.clone()));
753 values.push(Value::Text(node.label.clone()));
754 values.push(Value::Text(to_json(&node.properties)?));
755 values.push(Value::Text(to_json(&node.provenance)?));
756 values.push(
757 optional_to_json(&node.freshness)?
758 .map(Value::Text)
759 .unwrap_or(Value::Null),
760 );
761 values.push(Value::Text(row_hash(node)?));
762 values.push(
763 source_watermark
764 .map(|watermark| Value::Text(watermark.to_string()))
765 .unwrap_or(Value::Null),
766 );
767 }
768 tx.execute(&sql, params_from_iter(values))?;
769 for node in chunk {
770 let Some(row) = graph_semantic_vector_row(&node.properties) else {
771 continue;
772 };
773 insert_semantic_vector.execute((
774 &node.id,
775 &node.kind,
776 row.model,
777 row.dimensions as i64,
778 row.vector_blob,
779 ))?;
780 }
781 }
782 Ok(())
783}
784
785fn sqlite_stage_projection_edges(
786 tx: &rusqlite::Transaction<'_>,
787 edges: &[&GraphEdge],
788 source_watermark: Option<&str>,
789) -> Result<()> {
790 for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
791 let sql = format!(
792 r#"
793 INSERT INTO next_graph_edges
794 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
795 VALUES {}
796 "#,
797 sqlite_graph_staging_placeholders(9, chunk.len())
798 );
799 let mut values = Vec::with_capacity(chunk.len() * 9);
800 for edge in chunk {
801 values.push(Value::Text(graph_edge_id(edge)));
802 values.push(Value::Text(edge.from_id.clone()));
803 values.push(Value::Text(edge.to_id.clone()));
804 values.push(Value::Text(edge.kind.clone()));
805 values.push(Value::Text(to_json(&edge.properties)?));
806 values.push(Value::Text(to_json(&edge.provenance)?));
807 values.push(
808 optional_to_json(&edge.freshness)?
809 .map(Value::Text)
810 .unwrap_or(Value::Null),
811 );
812 values.push(Value::Text(row_hash(edge)?));
813 values.push(
814 source_watermark
815 .map(|watermark| Value::Text(watermark.to_string()))
816 .unwrap_or(Value::Null),
817 );
818 }
819 tx.execute(&sql, params_from_iter(values))?;
820 }
821 Ok(())
822}
823
824impl SqliteGraphStore {
825 fn assert_not_in_temp_table_section(&self) {
826 if self.temp_table_active.get() {
827 panic!(
828 "SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection"
829 );
830 }
831 }
832
833 pub fn open(db_path: &Path) -> Result<Self> {
834 if let Some(parent) = db_path.parent() {
835 std::fs::create_dir_all(parent)
836 .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
837 }
838 let conn = Connection::open(db_path)
839 .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
840 configure_writable_graph_connection(&conn, db_path)?;
841 Self::from_connection(conn)
842 }
843
844 pub fn in_memory() -> Result<Self> {
845 let conn = Connection::open_in_memory()?;
846 conn.busy_timeout(Duration::from_secs(5))?;
847 Self::from_connection(conn)
848 }
849
850 pub fn open_read_only(db_path: &Path) -> Result<Self> {
851 let connection = open_graph_read_only_connection(db_path)?;
852 Self::from_read_only_connection(connection)
853 }
854
855 pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
856 let connection = open_graph_read_only_connection_resilient(db_path)?;
857 Self::from_read_only_connection(connection)
858 }
859
860 pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
861 self.read_only_recovery
862 }
863
864 pub fn has_user_triggers(&self) -> Result<bool> {
865 self.conn
866 .query_row(
867 r#"
868 SELECT EXISTS(
869 SELECT 1
870 FROM sqlite_master
871 WHERE type = 'trigger'
872 AND name NOT LIKE 'sqlite_%'
873 )
874 "#,
875 [],
876 |row| row.get::<_, bool>(0),
877 )
878 .map_err(Into::into)
879 }
880
881 fn from_connection(conn: Connection) -> Result<Self> {
882 conn.pragma_update(None, "foreign_keys", "ON")?;
883 let user_version: i64 =
884 conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
885 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
886 bail!(
887 "graph.db schema version {} is newer than supported version {}",
888 user_version,
889 SQLITE_GRAPH_SCHEMA_VERSION
890 );
891 }
892 conn.execute_batch(
893 r#"
894 CREATE TABLE IF NOT EXISTS graph_nodes (
895 id TEXT PRIMARY KEY,
896 kind TEXT NOT NULL,
897 label TEXT NOT NULL,
898 properties_json TEXT NOT NULL DEFAULT '{}',
899 provenance_json TEXT NOT NULL DEFAULT '[]',
900 freshness_json TEXT,
901 row_hash TEXT,
902 source_watermark TEXT
903 );
904 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
905 ON graph_nodes(kind);
906 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
907 ON graph_nodes(kind, label, id);
908
909 CREATE TABLE IF NOT EXISTS graph_edges (
910 edge_key TEXT NOT NULL UNIQUE,
911 from_id TEXT NOT NULL,
912 to_id TEXT NOT NULL,
913 kind TEXT NOT NULL,
914 properties_json TEXT NOT NULL DEFAULT '{}',
915 provenance_json TEXT NOT NULL DEFAULT '[]',
916 freshness_json TEXT,
917 row_hash TEXT,
918 source_watermark TEXT,
919 PRIMARY KEY (from_id, to_id, kind),
920 FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
921 FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
922 );
923 CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
924 ON graph_edges(from_id, kind);
925 CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
926 ON graph_edges(to_id, kind);
927
928 CREATE TABLE IF NOT EXISTS graph_projection_versions (
929 scope TEXT PRIMARY KEY,
930 projection_version TEXT NOT NULL,
931 content_hash TEXT,
932 source_watermark TEXT,
933 observed_at_unix INTEGER NOT NULL
934 );
935
936 CREATE TABLE IF NOT EXISTS graph_tombstones (
937 row_key TEXT PRIMARY KEY,
938 row_kind TEXT NOT NULL,
939 deleted_at_unix INTEGER NOT NULL
940 );
941
942 CREATE TABLE IF NOT EXISTS graph_node_properties (
943 node_id TEXT NOT NULL,
944 key TEXT NOT NULL,
945 value TEXT NOT NULL,
946 PRIMARY KEY (node_id, key),
947 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
948 );
949 CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
950 ON graph_node_properties(key, value, node_id);
951
952 CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
953 node_id TEXT PRIMARY KEY,
954 kind TEXT NOT NULL,
955 model TEXT NOT NULL,
956 dimensions INTEGER NOT NULL,
957 vector_blob BLOB NOT NULL,
958 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
959 );
960 CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
961 ON graph_node_semantic_vectors(kind, dimensions, node_id);
962
963 CREATE TABLE IF NOT EXISTS graph_operator_stats (
964 scope TEXT PRIMARY KEY,
965 nodes INTEGER NOT NULL,
966 edges INTEGER NOT NULL,
967 tombstone_nodes INTEGER NOT NULL,
968 tombstone_edges INTEGER NOT NULL,
969 file_size_bytes INTEGER,
970 freelist_bytes INTEGER,
971 observed_at_unix INTEGER NOT NULL
972 );
973 "#,
974 )?;
975 if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
976 migrate_sqlite_graph_schema(&conn, user_version)?;
977 conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
978 }
979 Ok(Self {
980 conn,
981 _snapshot_copy: None,
982 read_only_recovery: None,
983 temp_table_active: Cell::new(false),
984 })
985 }
986
987 fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
988 connection.conn.pragma_update(None, "foreign_keys", "ON")?;
989 let user_version: i64 =
990 connection
991 .conn
992 .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
993 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
994 bail!(
995 "graph.db schema version {} is newer than supported version {}",
996 user_version,
997 SQLITE_GRAPH_SCHEMA_VERSION
998 );
999 }
1000 connection
1001 .conn
1002 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
1003 Ok(Self {
1004 conn: connection.conn,
1005 _snapshot_copy: connection._snapshot_copy,
1006 read_only_recovery: connection.recovery,
1007 temp_table_active: Cell::new(false),
1008 })
1009 }
1010
1011 pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1012 self.replace_projection_with_version("root", projection, None, None)
1013 .map(|_| ())
1014 }
1015
1016 pub fn replace_projection_with_version(
1017 &mut self,
1018 scope: impl Into<String>,
1019 projection: &GraphProjection,
1020 projection_version: Option<&str>,
1021 source_watermark: Option<String>,
1022 ) -> Result<SqliteProjectionRefresh> {
1023 self.assert_not_in_temp_table_section();
1024 self.temp_table_active.set(true);
1025 let scope = scope.into();
1026 let result = self.replace_projection_with_version_fallible(
1027 scope,
1028 projection,
1029 projection_version,
1030 source_watermark,
1031 );
1032 self.temp_table_active.set(false);
1033 if let Ok(ref refresh) = result {
1034 let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
1035 let autocheckpoint = if total_rows > 10000 {
1036 8192
1037 } else if total_rows > 1000 {
1038 4096
1039 } else {
1040 2048
1041 };
1042 let _ = self
1043 .conn
1044 .pragma_update(None, "wal_autocheckpoint", autocheckpoint);
1045 let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
1046 }
1047 result
1048 }
1049
1050 fn replace_projection_with_version_fallible(
1051 &mut self,
1052 scope: String,
1053 projection: &GraphProjection,
1054 projection_version: Option<&str>,
1055 source_watermark: Option<String>,
1056 ) -> Result<SqliteProjectionRefresh> {
1057 let projection_version = projection_version
1058 .map(str::to_string)
1059 .or_else(|| projection_version_from_nodes(&projection.nodes))
1060 .unwrap_or_else(|| "unversioned".to_string());
1061 let projection_hash = projection_hash_from_nodes(&projection.nodes);
1062 let observed_at_unix = unix_now();
1063 let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
1064 let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
1065 let mut phase_timings = Vec::new();
1066
1067 let tx = self.conn.transaction()?;
1068 let started = Instant::now();
1069 tx.execute_batch(
1070 r#"
1071 CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
1072 id TEXT PRIMARY KEY,
1073 kind TEXT NOT NULL,
1074 label TEXT NOT NULL,
1075 properties_json TEXT NOT NULL,
1076 provenance_json TEXT NOT NULL,
1077 freshness_json TEXT,
1078 row_hash TEXT NOT NULL,
1079 source_watermark TEXT
1080 );
1081 CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
1082 edge_key TEXT PRIMARY KEY,
1083 from_id TEXT NOT NULL,
1084 to_id TEXT NOT NULL,
1085 kind TEXT NOT NULL,
1086 properties_json TEXT NOT NULL,
1087 provenance_json TEXT NOT NULL,
1088 freshness_json TEXT,
1089 row_hash TEXT NOT NULL,
1090 source_watermark TEXT
1091 );
1092 CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
1093 ON next_graph_edges(from_id, to_id, kind);
1094 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
1095 node_id TEXT NOT NULL,
1096 key TEXT NOT NULL,
1097 value TEXT NOT NULL,
1098 PRIMARY KEY (node_id, key)
1099 );
1100 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_semantic_vectors (
1101 node_id TEXT PRIMARY KEY,
1102 kind TEXT NOT NULL,
1103 model TEXT NOT NULL,
1104 dimensions INTEGER NOT NULL,
1105 vector_blob BLOB NOT NULL
1106 );
1107 CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
1108 edge_key TEXT NOT NULL,
1109 key TEXT NOT NULL,
1110 value TEXT NOT NULL,
1111 PRIMARY KEY (edge_key, key)
1112 );
1113 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
1114 id TEXT PRIMARY KEY
1115 );
1116 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
1117 edge_key TEXT PRIMARY KEY
1118 );
1119 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
1120 id TEXT PRIMARY KEY
1121 );
1122 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
1123 edge_key TEXT PRIMARY KEY
1124 );
1125 DELETE FROM next_graph_nodes;
1126 DELETE FROM next_graph_edges;
1127 DELETE FROM next_graph_node_properties;
1128 DELETE FROM next_graph_node_semantic_vectors;
1129 DELETE FROM next_graph_edge_properties;
1130 DELETE FROM next_graph_changed_nodes;
1131 DELETE FROM next_graph_changed_edges;
1132 DELETE FROM next_graph_all_node_ids;
1133 DELETE FROM next_graph_all_edge_keys;
1134 "#,
1135 )?;
1136 phase_timings.push(sqlite_refresh_phase_timing(
1137 "sqlite_temp_table_prepare",
1138 started,
1139 "create and clear refresh staging tables before row loading",
1140 ));
1141 let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
1142 (
1143 projection.nodes.iter().collect(),
1144 projection.edges.iter().collect(),
1145 0usize,
1146 0usize,
1147 )
1148 } else {
1149 let existing_node_hashes: BTreeMap<String, String> = {
1150 let mut stmt =
1151 tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
1152 let rows = stmt.query_map([], |row| {
1153 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1154 })?;
1155 collect_rows(rows)?.into_iter().collect()
1156 };
1157 let existing_edge_hashes: BTreeMap<String, String> = {
1158 let mut stmt = tx.prepare(
1159 "SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL",
1160 )?;
1161 let rows = stmt.query_map([], |row| {
1162 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1163 })?;
1164 collect_rows(rows)?.into_iter().collect()
1165 };
1166 let cn: Vec<&GraphNode> = projection
1167 .nodes
1168 .iter()
1169 .filter(|n| {
1170 let hash = row_hash(*n).ok();
1171 hash.as_ref()
1172 .is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
1173 })
1174 .collect();
1175 let ce: Vec<&GraphEdge> = projection
1176 .edges
1177 .iter()
1178 .filter(|e| {
1179 let hash = row_hash(*e).ok();
1180 let ek = graph_edge_id(e);
1181 hash.as_ref()
1182 .is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
1183 })
1184 .collect();
1185 let sn = projection.nodes.len() - cn.len();
1186 let se = projection.edges.len() - ce.len();
1187 (cn, ce, sn, se)
1188 };
1189 {
1190 let started = Instant::now();
1191 sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
1192 sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
1193 sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
1194 phase_timings.push(sqlite_refresh_phase_timing(
1195 "sqlite_node_staging",
1196 started,
1197 &format!(
1198 "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
1199 changed_nodes.len(),
1200 skipped_nodes,
1201 changed_edges.len(),
1202 skipped_edges,
1203 SQLITE_GRAPH_STAGING_CHUNK_ROWS
1204 ),
1205 ));
1206 }
1207 {
1208 let started = Instant::now();
1209 let changed_nodes_sql = if force_refresh_writes {
1210 r#"
1211 INSERT INTO next_graph_changed_nodes (id)
1212 SELECT id
1213 FROM next_graph_nodes
1214 "#
1215 } else {
1216 r#"
1217 INSERT INTO next_graph_changed_nodes (id)
1218 SELECT n.id
1219 FROM next_graph_nodes n
1220 LEFT JOIN graph_nodes g ON g.id = n.id
1221 WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
1222 "#
1223 };
1224 tx.execute(changed_nodes_sql, [])?;
1225 tx.execute_batch(
1226 r#"
1227 INSERT INTO next_graph_node_properties (node_id, key, value)
1228 SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
1229 FROM next_graph_nodes n
1230 JOIN next_graph_changed_nodes c ON c.id = n.id,
1231 json_each(n.properties_json)
1232 WHERE json_each.key IS NOT NULL
1233 AND json_each.value IS NOT NULL
1234 "#,
1235 )?;
1236 phase_timings.push(sqlite_refresh_phase_timing(
1237 "sqlite_property_row_staging",
1238 started,
1239 "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1240 ));
1241 }
1242 {
1243 let started = Instant::now();
1244 let changed_edges_sql = if force_refresh_writes {
1245 r#"
1246 INSERT INTO next_graph_changed_edges (edge_key)
1247 SELECT edge_key
1248 FROM next_graph_edges
1249 "#
1250 } else {
1251 r#"
1252 INSERT INTO next_graph_changed_edges (edge_key)
1253 SELECT n.edge_key
1254 FROM next_graph_edges n
1255 LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1256 WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1257 "#
1258 };
1259 tx.execute(changed_edges_sql, [])?;
1260 tx.execute_batch(
1261 r#"
1262 INSERT INTO next_graph_edge_properties (edge_key, key, value)
1263 SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1264 FROM next_graph_edges e
1265 JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1266 json_each(e.properties_json)
1267 WHERE json_each.key IS NOT NULL
1268 AND json_each.value IS NOT NULL
1269 "#,
1270 )?;
1271 phase_timings.push(sqlite_refresh_phase_timing(
1272 "sqlite_edge_property_row_staging",
1273 started,
1274 "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1275 ));
1276 }
1277
1278 let delta_started = Instant::now();
1279 let tombstoned_nodes = {
1280 let sql = if force_refresh_writes {
1281 r#"
1282 SELECT g.id
1283 FROM graph_nodes g
1284 LEFT JOIN next_graph_nodes n ON n.id = g.id
1285 WHERE n.id IS NULL
1286 ORDER BY g.id
1287 "#
1288 } else {
1289 r#"
1290 SELECT g.id
1291 FROM graph_nodes g
1292 LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1293 WHERE n.id IS NULL
1294 ORDER BY g.id
1295 "#
1296 };
1297 let mut stmt = tx.prepare(sql)?;
1298 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1299 };
1300 let tombstoned_edges = {
1301 let sql = if force_refresh_writes {
1302 r#"
1303 SELECT g.edge_key
1304 FROM graph_edges g
1305 LEFT JOIN next_graph_edges n
1306 ON n.edge_key = g.edge_key
1307 WHERE n.edge_key IS NULL
1308 ORDER BY g.edge_key
1309 "#
1310 } else {
1311 r#"
1312 SELECT g.edge_key
1313 FROM graph_edges g
1314 LEFT JOIN next_graph_all_edge_keys n
1315 ON n.edge_key = g.edge_key
1316 WHERE n.edge_key IS NULL
1317 ORDER BY g.edge_key
1318 "#
1319 };
1320 let mut stmt = tx.prepare(sql)?;
1321 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1322 };
1323 let unchanged_nodes: usize = if force_refresh_writes {
1324 tx.query_row(
1325 r#"
1326 SELECT COUNT(*)
1327 FROM next_graph_nodes n
1328 JOIN graph_nodes g ON g.id = n.id
1329 WHERE g.row_hash = n.row_hash
1330 "#,
1331 [],
1332 |row| row.get(0),
1333 )?
1334 } else {
1335 skipped_nodes
1336 };
1337 let unchanged_edges: usize = if force_refresh_writes {
1338 tx.query_row(
1339 r#"
1340 SELECT COUNT(*)
1341 FROM next_graph_edges n
1342 JOIN graph_edges g
1343 ON g.edge_key = n.edge_key
1344 WHERE g.row_hash = n.row_hash
1345 "#,
1346 [],
1347 |row| row.get(0),
1348 )?
1349 } else {
1350 skipped_edges
1351 };
1352 let reused_owner_node_properties: usize = if force_refresh_writes {
1353 tx.query_row(
1354 r#"
1355 SELECT COUNT(*)
1356 FROM graph_node_properties g
1357 JOIN next_graph_nodes n ON n.id = g.node_id
1358 LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1359 WHERE c.id IS NULL
1360 "#,
1361 [],
1362 |row| row.get(0),
1363 )?
1364 } else {
1365 tx.query_row(
1366 r#"
1367 SELECT COUNT(*)
1368 FROM graph_node_properties g
1369 JOIN next_graph_all_node_ids a ON a.id = g.node_id
1370 LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1371 WHERE c.id IS NULL
1372 "#,
1373 [],
1374 |row| row.get(0),
1375 )?
1376 };
1377 let reused_owner_edge_properties: usize = if force_refresh_writes {
1378 tx.query_row(
1379 r#"
1380 SELECT COUNT(*)
1381 FROM graph_edge_properties g
1382 JOIN next_graph_edges n ON n.edge_key = g.edge_key
1383 LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1384 WHERE c.edge_key IS NULL
1385 "#,
1386 [],
1387 |row| row.get(0),
1388 )?
1389 } else {
1390 tx.query_row(
1391 r#"
1392 SELECT COUNT(*)
1393 FROM graph_edge_properties g
1394 JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1395 LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1396 WHERE c.edge_key IS NULL
1397 "#,
1398 [],
1399 |row| row.get(0),
1400 )?
1401 };
1402 let unchanged_changed_node_properties: usize = tx.query_row(
1403 r#"
1404 SELECT COUNT(*)
1405 FROM next_graph_node_properties n
1406 JOIN graph_node_properties g
1407 ON g.node_id = n.node_id AND g.key = n.key
1408 WHERE g.value = n.value
1409 "#,
1410 [],
1411 |row| row.get(0),
1412 )?;
1413 let unchanged_changed_edge_properties: usize = tx.query_row(
1414 r#"
1415 SELECT COUNT(*)
1416 FROM next_graph_edge_properties n
1417 JOIN graph_edge_properties g
1418 ON g.edge_key = n.edge_key AND g.key = n.key
1419 WHERE g.value = n.value
1420 "#,
1421 [],
1422 |row| row.get(0),
1423 )?;
1424 let unchanged_properties = reused_owner_node_properties
1425 + reused_owner_edge_properties
1426 + unchanged_changed_node_properties
1427 + unchanged_changed_edge_properties;
1428
1429 let deleted_edges = if force_refresh_writes {
1430 tx.execute(
1431 r#"
1432 DELETE FROM graph_edges
1433 WHERE NOT EXISTS (
1434 SELECT 1
1435 FROM next_graph_edges n
1436 WHERE n.edge_key = graph_edges.edge_key
1437 )
1438 "#,
1439 [],
1440 )?
1441 } else {
1442 tx.execute(
1443 r#"
1444 DELETE FROM graph_edges
1445 WHERE NOT EXISTS (
1446 SELECT 1
1447 FROM next_graph_all_edge_keys n
1448 WHERE n.edge_key = graph_edges.edge_key
1449 )
1450 "#,
1451 [],
1452 )?
1453 };
1454 let deleted_nodes = if force_refresh_writes {
1455 tx.execute(
1456 r#"
1457 DELETE FROM graph_nodes
1458 WHERE NOT EXISTS (
1459 SELECT 1
1460 FROM next_graph_nodes n
1461 WHERE n.id = graph_nodes.id
1462 )
1463 "#,
1464 [],
1465 )?
1466 } else {
1467 tx.execute(
1468 r#"
1469 DELETE FROM graph_nodes
1470 WHERE NOT EXISTS (
1471 SELECT 1
1472 FROM next_graph_all_node_ids n
1473 WHERE n.id = graph_nodes.id
1474 )
1475 "#,
1476 [],
1477 )?
1478 };
1479
1480 let upsert_nodes_sql = r#"
1481 INSERT INTO graph_nodes
1482 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1483 SELECT
1484 n.id,
1485 n.kind,
1486 n.label,
1487 n.properties_json,
1488 n.provenance_json,
1489 n.freshness_json,
1490 n.row_hash,
1491 n.source_watermark
1492 FROM next_graph_nodes n
1493 JOIN next_graph_changed_nodes c ON c.id = n.id
1494 WHERE true
1495 ON CONFLICT(id) DO UPDATE SET
1496 kind = excluded.kind,
1497 label = excluded.label,
1498 properties_json = excluded.properties_json,
1499 provenance_json = excluded.provenance_json,
1500 freshness_json = excluded.freshness_json,
1501 row_hash = excluded.row_hash,
1502 source_watermark = excluded.source_watermark
1503 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1504 "#;
1505 tx.execute(upsert_nodes_sql, [])?;
1506 tx.execute(
1507 r#"
1508 DELETE FROM graph_node_semantic_vectors
1509 WHERE EXISTS (
1510 SELECT 1
1511 FROM next_graph_changed_nodes c
1512 WHERE c.id = graph_node_semantic_vectors.node_id
1513 )
1514 AND NOT EXISTS (
1515 SELECT 1
1516 FROM next_graph_node_semantic_vectors n
1517 WHERE n.node_id = graph_node_semantic_vectors.node_id
1518 )
1519 "#,
1520 [],
1521 )?;
1522 tx.execute(
1523 r#"
1524 INSERT INTO graph_node_semantic_vectors
1525 (node_id, kind, model, dimensions, vector_blob)
1526 SELECT n.node_id, n.kind, n.model, n.dimensions, n.vector_blob
1527 FROM next_graph_node_semantic_vectors n
1528 WHERE true
1529 ON CONFLICT(node_id) DO UPDATE SET
1530 kind = excluded.kind,
1531 model = excluded.model,
1532 dimensions = excluded.dimensions,
1533 vector_blob = excluded.vector_blob
1534 "#,
1535 [],
1536 )?;
1537 let upsert_edges_sql = r#"
1538 INSERT INTO graph_edges
1539 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1540 SELECT
1541 n.edge_key,
1542 n.from_id,
1543 n.to_id,
1544 n.kind,
1545 n.properties_json,
1546 n.provenance_json,
1547 n.freshness_json,
1548 n.row_hash,
1549 n.source_watermark
1550 FROM next_graph_edges n
1551 JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1552 WHERE true
1553 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1554 edge_key = excluded.edge_key,
1555 properties_json = excluded.properties_json,
1556 provenance_json = excluded.provenance_json,
1557 freshness_json = excluded.freshness_json,
1558 row_hash = excluded.row_hash,
1559 source_watermark = excluded.source_watermark
1560 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1561 "#;
1562 tx.execute(upsert_edges_sql, [])?;
1563 let deleted_node_properties = tx.execute(
1564 r#"
1565 DELETE FROM graph_node_properties
1566 WHERE EXISTS (
1567 SELECT 1
1568 FROM next_graph_changed_nodes c
1569 WHERE c.id = graph_node_properties.node_id
1570 )
1571 AND NOT EXISTS (
1572 SELECT 1
1573 FROM next_graph_node_properties n
1574 WHERE n.node_id = graph_node_properties.node_id
1575 AND n.key = graph_node_properties.key
1576 )
1577 "#,
1578 [],
1579 )?;
1580 let deleted_edge_properties = tx.execute(
1581 r#"
1582 DELETE FROM graph_edge_properties
1583 WHERE EXISTS (
1584 SELECT 1
1585 FROM next_graph_changed_edges c
1586 WHERE c.edge_key = graph_edge_properties.edge_key
1587 )
1588 AND NOT EXISTS (
1589 SELECT 1
1590 FROM next_graph_edge_properties n
1591 WHERE n.edge_key = graph_edge_properties.edge_key
1592 AND n.key = graph_edge_properties.key
1593 )
1594 "#,
1595 [],
1596 )?;
1597 let deleted_properties = deleted_node_properties + deleted_edge_properties;
1598 let upsert_properties_sql = r#"
1599 INSERT INTO graph_node_properties (node_id, key, value)
1600 SELECT n.node_id, n.key, n.value
1601 FROM next_graph_node_properties n
1602 LEFT JOIN graph_node_properties g
1603 ON g.node_id = n.node_id AND g.key = n.key
1604 WHERE g.node_id IS NULL OR g.value IS NOT n.value
1605 ON CONFLICT(node_id, key) DO UPDATE SET
1606 value = excluded.value
1607 WHERE graph_node_properties.value IS NOT excluded.value
1608 "#;
1609 let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1610 let upsert_edge_properties_sql = r#"
1611 INSERT INTO graph_edge_properties (edge_key, key, value)
1612 SELECT n.edge_key, n.key, n.value
1613 FROM next_graph_edge_properties n
1614 LEFT JOIN graph_edge_properties g
1615 ON g.edge_key = n.edge_key AND g.key = n.key
1616 WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1617 ON CONFLICT(edge_key, key) DO UPDATE SET
1618 value = excluded.value
1619 WHERE graph_edge_properties.value IS NOT excluded.value
1620 "#;
1621 let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1622 let upserted_properties = upserted_node_properties + upserted_edge_properties;
1623 tx.execute(
1624 r#"
1625 INSERT INTO graph_projection_versions
1626 (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1627 VALUES (?1, ?2, ?3, ?4, ?5)
1628 ON CONFLICT(scope) DO UPDATE SET
1629 projection_version = excluded.projection_version,
1630 content_hash = excluded.content_hash,
1631 source_watermark = excluded.source_watermark,
1632 observed_at_unix = excluded.observed_at_unix
1633 "#,
1634 (
1635 &scope,
1636 &projection_version,
1637 &projection_hash,
1638 &source_watermark,
1639 observed_at_unix,
1640 ),
1641 )?;
1642 let pruned_node_tombstones = tx.execute(
1643 r#"
1644 DELETE FROM graph_tombstones
1645 WHERE row_kind = 'node'
1646 AND EXISTS (
1647 SELECT 1
1648 FROM next_graph_nodes n
1649 WHERE n.id = substr(graph_tombstones.row_key, 6)
1650 )
1651 "#,
1652 [],
1653 )?;
1654 let pruned_edge_tombstones = tx.execute(
1655 r#"
1656 DELETE FROM graph_tombstones
1657 WHERE row_kind = 'edge'
1658 AND EXISTS (
1659 SELECT 1
1660 FROM next_graph_edges n
1661 WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1662 )
1663 "#,
1664 [],
1665 )?;
1666 {
1667 let mut insert_node_tombstone = tx.prepare(
1668 r#"
1669 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1670 VALUES (?1, 'node', ?2)
1671 ON CONFLICT(row_key) DO UPDATE SET
1672 row_kind = excluded.row_kind,
1673 deleted_at_unix = excluded.deleted_at_unix
1674 "#,
1675 )?;
1676 for id in &tombstoned_nodes {
1677 insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1678 }
1679 }
1680 {
1681 let mut insert_edge_tombstone = tx.prepare(
1682 r#"
1683 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1684 VALUES (?1, 'edge', ?2)
1685 ON CONFLICT(row_key) DO UPDATE SET
1686 row_kind = excluded.row_kind,
1687 deleted_at_unix = excluded.deleted_at_unix
1688 "#,
1689 )?;
1690 for key in &tombstoned_edges {
1691 insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1692 }
1693 }
1694 let tombstone_node_count: usize = tx.query_row(
1695 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1696 [],
1697 |row| row.get(0),
1698 )?;
1699 let tombstone_edge_count: usize = tx.query_row(
1700 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1701 [],
1702 |row| row.get(0),
1703 )?;
1704 tx.execute(
1705 r#"
1706 INSERT INTO graph_operator_stats
1707 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1708 VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1709 ON CONFLICT(scope) DO UPDATE SET
1710 nodes = excluded.nodes,
1711 edges = excluded.edges,
1712 tombstone_nodes = excluded.tombstone_nodes,
1713 tombstone_edges = excluded.tombstone_edges,
1714 file_size_bytes = excluded.file_size_bytes,
1715 freelist_bytes = excluded.freelist_bytes,
1716 observed_at_unix = excluded.observed_at_unix
1717 "#,
1718 (
1719 &scope,
1720 projection.nodes.len() as i64,
1721 projection.edges.len() as i64,
1722 tombstone_node_count as i64,
1723 tombstone_edge_count as i64,
1724 observed_at_unix,
1725 ),
1726 )?;
1727 phase_timings.push(sqlite_refresh_phase_timing(
1728 "sqlite_delta_write",
1729 delta_started,
1730 "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1731 ));
1732 let commit_started = Instant::now();
1733 tx.commit()?;
1734 phase_timings.push(sqlite_refresh_phase_timing(
1735 "sqlite_commit",
1736 commit_started,
1737 "commit refresh transaction and publish old-or-new graph visibility",
1738 ));
1739 let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1740 let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1741 let stats_started = Instant::now();
1742 self.conn.execute(
1743 r#"
1744 UPDATE graph_operator_stats
1745 SET file_size_bytes = ?2,
1746 freelist_bytes = ?3,
1747 observed_at_unix = ?4
1748 WHERE scope = ?1
1749 "#,
1750 (
1751 &scope,
1752 file_size_bytes_after.map(|value| value as i64),
1753 freelist_bytes_after.map(|value| value as i64),
1754 unix_now(),
1755 ),
1756 )?;
1757 phase_timings.push(sqlite_refresh_phase_timing(
1758 "sqlite_stats_cache_update",
1759 stats_started,
1760 "persist post-commit file and freelist proof for status/doctor",
1761 ));
1762 Ok(SqliteProjectionRefresh {
1763 scope,
1764 projection_version,
1765 source_watermark,
1766 upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1767 upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1768 unchanged_nodes,
1769 unchanged_edges,
1770 upserted_properties,
1771 unchanged_properties,
1772 deleted_properties,
1773 deleted_nodes,
1774 deleted_edges,
1775 pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1776 file_size_bytes_before,
1777 file_size_bytes_after,
1778 tombstoned_nodes,
1779 tombstoned_edges,
1780 phase_timings,
1781 })
1782 }
1783
1784 pub fn derive_ontology(&self) -> Result<GraphProjection> {
1793 let mut projection = GraphProjection::default();
1794
1795 let mut node_stmt = self.conn.prepare(
1796 "SELECT kind, COUNT(*) FROM graph_nodes \
1797 WHERE kind != 'ontology_type' \
1798 GROUP BY kind ORDER BY kind",
1799 )?;
1800 let node_rows = node_stmt.query_map([], |row| {
1801 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1802 })?;
1803 for row in node_rows {
1804 let (kind, count) = row?;
1805 projection.nodes.push(
1806 GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1807 .with_property("type_kind", &kind)
1808 .with_property("instance_count", count.to_string())
1809 .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1810 );
1811 }
1812
1813 let mut rel_stmt = self.conn.prepare(
1814 "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1815 FROM graph_edges e \
1816 JOIN graph_nodes n1 ON e.from_id = n1.id \
1817 JOIN graph_nodes n2 ON e.to_id = n2.id \
1818 WHERE e.kind NOT LIKE 'ontology_relation:%' \
1819 AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1820 GROUP BY n1.kind, e.kind, n2.kind \
1821 ORDER BY n1.kind, e.kind, n2.kind",
1822 )?;
1823 let rel_rows = rel_stmt.query_map([], |row| {
1824 Ok((
1825 row.get::<_, String>(0)?,
1826 row.get::<_, String>(1)?,
1827 row.get::<_, String>(2)?,
1828 row.get::<_, i64>(3)?,
1829 ))
1830 })?;
1831 for row in rel_rows {
1832 let (from_kind, edge_kind, to_kind, count) = row?;
1833 projection.edges.push(
1834 GraphEdge::new(
1835 format!("ontology_type:{from_kind}"),
1836 format!("ontology_type:{to_kind}"),
1837 format!("ontology_relation:{edge_kind}"),
1838 )
1839 .with_property("edge_kind", &edge_kind)
1840 .with_property("instance_count", count.to_string())
1841 .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1842 );
1843 }
1844
1845 Ok(projection)
1846 }
1847
1848 pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1849 let tx = self.conn.transaction()?;
1850 {
1851 let mut insert_node = tx.prepare(
1852 r#"
1853 INSERT INTO graph_nodes
1854 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1855 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1856 ON CONFLICT(id) DO UPDATE SET
1857 kind = excluded.kind,
1858 label = excluded.label,
1859 properties_json = excluded.properties_json,
1860 provenance_json = excluded.provenance_json,
1861 freshness_json = excluded.freshness_json,
1862 row_hash = excluded.row_hash,
1863 source_watermark = excluded.source_watermark
1864 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1865 OR graph_nodes.source_watermark IS NOT excluded.source_watermark
1866 "#,
1867 )?;
1868 let mut delete_properties =
1869 tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1870 let mut insert_property = tx.prepare(
1871 r#"
1872 INSERT INTO graph_node_properties (node_id, key, value)
1873 VALUES (?1, ?2, ?3)
1874 "#,
1875 )?;
1876 for node in &projection.nodes {
1877 let changed = insert_node.execute((
1878 &node.id,
1879 &node.kind,
1880 &node.label,
1881 to_json(&node.properties)?,
1882 to_json(&node.provenance)?,
1883 optional_to_json(&node.freshness)?,
1884 row_hash(node)?,
1885 ))?;
1886 if changed > 0 {
1887 delete_properties.execute([&node.id])?;
1888 for (key, value) in &node.properties {
1889 insert_property.execute((&node.id, key, value))?;
1890 }
1891 replace_node_semantic_vector(&tx, &node.id, &node.kind, &node.properties)?;
1892 }
1893 }
1894 }
1895 {
1896 let mut insert_edge = tx.prepare(
1897 r#"
1898 INSERT INTO graph_edges
1899 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1900 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1901 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1902 edge_key = excluded.edge_key,
1903 properties_json = excluded.properties_json,
1904 provenance_json = excluded.provenance_json,
1905 freshness_json = excluded.freshness_json,
1906 row_hash = excluded.row_hash,
1907 source_watermark = excluded.source_watermark
1908 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1909 OR graph_edges.source_watermark IS NOT excluded.source_watermark
1910 "#,
1911 )?;
1912 let mut delete_properties =
1913 tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1914 let mut insert_property = tx.prepare(
1915 r#"
1916 INSERT INTO graph_edge_properties (edge_key, key, value)
1917 VALUES (?1, ?2, ?3)
1918 "#,
1919 )?;
1920 for edge in &projection.edges {
1921 let edge_key = graph_edge_id(edge);
1922 let changed = insert_edge.execute((
1923 &edge_key,
1924 &edge.from_id,
1925 &edge.to_id,
1926 &edge.kind,
1927 to_json(&edge.properties)?,
1928 to_json(&edge.provenance)?,
1929 optional_to_json(&edge.freshness)?,
1930 row_hash(edge)?,
1931 ))?;
1932 if changed > 0 {
1933 delete_properties.execute([&edge_key])?;
1934 for (key, value) in &edge.properties {
1935 insert_property.execute((&edge_key, key, value))?;
1936 }
1937 }
1938 }
1939 }
1940 tx.commit()?;
1941 Ok(())
1942 }
1943
1944 pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1945 self.conn
1946 .query_row(
1947 r#"
1948 SELECT projection_version, content_hash, source_watermark
1949 FROM graph_projection_versions
1950 WHERE scope = ?1
1951 "#,
1952 [scope],
1953 |row| {
1954 Ok(SqliteProjectionVersion {
1955 projection_version: row.get(0)?,
1956 content_hash: row.get(1)?,
1957 source_watermark: row.get(2)?,
1958 })
1959 },
1960 )
1961 .optional()
1962 .map_err(Into::into)
1963 }
1964
1965 pub fn update_projection_source_watermark(
1966 &mut self,
1967 scope: &str,
1968 source_watermark: Option<String>,
1969 ) -> Result<()> {
1970 self.conn.execute(
1971 r#"
1972 UPDATE graph_projection_versions
1973 SET source_watermark = ?2
1974 WHERE scope = ?1
1975 "#,
1976 (scope, source_watermark),
1977 )?;
1978 Ok(())
1979 }
1980
1981 pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1982 let pruned_tombstones = if prune_tombstones {
1983 self.conn.execute("DELETE FROM graph_tombstones", [])?
1984 } else {
1985 0
1986 };
1987 self.conn.execute_batch(
1988 r#"
1989 PRAGMA wal_checkpoint(TRUNCATE);
1990 VACUUM;
1991 "#,
1992 )?;
1993 let nodes = self
1994 .conn
1995 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1996 row.get::<_, i64>(0)
1997 })?;
1998 let edges = self
1999 .conn
2000 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2001 row.get::<_, i64>(0)
2002 })?;
2003 let tombstone_nodes = self.conn.query_row(
2004 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
2005 [],
2006 |row| row.get::<_, i64>(0),
2007 )?;
2008 let tombstone_edges = self.conn.query_row(
2009 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
2010 [],
2011 |row| row.get::<_, i64>(0),
2012 )?;
2013 let file_size_bytes = sqlite_database_size_bytes(&self.conn)
2014 .ok()
2015 .map(|value| value as i64);
2016 let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
2017 .ok()
2018 .map(|value| value as i64);
2019 self.conn.execute(
2020 r#"
2021 INSERT INTO graph_operator_stats
2022 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
2023 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
2024 ON CONFLICT(scope) DO UPDATE SET
2025 nodes = excluded.nodes,
2026 edges = excluded.edges,
2027 tombstone_nodes = excluded.tombstone_nodes,
2028 tombstone_edges = excluded.tombstone_edges,
2029 file_size_bytes = excluded.file_size_bytes,
2030 freelist_bytes = excluded.freelist_bytes,
2031 observed_at_unix = excluded.observed_at_unix
2032 "#,
2033 (
2034 scope,
2035 nodes,
2036 edges,
2037 tombstone_nodes,
2038 tombstone_edges,
2039 file_size_bytes,
2040 freelist_bytes,
2041 ),
2042 )?;
2043 Ok(pruned_tombstones)
2044 }
2045
2046 fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2047 let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
2048 let in_clause = placeholders.join(", ");
2049 let sql = format!(
2050 "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
2051 FROM graph_edges e \
2052 WHERE e.from_id IN ({in_clause}) \
2053 AND e.to_id IN ({in_clause}) \
2054 ORDER BY e.from_id, e.kind, e.to_id"
2055 );
2056 let values: Vec<Value> = node_ids
2057 .iter()
2058 .chain(node_ids.iter())
2059 .map(|id| Value::Text(id.clone()))
2060 .collect();
2061 let mut stmt = self.conn.prepare(&sql)?;
2062 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2063 }
2064}
2065
2066fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
2067 let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
2068 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2069 row.get::<_, String>(3)
2070 })?)
2071}
2072
2073fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
2074 let mut diagnostics = vec![format!(
2075 "sqlite query pushdown active; plan: {}",
2076 plan.join(" | ")
2077 )];
2078 for expected_index in expected_indexes {
2079 if plan.iter().any(|row| row.contains(expected_index)) {
2080 diagnostics.push(format!("sqlite query plan uses {expected_index}"));
2081 } else {
2082 diagnostics.push(format!(
2083 "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
2084 ));
2085 }
2086 }
2087 diagnostics
2088}
2089
2090#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2091pub struct TerseDiagnostic {
2092 pub code: &'static str,
2093 #[serde(skip_serializing_if = "Option::is_none")]
2094 pub index: Option<String>,
2095}
2096
2097#[allow(dead_code)]
2098fn terse_query_plan_diagnostics(
2099 plan: &[String],
2100 expected_indexes: &[&str],
2101) -> Vec<TerseDiagnostic> {
2102 let mut diagnostics = vec![TerseDiagnostic {
2103 code: "plan_active",
2104 index: None,
2105 }];
2106 for expected_index in expected_indexes {
2107 if plan.iter().any(|row| row.contains(expected_index)) {
2108 diagnostics.push(TerseDiagnostic {
2109 code: "idx_ok",
2110 index: Some(expected_index.to_string()),
2111 });
2112 } else {
2113 diagnostics.push(TerseDiagnostic {
2114 code: "idx_missing",
2115 index: Some(expected_index.to_string()),
2116 });
2117 }
2118 }
2119 diagnostics
2120}
2121
2122fn push_sqlite_property_filter_exists(
2123 sql: &mut String,
2124 values: &mut Vec<Value>,
2125 node_alias: &str,
2126 filters: &[GraphPropertyFilter],
2127) {
2128 for (index, filter) in filters.iter().enumerate() {
2129 sql.push_str(&format!(
2130 r#"
2131 AND EXISTS (
2132 SELECT 1
2133 FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
2134 WHERE p{index}.node_id = {node_alias}.id
2135 AND p{index}.key = ?
2136 AND p{index}.value = ?
2137 )
2138 "#
2139 ));
2140 values.push(Value::Text(filter.key.clone()));
2141 values.push(Value::Text(filter.value.clone()));
2142 }
2143}
2144
2145fn push_sqlite_edge_property_filter_exists(
2146 sql: &mut String,
2147 values: &mut Vec<Value>,
2148 edge_alias: &str,
2149 filters: &[GraphPropertyFilter],
2150) {
2151 for (index, filter) in filters.iter().enumerate() {
2152 sql.push_str(&format!(
2153 r#"
2154 AND EXISTS (
2155 SELECT 1
2156 FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
2157 WHERE ep{index}.edge_key = {edge_alias}.edge_key
2158 AND ep{index}.key = ?
2159 AND ep{index}.value = ?
2160 )
2161 "#
2162 ));
2163 values.push(Value::Text(filter.key.clone()));
2164 values.push(Value::Text(filter.value.clone()));
2165 }
2166}
2167
2168struct SqliteIncidentEdgeBranch<'a> {
2169 index_name: &'a str,
2170 endpoint_column: &'a str,
2171 node_id: &'a str,
2172 kind: Option<&'a str>,
2173 filters: &'a [GraphPropertyFilter],
2174 cursor: Option<&'a str>,
2175}
2176
2177fn push_sqlite_incident_edge_branch(
2178 sql: &mut String,
2179 values: &mut Vec<Value>,
2180 branch: SqliteIncidentEdgeBranch<'_>,
2181) {
2182 sql.push_str(&format!(
2183 r#"
2184 SELECT
2185 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2186 FROM graph_edges e INDEXED BY {index_name}
2187 WHERE e.{endpoint_column} = ?
2188 "#,
2189 index_name = branch.index_name,
2190 endpoint_column = branch.endpoint_column,
2191 ));
2192 values.push(Value::Text(branch.node_id.to_string()));
2193 if let Some(kind) = branch.kind {
2194 sql.push_str(" AND e.kind = ?");
2195 values.push(Value::Text(kind.to_string()));
2196 }
2197 push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
2198 if let Some(cursor) = branch.cursor {
2199 sql.push_str(" AND e.edge_key > ?");
2200 values.push(Value::Text(cursor.to_string()));
2201 }
2202}
2203
2204fn sqlite_incident_edges_union_query(
2205 node_id: &str,
2206 kind: Option<&str>,
2207 filters: &[GraphPropertyFilter],
2208 cursor: Option<&str>,
2209 limit: Option<usize>,
2210) -> (String, Vec<Value>) {
2211 let mut sql = String::from(
2212 r#"
2213 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2214 FROM (
2215 "#,
2216 );
2217 let mut values = Vec::new();
2218 push_sqlite_incident_edge_branch(
2219 &mut sql,
2220 &mut values,
2221 SqliteIncidentEdgeBranch {
2222 index_name: "idx_graph_edges_from_kind",
2223 endpoint_column: "from_id",
2224 node_id,
2225 kind,
2226 filters,
2227 cursor,
2228 },
2229 );
2230 sql.push_str(" UNION ");
2231 push_sqlite_incident_edge_branch(
2232 &mut sql,
2233 &mut values,
2234 SqliteIncidentEdgeBranch {
2235 index_name: "idx_graph_edges_to_kind",
2236 endpoint_column: "to_id",
2237 node_id,
2238 kind,
2239 filters,
2240 cursor,
2241 },
2242 );
2243 sql.push_str(
2244 r#"
2245 ) e
2246 ORDER BY e.edge_key
2247 "#,
2248 );
2249 if let Some(limit) = limit {
2250 sql.push_str(" LIMIT ?");
2251 values.push(Value::Integer(limit.saturating_add(1) as i64));
2252 }
2253 (sql, values)
2254}
2255
2256fn sqlite_semantic_seeded_edge_score_expr(edge_alias: &str, direction_bonus: &str) -> String {
2257 format!(
2258 "(CASE {edge_alias}.kind \
2259WHEN 'semantic_relation' THEN 340 \
2260WHEN 'mentions_entity' THEN 280 \
2261WHEN 'mentions_concept' THEN 280 \
2262WHEN 'tagged_entity' THEN 280 \
2263WHEN 'tagged_concept' THEN 280 \
2264WHEN 'related_concept' THEN 280 \
2265WHEN 'mentions' THEN 220 \
2266WHEN 'calls' THEN 200 \
2267WHEN 'requests_context' THEN 180 \
2268WHEN 'scopes_context' THEN 180 \
2269WHEN 'scopes_source' THEN 180 \
2270WHEN 'explains_result' THEN 180 \
2271WHEN 'defines' THEN 120 \
2272WHEN 'contains' THEN 120 \
2273WHEN 'belongs_to' THEN 120 \
2274WHEN {edge_alias}.kind LIKE '%community%' THEN 200 \
2275WHEN {edge_alias}.kind LIKE '%semantic%' \
2276OR {edge_alias}.kind LIKE '%concept%' \
2277OR {edge_alias}.kind LIKE '%entity%' THEN 240 \
2278ELSE 80 END) \
2279+ ({direction_bonus}) \
2280+ (CASE {edge_alias}.kind \
2281WHEN 'mentions_concept' THEN 30 \
2282WHEN 'mentions_entity' THEN 30 \
2283WHEN 'tagged_concept' THEN 30 \
2284WHEN 'tagged_entity' THEN 30 \
2285WHEN 'related_concept' THEN 30 \
2286WHEN 'semantic_relation' THEN 28 \
2287WHEN 'calls' THEN 24 \
2288WHEN 'mentions' THEN 22 \
2289WHEN 'requests_context' THEN 18 \
2290WHEN 'scopes_context' THEN 18 \
2291WHEN 'scopes_source' THEN 18 \
2292WHEN 'explains_result' THEN 18 \
2293WHEN 'defines' THEN 12 \
2294WHEN 'contains' THEN 12 \
2295WHEN 'belongs_to' THEN 12 \
2296ELSE 0 END)"
2297 )
2298}
2299
2300impl GraphStore for SqliteGraphStore {
2301 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
2302 self.conn.execute(
2303 r#"
2304 INSERT INTO graph_nodes
2305 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2306 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
2307 ON CONFLICT(id) DO UPDATE SET
2308 kind = excluded.kind,
2309 label = excluded.label,
2310 properties_json = excluded.properties_json,
2311 provenance_json = excluded.provenance_json,
2312 freshness_json = excluded.freshness_json,
2313 row_hash = excluded.row_hash,
2314 source_watermark = excluded.source_watermark
2315 "#,
2316 (
2317 &node.id,
2318 &node.kind,
2319 &node.label,
2320 to_json(&node.properties)?,
2321 to_json(&node.provenance)?,
2322 optional_to_json(&node.freshness)?,
2323 row_hash(node)?,
2324 ),
2325 )?;
2326 replace_node_properties(&self.conn, &node.id, &node.properties)?;
2327 replace_node_semantic_vector(&self.conn, &node.id, &node.kind, &node.properties)?;
2328 Ok(())
2329 }
2330
2331 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2332 let edge_key = graph_edge_id(edge);
2333 self.conn.execute(
2334 r#"
2335 INSERT INTO graph_edges
2336 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2337 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2338 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2339 edge_key = excluded.edge_key,
2340 properties_json = excluded.properties_json,
2341 provenance_json = excluded.provenance_json,
2342 freshness_json = excluded.freshness_json,
2343 row_hash = excluded.row_hash,
2344 source_watermark = excluded.source_watermark
2345 "#,
2346 (
2347 &edge_key,
2348 &edge.from_id,
2349 &edge.to_id,
2350 &edge.kind,
2351 to_json(&edge.properties)?,
2352 to_json(&edge.provenance)?,
2353 optional_to_json(&edge.freshness)?,
2354 row_hash(edge)?,
2355 ),
2356 )?;
2357 replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2358 Ok(())
2359 }
2360
2361 fn delete_node(&self, id: &str) -> Result<usize> {
2362 self.conn
2363 .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2364 .map_err(Into::into)
2365 }
2366
2367 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2368 self.conn
2369 .execute(
2370 "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2371 (from_id, to_id, kind),
2372 )
2373 .map_err(Into::into)
2374 }
2375
2376 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2377 self.conn
2378 .query_row(
2379 r#"
2380SELECT id, kind, label, properties_json, provenance_json, freshness_json
2381 FROM graph_nodes
2382 WHERE id = ?1
2383 "#,
2384 [id],
2385 node_from_row,
2386 )
2387 .optional()
2388 .map_err(Into::into)
2389 }
2390
2391 fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
2392 let unique_ids = ids.iter().cloned().collect::<BTreeSet<_>>();
2393 if unique_ids.is_empty() {
2394 return Ok(Vec::new());
2395 }
2396
2397 let mut nodes = Vec::new();
2398 let id_refs = unique_ids.iter().collect::<Vec<_>>();
2399 for chunk in id_refs.chunks(450) {
2400 let placeholders = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
2401 let sql = format!(
2402 "SELECT id, kind, label, properties_json, provenance_json, freshness_json \
2403FROM graph_nodes \
2404WHERE id IN ({placeholders}) \
2405ORDER BY id"
2406 );
2407 let values = chunk
2408 .iter()
2409 .map(|id| Value::Text((*id).clone()))
2410 .collect::<Vec<_>>();
2411 let mut stmt = self.conn.prepare(&sql)?;
2412 nodes.extend(collect_rows(
2413 stmt.query_map(params_from_iter(values.iter()), node_from_row)?,
2414 )?);
2415 }
2416 nodes.sort_by(|left, right| left.id.cmp(&right.id));
2417 Ok(nodes)
2418 }
2419
2420 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2421 let mut stmt = self.conn.prepare(
2422 r#"
2423SELECT id, kind, label, properties_json, provenance_json, freshness_json
2424 FROM graph_nodes
2425 ORDER BY id
2426 "#,
2427 )?;
2428 collect_rows(stmt.query_map([], node_from_row)?)
2429 }
2430
2431 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2432 let mut stmt = self.conn.prepare(
2433 r#"
2434 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2435 FROM graph_edges
2436 ORDER BY from_id, kind, to_id
2437 "#,
2438 )?;
2439 collect_rows(stmt.query_map([], edge_from_row)?)
2440 }
2441
2442 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2443 self.conn
2444 .query_row(
2445 r#"
2446 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2447 FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2448 WHERE edge_key = ?1
2449 "#,
2450 [edge_id],
2451 edge_from_row,
2452 )
2453 .optional()
2454 .map_err(Into::into)
2455 }
2456
2457 fn graph_counts(&self) -> Result<(usize, usize)> {
2458 let nodes = self
2459 .conn
2460 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2461 row.get::<_, usize>(0)
2462 })?;
2463 let edges = self
2464 .conn
2465 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2466 row.get::<_, usize>(0)
2467 })?;
2468 Ok((nodes, edges))
2469 }
2470
2471 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2472 match kind {
2473 Some(kind) => self
2474 .conn
2475 .query_row(
2476 r#"
2477 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2478 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2479 WHERE from_id <> to_id AND kind = ?1
2480 ORDER BY from_id, kind, to_id
2481 LIMIT 1
2482 "#,
2483 [kind],
2484 edge_from_row,
2485 )
2486 .optional()
2487 .map_err(Into::into),
2488 None => self
2489 .conn
2490 .query_row(
2491 r#"
2492 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2493 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2494 WHERE from_id <> to_id
2495 ORDER BY from_id, kind, to_id
2496 LIMIT 1
2497 "#,
2498 [],
2499 edge_from_row,
2500 )
2501 .optional()
2502 .map_err(Into::into),
2503 }
2504 }
2505
2506 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2507 self.conn
2508 .query_row(
2509 r#"
2510 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2511 ep.key, ep.value
2512 FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2513 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2514 ON e.edge_key = ep.edge_key
2515 WHERE e.from_id <> e.to_id
2516 ORDER BY ep.key, ep.value, ep.edge_key
2517 LIMIT 1
2518 "#,
2519 [],
2520 |row| {
2521 Ok((
2522 edge_from_row(row)?,
2523 GraphPropertyFilter {
2524 key: row.get(7)?,
2525 value: row.get(8)?,
2526 },
2527 ))
2528 },
2529 )
2530 .optional()
2531 .map_err(Into::into)
2532 }
2533
2534 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2535 let mut stmt = self.conn.prepare(
2536 r#"
2537 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2538 FROM graph_nodes
2539 WHERE kind = ?1
2540 ORDER BY id
2541 "#,
2542 )?;
2543 collect_rows(stmt.query_map([kind], node_from_row)?)
2544 }
2545
2546 fn semantic_top_candidates(
2547 &self,
2548 query_vector: &[f64],
2549 kinds: &[&str],
2550 limit: usize,
2551 ) -> Result<Vec<GraphSemanticCandidate>> {
2552 if query_vector.is_empty() || kinds.is_empty() {
2553 return Ok(Vec::new());
2554 }
2555 if !sqlite_table_exists(&self.conn, "graph_node_semantic_vectors")? {
2556 return graph_semantic_top_candidates_by_property_scan(
2557 self,
2558 query_vector,
2559 kinds,
2560 limit,
2561 );
2562 }
2563
2564 let unique_kinds = kinds.iter().copied().collect::<BTreeSet<_>>();
2565 if unique_kinds.is_empty() {
2566 return Ok(Vec::new());
2567 }
2568 let kind_placeholders = unique_kinds
2569 .iter()
2570 .map(|_| "?")
2571 .collect::<Vec<_>>()
2572 .join(", ");
2573 let sql = format!(
2574 r#"
2575 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2576 graph_node_semantic_vectors.vector_blob,
2577 graph_node_semantic_vectors.dimensions
2578 FROM graph_node_semantic_vectors INDEXED BY idx_graph_node_semantic_vectors_kind_dims
2579 JOIN graph_nodes n ON n.id = graph_node_semantic_vectors.node_id
2580 WHERE graph_node_semantic_vectors.dimensions = ?
2581 AND graph_node_semantic_vectors.kind IN ({kind_placeholders})
2582 ORDER BY graph_node_semantic_vectors.kind, n.label, n.id
2583 "#
2584 );
2585 let mut values = vec![Value::Integer(query_vector.len() as i64)];
2586 values.extend(
2587 unique_kinds
2588 .into_iter()
2589 .map(|kind| Value::Text(kind.to_string())),
2590 );
2591 let rows = {
2592 let mut stmt = self.conn.prepare(&sql)?;
2593 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2594 Ok((
2595 node_from_row_at(row, 0)?,
2596 row.get::<_, Vec<u8>>(6)?,
2597 row.get::<_, i64>(7)?,
2598 ))
2599 })?)?
2600 };
2601
2602 let mut candidates = rows
2603 .into_iter()
2604 .filter_map(|(node, blob, dimensions)| {
2605 let dimensions = usize::try_from(dimensions).ok()?;
2606 let vector = semantic_vector_from_blob(&blob, dimensions)?;
2607 Some(GraphSemanticCandidate {
2608 score: graph_semantic_cosine(query_vector, &vector),
2609 node,
2610 })
2611 })
2612 .collect::<Vec<_>>();
2613 candidates.sort_by(|left, right| {
2614 right
2615 .score
2616 .partial_cmp(&left.score)
2617 .unwrap_or(std::cmp::Ordering::Equal)
2618 .then_with(|| left.node.kind.cmp(&right.node.kind))
2619 .then_with(|| left.node.label.cmp(&right.node.label))
2620 .then_with(|| left.node.id.cmp(&right.node.id))
2621 });
2622 if limit > 0 && candidates.len() > limit {
2623 candidates.truncate(limit);
2624 }
2625 Ok(candidates)
2626 }
2627
2628 fn paged_nodes_by_kind(
2629 &self,
2630 kind: &str,
2631 options: GraphQueryOptions,
2632 ) -> Result<GraphPagedSubgraph> {
2633 let mut sql = String::from(
2634 r#"
2635 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2636 FROM graph_nodes
2637 WHERE kind = ?
2638 "#,
2639 );
2640 let mut values = vec![Value::Text(kind.to_string())];
2641 push_sqlite_property_filter_exists(
2642 &mut sql,
2643 &mut values,
2644 "graph_nodes",
2645 &options.property_filters,
2646 );
2647 if let Some(cursor) = &options.cursor {
2648 sql.push_str(" AND id > ?");
2649 values.push(Value::Text(cursor.clone()));
2650 }
2651 sql.push_str(" ORDER BY id");
2652 if let Some(limit) = options.limit {
2653 sql.push_str(" LIMIT ?");
2654 values.push(Value::Integer(limit.saturating_add(1) as i64));
2655 }
2656
2657 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2658 let mut stmt = self.conn.prepare(&sql)?;
2659 let mut nodes =
2660 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2661 let before_limit = nodes.len();
2662 let mut next_cursor = None;
2663 if let Some(limit) = options.limit
2664 && nodes.len() > limit
2665 {
2666 next_cursor = nodes
2667 .get(limit.saturating_sub(1))
2668 .map(|node| node.id.clone());
2669 nodes.truncate(limit);
2670 }
2671 let expected_indexes = if options.property_filters.is_empty() {
2672 vec!["idx_graph_nodes_kind"]
2673 } else {
2674 vec![
2675 "idx_graph_nodes_kind",
2676 "idx_graph_node_properties_key_value_node",
2677 ]
2678 };
2679 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2680 if !options.property_filters.is_empty() {
2681 diagnostics.push(
2682 "property filters were evaluated by SQLite materialized property rows before paging"
2683 .to_string(),
2684 );
2685 }
2686 if options.cursor.is_some() {
2687 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2688 }
2689 if next_cursor.is_some() {
2690 diagnostics.push(
2691 "result was truncated; pass page.next_cursor as --cursor for the next page"
2692 .to_string(),
2693 );
2694 }
2695 Ok(GraphPagedSubgraph {
2696 page: GraphQueryPage {
2697 cursor: options.cursor,
2698 limit: options.limit,
2699 next_cursor,
2700 returned_nodes: nodes.len(),
2701 returned_edges: 0,
2702 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2703 diagnostics,
2704 },
2705 nodes,
2706 edges: Vec::new(),
2707 })
2708 }
2709
2710 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2711 match kind {
2712 Some(kind) => {
2713 let mut stmt = self.conn.prepare(
2714 r#"
2715 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2716 FROM graph_edges
2717 WHERE from_id = ?1 AND kind = ?2
2718 ORDER BY to_id, kind
2719 "#,
2720 )?;
2721 collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2722 }
2723 None => {
2724 let mut stmt = self.conn.prepare(
2725 r#"
2726 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2727 FROM graph_edges
2728 WHERE from_id = ?1
2729 ORDER BY to_id, kind
2730 "#,
2731 )?;
2732 collect_rows(stmt.query_map([from_id], edge_from_row)?)
2733 }
2734 }
2735 }
2736
2737 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2738 let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2739 let mut stmt = self.conn.prepare(&sql)?;
2740 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2741 }
2742
2743 fn semantic_seeded_expansion_edges(
2744 &self,
2745 current_id: &str,
2746 options: &SemanticSeededNeighborhoodOptions,
2747 ) -> Result<SemanticSeededNeighborhoodExpansion> {
2748 let from_score = sqlite_semantic_seeded_edge_score_expr("e", "8");
2749 let to_score = sqlite_semantic_seeded_edge_score_expr(
2750 "e",
2751 "CASE WHEN e.from_id = e.to_id THEN 8 ELSE 4 END",
2752 );
2753 let limit_clause = if options.edge_scan_cap > 0 {
2754 "LIMIT ?"
2755 } else {
2756 ""
2757 };
2758 let sql = format!(
2759 r#"
2760WITH candidate_edges AS (
2761 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2762 {from_score} AS score
2763 FROM graph_edges e INDEXED BY idx_graph_edges_from_kind
2764 WHERE e.from_id = ?
2765 UNION ALL
2766 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2767 {to_score} AS score
2768 FROM graph_edges e INDEXED BY idx_graph_edges_to_kind
2769 WHERE e.to_id = ?
2770),
2771ranked_edges AS (
2772 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2773 MAX(score) AS score
2774 FROM candidate_edges
2775 GROUP BY edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2776),
2777limited_edges AS (
2778 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2779 score, COUNT(*) OVER () AS total_edges
2780 FROM ranked_edges
2781 ORDER BY score DESC, edge_key ASC
2782 {limit_clause}
2783)
2784SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, total_edges
2785FROM limited_edges
2786ORDER BY score DESC, edge_key ASC
2787"#
2788 );
2789 let mut values = vec![
2790 Value::Text(current_id.to_string()),
2791 Value::Text(current_id.to_string()),
2792 ];
2793 if options.edge_scan_cap > 0 {
2794 values.push(Value::Integer(
2795 options
2796 .edge_scan_cap
2797 .saturating_add(1)
2798 .min(i64::MAX as usize) as i64,
2799 ));
2800 }
2801 let mut stmt = self.conn.prepare(&sql)?;
2802 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2803 Ok((edge_from_row_at(row, 0)?, row.get::<_, i64>(7)? as usize))
2804 })?)?;
2805 let total_candidates = rows.first().map(|(_, total)| *total).unwrap_or(0);
2806 let mut edges = rows.into_iter().map(|(edge, _)| edge).collect::<Vec<_>>();
2807 let mut skipped_by_edge_cap = 0usize;
2808 if options.edge_scan_cap > 0 && total_candidates > options.edge_scan_cap {
2809 skipped_by_edge_cap = total_candidates - options.edge_scan_cap;
2810 edges.truncate(options.edge_scan_cap);
2811 }
2812 Ok(SemanticSeededNeighborhoodExpansion {
2813 edges,
2814 skipped_by_edge_cap,
2815 })
2816 }
2817
2818 fn paged_edges(
2819 &self,
2820 kind: Option<&str>,
2821 options: GraphQueryOptions,
2822 ) -> Result<GraphPagedSubgraph> {
2823 let primary_property_filter = options.property_filters.first();
2824 let mut values = Vec::new();
2825 let mut sql = if let Some(filter) = primary_property_filter {
2826 values.push(Value::Text(filter.key.clone()));
2827 values.push(Value::Text(filter.value.clone()));
2828 String::from(
2829 r#"
2830 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2831 FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2832 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2833 ON e.edge_key = ep0.edge_key
2834 WHERE ep0.key = ?
2835 AND ep0.value = ?
2836 "#,
2837 )
2838 } else {
2839 String::from(
2840 r#"
2841 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2842 FROM graph_edges e
2843 WHERE 1 = 1
2844 "#,
2845 )
2846 };
2847 if let Some(kind) = kind {
2848 sql.push_str(" AND e.kind = ?");
2849 values.push(Value::Text(kind.to_string()));
2850 }
2851 push_sqlite_edge_property_filter_exists(
2852 &mut sql,
2853 &mut values,
2854 "e",
2855 if primary_property_filter.is_some() {
2856 &options.property_filters[1..]
2857 } else {
2858 &options.property_filters
2859 },
2860 );
2861 if let Some(cursor) = &options.cursor {
2862 if primary_property_filter.is_some() {
2863 sql.push_str(" AND ep0.edge_key > ?");
2864 } else {
2865 sql.push_str(" AND e.edge_key > ?");
2866 }
2867 values.push(Value::Text(cursor.clone()));
2868 }
2869 if primary_property_filter.is_some() {
2870 sql.push_str(" ORDER BY ep0.edge_key");
2871 } else {
2872 sql.push_str(" ORDER BY e.edge_key");
2873 }
2874 if let Some(limit) = options.limit {
2875 sql.push_str(" LIMIT ?");
2876 values.push(Value::Integer(limit.saturating_add(1) as i64));
2877 }
2878
2879 let primary_property_row_count = if let Some(filter) = primary_property_filter {
2880 Some(self.conn.query_row(
2881 r#"
2882 SELECT COUNT(*)
2883 FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2884 WHERE key = ?1 AND value = ?2
2885 "#,
2886 (&filter.key, &filter.value),
2887 |row| row.get::<_, usize>(0),
2888 )?)
2889 } else {
2890 None
2891 };
2892 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2893 let mut stmt = self.conn.prepare(&sql)?;
2894 let mut edges =
2895 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2896 let before_limit = edges.len();
2897 let mut next_cursor = None;
2898 if let Some(limit) = options.limit
2899 && edges.len() > limit
2900 {
2901 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2902 edges.truncate(limit);
2903 }
2904 let expected_indexes = if options.property_filters.is_empty() {
2905 vec!["idx_graph_edges_edge_key"]
2906 } else {
2907 vec![
2908 "idx_graph_edge_properties_key_value_edge",
2909 "idx_graph_edges_edge_key",
2910 ]
2911 };
2912 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2913 if !options.property_filters.is_empty() {
2914 if let Some(row_count) = primary_property_row_count {
2915 diagnostics.push(format!(
2916 "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2917 ));
2918 }
2919 diagnostics.push(
2920 "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2921 .to_string(),
2922 );
2923 }
2924 if options.cursor.is_some() {
2925 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2926 }
2927 if next_cursor.is_some() {
2928 diagnostics.push(
2929 "result was truncated; pass page.next_cursor as --cursor for the next page"
2930 .to_string(),
2931 );
2932 }
2933 Ok(GraphPagedSubgraph {
2934 page: GraphQueryPage {
2935 cursor: options.cursor,
2936 limit: options.limit,
2937 next_cursor,
2938 returned_nodes: 0,
2939 returned_edges: edges.len(),
2940 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2941 diagnostics,
2942 },
2943 nodes: Vec::new(),
2944 edges,
2945 })
2946 }
2947
2948 fn paged_incident_edges(
2949 &self,
2950 node_id: &str,
2951 kind: Option<&str>,
2952 options: GraphQueryOptions,
2953 ) -> Result<GraphPagedSubgraph> {
2954 let (sql, values) = sqlite_incident_edges_union_query(
2955 node_id,
2956 kind,
2957 &options.property_filters,
2958 options.cursor.as_deref(),
2959 options.limit,
2960 );
2961 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2962 let mut stmt = self.conn.prepare(&sql)?;
2963 let mut edges =
2964 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2965 let before_limit = edges.len();
2966 let mut next_cursor = None;
2967 if let Some(limit) = options.limit
2968 && edges.len() > limit
2969 {
2970 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2971 edges.truncate(limit);
2972 }
2973 let expected_indexes = if options.property_filters.is_empty() {
2974 vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2975 } else {
2976 vec![
2977 "idx_graph_edges_from_kind",
2978 "idx_graph_edges_to_kind",
2979 "idx_graph_edge_properties_key_value_edge",
2980 ]
2981 };
2982 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2983 diagnostics.push(
2984 "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2985 .to_string(),
2986 );
2987 if !options.property_filters.is_empty() {
2988 diagnostics.push(
2989 "edge property filters were evaluated by SQLite materialized property rows before paging"
2990 .to_string(),
2991 );
2992 }
2993 if options.cursor.is_some() {
2994 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2995 }
2996 if next_cursor.is_some() {
2997 diagnostics.push(
2998 "result was truncated; pass page.next_cursor as --cursor for the next page"
2999 .to_string(),
3000 );
3001 }
3002 Ok(GraphPagedSubgraph {
3003 page: GraphQueryPage {
3004 cursor: options.cursor,
3005 limit: options.limit,
3006 next_cursor,
3007 returned_nodes: 0,
3008 returned_edges: edges.len(),
3009 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3010 diagnostics,
3011 },
3012 nodes: Vec::new(),
3013 edges,
3014 })
3015 }
3016
3017 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
3018 if node_ids.is_empty() {
3019 return Ok(Vec::new());
3020 }
3021 if node_ids.len() <= 20 {
3022 return self.edges_between_nodes_inline(node_ids);
3023 }
3024 self.assert_not_in_temp_table_section();
3025 self.temp_table_active.set(true);
3026 let result = (|| -> Result<Vec<GraphEdge>> {
3027 let tx = self.conn.unchecked_transaction()?;
3028 tx.execute_batch(
3029 r#"
3030 CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
3031 DELETE FROM _edges_between_ids;
3032 "#,
3033 )?;
3034 for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
3035 let row_placeholders: Vec<String> =
3036 chunk.iter().map(|_| "(?)".to_string()).collect();
3037 let placeholders = row_placeholders.join(", ");
3038 let sql =
3039 format!("INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}");
3040 let values: Vec<Value> =
3041 chunk.iter().map(|id| Value::Text((*id).clone())).collect();
3042 tx.execute(&sql, params_from_iter(values.iter()))?;
3043 }
3044 let edges = {
3045 let mut stmt = tx.prepare(
3046 r#"
3047 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3048 FROM graph_edges e
3049 WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
3050 AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
3051 ORDER BY e.from_id, e.kind, e.to_id
3052 "#,
3053 )?;
3054 collect_rows(stmt.query_map([], edge_from_row)?)?
3055 };
3056 tx.finish()?;
3057 Ok(edges)
3058 })();
3059 self.temp_table_active.set(false);
3060 result
3061 }
3062
3063 fn ranked_neighborhood(
3064 &self,
3065 center_id: &str,
3066 options: &RankedNeighborhoodOptions,
3067 ) -> Result<Option<RankedNeighborhoodResult>> {
3068 if self.node(center_id)?.is_none() {
3069 return Ok(None);
3070 }
3071 let center = self.node(center_id)?.unwrap();
3072
3073 let base_score_expr = match options.scoring {
3074 tsift_core::NeighborhoodScoring::BreadthFirst => {
3075 "MAX(0, 120 - (walk.depth * 18))".to_string()
3076 }
3077 tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
3078 "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
3079 WHEN 'semantic_relation' THEN 34 \
3080 WHEN 'mentions_entity' THEN 28 \
3081 WHEN 'mentions_concept' THEN 28 \
3082 WHEN 'tagged_entity' THEN 28 \
3083 WHEN 'tagged_concept' THEN 28 \
3084 WHEN 'related_concept' THEN 28 \
3085 WHEN 'mentions' THEN 22 \
3086 WHEN 'calls' THEN 20 \
3087 WHEN 'requests_context' THEN 18 \
3088 WHEN 'scopes_context' THEN 18 \
3089 WHEN 'scopes_source' THEN 18 \
3090 WHEN 'explains_result' THEN 18 \
3091 WHEN 'defines' THEN 12 \
3092 WHEN 'contains' THEN 12 \
3093 WHEN 'belongs_to' THEN 12 \
3094 ELSE 8 END".to_string()
3095 }
3096 tsift_core::NeighborhoodScoring::DegreeWeighted => {
3097 "MAX(0, 120 - (walk.depth * 18)) + CASE \
3098 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
3099 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
3100 ELSE 0 END"
3101 .to_string()
3102 }
3103 };
3104 let now_unix = options.observed_at_now_unix.unwrap_or_else(unix_now);
3105 let observed_at_half_life_secs = options.observed_at_half_life_secs.max(1);
3106 let observed_at_weight = options.observed_at_weight.max(0);
3107 let memory_node_boost = options.memory_node_boost.max(0);
3108 let observed_at_value = "COALESCE(\
3109 CAST(json_extract(n_score.freshness_json, '$.observed_at_unix') AS INTEGER), \
3110 CAST(json_extract(n_score.properties_json, '$.observed_at_unix') AS INTEGER), \
3111 CAST(json_extract(n_score.properties_json, '$.max_observed_at_unix') AS INTEGER)\
3112 )";
3113 let observed_at_expr = if observed_at_weight == 0 {
3114 "0".to_string()
3115 } else {
3116 format!(
3117 "CASE \
3118 WHEN {observed_at_value} IS NULL THEN 0 \
3119 WHEN ({now_unix} - {observed_at_value}) < {observed_at_half_life_secs} THEN {observed_at_weight} \
3120 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 2) THEN {observed_at_weight} / 2 \
3121 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 4) THEN {observed_at_weight} / 4 \
3122 ELSE 0 END"
3123 )
3124 };
3125 let confidence_value =
3126 "CAST(json_extract(n_score.properties_json, '$.confidence') AS REAL)";
3127 let memory_signal_expr = if memory_node_boost == 0 {
3128 "0".to_string()
3129 } else {
3130 format!(
3131 "(CASE \
3132 WHEN n_score.kind = 'memory_projection' THEN 0 \
3133 WHEN n_score.kind IN ('finding', 'decision', 'memory_event') THEN {memory_node_boost} \
3134 WHEN n_score.kind IN ('note', 'memory_session') THEN {memory_node_boost} / 2 \
3135 WHEN n_score.kind IN ('source_handle', 'semantic_concept', 'semantic_vector_handle') \
3136 AND json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3137 WHEN n_score.kind LIKE 'memory_%' THEN {memory_node_boost} \
3138 WHEN json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3139 ELSE 0 END) \
3140 + (CASE \
3141 WHEN json_extract(n_score.properties_json, '$.confidence') IS NULL THEN 0 \
3142 WHEN {confidence_value} <= 0.0 THEN 0 \
3143 WHEN {confidence_value} >= 1.0 THEN {memory_node_boost} \
3144 ELSE CAST(ROUND({memory_node_boost} * {confidence_value}) AS INTEGER) END)"
3145 )
3146 };
3147 let score_expr =
3148 format!("({base_score_expr}) + ({observed_at_expr}) + ({memory_signal_expr})");
3149
3150 let use_degree_cache = matches!(
3151 options.scoring,
3152 tsift_core::NeighborhoodScoring::DegreeWeighted
3153 );
3154 let degree_cte = if use_degree_cache {
3155 "degree_cache AS ( \
3156 SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
3157 FROM graph_nodes n), "
3158 } else {
3159 ""
3160 };
3161 let mut sql = format!(
3162 r#"
3163WITH RECURSIVE {degree_cte}walk(id, depth, edge_kind, score) AS (
3164SELECT ?, 0, '', ?
3165UNION
3166SELECT e.to_id, walk.depth + 1, e.kind,
3167"#,
3168 );
3169 sql.push_str(&format!(" {}\n", score_expr));
3170 sql.push_str(
3171 r#"
3172FROM walk
3173JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3174ON e.from_id = walk.id
3175JOIN graph_nodes n_score ON n_score.id = e.to_id
3176WHERE walk.depth < ?
3177"#,
3178 );
3179 let mut values = vec![
3180 Value::Text(center_id.to_string()),
3181 Value::Integer(i64::MAX),
3182 Value::Integer(options.depth as i64),
3183 ];
3184 if let Some(kind) = &options.edge_kind {
3185 sql.push_str(" AND e.kind = ?");
3186 values.push(Value::Text(kind.clone()));
3187 }
3188 sql.push_str(
3189 r#"
3190 ),
3191scored_nodes AS (
3192SELECT walk.id, MAX(walk.score) AS score,
3193n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3194FROM walk
3195JOIN graph_nodes n ON n.id = walk.id
3196GROUP BY walk.id
3197 ),
3198 ranked AS (
3199 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3200 FROM scored_nodes
3201 ORDER BY score DESC, id ASC
3202 ),
3203 kept AS (
3204 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3205 FROM ranked
3206 LIMIT ?
3207 ),
3208 total AS (
3209 SELECT COUNT(*) AS cnt FROM scored_nodes
3210 )
3211 SELECT
3212 'meta' AS row_type,
3213 (SELECT cnt FROM total) AS total_discovered,
3214 0 AS node_id, '' AS node_kind, '' AS node_label,
3215 '' AS node_props, '' AS node_prov, '' AS node_fresh,
3216 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3217 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3218 UNION ALL
3219 SELECT
3220 'node' AS row_type,
3221 0 AS total_discovered,
3222 k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
3223 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3224 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3225 FROM kept k
3226 UNION ALL
3227 SELECT
3228 'edge' AS row_type,
3229 0 AS total_discovered,
3230 '' AS node_id, '' AS node_kind, '' AS node_label,
3231 '' AS node_props, '' AS node_prov, '' AS node_fresh,
3232 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3233 FROM graph_edges e
3234 WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
3235 AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
3236"#,
3237 );
3238 values.push(Value::Integer(options.max_nodes.saturating_add(1) as i64));
3239
3240 let mut stmt = self.conn.prepare(&sql)?;
3241 let mut nodes = vec![center.clone()];
3242 let mut edges = Vec::new();
3243 let mut total_discovered = 0usize;
3244
3245 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3246 let row_type: String = row.get(0)?;
3247 match row_type.as_str() {
3248 "meta" => Ok(QueryResult::Meta {
3249 total: row.get::<_, i64>(1)? as usize,
3250 }),
3251 "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
3252 "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
3253 _ => Err(rusqlite::Error::InvalidQuery),
3254 }
3255 })?;
3256 for row_result in rows {
3257 match row_result? {
3258 QueryResult::Meta { total } => {
3259 total_discovered = total;
3260 }
3261 QueryResult::Node(node) => {
3262 if node.id != center_id {
3263 nodes.push(node);
3264 }
3265 }
3266 QueryResult::Edge(edge) => {
3267 edges.push(edge);
3268 }
3269 }
3270 }
3271
3272 let total_discovered = total_discovered.max(nodes.len());
3273 let pruned_count = total_discovered.saturating_sub(nodes.len());
3274
3275 match options.property_mode {
3276 PropertyMode::Full => {}
3277 PropertyMode::Omit => {
3278 for n in &mut nodes {
3279 n.properties.clear();
3280 }
3281 for e in &mut edges {
3282 e.properties.clear();
3283 }
3284 }
3285 PropertyMode::Sample => {
3286 let mut seen_kinds = std::collections::BTreeSet::new();
3287 for n in &mut nodes {
3288 if !seen_kinds.contains(&n.kind) {
3289 seen_kinds.insert(n.kind.clone());
3290 } else {
3291 n.properties.clear();
3292 }
3293 }
3294 for e in &mut edges {
3295 e.properties.clear();
3296 }
3297 }
3298 }
3299
3300 Ok(Some(RankedNeighborhoodResult {
3301 nodes,
3302 edges,
3303 pruned_count,
3304 total_discovered,
3305 }))
3306 }
3307
3308 fn neighborhood(
3309 &self,
3310 center_id: &str,
3311 depth: usize,
3312 kind: Option<&str>,
3313 ) -> Result<Option<GraphSubgraph>> {
3314 self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
3315 .map(|result| {
3316 result.map(|result| {
3317 GraphSubgraph {
3318 nodes: result.nodes,
3319 edges: result.edges,
3320 }
3321 .sorted()
3322 })
3323 })
3324 }
3325
3326 fn paged_neighborhood(
3327 &self,
3328 center_id: &str,
3329 depth: usize,
3330 kind: Option<&str>,
3331 options: GraphQueryOptions,
3332 ) -> Result<Option<GraphPagedSubgraph>> {
3333 if self.node(center_id)?.is_none() {
3334 return Ok(None);
3335 }
3336 let mut sql = String::from(
3337 r#"
3338 WITH RECURSIVE walk(id, depth) AS (
3339 SELECT ?, 0
3340 UNION
3341 SELECT e.to_id, walk.depth + 1
3342 FROM walk
3343 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3344 ON e.from_id = walk.id
3345 WHERE walk.depth < ?
3346 "#,
3347 );
3348 let mut values = vec![
3349 Value::Text(center_id.to_string()),
3350 Value::Integer(depth as i64),
3351 ];
3352 if let Some(kind) = kind {
3353 sql.push_str(" AND e.kind = ?");
3354 values.push(Value::Text(kind.to_string()));
3355 }
3356 sql.push_str(
3357 r#"
3358 ),
3359 filtered_nodes AS (
3360 SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3361 FROM walk
3362 JOIN graph_nodes n ON n.id = walk.id
3363 WHERE 1 = 1
3364 "#,
3365 );
3366 push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
3367 if let Some(cursor) = &options.cursor {
3368 sql.push_str(" AND n.id > ?");
3369 values.push(Value::Text(cursor.clone()));
3370 }
3371 sql.push_str(
3372 r#"
3373 ),
3374 page_nodes AS (
3375 SELECT id, kind, label, properties_json, provenance_json, freshness_json
3376 FROM filtered_nodes
3377 ORDER BY id
3378 "#,
3379 );
3380 if let Some(limit) = options.limit {
3381 sql.push_str(" LIMIT ?");
3382 values.push(Value::Integer(limit.saturating_add(1) as i64));
3383 }
3384 sql.push_str(
3385 r#"
3386 ),
3387 walk_edges AS (
3388 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3389 FROM walk
3390 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3391 ON e.from_id = walk.id
3392 WHERE walk.depth < ?
3393 "#,
3394 );
3395 values.push(Value::Integer(depth as i64));
3396 if let Some(kind) = kind {
3397 sql.push_str(" AND e.kind = ?");
3398 values.push(Value::Text(kind.to_string()));
3399 }
3400 sql.push_str(
3401 r#"
3402 )
3403 SELECT
3404 'node' AS row_type,
3405 p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
3406 NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
3407 NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
3408 FROM page_nodes p
3409 UNION ALL
3410 SELECT DISTINCT
3411 'edge' AS row_type,
3412 NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
3413 NULL AS provenance_json, NULL AS freshness_json,
3414 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3415 FROM walk_edges e
3416 WHERE e.from_id IN (SELECT id FROM page_nodes)
3417 AND e.to_id IN (SELECT id FROM page_nodes)
3418 "#,
3419 );
3420
3421 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3422 let mut stmt = self.conn.prepare(&sql)?;
3423 let mut nodes = Vec::new();
3424 let mut edges = Vec::new();
3425 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3426 let row_type: String = row.get(0)?;
3427 match row_type.as_str() {
3428 "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
3429 "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
3430 _ => Err(rusqlite::Error::InvalidQuery),
3431 }
3432 })?;
3433 for row in rows {
3434 let (node, edge) = row?;
3435 if let Some(node) = node {
3436 nodes.push(node);
3437 }
3438 if let Some(edge) = edge {
3439 edges.push(edge);
3440 }
3441 }
3442 nodes.sort_by(|left, right| left.id.cmp(&right.id));
3443 let before_limit = nodes.len();
3444 let mut next_cursor = None;
3445 if let Some(limit) = options.limit
3446 && nodes.len() > limit
3447 {
3448 next_cursor = nodes
3449 .get(limit.saturating_sub(1))
3450 .map(|node| node.id.clone());
3451 nodes.truncate(limit);
3452 }
3453 let node_ids = nodes
3454 .iter()
3455 .map(|node| node.id.as_str())
3456 .collect::<BTreeSet<_>>();
3457 edges.retain(|edge| {
3458 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
3459 });
3460 edges.sort_by(|left, right| {
3461 left.from_id
3462 .cmp(&right.from_id)
3463 .then(left.kind.cmp(&right.kind))
3464 .then(left.to_id.cmp(&right.to_id))
3465 });
3466 let expected_indexes = if options.property_filters.is_empty() {
3467 vec!["idx_graph_edges_from_kind"]
3468 } else {
3469 vec![
3470 "idx_graph_edges_from_kind",
3471 "idx_graph_node_properties_key_value_node",
3472 ]
3473 };
3474 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3475 diagnostics.push(
3476 "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
3477 );
3478 if !options.property_filters.is_empty() {
3479 diagnostics.push(
3480 "property filters were evaluated by SQLite materialized property rows before paging"
3481 .to_string(),
3482 );
3483 }
3484 if options.cursor.is_some() {
3485 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
3486 }
3487 if next_cursor.is_some() {
3488 diagnostics.push(
3489 "result was truncated; pass page.next_cursor as --cursor for the next page"
3490 .to_string(),
3491 );
3492 }
3493 Ok(Some(GraphPagedSubgraph {
3494 page: GraphQueryPage {
3495 cursor: options.cursor,
3496 limit: options.limit,
3497 next_cursor,
3498 returned_nodes: nodes.len(),
3499 returned_edges: edges.len(),
3500 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3501 diagnostics,
3502 },
3503 nodes,
3504 edges,
3505 }))
3506 }
3507
3508 fn shortest_path(
3509 &self,
3510 from_id: &str,
3511 to_id: &str,
3512 kind: Option<&str>,
3513 ) -> Result<Option<GraphPath>> {
3514 self.shortest_path_with_max_hops(from_id, to_id, kind, None)
3515 }
3516
3517 fn shortest_path_with_max_hops(
3518 &self,
3519 from_id: &str,
3520 to_id: &str,
3521 kind: Option<&str>,
3522 max_hops: Option<usize>,
3523 ) -> Result<Option<GraphPath>> {
3524 if from_id == to_id {
3525 return Ok(Some(GraphPath {
3526 nodes: vec![from_id.to_string()],
3527 hops: 0,
3528 }));
3529 }
3530 let hop_limit = max_hops.unwrap_or(usize::MAX);
3531 if hop_limit == 0 {
3532 return Ok(None);
3533 }
3534
3535 self.assert_not_in_temp_table_section();
3536 self.temp_table_active.set(true);
3537 let result = (|| -> Result<Option<GraphPath>> {
3538 let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
3539 let tbl = format!("_tsift_frontier_{call_id}");
3540
3541 let mut visited = BTreeSet::from([from_id.to_string()]);
3542 let mut parent =
3543 BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
3544 let mut frontier = vec![from_id.to_string()];
3545 self.conn.execute_batch(&format!(
3546 r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
3547 DELETE FROM "{tbl}";"#,
3548 ))?;
3549 let select_sql = if kind.is_some() {
3550 format!(
3551 r#"SELECT e.from_id, e.to_id
3552 FROM "{tbl}" f
3553 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3554 ON e.from_id = f.id
3555 WHERE e.kind = ?
3556 ORDER BY e.from_id, e.to_id, e.kind"#,
3557 )
3558 } else {
3559 format!(
3560 r#"SELECT e.from_id, e.to_id
3561 FROM "{tbl}" f
3562 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3563 ON e.from_id = f.id
3564 ORDER BY e.from_id, e.to_id, e.kind"#,
3565 )
3566 };
3567 let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3568 let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3569 let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3570 let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3571 let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3572 let mut found_path: Option<GraphPath> = None;
3573 for _depth in 0..hop_limit {
3574 if frontier.is_empty() {
3575 break;
3576 }
3577 self.conn.execute(&delete_sql, [])?;
3578 for id in &frontier {
3579 frontier_insert_stmt.execute([id.as_str()])?;
3580 }
3581 let mut next_frontier = BTreeSet::new();
3582 let rows = if let Some(kind) = kind {
3583 collect_rows(frontier_select_stmt.query_map([kind], |row| {
3584 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3585 })?)?
3586 } else {
3587 collect_rows(frontier_select_stmt.query_map([], |row| {
3588 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3589 })?)?
3590 };
3591 for (from, next) in rows {
3592 if !visited.insert(next.clone()) {
3593 continue;
3594 }
3595 parent.insert(next.clone(), from);
3596 if next == to_id {
3597 let mut nodes = vec![to_id.to_string()];
3598 let mut cursor = to_id;
3599 while let Some(previous) = parent.get(cursor) {
3600 if previous.is_empty() {
3601 break;
3602 }
3603 nodes.push(previous.clone());
3604 cursor = previous;
3605 }
3606 nodes.reverse();
3607 found_path = Some(GraphPath {
3608 hops: nodes.len().saturating_sub(1),
3609 nodes,
3610 });
3611 break;
3612 }
3613 next_frontier.insert(next);
3614 }
3615 if found_path.is_some() {
3616 break;
3617 }
3618 frontier = next_frontier.into_iter().collect();
3619 }
3620 let _ = self.conn.execute_batch(&drop_sql);
3621 Ok(found_path)
3622 })();
3623 self.temp_table_active.set(false);
3624 result
3625 }
3626
3627 fn reachable_nodes_by_kind(
3628 &self,
3629 from_id: &str,
3630 kind: &str,
3631 depth: usize,
3632 limit: usize,
3633 ) -> Result<Vec<(GraphNode, GraphPath)>> {
3634 Ok(self
3635 .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3636 .remove(kind)
3637 .unwrap_or_default())
3638 }
3639
3640 fn reachable_nodes_by_kinds(
3641 &self,
3642 from_id: &str,
3643 kinds: &[&str],
3644 depth: usize,
3645 limit: usize,
3646 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3647 let mut requested = kinds
3648 .iter()
3649 .map(|kind| (*kind).to_string())
3650 .collect::<BTreeSet<_>>()
3651 .into_iter()
3652 .collect::<Vec<_>>();
3653 let mut results = requested
3654 .iter()
3655 .map(|kind| (kind.clone(), Vec::new()))
3656 .collect::<BTreeMap<_, _>>();
3657 if requested.is_empty() {
3658 return Ok(results);
3659 }
3660 requested.sort();
3661 let placeholders = std::iter::repeat_n("?", requested.len())
3662 .collect::<Vec<_>>()
3663 .join(", ");
3664 let mut sql = format!(
3665 r#"
3666 WITH RECURSIVE walk(id, depth, path) AS (
3667 SELECT ?, 0, char(31) || ? || char(31)
3668 UNION ALL
3669 SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3670 FROM walk
3671 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3672 ON e.from_id = walk.id
3673 WHERE walk.depth < ?
3674 AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3675 ),
3676 ranked AS (
3677 SELECT
3678 n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3679 walk.path, walk.depth,
3680 ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3681 FROM walk
3682 JOIN graph_nodes n ON n.id = walk.id
3683 WHERE n.kind IN ({placeholders}) AND n.id <> ?
3684 ),
3685 kind_ranked AS (
3686 SELECT *,
3687 ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3688 FROM ranked
3689 WHERE rn = 1
3690 )
3691 SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3692 FROM kind_ranked
3693 "#,
3694 );
3695 let mut values = vec![
3696 Value::Text(from_id.to_string()),
3697 Value::Text(from_id.to_string()),
3698 Value::Integer(depth as i64),
3699 ];
3700 values.extend(requested.iter().cloned().map(Value::Text));
3701 values.push(Value::Text(from_id.to_string()));
3702 if limit > 0 && limit != usize::MAX {
3703 sql.push_str(" WHERE kind_rank <= ?");
3704 values.push(Value::Integer(limit as i64));
3705 }
3706 sql.push_str(" ORDER BY kind, depth, label, id");
3707 let mut stmt = self.conn.prepare(&sql)?;
3708 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3709 let node = node_from_row(row)?;
3710 let path: String = row.get(6)?;
3711 let hops: usize = row.get(7)?;
3712 Ok((
3713 node,
3714 GraphPath {
3715 nodes: path
3716 .split('\u{1f}')
3717 .filter(|part| !part.is_empty())
3718 .map(str::to_string)
3719 .collect(),
3720 hops,
3721 },
3722 ))
3723 })?)?;
3724 for (node, path) in rows {
3725 results
3726 .entry(node.kind.clone())
3727 .or_default()
3728 .push((node, path));
3729 }
3730 Ok(results)
3731 }
3732
3733 fn evidence_target_candidates(
3734 &self,
3735 target: &str,
3736 kinds: &[&str],
3737 preferred_path: Option<&str>,
3738 ) -> Result<Vec<GraphNode>> {
3739 if kinds.is_empty() {
3740 return Ok(Vec::new());
3741 }
3742
3743 let normalized = target.trim().trim_start_matches('#');
3744 let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3745 .collect::<Vec<_>>()
3746 .join(", ");
3747 let kind_rank = kinds
3748 .iter()
3749 .enumerate()
3750 .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3751 .collect::<Vec<_>>()
3752 .join(" ");
3753 let path_filter = if preferred_path.is_some() {
3754 r#"
3755AND EXISTS (
3756 SELECT 1
3757 FROM graph_node_properties p_path INDEXED BY idx_graph_node_properties_key_value_node
3758 WHERE p_path.node_id = n.id
3759 AND p_path.key = 'path'
3760 AND p_path.value = ?
3761)
3762"#
3763 } else {
3764 ""
3765 };
3766 let sql = format!(
3767 r#"
3768SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3769FROM graph_nodes n
3770WHERE n.kind IN ({kind_placeholders})
3771 AND (
3772 EXISTS (
3773 SELECT 1
3774 FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3775 WHERE p_handle.node_id = n.id
3776 AND p_handle.key = 'handle'
3777 AND p_handle.value = ?
3778 )
3779 OR EXISTS (
3780 SELECT 1
3781 FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3782 WHERE p_ref.node_id = n.id
3783 AND p_ref.key = 'ref_id'
3784 AND p_ref.value = ?
3785)
3786OR n.label = ?
3787OR n.label = ?
3788)
3789{path_filter}
3790ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3791"#
3792 );
3793 let mut values = kinds
3794 .iter()
3795 .map(|kind| Value::Text((*kind).to_string()))
3796 .collect::<Vec<_>>();
3797 values.push(Value::Text(target.to_string()));
3798 values.push(Value::Text(normalized.to_string()));
3799 values.push(Value::Text(target.to_string()));
3800 values.push(Value::Text(format!("#{normalized}")));
3801 if let Some(path) = preferred_path {
3802 values.push(Value::Text(path.to_string()));
3803 }
3804 values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3805 let mut stmt = self.conn.prepare(&sql)?;
3806 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)
3807 }
3808
3809 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3810 if let Some(node) = self.node(target)? {
3811 return Ok(Some(node));
3812 }
3813 Ok(self
3814 .evidence_target_candidates(target, kinds, None)?
3815 .into_iter()
3816 .next())
3817 }
3818}
3819
3820fn to_json<T: Serialize>(value: &T) -> Result<String> {
3821 serde_json::to_string(value).map_err(Into::into)
3822}
3823
3824fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3825 let payload = serde_json::to_vec(value)?;
3826 Ok(blake3::hash(&payload).to_hex().to_string())
3827}
3828
3829fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3830 value.as_ref().map(to_json).transpose()
3831}
3832
3833fn collect_rows<T>(
3834 rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3835) -> Result<Vec<T>> {
3836 rows.collect::<std::result::Result<Vec<_>, _>>()
3837 .map_err(Into::into)
3838}
3839
3840enum QueryResult {
3841 Meta { total: usize },
3842 Node(GraphNode),
3843 Edge(GraphEdge),
3844}
3845
3846fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3847 let properties_col = offset + 3;
3848 let provenance_col = offset + 4;
3849 let freshness_col = offset + 5;
3850 let properties_json: String = row.get(properties_col)?;
3851 let provenance_json: String = row.get(provenance_col)?;
3852 let freshness_json: Option<String> = row.get(freshness_col)?;
3853 Ok(GraphNode {
3854 id: row.get(offset)?,
3855 kind: row.get(offset + 1)?,
3856 label: row.get(offset + 2)?,
3857 properties: from_json(properties_col, &properties_json)?,
3858 provenance: from_json(provenance_col, &provenance_json)?,
3859 freshness: optional_from_json(freshness_col, freshness_json)?,
3860 })
3861}
3862
3863fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3864 node_from_row_at(row, 0)
3865}
3866
3867fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3868 let properties_col = offset + 4;
3869 let provenance_col = offset + 5;
3870 let freshness_col = offset + 6;
3871 let properties_json: String = row.get(properties_col)?;
3872 let provenance_json: String = row.get(provenance_col)?;
3873 let freshness_json: Option<String> = row.get(freshness_col)?;
3874 Ok(GraphEdge {
3875 id: row.get(offset)?,
3876 from_id: row.get(offset + 1)?,
3877 to_id: row.get(offset + 2)?,
3878 kind: row.get(offset + 3)?,
3879 properties: from_json(properties_col, &properties_json)?,
3880 provenance: from_json(provenance_col, &provenance_json)?,
3881 freshness: optional_from_json(freshness_col, freshness_json)?,
3882 })
3883}
3884
3885fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3886 edge_from_row_at(row, 0)
3887}
3888
3889fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3890 serde_json::from_str(raw)
3891 .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3892}
3893
3894fn optional_from_json<T: DeserializeOwned>(
3895 column: usize,
3896 raw: Option<String>,
3897) -> rusqlite::Result<Option<T>> {
3898 raw.map(|value| from_json(column, &value)).transpose()
3899}
3900
3901fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3902 nodes
3903 .iter()
3904 .find(|node| node.kind == "projection_meta")
3905 .and_then(|node| node.properties.get("projection_version").cloned())
3906}
3907
3908fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3909 nodes
3910 .iter()
3911 .find(|node| node.kind == "projection_meta")
3912 .and_then(|node| node.properties.get("content_hash").cloned())
3913}
3914
3915fn unix_now() -> i64 {
3916 std::time::SystemTime::now()
3917 .duration_since(std::time::UNIX_EPOCH)
3918 .map(|duration| duration.as_secs() as i64)
3919 .unwrap_or_default()
3920}
3921
3922fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3923 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3924 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3925 Ok(page_count.saturating_mul(page_size))
3926}
3927
3928fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3929 let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3930 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3931 Ok(freelist_count.saturating_mul(page_size))
3932}
3933
3934#[cfg(test)]
3935mod tests {
3936 use super::*;
3937
3938 fn sample_provenance() -> GraphProvenance {
3939 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3940 }
3941
3942 fn sample_projection() -> GraphProjection {
3943 let source = sample_provenance();
3944 GraphProjection {
3945 nodes: vec![
3946 GraphNode::new("doc:livekit", "document", "LiveKit guide")
3947 .with_property("domain", "livekit")
3948 .with_provenance(source.clone())
3949 .with_freshness(GraphFreshness::content_hash("node-hash")),
3950 GraphNode::new("topic:rooms", "topic", "Rooms"),
3951 GraphNode::new("topic:egress", "topic", "Egress"),
3952 ],
3953 edges: vec![
3954 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3955 .with_property("confidence", "0.91")
3956 .with_provenance(source.clone())
3957 .with_freshness(GraphFreshness::content_hash("edge-hash")),
3958 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3959 ],
3960 }
3961 }
3962
3963 fn assert_projection_store_contract(store: &impl GraphStore) {
3964 let projection = sample_projection();
3965 projection.upsert_into(store).unwrap();
3966
3967 assert_eq!(
3968 store.node("doc:livekit").unwrap(),
3969 projection
3970 .nodes
3971 .iter()
3972 .find(|node| node.id == "doc:livekit")
3973 .cloned()
3974 );
3975 assert_eq!(
3976 store.nodes_by_kind("topic").unwrap(),
3977 vec![
3978 GraphNode::new("topic:egress", "topic", "Egress"),
3979 GraphNode::new("topic:rooms", "topic", "Rooms"),
3980 ]
3981 );
3982
3983 let mentions = store
3984 .outgoing_edges("doc:livekit", Some("mentions"))
3985 .unwrap();
3986 assert_eq!(mentions.len(), 1);
3987 assert_eq!(mentions[0].to_id, "topic:rooms");
3988 assert_eq!(
3989 mentions[0].properties.get("confidence"),
3990 Some(&"0.91".into())
3991 );
3992
3993 let path = store
3994 .shortest_path("doc:livekit", "topic:egress", None)
3995 .unwrap()
3996 .unwrap();
3997 assert_eq!(
3998 path.nodes,
3999 vec!["doc:livekit", "topic:rooms", "topic:egress"]
4000 );
4001 }
4002
4003 #[test]
4004 fn sqlite_store_round_trips_generic_nodes_edges() {
4005 let store = SqliteGraphStore::in_memory().unwrap();
4006 let source = sample_provenance();
4007 let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4008 .with_property("domain", "livekit")
4009 .with_provenance(source.clone())
4010 .with_freshness(GraphFreshness::content_hash("node-hash"));
4011 let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
4012 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4013 .with_property("confidence", "0.91")
4014 .with_provenance(source)
4015 .with_freshness(GraphFreshness::content_hash("edge-hash"));
4016
4017 store.upsert_node(&node).unwrap();
4018 store.upsert_node(&topic).unwrap();
4019 store.upsert_edge(&edge).unwrap();
4020
4021 assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
4022 assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
4023 assert_eq!(store.all_nodes().unwrap().len(), 2);
4024 assert_eq!(store.all_edges().unwrap().len(), 1);
4025 assert_eq!(
4026 store
4027 .outgoing_edges("doc:livekit", Some("mentions"))
4028 .unwrap(),
4029 vec![edge]
4030 );
4031 }
4032
4033 #[test]
4034 fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
4035 let store = SqliteGraphStore::in_memory().unwrap();
4036 for node in [
4037 GraphNode::new("doc:livekit", "document", "LiveKit guide"),
4038 GraphNode::new("topic:rooms", "topic", "Rooms"),
4039 GraphNode::new("topic:egress", "topic", "Egress"),
4040 ] {
4041 store.upsert_node(&node).unwrap();
4042 }
4043 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4044 .with_property("confidence", "0.91");
4045 let edge_id = edge.id.clone();
4046 store.upsert_edge(&edge).unwrap();
4047 store
4048 .upsert_edge(
4049 &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
4050 .with_property("confidence", "0.42"),
4051 )
4052 .unwrap();
4053
4054 assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
4055 let mut expected_incident_ids = vec![
4056 GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
4057 GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
4058 ];
4059 expected_incident_ids.sort();
4060 assert_eq!(
4061 store
4062 .incident_edges("topic:rooms", None)
4063 .unwrap()
4064 .into_iter()
4065 .map(|edge| edge.id)
4066 .collect::<Vec<_>>(),
4067 expected_incident_ids
4068 );
4069
4070 let page = store
4071 .paged_edges(
4072 Some("mentions"),
4073 GraphQueryOptions {
4074 property_filters: vec![GraphPropertyFilter {
4075 key: "confidence".to_string(),
4076 value: "0.91".to_string(),
4077 }],
4078 ..GraphQueryOptions::default()
4079 },
4080 )
4081 .unwrap();
4082 assert_eq!(page.edges.len(), 1);
4083 assert_eq!(page.edges[0].id, edge_id);
4084 assert!(
4085 page.page
4086 .diagnostics
4087 .iter()
4088 .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
4089 "{:?}",
4090 page.page.diagnostics
4091 );
4092 assert!(
4093 page.page
4094 .diagnostics
4095 .iter()
4096 .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
4097 "{:?}",
4098 page.page.diagnostics
4099 );
4100 assert!(
4101 page.page.diagnostics.iter().any(|diagnostic| diagnostic
4102 .contains("edge property primary filter matched 1 materialized row")),
4103 "{:?}",
4104 page.page.diagnostics
4105 );
4106 assert!(
4107 page.page
4108 .diagnostics
4109 .iter()
4110 .any(|diagnostic| diagnostic
4111 .contains("drives from SQLite materialized property rows")),
4112 "{:?}",
4113 page.page.diagnostics
4114 );
4115
4116 let property_rows: usize = store
4117 .conn
4118 .query_row(
4119 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4120 [],
4121 |row| row.get(0),
4122 )
4123 .unwrap();
4124 assert_eq!(property_rows, 2);
4125 }
4126
4127 #[test]
4128 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
4129 let sqlite = SqliteGraphStore::in_memory().unwrap();
4130 assert_projection_store_contract(&sqlite);
4131 }
4132
4133 #[test]
4134 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
4135 fn assert_crud_contract(store: &impl GraphStore) {
4136 let projection = sample_projection();
4137 projection.upsert_into(store).unwrap();
4138
4139 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
4140 assert_eq!(
4141 neighborhood
4142 .nodes
4143 .iter()
4144 .map(|node| node.id.as_str())
4145 .collect::<Vec<_>>(),
4146 vec!["doc:livekit", "topic:egress", "topic:rooms"]
4147 );
4148 assert_eq!(
4149 neighborhood
4150 .edges
4151 .iter()
4152 .map(|edge| (
4153 edge.from_id.as_str(),
4154 edge.kind.as_str(),
4155 edge.to_id.as_str()
4156 ))
4157 .collect::<Vec<_>>(),
4158 vec![
4159 ("doc:livekit", "mentions", "topic:rooms"),
4160 ("topic:rooms", "related_to", "topic:egress"),
4161 ]
4162 );
4163
4164 assert_eq!(
4165 store
4166 .delete_edge("topic:rooms", "topic:egress", "related_to")
4167 .unwrap(),
4168 1
4169 );
4170 assert!(
4171 store
4172 .shortest_path("doc:livekit", "topic:egress", None)
4173 .unwrap()
4174 .is_none()
4175 );
4176 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
4177 assert!(store.node("topic:rooms").unwrap().is_none());
4178 assert!(
4179 store
4180 .outgoing_edges("doc:livekit", None)
4181 .unwrap()
4182 .is_empty()
4183 );
4184 }
4185
4186 assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
4187 }
4188
4189 #[test]
4190 fn sqlite_ranked_neighborhood_prefers_recent_memory_nodes_when_pruning() {
4191 let store = SqliteGraphStore::in_memory().unwrap();
4192 store
4193 .upsert_node(&GraphNode::new("center", "file", "center"))
4194 .unwrap();
4195 store
4196 .upsert_node(&GraphNode::new("aaa-code", "symbol", "code candidate"))
4197 .unwrap();
4198 store
4199 .upsert_node(
4200 &GraphNode::new("mmm-stale", "memory_event", "stale memory")
4201 .with_property("provider", "tsift-memory")
4202 .with_property("observed_at_unix", "1000"),
4203 )
4204 .unwrap();
4205 store
4206 .upsert_node(
4207 &GraphNode::new("zzz-fresh", "memory_event", "fresh memory")
4208 .with_property("provider", "tsift-memory")
4209 .with_property("observed_at_unix", "1995"),
4210 )
4211 .unwrap();
4212 store
4213 .upsert_edge(&GraphEdge::new("center", "aaa-code", "mentions"))
4214 .unwrap();
4215 store
4216 .upsert_edge(&GraphEdge::new("center", "mmm-stale", "mentions"))
4217 .unwrap();
4218 store
4219 .upsert_edge(&GraphEdge::new("center", "zzz-fresh", "mentions"))
4220 .unwrap();
4221
4222 let options = RankedNeighborhoodOptions::new(1, 1)
4223 .with_observed_at_now_unix(2000)
4224 .with_observed_at_half_life_secs(100);
4225 let result = store
4226 .ranked_neighborhood("center", &options)
4227 .unwrap()
4228 .unwrap();
4229 let ids: Vec<_> = result.nodes.iter().map(|node| node.id.as_str()).collect();
4230 assert!(ids.contains(&"center"));
4231 assert!(
4232 ids.contains(&"zzz-fresh"),
4233 "fresh memory node should survive pruning: {ids:?}"
4234 );
4235 assert!(!ids.contains(&"aaa-code"));
4236 assert!(!ids.contains(&"mmm-stale"));
4237 }
4238
4239 #[test]
4240 fn sqlite_semantic_seeded_neighborhood_scores_before_sql_edge_cap() {
4241 let store = SqliteGraphStore::in_memory().unwrap();
4242 store
4243 .upsert_node(&GraphNode::new("seed", "semantic_concept", "graph budget"))
4244 .unwrap();
4245 store
4246 .upsert_node(&GraphNode::new("zzz_high", "symbol", "high_signal"))
4247 .unwrap();
4248 store
4249 .upsert_edge(&GraphEdge::new("zzz_high", "seed", "mentions_concept"))
4250 .unwrap();
4251 for idx in 0..24 {
4252 let id = format!("aaa_low_{idx:02}");
4253 store
4254 .upsert_node(&GraphNode::new(id.clone(), "note", format!("low {idx}")))
4255 .unwrap();
4256 store
4257 .upsert_edge(&GraphEdge::new(id, "seed", "weak_link"))
4258 .unwrap();
4259 }
4260
4261 let options = SemanticSeededNeighborhoodOptions::new(1, 3)
4262 .with_edge_scan_cap(16)
4263 .with_node_discovery_cap(9);
4264 let result = store
4265 .semantic_seeded_neighborhood(&["seed".to_string()], &options)
4266 .unwrap();
4267 let ids = result
4268 .nodes
4269 .iter()
4270 .map(|node| node.id.as_str())
4271 .collect::<Vec<_>>();
4272
4273 assert_eq!(ids.len(), 3);
4274 assert_eq!(ids[0], "seed");
4275 assert_eq!(ids[1], "zzz_high");
4276 assert_eq!(result.skipped_by_edge_cap, 9);
4277 assert!(result.truncated);
4278 }
4279
4280 #[test]
4281 fn sqlite_upsert_projection_batches_rows_and_properties() {
4282 let mut store = SqliteGraphStore::in_memory().unwrap();
4283 let mut projection = sample_projection();
4284 store.upsert_projection(&projection).unwrap();
4285
4286 let page = store
4287 .paged_nodes_by_kind(
4288 "document",
4289 GraphQueryOptions {
4290 property_filters: vec![GraphPropertyFilter {
4291 key: "domain".to_string(),
4292 value: "livekit".to_string(),
4293 }],
4294 ..GraphQueryOptions::default()
4295 },
4296 )
4297 .unwrap();
4298 assert_eq!(page.nodes[0].id, "doc:livekit");
4299
4300 projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4301 .with_property("domain", "recording");
4302 store.upsert_projection(&projection).unwrap();
4303
4304 let old_property_count: usize = store
4305 .conn
4306 .query_row(
4307 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
4308 [],
4309 |row| row.get(0),
4310 )
4311 .unwrap();
4312 let updated_page = store
4313 .paged_nodes_by_kind(
4314 "document",
4315 GraphQueryOptions {
4316 property_filters: vec![GraphPropertyFilter {
4317 key: "domain".to_string(),
4318 value: "recording".to_string(),
4319 }],
4320 ..GraphQueryOptions::default()
4321 },
4322 )
4323 .unwrap();
4324 assert_eq!(old_property_count, 0);
4325 assert_eq!(updated_page.nodes[0].id, "doc:livekit");
4326 let edge_property_count: usize = store
4327 .conn
4328 .query_row(
4329 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4330 [],
4331 |row| row.get(0),
4332 )
4333 .unwrap();
4334 assert_eq!(edge_property_count, 1);
4335
4336 let changes_before = store.conn.total_changes();
4337 store.upsert_projection(&projection).unwrap();
4338 assert_eq!(
4339 store.conn.total_changes(),
4340 changes_before,
4341 "unchanged projection rows should not rewrite graph rows or materialized properties"
4342 );
4343 }
4344
4345 #[test]
4346 fn sqlite_semantic_top_candidates_use_materialized_vector_table() {
4347 let store = SqliteGraphStore::in_memory().unwrap();
4348 store
4349 .upsert_node(
4350 &GraphNode::new("concept:graph", "semantic_concept", "graph navigation")
4351 .with_property("embedding_model", "fixture-v1")
4352 .with_property("embedding", "1.0,0.0"),
4353 )
4354 .unwrap();
4355 store
4356 .upsert_node(
4357 &GraphNode::new("concept:sqlite", "semantic_concept", "sqlite search")
4358 .with_property("embedding", "0.0,1.0"),
4359 )
4360 .unwrap();
4361 store
4362 .upsert_node(&GraphNode::new("entity:skip", "semantic_entity", "skipped"))
4363 .unwrap();
4364
4365 let vector_rows: usize = store
4366 .conn
4367 .query_row(
4368 "SELECT COUNT(*) FROM graph_node_semantic_vectors",
4369 [],
4370 |row| row.get(0),
4371 )
4372 .unwrap();
4373 assert_eq!(vector_rows, 2);
4374
4375 let candidates = store
4376 .semantic_top_candidates(&[1.0, 0.0], &["semantic_concept"], 1)
4377 .unwrap();
4378 assert_eq!(candidates.len(), 1);
4379 assert_eq!(candidates[0].node.id, "concept:graph");
4380 assert_eq!(candidates[0].score, 1.0);
4381
4382 store
4383 .upsert_node(&GraphNode::new(
4384 "concept:graph",
4385 "semantic_concept",
4386 "graph navigation",
4387 ))
4388 .unwrap();
4389 let vector_rows_after_update: usize = store
4390 .conn
4391 .query_row(
4392 "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE node_id = 'concept:graph'",
4393 [],
4394 |row| row.get(0),
4395 )
4396 .unwrap();
4397 assert_eq!(vector_rows_after_update, 0);
4398 }
4399
4400 #[test]
4401 fn sqlite_store_filters_edges_by_kind_and_paths() {
4402 let store = SqliteGraphStore::in_memory().unwrap();
4403 for id in ["a", "b", "c"] {
4404 store
4405 .upsert_node(&GraphNode::new(id, "symbol", id))
4406 .unwrap();
4407 }
4408 store
4409 .upsert_edge(&GraphEdge::new("a", "b", "calls"))
4410 .unwrap();
4411 store
4412 .upsert_edge(&GraphEdge::new("a", "c", "documents"))
4413 .unwrap();
4414 store
4415 .upsert_edge(&GraphEdge::new("b", "c", "calls"))
4416 .unwrap();
4417
4418 let calls = store.outgoing_edges("a", Some("calls")).unwrap();
4419 assert_eq!(calls.len(), 1);
4420 assert_eq!(calls[0].to_id, "b");
4421 assert_eq!(store.graph_counts().unwrap(), (3, 3));
4422 assert_eq!(
4423 store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
4424 "b"
4425 );
4426
4427 let path = store
4428 .shortest_path("a", "c", Some("calls"))
4429 .unwrap()
4430 .unwrap();
4431 assert_eq!(path.nodes, vec!["a", "b", "c"]);
4432 assert_eq!(path.hops, 2);
4433
4434 assert!(
4435 store
4436 .shortest_path("c", "a", Some("calls"))
4437 .unwrap()
4438 .is_none()
4439 );
4440 }
4441
4442 #[test]
4443 fn sqlite_store_batches_edges_between_node_sets() {
4444 let store = SqliteGraphStore::in_memory().unwrap();
4445 for id in ["a", "b", "c", "outside"] {
4446 store
4447 .upsert_node(&GraphNode::new(id, "symbol", id))
4448 .unwrap();
4449 }
4450 for edge in [
4451 GraphEdge::new("a", "b", "calls"),
4452 GraphEdge::new("b", "c", "calls"),
4453 GraphEdge::new("a", "outside", "calls"),
4454 GraphEdge::new("outside", "c", "calls"),
4455 ] {
4456 store.upsert_edge(&edge).unwrap();
4457 }
4458
4459 let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
4460 .into_iter()
4461 .collect::<BTreeSet<_>>();
4462 let edge_keys = store
4463 .edges_between_nodes(&scoped)
4464 .unwrap()
4465 .into_iter()
4466 .map(|edge| (edge.from_id, edge.kind, edge.to_id))
4467 .collect::<Vec<_>>();
4468
4469 assert_eq!(
4470 edge_keys,
4471 vec![
4472 ("a".to_string(), "calls".to_string(), "b".to_string()),
4473 ("b".to_string(), "calls".to_string(), "c".to_string()),
4474 ]
4475 );
4476 }
4477
4478 #[test]
4479 fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
4480 let mut store = SqliteGraphStore::in_memory().unwrap();
4481 let mut projection = sample_projection();
4482 projection.nodes.push(
4483 GraphNode::new(
4484 "projection:fixture",
4485 "projection_meta",
4486 "fixture projection",
4487 )
4488 .with_property("projection_version", "fixture-v1")
4489 .with_property("content_hash", "hash-a"),
4490 );
4491 store
4492 .replace_projection_with_version(
4493 "root",
4494 &projection,
4495 Some("fixture-v1"),
4496 Some("commit-a".to_string()),
4497 )
4498 .unwrap();
4499
4500 projection.nodes.retain(|node| node.id != "topic:egress");
4501 projection.edges.retain(|edge| edge.to_id != "topic:egress");
4502 let refresh = store
4503 .replace_projection_with_version(
4504 "root",
4505 &projection,
4506 Some("fixture-v2"),
4507 Some("commit-b".to_string()),
4508 )
4509 .unwrap();
4510
4511 assert_eq!(refresh.projection_version, "fixture-v2");
4512 assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
4513 assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
4514 assert_eq!(refresh.tombstoned_edges.len(), 1);
4515 assert_eq!(refresh.deleted_nodes, 1);
4516 assert_eq!(refresh.deleted_edges, 1);
4517 assert_eq!(refresh.unchanged_nodes, 3);
4518 assert_eq!(refresh.upserted_nodes, 0);
4519 assert_eq!(refresh.unchanged_properties, 4);
4520 assert_eq!(refresh.upserted_properties, 0);
4521 assert_eq!(refresh.deleted_properties, 0);
4522 assert!(
4523 refresh
4524 .phase_timings
4525 .iter()
4526 .any(|phase| phase.name == "sqlite_property_row_staging"),
4527 "{:?}",
4528 refresh.phase_timings
4529 );
4530 assert!(
4531 refresh
4532 .phase_timings
4533 .iter()
4534 .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
4535 "{:?}",
4536 refresh.phase_timings
4537 );
4538 let version = store.projection_version("root").unwrap().unwrap();
4539 assert_eq!(version.projection_version, "fixture-v2");
4540 assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
4541 let cached_counts: (usize, usize, usize, usize) = store
4542 .conn
4543 .query_row(
4544 r#"
4545 SELECT nodes, edges, tombstone_nodes, tombstone_edges
4546 FROM graph_operator_stats
4547 WHERE scope = 'root'
4548 "#,
4549 [],
4550 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
4551 )
4552 .unwrap();
4553 assert_eq!(cached_counts, (3, 1, 1, 1));
4554
4555 projection
4556 .nodes
4557 .push(GraphNode::new("topic:egress", "topic", "Egress"));
4558 let refresh = store
4559 .replace_projection_with_version(
4560 "root",
4561 &projection,
4562 Some("fixture-v3"),
4563 Some("commit-c".to_string()),
4564 )
4565 .unwrap();
4566 assert_eq!(refresh.pruned_tombstones, 1);
4567 assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
4568
4569 projection.nodes.retain(|node| node.id != "topic:egress");
4570 store
4571 .replace_projection_with_version(
4572 "root",
4573 &projection,
4574 Some("fixture-v4"),
4575 Some("commit-d".to_string()),
4576 )
4577 .unwrap();
4578 assert_eq!(store.compact_storage("root", true).unwrap(), 2);
4579 let cached_counts: (usize, usize, usize, usize) = store
4580 .conn
4581 .query_row(
4582 r#"
4583 SELECT nodes, edges, tombstone_nodes, tombstone_edges
4584 FROM graph_operator_stats
4585 WHERE scope = 'root'
4586 "#,
4587 [],
4588 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
4589 )
4590 .unwrap();
4591 assert_eq!(cached_counts, (3, 1, 0, 0));
4592 }
4593
4594 #[test]
4595 fn sqlite_shortest_path_uses_bounded_frontier() {
4596 let store = SqliteGraphStore::in_memory().unwrap();
4597 for idx in 0..80 {
4598 store
4599 .upsert_node(&GraphNode::new(
4600 format!("node:{idx:02}"),
4601 "symbol",
4602 format!("node {idx:02}"),
4603 ))
4604 .unwrap();
4605 }
4606 for idx in 0..79 {
4607 store
4608 .upsert_edge(&GraphEdge::new(
4609 format!("node:{idx:02}"),
4610 format!("node:{:02}", idx + 1),
4611 "calls",
4612 ))
4613 .unwrap();
4614 }
4615 store
4616 .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
4617 .unwrap();
4618
4619 assert!(
4620 store
4621 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
4622 .unwrap()
4623 .is_none()
4624 );
4625 let path = store
4626 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
4627 .unwrap()
4628 .unwrap();
4629 assert_eq!(path.hops, 79);
4630 assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
4631 assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
4632
4633 let direct = store
4634 .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
4635 .unwrap()
4636 .unwrap();
4637 assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
4638 }
4639
4640 #[test]
4641 fn sqlite_resolves_evidence_targets_with_indexed_properties() {
4642 let store = SqliteGraphStore::in_memory().unwrap();
4643 for node in [
4644 GraphNode::new("gbak-refresh", "backlog", "#refresh")
4645 .with_property("ref_id", "refresh")
4646 .with_property("path", "tasks/current.md")
4647 .with_property("handle", "backlog-handle"),
4648 GraphNode::new("gbak-zrefresh", "backlog", "#refresh")
4649 .with_property("ref_id", "refresh")
4650 .with_property("path", "tasks/other.md"),
4651 GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
4652 .with_property("ref_id", "refresh"),
4653 GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
4654 .with_property("ref_id", "refresh"),
4655 ] {
4656 store.upsert_node(&node).unwrap();
4657 }
4658
4659 let by_ref = store
4660 .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
4661 .unwrap()
4662 .unwrap();
4663 assert_eq!(by_ref.id, "gbak-refresh");
4664 let by_handle = store
4665 .resolve_evidence_target("backlog-handle", &["backlog"])
4666 .unwrap()
4667 .unwrap();
4668 assert_eq!(by_handle.id, "gbak-refresh");
4669 let by_path = store
4670 .evidence_target_candidates("#refresh", &["backlog"], Some("tasks/other.md"))
4671 .unwrap();
4672 assert_eq!(by_path.len(), 1);
4673 assert_eq!(by_path[0].id, "gbak-zrefresh");
4674 }
4675
4676 #[test]
4677 fn sqlite_schema_migration_backfills_materialized_node_properties() {
4678 let conn = Connection::open_in_memory().unwrap();
4679 conn.execute_batch(
4680 r#"
4681 PRAGMA user_version = 2;
4682 CREATE TABLE graph_nodes (
4683 id TEXT PRIMARY KEY,
4684 kind TEXT NOT NULL,
4685 label TEXT NOT NULL,
4686 properties_json TEXT NOT NULL DEFAULT '{}',
4687 provenance_json TEXT NOT NULL DEFAULT '[]',
4688 freshness_json TEXT,
4689 row_hash TEXT,
4690 source_watermark TEXT
4691 );
4692 CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
4693 CREATE TABLE graph_edges (
4694 from_id TEXT NOT NULL,
4695 to_id TEXT NOT NULL,
4696 kind TEXT NOT NULL,
4697 properties_json TEXT NOT NULL DEFAULT '{}',
4698 provenance_json TEXT NOT NULL DEFAULT '[]',
4699 freshness_json TEXT,
4700 row_hash TEXT,
4701 source_watermark TEXT,
4702 PRIMARY KEY (from_id, to_id, kind)
4703 );
4704 CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
4705 CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
4706 CREATE TABLE graph_projection_versions (
4707 scope TEXT PRIMARY KEY,
4708 projection_version TEXT NOT NULL,
4709 content_hash TEXT,
4710 source_watermark TEXT,
4711 observed_at_unix INTEGER NOT NULL
4712 );
4713 CREATE TABLE graph_tombstones (
4714 row_key TEXT PRIMARY KEY,
4715 row_kind TEXT NOT NULL,
4716 deleted_at_unix INTEGER NOT NULL
4717 );
4718 INSERT INTO graph_nodes
4719 (id, kind, label, properties_json, provenance_json)
4720 VALUES
4721 ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
4722 ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]'),
4723 ('concept:graph', 'semantic_concept', 'Graph navigation', '{"embedding":"1.0,0.0","embedding_model":"fixture-v1"}', '[]');
4724 INSERT INTO graph_edges
4725 (from_id, to_id, kind, properties_json, provenance_json)
4726 VALUES
4727 ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
4728 "#,
4729 )
4730 .unwrap();
4731
4732 let store = SqliteGraphStore::from_connection(conn).unwrap();
4733 let version: i64 = store
4734 .conn
4735 .pragma_query_value(None, "user_version", |row| row.get(0))
4736 .unwrap();
4737 assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
4738 let property_rows: usize = store
4739 .conn
4740 .query_row(
4741 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
4742 [],
4743 |row| row.get(0),
4744 )
4745 .unwrap();
4746 assert_eq!(property_rows, 2);
4747 let edge_property_rows: usize = store
4748 .conn
4749 .query_row(
4750 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4751 [],
4752 |row| row.get(0),
4753 )
4754 .unwrap();
4755 assert_eq!(edge_property_rows, 1);
4756 let semantic_vector_rows: usize = store
4757 .conn
4758 .query_row(
4759 "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE model = 'fixture-v1'",
4760 [],
4761 |row| row.get(0),
4762 )
4763 .unwrap();
4764 assert_eq!(semantic_vector_rows, 1);
4765 let edge = store
4766 .edge(&GraphEdge::stable_id(
4767 "topic:rooms",
4768 "topic:egress",
4769 "mentions",
4770 ))
4771 .unwrap()
4772 .unwrap();
4773 assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4774
4775 let page = store
4776 .paged_nodes_by_kind(
4777 "topic",
4778 GraphQueryOptions {
4779 property_filters: vec![GraphPropertyFilter {
4780 key: "domain".to_string(),
4781 value: "livekit".to_string(),
4782 }],
4783 ..GraphQueryOptions::default()
4784 },
4785 )
4786 .unwrap();
4787 assert_eq!(page.nodes[0].id, "topic:rooms");
4788 assert!(
4789 page.page
4790 .diagnostics
4791 .iter()
4792 .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
4793 "{:?}",
4794 page.page.diagnostics
4795 );
4796 }
4797
4798 #[test]
4799 fn sqlite_store_batches_reachable_nodes_by_kinds() {
4800 let store = SqliteGraphStore::in_memory().unwrap();
4801 for node in [
4802 GraphNode::new("start", "backlog", "start"),
4803 GraphNode::new("ctx", "worker_context", "context"),
4804 GraphNode::new("src", "source_handle", "source"),
4805 GraphNode::new("sem", "semantic_concept", "concept"),
4806 ] {
4807 store.upsert_node(&node).unwrap();
4808 }
4809 store
4810 .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
4811 .unwrap();
4812 store
4813 .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
4814 .unwrap();
4815 store
4816 .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
4817 .unwrap();
4818
4819 let rows = store
4820 .reachable_nodes_by_kinds(
4821 "start",
4822 &["worker_context", "source_handle", "semantic_concept"],
4823 2,
4824 8,
4825 )
4826 .unwrap();
4827 assert_eq!(rows["worker_context"][0].0.id, "ctx");
4828 assert_eq!(
4829 rows["source_handle"][0].1.nodes,
4830 vec!["start", "ctx", "src"]
4831 );
4832 assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4833 }
4834
4835 #[test]
4836 fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4837 let mut store = SqliteGraphStore::in_memory().unwrap();
4838 let source = GraphProvenance::new("fixture", "bulk");
4839 let mut projection = GraphProjection::default();
4840 for idx in 0..128 {
4841 projection.nodes.push(
4842 GraphNode::new(
4843 format!("node:{idx:03}"),
4844 if idx % 2 == 0 { "symbol" } else { "file" },
4845 format!("bulk node {idx:03}"),
4846 )
4847 .with_property("ordinal", idx.to_string())
4848 .with_provenance(source.clone())
4849 .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4850 );
4851 }
4852 for idx in 0..127 {
4853 projection.edges.push(
4854 GraphEdge::new(
4855 format!("node:{idx:03}"),
4856 format!("node:{:03}", idx + 1),
4857 "next",
4858 )
4859 .with_property("ordinal", idx.to_string())
4860 .with_provenance(source.clone())
4861 .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4862 );
4863 }
4864
4865 store
4866 .replace_projection_with_version(
4867 "root",
4868 &projection,
4869 Some("bulk-v1"),
4870 Some("commit-a".to_string()),
4871 )
4872 .unwrap();
4873
4874 projection
4875 .nodes
4876 .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4877 projection.edges.retain(|edge| {
4878 !edge.from_id.ends_with("000")
4879 && !edge.to_id.ends_with("000")
4880 && !edge.from_id.ends_with("064")
4881 && !edge.to_id.ends_with("064")
4882 });
4883 let refresh = store
4884 .replace_projection_with_version(
4885 "root",
4886 &projection,
4887 Some("bulk-v2"),
4888 Some("commit-b".to_string()),
4889 )
4890 .unwrap();
4891
4892 assert_eq!(store.all_nodes().unwrap().len(), 126);
4893 assert_eq!(store.all_edges().unwrap().len(), 124);
4894 assert_eq!(
4895 refresh.tombstoned_nodes,
4896 vec!["node:000".to_string(), "node:064".to_string()]
4897 );
4898 assert_eq!(refresh.tombstoned_edges.len(), 3);
4899 assert_eq!(refresh.deleted_nodes, 2);
4900 assert_eq!(refresh.deleted_edges, 3);
4901 assert_eq!(refresh.unchanged_nodes, 126);
4902 assert_eq!(refresh.unchanged_edges, 124);
4903 assert_eq!(refresh.upserted_nodes, 0);
4904 assert_eq!(refresh.upserted_edges, 0);
4905 assert_eq!(refresh.unchanged_properties, 250);
4906 assert_eq!(refresh.upserted_properties, 0);
4907 assert!(
4908 refresh
4909 .phase_timings
4910 .iter()
4911 .any(|phase| phase.name == "sqlite_node_staging"
4912 && phase.detail.contains("126 unchanged skipped")
4913 && phase.detail.contains("multi-row chunks up to 500 rows")),
4914 "{:?}",
4915 refresh.phase_timings
4916 );
4917 assert!(
4918 refresh
4919 .phase_timings
4920 .iter()
4921 .any(|phase| phase.name == "sqlite_node_staging"
4922 && phase.detail.contains("124 unchanged skipped")),
4923 "{:?}",
4924 refresh.phase_timings
4925 );
4926 let staged_node_properties: usize = store
4927 .conn
4928 .query_row(
4929 "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4930 [],
4931 |row| row.get(0),
4932 )
4933 .unwrap();
4934 let staged_edge_properties: usize = store
4935 .conn
4936 .query_row(
4937 "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4938 [],
4939 |row| row.get(0),
4940 )
4941 .unwrap();
4942 assert_eq!(staged_node_properties, 0);
4943 assert_eq!(staged_edge_properties, 0);
4944 assert!(
4945 refresh
4946 .phase_timings
4947 .iter()
4948 .any(|phase| phase.name == "sqlite_property_row_staging"
4949 && phase.detail.contains("new/changed node rows")),
4950 "{:?}",
4951 refresh.phase_timings
4952 );
4953 assert!(
4954 refresh
4955 .phase_timings
4956 .iter()
4957 .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4958 && phase.detail.contains("new/changed edge rows")),
4959 "{:?}",
4960 refresh.phase_timings
4961 );
4962 assert_eq!(
4963 store
4964 .projection_version("root")
4965 .unwrap()
4966 .unwrap()
4967 .source_watermark
4968 .as_deref(),
4969 Some("commit-b")
4970 );
4971 }
4972
4973 #[test]
4974 fn sqlite_reentrant_temp_table_guard_panics() {
4975 let store = SqliteGraphStore::in_memory().unwrap();
4976 store.temp_table_active.set(true);
4977 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4978 store.assert_not_in_temp_table_section();
4979 }));
4980 assert!(result.is_err());
4981 }
4982
4983 #[test]
4984 fn sqlite_temp_table_guard_clears_after_method() {
4985 let mut store = SqliteGraphStore::in_memory().unwrap();
4986 let projection = GraphProjection {
4987 nodes: vec![],
4988 edges: vec![],
4989 };
4990 store.replace_projection(&projection).unwrap();
4991 assert!(!store.temp_table_active.get());
4992 }
4993
4994 #[test]
4995 fn derive_ontology_summarizes_types_and_relations() {
4996 let mut store = SqliteGraphStore::in_memory().unwrap();
4997 let seed = GraphProjection {
4998 nodes: vec![
4999 GraphNode::new("fn:a", "function", "a"),
5000 GraphNode::new("fn:b", "function", "b"),
5001 GraphNode::new("mod:m", "module", "m"),
5002 ],
5003 edges: vec![
5004 GraphEdge::new("fn:a", "fn:b", "calls"),
5005 GraphEdge::new("mod:m", "fn:a", "contains"),
5006 ],
5007 };
5008 store.upsert_projection(&seed).unwrap();
5009
5010 let onto = store.derive_ontology().unwrap();
5011 let type_kinds: std::collections::BTreeSet<_> =
5012 onto.nodes.iter().map(|n| n.label.clone()).collect();
5013 assert!(type_kinds.contains("function"));
5014 assert!(type_kinds.contains("module"));
5015 assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
5016
5017 let rel: std::collections::BTreeSet<_> = onto
5018 .edges
5019 .iter()
5020 .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
5021 .collect();
5022 assert!(rel.contains(&(
5023 "ontology_type:function".into(),
5024 "ontology_relation:calls".into(),
5025 "ontology_type:function".into()
5026 )));
5027 assert!(rel.contains(&(
5028 "ontology_type:module".into(),
5029 "ontology_relation:contains".into(),
5030 "ontology_type:function".into()
5031 )));
5032
5033 let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
5034 assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
5035
5036 store.upsert_projection(&onto).unwrap();
5038 let onto2 = store.derive_ontology().unwrap();
5039 assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
5040 assert_eq!(onto2.nodes.len(), onto.nodes.len());
5041 assert_eq!(onto2.edges.len(), onto.edges.len());
5042 }
5043}