1use anyhow::{Context, Result, bail};
2use rusqlite::{
3 Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4 types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::cell::Cell;
8use std::collections::{BTreeMap, BTreeSet};
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15 ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16 ConvexRowsGraphClient, GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL,
17 GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY, GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY, GraphEdge,
18 GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath, GraphProjection, GraphPropertyFilter,
19 GraphProvenance, GraphQueryOptions, GraphQueryPage, GraphSemanticCandidate, GraphStore,
20 GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
21 SQLITE_GRAPH_SCHEMA_VERSION, SemanticSeededNeighborhoodExpansion,
22 SemanticSeededNeighborhoodOptions, TerseGraphEdge, TerseGraphNode, apply_graph_edge_query_page,
23 apply_graph_query_page, graph_edge_id, graph_semantic_cosine,
24 graph_semantic_top_candidates_by_property_scan, parse_graph_semantic_vector_property,
25 shortest_path_using_outgoing, stable_graph_edge_id,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "snake_case")]
30pub enum ReadOnlyRecovery {
31 SnapshotFallback,
32 SnapshotFallbackWal,
33}
34
35pub fn read_only_snapshot_recovery(
36 db_path: &Path,
37 err: &anyhow::Error,
38) -> Option<ReadOnlyRecovery> {
39 if !error_mentions_locked_db(err) {
40 return None;
41 }
42 if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
43 Some(ReadOnlyRecovery::SnapshotFallbackWal)
44 } else if rollback_journal_path(db_path).exists() {
45 Some(ReadOnlyRecovery::SnapshotFallback)
46 } else {
47 None
48 }
49}
50
51pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
52 let mut journal = db_path.as_os_str().to_os_string();
53 journal.push("-journal");
54 PathBuf::from(journal)
55}
56
57pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
58 let mut wal = db_path.as_os_str().to_os_string();
59 wal.push("-wal");
60 PathBuf::from(wal)
61}
62
63pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
64 let mut shm = db_path.as_os_str().to_os_string();
65 shm.push("-shm");
66 PathBuf::from(shm)
67}
68
69pub fn copy_read_only_snapshot(
70 db_path: &Path,
71 default_stem: &str,
72) -> Result<(PathBuf, Vec<PathBuf>)> {
73 let snapshot_path = snapshot_copy_path(db_path, default_stem);
74 std::fs::copy(db_path, &snapshot_path).with_context(|| {
75 format!(
76 "copying locked db {} to snapshot {}",
77 db_path.display(),
78 snapshot_path.display()
79 )
80 })?;
81 let mut cleanup_paths = vec![snapshot_path.clone()];
82 copy_optional_snapshot_sidecar(
83 &wal_sidecar_path(db_path),
84 &wal_sidecar_path(&snapshot_path),
85 &mut cleanup_paths,
86 )?;
87 copy_optional_snapshot_sidecar(
88 &shared_memory_sidecar_path(db_path),
89 &shared_memory_sidecar_path(&snapshot_path),
90 &mut cleanup_paths,
91 )?;
92 Ok((snapshot_path, cleanup_paths))
93}
94
95pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
96 err.chain()
97 .any(|cause| cause.to_string().contains("database is locked"))
98}
99
100fn copy_optional_snapshot_sidecar(
101 source_path: &Path,
102 snapshot_path: &Path,
103 cleanup_paths: &mut Vec<PathBuf>,
104) -> Result<()> {
105 match std::fs::copy(source_path, snapshot_path) {
106 Ok(_) => {
107 cleanup_paths.push(snapshot_path.to_path_buf());
108 Ok(())
109 }
110 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
111 Err(err) => Err(err).with_context(|| {
112 format!(
113 "copying SQLite sidecar {} to snapshot {}",
114 source_path.display(),
115 snapshot_path.display()
116 )
117 }),
118 }
119}
120
121fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
122 let nanos = SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .unwrap_or(Duration::ZERO)
125 .as_nanos();
126 let stem = db_path
127 .file_stem()
128 .and_then(|stem| stem.to_str())
129 .unwrap_or(default_stem);
130 let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
131 file_name.push(".db");
132 std::env::temp_dir().join(file_name)
133}
134const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
135const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
136
137pub struct SqliteGraphStore {
150 conn: Connection,
151 _snapshot_copy: Option<SnapshotCopyGuard>,
152 read_only_recovery: Option<ReadOnlyRecovery>,
153 temp_table_active: Cell<bool>,
154}
155
156pub struct SqliteReadOnlyConnection {
167 conn: Connection,
168 _snapshot_copy: Option<SnapshotCopyGuard>,
169 recovery: Option<ReadOnlyRecovery>,
170}
171
172static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
173
174impl SqliteReadOnlyConnection {
175 pub fn conn(&self) -> &Connection {
176 &self.conn
177 }
178
179 pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
180 self.recovery
181 }
182}
183
184struct SnapshotCopyGuard {
185 paths: Vec<PathBuf>,
186}
187
188impl Drop for SnapshotCopyGuard {
189 fn drop(&mut self) {
190 for path in &self.paths {
191 let _ = std::fs::remove_file(path);
192 }
193 }
194}
195
196fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
197 conn.busy_timeout(Duration::from_secs(5))?;
198 conn.pragma_update(None, "journal_mode", "WAL")?;
199 let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
200 if mode.to_lowercase() != "wal" {
201 bail!(
202 "graph substrate db {} requires WAL mode for concurrent reads, got {}",
203 db_path.display(),
204 mode
205 );
206 }
207 conn.pragma_update(
208 None,
209 "wal_autocheckpoint",
210 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
211 )?;
212 let checkpoint_pages: i64 =
213 conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
214 if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
215 bail!(
216 "graph substrate db {} requires wal_autocheckpoint={}, got {}",
217 db_path.display(),
218 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
219 checkpoint_pages
220 );
221 }
222 Ok(())
223}
224
225fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
226 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
227 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
228 for row in rows {
229 if row? == column {
230 return Ok(true);
231 }
232 }
233 Ok(false)
234}
235
236fn sqlite_table_exists(conn: &Connection, table: &str) -> Result<bool> {
237 conn.query_row(
238 r#"
239 SELECT EXISTS(
240 SELECT 1
241 FROM sqlite_master
242 WHERE type = 'table' AND name = ?1
243 )
244 "#,
245 [table],
246 |row| row.get::<_, bool>(0),
247 )
248 .map_err(Into::into)
249}
250
251fn add_column_if_missing(
252 conn: &Connection,
253 table: &str,
254 column: &str,
255 definition: &str,
256) -> Result<()> {
257 if !sqlite_column_exists(conn, table, column)? {
258 conn.execute(
259 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
260 [],
261 )?;
262 }
263 Ok(())
264}
265
266fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
267 if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
268 return Ok(());
269 }
270 let rows = {
271 let mut stmt = conn.prepare(
272 r#"
273 SELECT from_id, to_id, kind
274 FROM graph_edges
275 WHERE edge_key IS NULL OR edge_key = ''
276 ORDER BY from_id, kind, to_id
277 "#,
278 )?;
279 collect_rows(stmt.query_map([], |row| {
280 Ok((
281 row.get::<_, String>(0)?,
282 row.get::<_, String>(1)?,
283 row.get::<_, String>(2)?,
284 ))
285 })?)?
286 };
287 let mut update = conn.prepare(
288 r#"
289 UPDATE graph_edges
290 SET edge_key = ?4
291 WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
292 "#,
293 )?;
294 for (from_id, to_id, kind) in rows {
295 update.execute((
296 &from_id,
297 &to_id,
298 &kind,
299 stable_graph_edge_id(&from_id, &to_id, &kind),
300 ))?;
301 }
302 Ok(())
303}
304
305fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
306 if old_version < 2 {
307 add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
308 add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
309 add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
310 add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
311 }
312 if old_version < 3 {
313 rebuild_graph_node_properties(conn)?;
314 }
315 if old_version < 4 {
316 ensure_sqlite_graph_operator_stats_schema(conn)?;
317 }
318 if old_version < 5 {
319 add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
320 backfill_graph_edge_keys(conn)?;
321 ensure_sqlite_graph_edge_properties_schema(conn)?;
322 rebuild_graph_edge_properties(conn)?;
323 }
324 if old_version < 6 {
325 ensure_sqlite_graph_semantic_vectors_schema(conn)?;
326 rebuild_graph_node_semantic_vectors(conn)?;
327 }
328 Ok(())
329}
330
331fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
332 conn.execute_batch(
333 r#"
334 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
335 ON graph_nodes(kind, label, id);
336 CREATE TABLE IF NOT EXISTS graph_operator_stats (
337 scope TEXT PRIMARY KEY,
338 nodes INTEGER NOT NULL,
339 edges INTEGER NOT NULL,
340 tombstone_nodes INTEGER NOT NULL,
341 tombstone_edges INTEGER NOT NULL,
342 file_size_bytes INTEGER,
343 freelist_bytes INTEGER,
344 observed_at_unix INTEGER NOT NULL
345 );
346 "#,
347 )?;
348 Ok(())
349}
350
351fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
352 conn.execute_batch(
353 r#"
354 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
355 ON graph_edges(edge_key);
356 CREATE TABLE IF NOT EXISTS graph_edge_properties (
357 edge_key TEXT NOT NULL,
358 key TEXT NOT NULL,
359 value TEXT NOT NULL,
360 PRIMARY KEY (edge_key, key),
361 FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
362 );
363 CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
364 ON graph_edge_properties(key, value, edge_key);
365 "#,
366 )?;
367 Ok(())
368}
369
370fn ensure_sqlite_graph_semantic_vectors_schema(conn: &Connection) -> Result<()> {
371 conn.execute_batch(
372 r#"
373 CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
374 node_id TEXT PRIMARY KEY,
375 kind TEXT NOT NULL,
376 model TEXT NOT NULL,
377 dimensions INTEGER NOT NULL,
378 vector_blob BLOB NOT NULL,
379 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
380 );
381 CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
382 ON graph_node_semantic_vectors(kind, dimensions, node_id);
383 "#,
384 )?;
385 Ok(())
386}
387
388fn replace_node_properties(
389 conn: &Connection,
390 node_id: &str,
391 properties: &BTreeMap<String, String>,
392) -> Result<()> {
393 conn.execute(
394 "DELETE FROM graph_node_properties WHERE node_id = ?1",
395 [node_id],
396 )?;
397 let mut insert = conn.prepare(
398 r#"
399 INSERT INTO graph_node_properties (node_id, key, value)
400 VALUES (?1, ?2, ?3)
401 ON CONFLICT(node_id, key) DO UPDATE SET
402 value = excluded.value
403 "#,
404 )?;
405 for (key, value) in properties {
406 insert.execute((node_id, key, value))?;
407 }
408 Ok(())
409}
410
411struct GraphSemanticVectorRow {
412 model: String,
413 dimensions: usize,
414 vector_blob: Vec<u8>,
415}
416
417fn graph_semantic_vector_row(
418 properties: &BTreeMap<String, String>,
419) -> Option<GraphSemanticVectorRow> {
420 let vector = properties
421 .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
422 .and_then(|value| parse_graph_semantic_vector_property(value))?;
423 Some(GraphSemanticVectorRow {
424 model: properties
425 .get(GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY)
426 .cloned()
427 .unwrap_or_else(|| GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL.to_string()),
428 dimensions: vector.len(),
429 vector_blob: semantic_vector_to_blob(&vector),
430 })
431}
432
433fn semantic_vector_to_blob(vector: &[f64]) -> Vec<u8> {
434 let mut blob = Vec::with_capacity(std::mem::size_of_val(vector));
435 for value in vector {
436 blob.extend_from_slice(&value.to_le_bytes());
437 }
438 blob
439}
440
441fn semantic_vector_from_blob(blob: &[u8], dimensions: usize) -> Option<Vec<f64>> {
442 if dimensions == 0 || blob.len() != dimensions * std::mem::size_of::<f64>() {
443 return None;
444 }
445 let mut vector = Vec::with_capacity(dimensions);
446 for chunk in blob.chunks_exact(std::mem::size_of::<f64>()) {
447 let value = f64::from_le_bytes(chunk.try_into().ok()?);
448 if !value.is_finite() {
449 return None;
450 }
451 vector.push(value);
452 }
453 Some(vector)
454}
455
456fn replace_node_semantic_vector(
457 conn: &Connection,
458 node_id: &str,
459 kind: &str,
460 properties: &BTreeMap<String, String>,
461) -> Result<()> {
462 conn.execute(
463 "DELETE FROM graph_node_semantic_vectors WHERE node_id = ?1",
464 [node_id],
465 )?;
466 let Some(row) = graph_semantic_vector_row(properties) else {
467 return Ok(());
468 };
469 conn.execute(
470 r#"
471 INSERT INTO graph_node_semantic_vectors
472 (node_id, kind, model, dimensions, vector_blob)
473 VALUES (?1, ?2, ?3, ?4, ?5)
474 "#,
475 (
476 node_id,
477 kind,
478 row.model,
479 row.dimensions as i64,
480 row.vector_blob,
481 ),
482 )?;
483 Ok(())
484}
485
486fn replace_edge_properties(
487 conn: &Connection,
488 edge_key: &str,
489 properties: &BTreeMap<String, String>,
490) -> Result<()> {
491 conn.execute(
492 "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
493 [edge_key],
494 )?;
495 let mut insert = conn.prepare(
496 r#"
497 INSERT INTO graph_edge_properties (edge_key, key, value)
498 VALUES (?1, ?2, ?3)
499 ON CONFLICT(edge_key, key) DO UPDATE SET
500 value = excluded.value
501 "#,
502 )?;
503 for (key, value) in properties {
504 insert.execute((edge_key, key, value))?;
505 }
506 Ok(())
507}
508
509fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
510 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
511 return Ok(());
512 }
513 conn.execute_batch(
514 r#"
515 DELETE FROM graph_node_properties;
516 INSERT INTO graph_node_properties (node_id, key, value)
517 SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
518 FROM graph_nodes, json_each(graph_nodes.properties_json)
519 WHERE json_each.key IS NOT NULL
520 AND json_each.value IS NOT NULL
521 "#,
522 )?;
523 Ok(())
524}
525
526fn rebuild_graph_node_semantic_vectors(conn: &Connection) -> Result<()> {
527 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")?
528 || !sqlite_table_exists(conn, "graph_node_semantic_vectors")?
529 {
530 return Ok(());
531 }
532 conn.execute("DELETE FROM graph_node_semantic_vectors", [])?;
533 let rows = {
534 let mut stmt = conn.prepare(
535 r#"
536 SELECT id, kind, properties_json
537 FROM graph_nodes
538 WHERE json_extract(properties_json, '$.embedding') IS NOT NULL
539 ORDER BY id
540 "#,
541 )?;
542 collect_rows(stmt.query_map([], |row| {
543 Ok((
544 row.get::<_, String>(0)?,
545 row.get::<_, String>(1)?,
546 row.get::<_, String>(2)?,
547 ))
548 })?)?
549 };
550 for (node_id, kind, properties_json) in rows {
551 let properties: BTreeMap<String, String> = serde_json::from_str(&properties_json)
552 .with_context(|| format!("parsing semantic properties for graph node {node_id}"))?;
553 replace_node_semantic_vector(conn, &node_id, &kind, &properties)?;
554 }
555 Ok(())
556}
557
558fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
559 if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
560 || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
561 {
562 return Ok(());
563 }
564 conn.execute_batch(
565 r#"
566 DELETE FROM graph_edge_properties;
567 INSERT INTO graph_edge_properties (edge_key, key, value)
568 SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
569 FROM graph_edges, json_each(graph_edges.properties_json)
570 WHERE graph_edges.edge_key IS NOT NULL
571 AND graph_edges.edge_key <> ''
572 AND json_each.key IS NOT NULL
573 AND json_each.value IS NOT NULL
574 "#,
575 )?;
576 Ok(())
577}
578
579pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
580 let conn = Connection::open_with_flags(
581 db_path,
582 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
583 )
584 .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
585 conn.busy_timeout(Duration::from_secs(5))?;
586 Ok(SqliteReadOnlyConnection {
587 conn,
588 _snapshot_copy: None,
589 recovery: None,
590 })
591}
592
593pub fn open_graph_read_only_connection_resilient(
594 db_path: &Path,
595) -> Result<SqliteReadOnlyConnection> {
596 match open_graph_read_only_connection(db_path).and_then(|connection| {
597 connection
598 .conn
599 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
600 Ok(connection)
601 }) {
602 Ok(connection) => Ok(connection),
603 Err(err) => match read_only_snapshot_recovery(db_path, &err) {
604 Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
605 None => Err(err),
606 },
607 }
608}
609
610fn open_graph_read_only_snapshot(
611 db_path: &Path,
612 recovery: ReadOnlyRecovery,
613) -> Result<SqliteReadOnlyConnection> {
614 let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
615 let conn = Connection::open_with_flags(
616 &snapshot_path,
617 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
618 )
619 .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
620 conn.busy_timeout(Duration::from_secs(5))?;
621 Ok(SqliteReadOnlyConnection {
622 conn,
623 _snapshot_copy: Some(SnapshotCopyGuard {
624 paths: cleanup_paths,
625 }),
626 recovery: Some(recovery),
627 })
628}
629
630#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
631pub struct SqliteProjectionRefreshPhase {
632 pub name: String,
633 pub duration_micros: u128,
634 pub detail: String,
635}
636
637#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
638pub struct SqliteProjectionRefresh {
639 pub scope: String,
640 pub projection_version: String,
641 pub source_watermark: Option<String>,
642 pub tombstoned_nodes: Vec<String>,
643 pub tombstoned_edges: Vec<String>,
644 pub upserted_nodes: usize,
645 pub upserted_edges: usize,
646 pub unchanged_nodes: usize,
647 pub unchanged_edges: usize,
648 pub upserted_properties: usize,
649 pub unchanged_properties: usize,
650 pub deleted_properties: usize,
651 pub deleted_nodes: usize,
652 pub deleted_edges: usize,
653 pub pruned_tombstones: usize,
654 pub file_size_bytes_before: Option<u64>,
655 pub file_size_bytes_after: Option<u64>,
656 pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct SqliteProjectionVersion {
661 pub projection_version: String,
662 pub content_hash: Option<String>,
663 pub source_watermark: Option<String>,
664}
665
666fn sqlite_refresh_phase_timing(
667 name: &str,
668 started: Instant,
669 detail: &str,
670) -> SqliteProjectionRefreshPhase {
671 SqliteProjectionRefreshPhase {
672 name: name.to_string(),
673 duration_micros: started.elapsed().as_micros(),
674 detail: detail.to_string(),
675 }
676}
677
678fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
679 let row = format!(
680 "({})",
681 (0..column_count)
682 .map(|_| "?")
683 .collect::<Vec<_>>()
684 .join(", ")
685 );
686 (0..row_count)
687 .map(|_| row.as_str())
688 .collect::<Vec<_>>()
689 .join(", ")
690}
691
692fn sqlite_stage_all_ids(
693 tx: &rusqlite::Transaction<'_>,
694 nodes: &[GraphNode],
695 edges: &[GraphEdge],
696) -> Result<()> {
697 for chunk in nodes
698 .iter()
699 .map(|n| &n.id)
700 .collect::<Vec<_>>()
701 .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
702 {
703 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
704 let sql = format!(
705 "INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}",
706 placeholders.join(", ")
707 );
708 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
709 tx.execute(&sql, params_from_iter(values.iter()))?;
710 }
711 for chunk in edges
712 .iter()
713 .map(graph_edge_id)
714 .collect::<Vec<_>>()
715 .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
716 {
717 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
718 let sql = format!(
719 "INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}",
720 placeholders.join(", ")
721 );
722 let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
723 tx.execute(&sql, params_from_iter(values.iter()))?;
724 }
725 Ok(())
726}
727
728fn sqlite_stage_projection_nodes(
729 tx: &rusqlite::Transaction<'_>,
730 nodes: &[&GraphNode],
731 source_watermark: Option<&str>,
732) -> Result<()> {
733 let mut insert_semantic_vector = tx.prepare(
734 r#"
735 INSERT INTO next_graph_node_semantic_vectors
736 (node_id, kind, model, dimensions, vector_blob)
737 VALUES (?1, ?2, ?3, ?4, ?5)
738 "#,
739 )?;
740 for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
741 let sql = format!(
742 r#"
743 INSERT INTO next_graph_nodes
744 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
745 VALUES {}
746 "#,
747 sqlite_graph_staging_placeholders(8, chunk.len())
748 );
749 let mut values = Vec::with_capacity(chunk.len() * 8);
750 for node in chunk {
751 values.push(Value::Text(node.id.clone()));
752 values.push(Value::Text(node.kind.clone()));
753 values.push(Value::Text(node.label.clone()));
754 values.push(Value::Text(to_json(&node.properties)?));
755 values.push(Value::Text(to_json(&node.provenance)?));
756 values.push(
757 optional_to_json(&node.freshness)?
758 .map(Value::Text)
759 .unwrap_or(Value::Null),
760 );
761 values.push(Value::Text(row_hash(node)?));
762 values.push(
763 source_watermark
764 .map(|watermark| Value::Text(watermark.to_string()))
765 .unwrap_or(Value::Null),
766 );
767 }
768 tx.execute(&sql, params_from_iter(values))?;
769 for node in chunk {
770 let Some(row) = graph_semantic_vector_row(&node.properties) else {
771 continue;
772 };
773 insert_semantic_vector.execute((
774 &node.id,
775 &node.kind,
776 row.model,
777 row.dimensions as i64,
778 row.vector_blob,
779 ))?;
780 }
781 }
782 Ok(())
783}
784
785fn sqlite_stage_projection_edges(
786 tx: &rusqlite::Transaction<'_>,
787 edges: &[&GraphEdge],
788 source_watermark: Option<&str>,
789) -> Result<()> {
790 for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
791 let sql = format!(
792 r#"
793 INSERT INTO next_graph_edges
794 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
795 VALUES {}
796 "#,
797 sqlite_graph_staging_placeholders(9, chunk.len())
798 );
799 let mut values = Vec::with_capacity(chunk.len() * 9);
800 for edge in chunk {
801 values.push(Value::Text(graph_edge_id(edge)));
802 values.push(Value::Text(edge.from_id.clone()));
803 values.push(Value::Text(edge.to_id.clone()));
804 values.push(Value::Text(edge.kind.clone()));
805 values.push(Value::Text(to_json(&edge.properties)?));
806 values.push(Value::Text(to_json(&edge.provenance)?));
807 values.push(
808 optional_to_json(&edge.freshness)?
809 .map(Value::Text)
810 .unwrap_or(Value::Null),
811 );
812 values.push(Value::Text(row_hash(edge)?));
813 values.push(
814 source_watermark
815 .map(|watermark| Value::Text(watermark.to_string()))
816 .unwrap_or(Value::Null),
817 );
818 }
819 tx.execute(&sql, params_from_iter(values))?;
820 }
821 Ok(())
822}
823
824impl SqliteGraphStore {
825 fn assert_not_in_temp_table_section(&self) {
826 if self.temp_table_active.get() {
827 panic!(
828 "SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection"
829 );
830 }
831 }
832
833 pub fn open(db_path: &Path) -> Result<Self> {
834 if let Some(parent) = db_path.parent() {
835 std::fs::create_dir_all(parent)
836 .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
837 }
838 let conn = Connection::open(db_path)
839 .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
840 configure_writable_graph_connection(&conn, db_path)?;
841 Self::from_connection(conn)
842 }
843
844 pub fn in_memory() -> Result<Self> {
845 let conn = Connection::open_in_memory()?;
846 conn.busy_timeout(Duration::from_secs(5))?;
847 Self::from_connection(conn)
848 }
849
850 pub fn open_read_only(db_path: &Path) -> Result<Self> {
851 let connection = open_graph_read_only_connection(db_path)?;
852 Self::from_read_only_connection(connection)
853 }
854
855 pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
856 let connection = open_graph_read_only_connection_resilient(db_path)?;
857 Self::from_read_only_connection(connection)
858 }
859
860 pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
861 self.read_only_recovery
862 }
863
864 pub fn has_user_triggers(&self) -> Result<bool> {
865 self.conn
866 .query_row(
867 r#"
868 SELECT EXISTS(
869 SELECT 1
870 FROM sqlite_master
871 WHERE type = 'trigger'
872 AND name NOT LIKE 'sqlite_%'
873 )
874 "#,
875 [],
876 |row| row.get::<_, bool>(0),
877 )
878 .map_err(Into::into)
879 }
880
881 fn from_connection(conn: Connection) -> Result<Self> {
882 conn.pragma_update(None, "foreign_keys", "ON")?;
883 let user_version: i64 =
884 conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
885 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
886 bail!(
887 "graph.db schema version {} is newer than supported version {}",
888 user_version,
889 SQLITE_GRAPH_SCHEMA_VERSION
890 );
891 }
892 conn.execute_batch(
893 r#"
894 CREATE TABLE IF NOT EXISTS graph_nodes (
895 id TEXT PRIMARY KEY,
896 kind TEXT NOT NULL,
897 label TEXT NOT NULL,
898 properties_json TEXT NOT NULL DEFAULT '{}',
899 provenance_json TEXT NOT NULL DEFAULT '[]',
900 freshness_json TEXT,
901 row_hash TEXT,
902 source_watermark TEXT
903 );
904 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
905 ON graph_nodes(kind);
906 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
907 ON graph_nodes(kind, label, id);
908
909 CREATE TABLE IF NOT EXISTS graph_edges (
910 edge_key TEXT NOT NULL UNIQUE,
911 from_id TEXT NOT NULL,
912 to_id TEXT NOT NULL,
913 kind TEXT NOT NULL,
914 properties_json TEXT NOT NULL DEFAULT '{}',
915 provenance_json TEXT NOT NULL DEFAULT '[]',
916 freshness_json TEXT,
917 row_hash TEXT,
918 source_watermark TEXT,
919 PRIMARY KEY (from_id, to_id, kind),
920 FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
921 FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
922 );
923 CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
924 ON graph_edges(from_id, kind);
925 CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
926 ON graph_edges(to_id, kind);
927
928 CREATE TABLE IF NOT EXISTS graph_projection_versions (
929 scope TEXT PRIMARY KEY,
930 projection_version TEXT NOT NULL,
931 content_hash TEXT,
932 source_watermark TEXT,
933 observed_at_unix INTEGER NOT NULL
934 );
935
936 CREATE TABLE IF NOT EXISTS graph_tombstones (
937 row_key TEXT PRIMARY KEY,
938 row_kind TEXT NOT NULL,
939 deleted_at_unix INTEGER NOT NULL
940 );
941
942 CREATE TABLE IF NOT EXISTS graph_node_properties (
943 node_id TEXT NOT NULL,
944 key TEXT NOT NULL,
945 value TEXT NOT NULL,
946 PRIMARY KEY (node_id, key),
947 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
948 );
949 CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
950 ON graph_node_properties(key, value, node_id);
951
952 CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
953 node_id TEXT PRIMARY KEY,
954 kind TEXT NOT NULL,
955 model TEXT NOT NULL,
956 dimensions INTEGER NOT NULL,
957 vector_blob BLOB NOT NULL,
958 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
959 );
960 CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
961 ON graph_node_semantic_vectors(kind, dimensions, node_id);
962
963 CREATE TABLE IF NOT EXISTS graph_operator_stats (
964 scope TEXT PRIMARY KEY,
965 nodes INTEGER NOT NULL,
966 edges INTEGER NOT NULL,
967 tombstone_nodes INTEGER NOT NULL,
968 tombstone_edges INTEGER NOT NULL,
969 file_size_bytes INTEGER,
970 freelist_bytes INTEGER,
971 observed_at_unix INTEGER NOT NULL
972 );
973 "#,
974 )?;
975 if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
976 migrate_sqlite_graph_schema(&conn, user_version)?;
977 conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
978 }
979 Ok(Self {
980 conn,
981 _snapshot_copy: None,
982 read_only_recovery: None,
983 temp_table_active: Cell::new(false),
984 })
985 }
986
987 fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
988 connection.conn.pragma_update(None, "foreign_keys", "ON")?;
989 let user_version: i64 =
990 connection
991 .conn
992 .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
993 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
994 bail!(
995 "graph.db schema version {} is newer than supported version {}",
996 user_version,
997 SQLITE_GRAPH_SCHEMA_VERSION
998 );
999 }
1000 connection
1001 .conn
1002 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
1003 Ok(Self {
1004 conn: connection.conn,
1005 _snapshot_copy: connection._snapshot_copy,
1006 read_only_recovery: connection.recovery,
1007 temp_table_active: Cell::new(false),
1008 })
1009 }
1010
1011 pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1012 self.replace_projection_with_version("root", projection, None, None)
1013 .map(|_| ())
1014 }
1015
1016 pub fn replace_projection_with_version(
1017 &mut self,
1018 scope: impl Into<String>,
1019 projection: &GraphProjection,
1020 projection_version: Option<&str>,
1021 source_watermark: Option<String>,
1022 ) -> Result<SqliteProjectionRefresh> {
1023 self.assert_not_in_temp_table_section();
1024 self.temp_table_active.set(true);
1025 let scope = scope.into();
1026 let result = self.replace_projection_with_version_fallible(
1027 scope,
1028 projection,
1029 projection_version,
1030 source_watermark,
1031 );
1032 self.temp_table_active.set(false);
1033 if let Ok(ref refresh) = result {
1034 let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
1035 let autocheckpoint = if total_rows > 10000 {
1036 8192
1037 } else if total_rows > 1000 {
1038 4096
1039 } else {
1040 2048
1041 };
1042 let _ = self
1043 .conn
1044 .pragma_update(None, "wal_autocheckpoint", autocheckpoint);
1045 let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
1046 }
1047 result
1048 }
1049
1050 fn replace_projection_with_version_fallible(
1051 &mut self,
1052 scope: String,
1053 projection: &GraphProjection,
1054 projection_version: Option<&str>,
1055 source_watermark: Option<String>,
1056 ) -> Result<SqliteProjectionRefresh> {
1057 let projection_version = projection_version
1058 .map(str::to_string)
1059 .or_else(|| projection_version_from_nodes(&projection.nodes))
1060 .unwrap_or_else(|| "unversioned".to_string());
1061 let projection_hash = projection_hash_from_nodes(&projection.nodes);
1062 let observed_at_unix = unix_now();
1063 let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
1064 let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
1065 let mut phase_timings = Vec::new();
1066
1067 let tx = self.conn.transaction()?;
1068 let started = Instant::now();
1069 tx.execute_batch(
1070 r#"
1071 CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
1072 id TEXT PRIMARY KEY,
1073 kind TEXT NOT NULL,
1074 label TEXT NOT NULL,
1075 properties_json TEXT NOT NULL,
1076 provenance_json TEXT NOT NULL,
1077 freshness_json TEXT,
1078 row_hash TEXT NOT NULL,
1079 source_watermark TEXT
1080 );
1081 CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
1082 edge_key TEXT PRIMARY KEY,
1083 from_id TEXT NOT NULL,
1084 to_id TEXT NOT NULL,
1085 kind TEXT NOT NULL,
1086 properties_json TEXT NOT NULL,
1087 provenance_json TEXT NOT NULL,
1088 freshness_json TEXT,
1089 row_hash TEXT NOT NULL,
1090 source_watermark TEXT
1091 );
1092 CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
1093 ON next_graph_edges(from_id, to_id, kind);
1094 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
1095 node_id TEXT NOT NULL,
1096 key TEXT NOT NULL,
1097 value TEXT NOT NULL,
1098 PRIMARY KEY (node_id, key)
1099 );
1100 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_semantic_vectors (
1101 node_id TEXT PRIMARY KEY,
1102 kind TEXT NOT NULL,
1103 model TEXT NOT NULL,
1104 dimensions INTEGER NOT NULL,
1105 vector_blob BLOB NOT NULL
1106 );
1107 CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
1108 edge_key TEXT NOT NULL,
1109 key TEXT NOT NULL,
1110 value TEXT NOT NULL,
1111 PRIMARY KEY (edge_key, key)
1112 );
1113 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
1114 id TEXT PRIMARY KEY
1115 );
1116 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
1117 edge_key TEXT PRIMARY KEY
1118 );
1119 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
1120 id TEXT PRIMARY KEY
1121 );
1122 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
1123 edge_key TEXT PRIMARY KEY
1124 );
1125 DELETE FROM next_graph_nodes;
1126 DELETE FROM next_graph_edges;
1127 DELETE FROM next_graph_node_properties;
1128 DELETE FROM next_graph_node_semantic_vectors;
1129 DELETE FROM next_graph_edge_properties;
1130 DELETE FROM next_graph_changed_nodes;
1131 DELETE FROM next_graph_changed_edges;
1132 DELETE FROM next_graph_all_node_ids;
1133 DELETE FROM next_graph_all_edge_keys;
1134 "#,
1135 )?;
1136 phase_timings.push(sqlite_refresh_phase_timing(
1137 "sqlite_temp_table_prepare",
1138 started,
1139 "create and clear refresh staging tables before row loading",
1140 ));
1141 let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
1142 (
1143 projection.nodes.iter().collect(),
1144 projection.edges.iter().collect(),
1145 0usize,
1146 0usize,
1147 )
1148 } else {
1149 let existing_node_hashes: BTreeMap<String, String> = {
1150 let mut stmt =
1151 tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
1152 let rows = stmt.query_map([], |row| {
1153 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1154 })?;
1155 collect_rows(rows)?.into_iter().collect()
1156 };
1157 let existing_edge_hashes: BTreeMap<String, String> = {
1158 let mut stmt = tx.prepare(
1159 "SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL",
1160 )?;
1161 let rows = stmt.query_map([], |row| {
1162 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1163 })?;
1164 collect_rows(rows)?.into_iter().collect()
1165 };
1166 let cn: Vec<&GraphNode> = projection
1167 .nodes
1168 .iter()
1169 .filter(|n| {
1170 let hash = row_hash(*n).ok();
1171 hash.as_ref()
1172 .is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
1173 })
1174 .collect();
1175 let ce: Vec<&GraphEdge> = projection
1176 .edges
1177 .iter()
1178 .filter(|e| {
1179 let hash = row_hash(*e).ok();
1180 let ek = graph_edge_id(e);
1181 hash.as_ref()
1182 .is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
1183 })
1184 .collect();
1185 let sn = projection.nodes.len() - cn.len();
1186 let se = projection.edges.len() - ce.len();
1187 (cn, ce, sn, se)
1188 };
1189 {
1190 let started = Instant::now();
1191 sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
1192 sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
1193 sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
1194 phase_timings.push(sqlite_refresh_phase_timing(
1195 "sqlite_node_staging",
1196 started,
1197 &format!(
1198 "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
1199 changed_nodes.len(),
1200 skipped_nodes,
1201 changed_edges.len(),
1202 skipped_edges,
1203 SQLITE_GRAPH_STAGING_CHUNK_ROWS
1204 ),
1205 ));
1206 }
1207 {
1208 let started = Instant::now();
1209 let changed_nodes_sql = if force_refresh_writes {
1210 r#"
1211 INSERT INTO next_graph_changed_nodes (id)
1212 SELECT id
1213 FROM next_graph_nodes
1214 "#
1215 } else {
1216 r#"
1217 INSERT INTO next_graph_changed_nodes (id)
1218 SELECT n.id
1219 FROM next_graph_nodes n
1220 LEFT JOIN graph_nodes g ON g.id = n.id
1221 WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
1222 "#
1223 };
1224 tx.execute(changed_nodes_sql, [])?;
1225 tx.execute_batch(
1226 r#"
1227 INSERT INTO next_graph_node_properties (node_id, key, value)
1228 SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
1229 FROM next_graph_nodes n
1230 JOIN next_graph_changed_nodes c ON c.id = n.id,
1231 json_each(n.properties_json)
1232 WHERE json_each.key IS NOT NULL
1233 AND json_each.value IS NOT NULL
1234 "#,
1235 )?;
1236 phase_timings.push(sqlite_refresh_phase_timing(
1237 "sqlite_property_row_staging",
1238 started,
1239 "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1240 ));
1241 }
1242 {
1243 let started = Instant::now();
1244 let changed_edges_sql = if force_refresh_writes {
1245 r#"
1246 INSERT INTO next_graph_changed_edges (edge_key)
1247 SELECT edge_key
1248 FROM next_graph_edges
1249 "#
1250 } else {
1251 r#"
1252 INSERT INTO next_graph_changed_edges (edge_key)
1253 SELECT n.edge_key
1254 FROM next_graph_edges n
1255 LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1256 WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1257 "#
1258 };
1259 tx.execute(changed_edges_sql, [])?;
1260 tx.execute_batch(
1261 r#"
1262 INSERT INTO next_graph_edge_properties (edge_key, key, value)
1263 SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1264 FROM next_graph_edges e
1265 JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1266 json_each(e.properties_json)
1267 WHERE json_each.key IS NOT NULL
1268 AND json_each.value IS NOT NULL
1269 "#,
1270 )?;
1271 phase_timings.push(sqlite_refresh_phase_timing(
1272 "sqlite_edge_property_row_staging",
1273 started,
1274 "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1275 ));
1276 }
1277
1278 let delta_started = Instant::now();
1279 let tombstoned_nodes = {
1280 let sql = if force_refresh_writes {
1281 r#"
1282 SELECT g.id
1283 FROM graph_nodes g
1284 LEFT JOIN next_graph_nodes n ON n.id = g.id
1285 WHERE n.id IS NULL
1286 ORDER BY g.id
1287 "#
1288 } else {
1289 r#"
1290 SELECT g.id
1291 FROM graph_nodes g
1292 LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1293 WHERE n.id IS NULL
1294 ORDER BY g.id
1295 "#
1296 };
1297 let mut stmt = tx.prepare(sql)?;
1298 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1299 };
1300 let tombstoned_edges = {
1301 let sql = if force_refresh_writes {
1302 r#"
1303 SELECT g.edge_key
1304 FROM graph_edges g
1305 LEFT JOIN next_graph_edges n
1306 ON n.edge_key = g.edge_key
1307 WHERE n.edge_key IS NULL
1308 ORDER BY g.edge_key
1309 "#
1310 } else {
1311 r#"
1312 SELECT g.edge_key
1313 FROM graph_edges g
1314 LEFT JOIN next_graph_all_edge_keys n
1315 ON n.edge_key = g.edge_key
1316 WHERE n.edge_key IS NULL
1317 ORDER BY g.edge_key
1318 "#
1319 };
1320 let mut stmt = tx.prepare(sql)?;
1321 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1322 };
1323 let unchanged_nodes: usize = if force_refresh_writes {
1324 tx.query_row(
1325 r#"
1326 SELECT COUNT(*)
1327 FROM next_graph_nodes n
1328 JOIN graph_nodes g ON g.id = n.id
1329 WHERE g.row_hash = n.row_hash
1330 "#,
1331 [],
1332 |row| row.get(0),
1333 )?
1334 } else {
1335 skipped_nodes
1336 };
1337 let unchanged_edges: usize = if force_refresh_writes {
1338 tx.query_row(
1339 r#"
1340 SELECT COUNT(*)
1341 FROM next_graph_edges n
1342 JOIN graph_edges g
1343 ON g.edge_key = n.edge_key
1344 WHERE g.row_hash = n.row_hash
1345 "#,
1346 [],
1347 |row| row.get(0),
1348 )?
1349 } else {
1350 skipped_edges
1351 };
1352 let reused_owner_node_properties: usize = if force_refresh_writes {
1353 tx.query_row(
1354 r#"
1355 SELECT COUNT(*)
1356 FROM graph_node_properties g
1357 JOIN next_graph_nodes n ON n.id = g.node_id
1358 LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1359 WHERE c.id IS NULL
1360 "#,
1361 [],
1362 |row| row.get(0),
1363 )?
1364 } else {
1365 tx.query_row(
1366 r#"
1367 SELECT COUNT(*)
1368 FROM graph_node_properties g
1369 JOIN next_graph_all_node_ids a ON a.id = g.node_id
1370 LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1371 WHERE c.id IS NULL
1372 "#,
1373 [],
1374 |row| row.get(0),
1375 )?
1376 };
1377 let reused_owner_edge_properties: usize = if force_refresh_writes {
1378 tx.query_row(
1379 r#"
1380 SELECT COUNT(*)
1381 FROM graph_edge_properties g
1382 JOIN next_graph_edges n ON n.edge_key = g.edge_key
1383 LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1384 WHERE c.edge_key IS NULL
1385 "#,
1386 [],
1387 |row| row.get(0),
1388 )?
1389 } else {
1390 tx.query_row(
1391 r#"
1392 SELECT COUNT(*)
1393 FROM graph_edge_properties g
1394 JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1395 LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1396 WHERE c.edge_key IS NULL
1397 "#,
1398 [],
1399 |row| row.get(0),
1400 )?
1401 };
1402 let unchanged_changed_node_properties: usize = tx.query_row(
1403 r#"
1404 SELECT COUNT(*)
1405 FROM next_graph_node_properties n
1406 JOIN graph_node_properties g
1407 ON g.node_id = n.node_id AND g.key = n.key
1408 WHERE g.value = n.value
1409 "#,
1410 [],
1411 |row| row.get(0),
1412 )?;
1413 let unchanged_changed_edge_properties: usize = tx.query_row(
1414 r#"
1415 SELECT COUNT(*)
1416 FROM next_graph_edge_properties n
1417 JOIN graph_edge_properties g
1418 ON g.edge_key = n.edge_key AND g.key = n.key
1419 WHERE g.value = n.value
1420 "#,
1421 [],
1422 |row| row.get(0),
1423 )?;
1424 let unchanged_properties = reused_owner_node_properties
1425 + reused_owner_edge_properties
1426 + unchanged_changed_node_properties
1427 + unchanged_changed_edge_properties;
1428
1429 let deleted_edges = if force_refresh_writes {
1430 tx.execute(
1431 r#"
1432 DELETE FROM graph_edges
1433 WHERE NOT EXISTS (
1434 SELECT 1
1435 FROM next_graph_edges n
1436 WHERE n.edge_key = graph_edges.edge_key
1437 )
1438 "#,
1439 [],
1440 )?
1441 } else {
1442 tx.execute(
1443 r#"
1444 DELETE FROM graph_edges
1445 WHERE NOT EXISTS (
1446 SELECT 1
1447 FROM next_graph_all_edge_keys n
1448 WHERE n.edge_key = graph_edges.edge_key
1449 )
1450 "#,
1451 [],
1452 )?
1453 };
1454 let deleted_nodes = if force_refresh_writes {
1455 tx.execute(
1456 r#"
1457 DELETE FROM graph_nodes
1458 WHERE NOT EXISTS (
1459 SELECT 1
1460 FROM next_graph_nodes n
1461 WHERE n.id = graph_nodes.id
1462 )
1463 "#,
1464 [],
1465 )?
1466 } else {
1467 tx.execute(
1468 r#"
1469 DELETE FROM graph_nodes
1470 WHERE NOT EXISTS (
1471 SELECT 1
1472 FROM next_graph_all_node_ids n
1473 WHERE n.id = graph_nodes.id
1474 )
1475 "#,
1476 [],
1477 )?
1478 };
1479
1480 let upsert_nodes_sql = r#"
1481 INSERT INTO graph_nodes
1482 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1483 SELECT
1484 n.id,
1485 n.kind,
1486 n.label,
1487 n.properties_json,
1488 n.provenance_json,
1489 n.freshness_json,
1490 n.row_hash,
1491 n.source_watermark
1492 FROM next_graph_nodes n
1493 JOIN next_graph_changed_nodes c ON c.id = n.id
1494 WHERE true
1495 ON CONFLICT(id) DO UPDATE SET
1496 kind = excluded.kind,
1497 label = excluded.label,
1498 properties_json = excluded.properties_json,
1499 provenance_json = excluded.provenance_json,
1500 freshness_json = excluded.freshness_json,
1501 row_hash = excluded.row_hash,
1502 source_watermark = excluded.source_watermark
1503 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1504 "#;
1505 tx.execute(upsert_nodes_sql, [])?;
1506 tx.execute(
1507 r#"
1508 DELETE FROM graph_node_semantic_vectors
1509 WHERE EXISTS (
1510 SELECT 1
1511 FROM next_graph_changed_nodes c
1512 WHERE c.id = graph_node_semantic_vectors.node_id
1513 )
1514 AND NOT EXISTS (
1515 SELECT 1
1516 FROM next_graph_node_semantic_vectors n
1517 WHERE n.node_id = graph_node_semantic_vectors.node_id
1518 )
1519 "#,
1520 [],
1521 )?;
1522 tx.execute(
1523 r#"
1524 INSERT INTO graph_node_semantic_vectors
1525 (node_id, kind, model, dimensions, vector_blob)
1526 SELECT n.node_id, n.kind, n.model, n.dimensions, n.vector_blob
1527 FROM next_graph_node_semantic_vectors n
1528 WHERE true
1529 ON CONFLICT(node_id) DO UPDATE SET
1530 kind = excluded.kind,
1531 model = excluded.model,
1532 dimensions = excluded.dimensions,
1533 vector_blob = excluded.vector_blob
1534 "#,
1535 [],
1536 )?;
1537 let upsert_edges_sql = r#"
1538 INSERT INTO graph_edges
1539 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1540 SELECT
1541 n.edge_key,
1542 n.from_id,
1543 n.to_id,
1544 n.kind,
1545 n.properties_json,
1546 n.provenance_json,
1547 n.freshness_json,
1548 n.row_hash,
1549 n.source_watermark
1550 FROM next_graph_edges n
1551 JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1552 WHERE true
1553 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1554 edge_key = excluded.edge_key,
1555 properties_json = excluded.properties_json,
1556 provenance_json = excluded.provenance_json,
1557 freshness_json = excluded.freshness_json,
1558 row_hash = excluded.row_hash,
1559 source_watermark = excluded.source_watermark
1560 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1561 "#;
1562 tx.execute(upsert_edges_sql, [])?;
1563 let deleted_node_properties = tx.execute(
1564 r#"
1565 DELETE FROM graph_node_properties
1566 WHERE EXISTS (
1567 SELECT 1
1568 FROM next_graph_changed_nodes c
1569 WHERE c.id = graph_node_properties.node_id
1570 )
1571 AND NOT EXISTS (
1572 SELECT 1
1573 FROM next_graph_node_properties n
1574 WHERE n.node_id = graph_node_properties.node_id
1575 AND n.key = graph_node_properties.key
1576 )
1577 "#,
1578 [],
1579 )?;
1580 let deleted_edge_properties = tx.execute(
1581 r#"
1582 DELETE FROM graph_edge_properties
1583 WHERE EXISTS (
1584 SELECT 1
1585 FROM next_graph_changed_edges c
1586 WHERE c.edge_key = graph_edge_properties.edge_key
1587 )
1588 AND NOT EXISTS (
1589 SELECT 1
1590 FROM next_graph_edge_properties n
1591 WHERE n.edge_key = graph_edge_properties.edge_key
1592 AND n.key = graph_edge_properties.key
1593 )
1594 "#,
1595 [],
1596 )?;
1597 let deleted_properties = deleted_node_properties + deleted_edge_properties;
1598 let upsert_properties_sql = r#"
1599 INSERT INTO graph_node_properties (node_id, key, value)
1600 SELECT n.node_id, n.key, n.value
1601 FROM next_graph_node_properties n
1602 LEFT JOIN graph_node_properties g
1603 ON g.node_id = n.node_id AND g.key = n.key
1604 WHERE g.node_id IS NULL OR g.value IS NOT n.value
1605 ON CONFLICT(node_id, key) DO UPDATE SET
1606 value = excluded.value
1607 WHERE graph_node_properties.value IS NOT excluded.value
1608 "#;
1609 let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1610 let upsert_edge_properties_sql = r#"
1611 INSERT INTO graph_edge_properties (edge_key, key, value)
1612 SELECT n.edge_key, n.key, n.value
1613 FROM next_graph_edge_properties n
1614 LEFT JOIN graph_edge_properties g
1615 ON g.edge_key = n.edge_key AND g.key = n.key
1616 WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1617 ON CONFLICT(edge_key, key) DO UPDATE SET
1618 value = excluded.value
1619 WHERE graph_edge_properties.value IS NOT excluded.value
1620 "#;
1621 let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1622 let upserted_properties = upserted_node_properties + upserted_edge_properties;
1623 tx.execute(
1624 r#"
1625 INSERT INTO graph_projection_versions
1626 (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1627 VALUES (?1, ?2, ?3, ?4, ?5)
1628 ON CONFLICT(scope) DO UPDATE SET
1629 projection_version = excluded.projection_version,
1630 content_hash = excluded.content_hash,
1631 source_watermark = excluded.source_watermark,
1632 observed_at_unix = excluded.observed_at_unix
1633 "#,
1634 (
1635 &scope,
1636 &projection_version,
1637 &projection_hash,
1638 &source_watermark,
1639 observed_at_unix,
1640 ),
1641 )?;
1642 let pruned_node_tombstones = tx.execute(
1643 r#"
1644 DELETE FROM graph_tombstones
1645 WHERE row_kind = 'node'
1646 AND EXISTS (
1647 SELECT 1
1648 FROM next_graph_nodes n
1649 WHERE n.id = substr(graph_tombstones.row_key, 6)
1650 )
1651 "#,
1652 [],
1653 )?;
1654 let pruned_edge_tombstones = tx.execute(
1655 r#"
1656 DELETE FROM graph_tombstones
1657 WHERE row_kind = 'edge'
1658 AND EXISTS (
1659 SELECT 1
1660 FROM next_graph_edges n
1661 WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1662 )
1663 "#,
1664 [],
1665 )?;
1666 {
1667 let mut insert_node_tombstone = tx.prepare(
1668 r#"
1669 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1670 VALUES (?1, 'node', ?2)
1671 ON CONFLICT(row_key) DO UPDATE SET
1672 row_kind = excluded.row_kind,
1673 deleted_at_unix = excluded.deleted_at_unix
1674 "#,
1675 )?;
1676 for id in &tombstoned_nodes {
1677 insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1678 }
1679 }
1680 {
1681 let mut insert_edge_tombstone = tx.prepare(
1682 r#"
1683 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1684 VALUES (?1, 'edge', ?2)
1685 ON CONFLICT(row_key) DO UPDATE SET
1686 row_kind = excluded.row_kind,
1687 deleted_at_unix = excluded.deleted_at_unix
1688 "#,
1689 )?;
1690 for key in &tombstoned_edges {
1691 insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1692 }
1693 }
1694 let tombstone_node_count: usize = tx.query_row(
1695 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1696 [],
1697 |row| row.get(0),
1698 )?;
1699 let tombstone_edge_count: usize = tx.query_row(
1700 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1701 [],
1702 |row| row.get(0),
1703 )?;
1704 tx.execute(
1705 r#"
1706 INSERT INTO graph_operator_stats
1707 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1708 VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1709 ON CONFLICT(scope) DO UPDATE SET
1710 nodes = excluded.nodes,
1711 edges = excluded.edges,
1712 tombstone_nodes = excluded.tombstone_nodes,
1713 tombstone_edges = excluded.tombstone_edges,
1714 file_size_bytes = excluded.file_size_bytes,
1715 freelist_bytes = excluded.freelist_bytes,
1716 observed_at_unix = excluded.observed_at_unix
1717 "#,
1718 (
1719 &scope,
1720 projection.nodes.len() as i64,
1721 projection.edges.len() as i64,
1722 tombstone_node_count as i64,
1723 tombstone_edge_count as i64,
1724 observed_at_unix,
1725 ),
1726 )?;
1727 phase_timings.push(sqlite_refresh_phase_timing(
1728 "sqlite_delta_write",
1729 delta_started,
1730 "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1731 ));
1732 let commit_started = Instant::now();
1733 tx.commit()?;
1734 phase_timings.push(sqlite_refresh_phase_timing(
1735 "sqlite_commit",
1736 commit_started,
1737 "commit refresh transaction and publish old-or-new graph visibility",
1738 ));
1739 let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1740 let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1741 let stats_started = Instant::now();
1742 self.conn.execute(
1743 r#"
1744 UPDATE graph_operator_stats
1745 SET file_size_bytes = ?2,
1746 freelist_bytes = ?3,
1747 observed_at_unix = ?4
1748 WHERE scope = ?1
1749 "#,
1750 (
1751 &scope,
1752 file_size_bytes_after.map(|value| value as i64),
1753 freelist_bytes_after.map(|value| value as i64),
1754 unix_now(),
1755 ),
1756 )?;
1757 phase_timings.push(sqlite_refresh_phase_timing(
1758 "sqlite_stats_cache_update",
1759 stats_started,
1760 "persist post-commit file and freelist proof for status/doctor",
1761 ));
1762 Ok(SqliteProjectionRefresh {
1763 scope,
1764 projection_version,
1765 source_watermark,
1766 upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1767 upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1768 unchanged_nodes,
1769 unchanged_edges,
1770 upserted_properties,
1771 unchanged_properties,
1772 deleted_properties,
1773 deleted_nodes,
1774 deleted_edges,
1775 pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1776 file_size_bytes_before,
1777 file_size_bytes_after,
1778 tombstoned_nodes,
1779 tombstoned_edges,
1780 phase_timings,
1781 })
1782 }
1783
1784 pub fn derive_ontology(&self) -> Result<GraphProjection> {
1793 let mut projection = GraphProjection::default();
1794
1795 let mut node_stmt = self.conn.prepare(
1796 "SELECT kind, COUNT(*) FROM graph_nodes \
1797 WHERE kind != 'ontology_type' \
1798 GROUP BY kind ORDER BY kind",
1799 )?;
1800 let node_rows = node_stmt.query_map([], |row| {
1801 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1802 })?;
1803 for row in node_rows {
1804 let (kind, count) = row?;
1805 projection.nodes.push(
1806 GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1807 .with_property("type_kind", &kind)
1808 .with_property("instance_count", count.to_string())
1809 .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1810 );
1811 }
1812
1813 let mut rel_stmt = self.conn.prepare(
1814 "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1815 FROM graph_edges e \
1816 JOIN graph_nodes n1 ON e.from_id = n1.id \
1817 JOIN graph_nodes n2 ON e.to_id = n2.id \
1818 WHERE e.kind NOT LIKE 'ontology_relation:%' \
1819 AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1820 GROUP BY n1.kind, e.kind, n2.kind \
1821 ORDER BY n1.kind, e.kind, n2.kind",
1822 )?;
1823 let rel_rows = rel_stmt.query_map([], |row| {
1824 Ok((
1825 row.get::<_, String>(0)?,
1826 row.get::<_, String>(1)?,
1827 row.get::<_, String>(2)?,
1828 row.get::<_, i64>(3)?,
1829 ))
1830 })?;
1831 for row in rel_rows {
1832 let (from_kind, edge_kind, to_kind, count) = row?;
1833 projection.edges.push(
1834 GraphEdge::new(
1835 format!("ontology_type:{from_kind}"),
1836 format!("ontology_type:{to_kind}"),
1837 format!("ontology_relation:{edge_kind}"),
1838 )
1839 .with_property("edge_kind", &edge_kind)
1840 .with_property("instance_count", count.to_string())
1841 .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1842 );
1843 }
1844
1845 Ok(projection)
1846 }
1847
1848 pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1849 let tx = self.conn.transaction()?;
1850 {
1851 let mut insert_node = tx.prepare(
1852 r#"
1853 INSERT INTO graph_nodes
1854 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1855 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1856 ON CONFLICT(id) DO UPDATE SET
1857 kind = excluded.kind,
1858 label = excluded.label,
1859 properties_json = excluded.properties_json,
1860 provenance_json = excluded.provenance_json,
1861 freshness_json = excluded.freshness_json,
1862 row_hash = excluded.row_hash,
1863 source_watermark = excluded.source_watermark
1864 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1865 OR graph_nodes.source_watermark IS NOT excluded.source_watermark
1866 "#,
1867 )?;
1868 let mut delete_properties =
1869 tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1870 let mut insert_property = tx.prepare(
1871 r#"
1872 INSERT INTO graph_node_properties (node_id, key, value)
1873 VALUES (?1, ?2, ?3)
1874 "#,
1875 )?;
1876 for node in &projection.nodes {
1877 let changed = insert_node.execute((
1878 &node.id,
1879 &node.kind,
1880 &node.label,
1881 to_json(&node.properties)?,
1882 to_json(&node.provenance)?,
1883 optional_to_json(&node.freshness)?,
1884 row_hash(node)?,
1885 ))?;
1886 if changed > 0 {
1887 delete_properties.execute([&node.id])?;
1888 for (key, value) in &node.properties {
1889 insert_property.execute((&node.id, key, value))?;
1890 }
1891 replace_node_semantic_vector(&tx, &node.id, &node.kind, &node.properties)?;
1892 }
1893 }
1894 }
1895 {
1896 let mut insert_edge = tx.prepare(
1897 r#"
1898 INSERT INTO graph_edges
1899 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1900 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1901 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1902 edge_key = excluded.edge_key,
1903 properties_json = excluded.properties_json,
1904 provenance_json = excluded.provenance_json,
1905 freshness_json = excluded.freshness_json,
1906 row_hash = excluded.row_hash,
1907 source_watermark = excluded.source_watermark
1908 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1909 OR graph_edges.source_watermark IS NOT excluded.source_watermark
1910 "#,
1911 )?;
1912 let mut delete_properties =
1913 tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1914 let mut insert_property = tx.prepare(
1915 r#"
1916 INSERT INTO graph_edge_properties (edge_key, key, value)
1917 VALUES (?1, ?2, ?3)
1918 "#,
1919 )?;
1920 for edge in &projection.edges {
1921 let edge_key = graph_edge_id(edge);
1922 let changed = insert_edge.execute((
1923 &edge_key,
1924 &edge.from_id,
1925 &edge.to_id,
1926 &edge.kind,
1927 to_json(&edge.properties)?,
1928 to_json(&edge.provenance)?,
1929 optional_to_json(&edge.freshness)?,
1930 row_hash(edge)?,
1931 ))?;
1932 if changed > 0 {
1933 delete_properties.execute([&edge_key])?;
1934 for (key, value) in &edge.properties {
1935 insert_property.execute((&edge_key, key, value))?;
1936 }
1937 }
1938 }
1939 }
1940 tx.commit()?;
1941 Ok(())
1942 }
1943
1944 pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1945 self.conn
1946 .query_row(
1947 r#"
1948 SELECT projection_version, content_hash, source_watermark
1949 FROM graph_projection_versions
1950 WHERE scope = ?1
1951 "#,
1952 [scope],
1953 |row| {
1954 Ok(SqliteProjectionVersion {
1955 projection_version: row.get(0)?,
1956 content_hash: row.get(1)?,
1957 source_watermark: row.get(2)?,
1958 })
1959 },
1960 )
1961 .optional()
1962 .map_err(Into::into)
1963 }
1964
1965 pub fn update_projection_source_watermark(
1966 &mut self,
1967 scope: &str,
1968 source_watermark: Option<String>,
1969 ) -> Result<()> {
1970 self.conn.execute(
1971 r#"
1972 UPDATE graph_projection_versions
1973 SET source_watermark = ?2
1974 WHERE scope = ?1
1975 "#,
1976 (scope, source_watermark),
1977 )?;
1978 Ok(())
1979 }
1980
1981 pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1982 let pruned_tombstones = if prune_tombstones {
1983 self.conn.execute("DELETE FROM graph_tombstones", [])?
1984 } else {
1985 0
1986 };
1987 self.conn.execute_batch(
1988 r#"
1989 PRAGMA wal_checkpoint(TRUNCATE);
1990 VACUUM;
1991 "#,
1992 )?;
1993 let nodes = self
1994 .conn
1995 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1996 row.get::<_, i64>(0)
1997 })?;
1998 let edges = self
1999 .conn
2000 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2001 row.get::<_, i64>(0)
2002 })?;
2003 let tombstone_nodes = self.conn.query_row(
2004 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
2005 [],
2006 |row| row.get::<_, i64>(0),
2007 )?;
2008 let tombstone_edges = self.conn.query_row(
2009 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
2010 [],
2011 |row| row.get::<_, i64>(0),
2012 )?;
2013 let file_size_bytes = sqlite_database_size_bytes(&self.conn)
2014 .ok()
2015 .map(|value| value as i64);
2016 let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
2017 .ok()
2018 .map(|value| value as i64);
2019 self.conn.execute(
2020 r#"
2021 INSERT INTO graph_operator_stats
2022 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
2023 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
2024 ON CONFLICT(scope) DO UPDATE SET
2025 nodes = excluded.nodes,
2026 edges = excluded.edges,
2027 tombstone_nodes = excluded.tombstone_nodes,
2028 tombstone_edges = excluded.tombstone_edges,
2029 file_size_bytes = excluded.file_size_bytes,
2030 freelist_bytes = excluded.freelist_bytes,
2031 observed_at_unix = excluded.observed_at_unix
2032 "#,
2033 (
2034 scope,
2035 nodes,
2036 edges,
2037 tombstone_nodes,
2038 tombstone_edges,
2039 file_size_bytes,
2040 freelist_bytes,
2041 ),
2042 )?;
2043 Ok(pruned_tombstones)
2044 }
2045
2046 fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2047 let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
2048 let in_clause = placeholders.join(", ");
2049 let sql = format!(
2050 "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
2051 FROM graph_edges e \
2052 WHERE e.from_id IN ({in_clause}) \
2053 AND e.to_id IN ({in_clause}) \
2054 ORDER BY e.from_id, e.kind, e.to_id"
2055 );
2056 let values: Vec<Value> = node_ids
2057 .iter()
2058 .chain(node_ids.iter())
2059 .map(|id| Value::Text(id.clone()))
2060 .collect();
2061 let mut stmt = self.conn.prepare(&sql)?;
2062 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2063 }
2064}
2065
2066fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
2067 let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
2068 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2069 row.get::<_, String>(3)
2070 })?)
2071}
2072
2073fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
2074 let mut diagnostics = vec![format!(
2075 "sqlite query pushdown active; plan: {}",
2076 plan.join(" | ")
2077 )];
2078 for expected_index in expected_indexes {
2079 if plan.iter().any(|row| row.contains(expected_index)) {
2080 diagnostics.push(format!("sqlite query plan uses {expected_index}"));
2081 } else {
2082 diagnostics.push(format!(
2083 "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
2084 ));
2085 }
2086 }
2087 diagnostics
2088}
2089
2090#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2091pub struct TerseDiagnostic {
2092 pub code: &'static str,
2093 #[serde(skip_serializing_if = "Option::is_none")]
2094 pub index: Option<String>,
2095}
2096
2097#[allow(dead_code)]
2098fn terse_query_plan_diagnostics(
2099 plan: &[String],
2100 expected_indexes: &[&str],
2101) -> Vec<TerseDiagnostic> {
2102 let mut diagnostics = vec![TerseDiagnostic {
2103 code: "plan_active",
2104 index: None,
2105 }];
2106 for expected_index in expected_indexes {
2107 if plan.iter().any(|row| row.contains(expected_index)) {
2108 diagnostics.push(TerseDiagnostic {
2109 code: "idx_ok",
2110 index: Some(expected_index.to_string()),
2111 });
2112 } else {
2113 diagnostics.push(TerseDiagnostic {
2114 code: "idx_missing",
2115 index: Some(expected_index.to_string()),
2116 });
2117 }
2118 }
2119 diagnostics
2120}
2121
2122fn push_sqlite_property_filter_exists(
2123 sql: &mut String,
2124 values: &mut Vec<Value>,
2125 node_alias: &str,
2126 filters: &[GraphPropertyFilter],
2127) {
2128 for (index, filter) in filters.iter().enumerate() {
2129 sql.push_str(&format!(
2130 r#"
2131 AND EXISTS (
2132 SELECT 1
2133 FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
2134 WHERE p{index}.node_id = {node_alias}.id
2135 AND p{index}.key = ?
2136 AND p{index}.value = ?
2137 )
2138 "#
2139 ));
2140 values.push(Value::Text(filter.key.clone()));
2141 values.push(Value::Text(filter.value.clone()));
2142 }
2143}
2144
2145fn push_sqlite_edge_property_filter_exists(
2146 sql: &mut String,
2147 values: &mut Vec<Value>,
2148 edge_alias: &str,
2149 filters: &[GraphPropertyFilter],
2150) {
2151 for (index, filter) in filters.iter().enumerate() {
2152 sql.push_str(&format!(
2153 r#"
2154 AND EXISTS (
2155 SELECT 1
2156 FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
2157 WHERE ep{index}.edge_key = {edge_alias}.edge_key
2158 AND ep{index}.key = ?
2159 AND ep{index}.value = ?
2160 )
2161 "#
2162 ));
2163 values.push(Value::Text(filter.key.clone()));
2164 values.push(Value::Text(filter.value.clone()));
2165 }
2166}
2167
2168struct SqliteIncidentEdgeBranch<'a> {
2169 index_name: &'a str,
2170 endpoint_column: &'a str,
2171 node_id: &'a str,
2172 kind: Option<&'a str>,
2173 filters: &'a [GraphPropertyFilter],
2174 cursor: Option<&'a str>,
2175}
2176
2177fn push_sqlite_incident_edge_branch(
2178 sql: &mut String,
2179 values: &mut Vec<Value>,
2180 branch: SqliteIncidentEdgeBranch<'_>,
2181) {
2182 sql.push_str(&format!(
2183 r#"
2184 SELECT
2185 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2186 FROM graph_edges e INDEXED BY {index_name}
2187 WHERE e.{endpoint_column} = ?
2188 "#,
2189 index_name = branch.index_name,
2190 endpoint_column = branch.endpoint_column,
2191 ));
2192 values.push(Value::Text(branch.node_id.to_string()));
2193 if let Some(kind) = branch.kind {
2194 sql.push_str(" AND e.kind = ?");
2195 values.push(Value::Text(kind.to_string()));
2196 }
2197 push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
2198 if let Some(cursor) = branch.cursor {
2199 sql.push_str(" AND e.edge_key > ?");
2200 values.push(Value::Text(cursor.to_string()));
2201 }
2202}
2203
2204fn sqlite_incident_edges_union_query(
2205 node_id: &str,
2206 kind: Option<&str>,
2207 filters: &[GraphPropertyFilter],
2208 cursor: Option<&str>,
2209 limit: Option<usize>,
2210) -> (String, Vec<Value>) {
2211 let mut sql = String::from(
2212 r#"
2213 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2214 FROM (
2215 "#,
2216 );
2217 let mut values = Vec::new();
2218 push_sqlite_incident_edge_branch(
2219 &mut sql,
2220 &mut values,
2221 SqliteIncidentEdgeBranch {
2222 index_name: "idx_graph_edges_from_kind",
2223 endpoint_column: "from_id",
2224 node_id,
2225 kind,
2226 filters,
2227 cursor,
2228 },
2229 );
2230 sql.push_str(" UNION ");
2231 push_sqlite_incident_edge_branch(
2232 &mut sql,
2233 &mut values,
2234 SqliteIncidentEdgeBranch {
2235 index_name: "idx_graph_edges_to_kind",
2236 endpoint_column: "to_id",
2237 node_id,
2238 kind,
2239 filters,
2240 cursor,
2241 },
2242 );
2243 sql.push_str(
2244 r#"
2245 ) e
2246 ORDER BY e.edge_key
2247 "#,
2248 );
2249 if let Some(limit) = limit {
2250 sql.push_str(" LIMIT ?");
2251 values.push(Value::Integer(limit.saturating_add(1) as i64));
2252 }
2253 (sql, values)
2254}
2255
2256fn sqlite_semantic_seeded_edge_score_expr(edge_alias: &str, direction_bonus: &str) -> String {
2257 format!(
2258 "(CASE {edge_alias}.kind \
2259WHEN 'semantic_relation' THEN 340 \
2260WHEN 'mentions_entity' THEN 280 \
2261WHEN 'mentions_concept' THEN 280 \
2262WHEN 'tagged_entity' THEN 280 \
2263WHEN 'tagged_concept' THEN 280 \
2264WHEN 'related_concept' THEN 280 \
2265WHEN 'mentions' THEN 220 \
2266WHEN 'calls' THEN 200 \
2267WHEN 'requests_context' THEN 180 \
2268WHEN 'scopes_context' THEN 180 \
2269WHEN 'scopes_source' THEN 180 \
2270WHEN 'explains_result' THEN 180 \
2271WHEN 'defines' THEN 120 \
2272WHEN 'contains' THEN 120 \
2273WHEN 'belongs_to' THEN 120 \
2274WHEN {edge_alias}.kind LIKE '%community%' THEN 200 \
2275WHEN {edge_alias}.kind LIKE '%semantic%' \
2276OR {edge_alias}.kind LIKE '%concept%' \
2277OR {edge_alias}.kind LIKE '%entity%' THEN 240 \
2278ELSE 80 END) \
2279+ ({direction_bonus}) \
2280+ (CASE {edge_alias}.kind \
2281WHEN 'mentions_concept' THEN 30 \
2282WHEN 'mentions_entity' THEN 30 \
2283WHEN 'tagged_concept' THEN 30 \
2284WHEN 'tagged_entity' THEN 30 \
2285WHEN 'related_concept' THEN 30 \
2286WHEN 'semantic_relation' THEN 28 \
2287WHEN 'calls' THEN 24 \
2288WHEN 'mentions' THEN 22 \
2289WHEN 'requests_context' THEN 18 \
2290WHEN 'scopes_context' THEN 18 \
2291WHEN 'scopes_source' THEN 18 \
2292WHEN 'explains_result' THEN 18 \
2293WHEN 'defines' THEN 12 \
2294WHEN 'contains' THEN 12 \
2295WHEN 'belongs_to' THEN 12 \
2296ELSE 0 END)"
2297 )
2298}
2299
2300impl GraphStore for SqliteGraphStore {
2301 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
2302 self.conn.execute(
2303 r#"
2304 INSERT INTO graph_nodes
2305 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2306 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
2307 ON CONFLICT(id) DO UPDATE SET
2308 kind = excluded.kind,
2309 label = excluded.label,
2310 properties_json = excluded.properties_json,
2311 provenance_json = excluded.provenance_json,
2312 freshness_json = excluded.freshness_json,
2313 row_hash = excluded.row_hash,
2314 source_watermark = excluded.source_watermark
2315 "#,
2316 (
2317 &node.id,
2318 &node.kind,
2319 &node.label,
2320 to_json(&node.properties)?,
2321 to_json(&node.provenance)?,
2322 optional_to_json(&node.freshness)?,
2323 row_hash(node)?,
2324 ),
2325 )?;
2326 replace_node_properties(&self.conn, &node.id, &node.properties)?;
2327 replace_node_semantic_vector(&self.conn, &node.id, &node.kind, &node.properties)?;
2328 Ok(())
2329 }
2330
2331 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2332 let edge_key = graph_edge_id(edge);
2333 self.conn.execute(
2334 r#"
2335 INSERT INTO graph_edges
2336 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2337 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2338 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2339 edge_key = excluded.edge_key,
2340 properties_json = excluded.properties_json,
2341 provenance_json = excluded.provenance_json,
2342 freshness_json = excluded.freshness_json,
2343 row_hash = excluded.row_hash,
2344 source_watermark = excluded.source_watermark
2345 "#,
2346 (
2347 &edge_key,
2348 &edge.from_id,
2349 &edge.to_id,
2350 &edge.kind,
2351 to_json(&edge.properties)?,
2352 to_json(&edge.provenance)?,
2353 optional_to_json(&edge.freshness)?,
2354 row_hash(edge)?,
2355 ),
2356 )?;
2357 replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2358 Ok(())
2359 }
2360
2361 fn delete_node(&self, id: &str) -> Result<usize> {
2362 self.conn
2363 .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2364 .map_err(Into::into)
2365 }
2366
2367 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2368 self.conn
2369 .execute(
2370 "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2371 (from_id, to_id, kind),
2372 )
2373 .map_err(Into::into)
2374 }
2375
2376 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2377 self.conn
2378 .query_row(
2379 r#"
2380SELECT id, kind, label, properties_json, provenance_json, freshness_json
2381 FROM graph_nodes
2382 WHERE id = ?1
2383 "#,
2384 [id],
2385 node_from_row,
2386 )
2387 .optional()
2388 .map_err(Into::into)
2389 }
2390
2391 fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
2392 let unique_ids = ids.iter().cloned().collect::<BTreeSet<_>>();
2393 if unique_ids.is_empty() {
2394 return Ok(Vec::new());
2395 }
2396
2397 let mut nodes = Vec::new();
2398 let id_refs = unique_ids.iter().collect::<Vec<_>>();
2399 for chunk in id_refs.chunks(450) {
2400 let placeholders = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
2401 let sql = format!(
2402 "SELECT id, kind, label, properties_json, provenance_json, freshness_json \
2403FROM graph_nodes \
2404WHERE id IN ({placeholders}) \
2405ORDER BY id"
2406 );
2407 let values = chunk
2408 .iter()
2409 .map(|id| Value::Text((*id).clone()))
2410 .collect::<Vec<_>>();
2411 let mut stmt = self.conn.prepare(&sql)?;
2412 nodes.extend(collect_rows(
2413 stmt.query_map(params_from_iter(values.iter()), node_from_row)?,
2414 )?);
2415 }
2416 nodes.sort_by(|left, right| left.id.cmp(&right.id));
2417 Ok(nodes)
2418 }
2419
2420 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2421 let mut stmt = self.conn.prepare(
2422 r#"
2423SELECT id, kind, label, properties_json, provenance_json, freshness_json
2424 FROM graph_nodes
2425 ORDER BY id
2426 "#,
2427 )?;
2428 collect_rows(stmt.query_map([], node_from_row)?)
2429 }
2430
2431 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2432 let mut stmt = self.conn.prepare(
2433 r#"
2434 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2435 FROM graph_edges
2436 ORDER BY from_id, kind, to_id
2437 "#,
2438 )?;
2439 collect_rows(stmt.query_map([], edge_from_row)?)
2440 }
2441
2442 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2443 self.conn
2444 .query_row(
2445 r#"
2446 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2447 FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2448 WHERE edge_key = ?1
2449 "#,
2450 [edge_id],
2451 edge_from_row,
2452 )
2453 .optional()
2454 .map_err(Into::into)
2455 }
2456
2457 fn graph_counts(&self) -> Result<(usize, usize)> {
2458 let nodes = self
2459 .conn
2460 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2461 row.get::<_, usize>(0)
2462 })?;
2463 let edges = self
2464 .conn
2465 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2466 row.get::<_, usize>(0)
2467 })?;
2468 Ok((nodes, edges))
2469 }
2470
2471 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2472 match kind {
2473 Some(kind) => self
2474 .conn
2475 .query_row(
2476 r#"
2477 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2478 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2479 WHERE from_id <> to_id AND kind = ?1
2480 ORDER BY from_id, kind, to_id
2481 LIMIT 1
2482 "#,
2483 [kind],
2484 edge_from_row,
2485 )
2486 .optional()
2487 .map_err(Into::into),
2488 None => self
2489 .conn
2490 .query_row(
2491 r#"
2492 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2493 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2494 WHERE from_id <> to_id
2495 ORDER BY from_id, kind, to_id
2496 LIMIT 1
2497 "#,
2498 [],
2499 edge_from_row,
2500 )
2501 .optional()
2502 .map_err(Into::into),
2503 }
2504 }
2505
2506 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2507 self.conn
2508 .query_row(
2509 r#"
2510 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2511 ep.key, ep.value
2512 FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2513 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2514 ON e.edge_key = ep.edge_key
2515 WHERE e.from_id <> e.to_id
2516 ORDER BY ep.key, ep.value, ep.edge_key
2517 LIMIT 1
2518 "#,
2519 [],
2520 |row| {
2521 Ok((
2522 edge_from_row(row)?,
2523 GraphPropertyFilter {
2524 key: row.get(7)?,
2525 value: row.get(8)?,
2526 },
2527 ))
2528 },
2529 )
2530 .optional()
2531 .map_err(Into::into)
2532 }
2533
2534 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2535 let mut stmt = self.conn.prepare(
2536 r#"
2537 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2538 FROM graph_nodes
2539 WHERE kind = ?1
2540 ORDER BY id
2541 "#,
2542 )?;
2543 collect_rows(stmt.query_map([kind], node_from_row)?)
2544 }
2545
2546 fn semantic_top_candidates(
2547 &self,
2548 query_vector: &[f64],
2549 kinds: &[&str],
2550 limit: usize,
2551 ) -> Result<Vec<GraphSemanticCandidate>> {
2552 if query_vector.is_empty() || kinds.is_empty() {
2553 return Ok(Vec::new());
2554 }
2555 if !sqlite_table_exists(&self.conn, "graph_node_semantic_vectors")? {
2556 return graph_semantic_top_candidates_by_property_scan(
2557 self,
2558 query_vector,
2559 kinds,
2560 limit,
2561 );
2562 }
2563
2564 let unique_kinds = kinds.iter().copied().collect::<BTreeSet<_>>();
2565 if unique_kinds.is_empty() {
2566 return Ok(Vec::new());
2567 }
2568 let kind_placeholders = unique_kinds
2569 .iter()
2570 .map(|_| "?")
2571 .collect::<Vec<_>>()
2572 .join(", ");
2573 let sql = format!(
2574 r#"
2575 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2576 graph_node_semantic_vectors.vector_blob,
2577 graph_node_semantic_vectors.dimensions
2578 FROM graph_node_semantic_vectors INDEXED BY idx_graph_node_semantic_vectors_kind_dims
2579 JOIN graph_nodes n ON n.id = graph_node_semantic_vectors.node_id
2580 WHERE graph_node_semantic_vectors.dimensions = ?
2581 AND graph_node_semantic_vectors.kind IN ({kind_placeholders})
2582 ORDER BY graph_node_semantic_vectors.kind, n.label, n.id
2583 "#
2584 );
2585 let mut values = vec![Value::Integer(query_vector.len() as i64)];
2586 values.extend(
2587 unique_kinds
2588 .into_iter()
2589 .map(|kind| Value::Text(kind.to_string())),
2590 );
2591 let rows = {
2592 let mut stmt = self.conn.prepare(&sql)?;
2593 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2594 Ok((
2595 node_from_row_at(row, 0)?,
2596 row.get::<_, Vec<u8>>(6)?,
2597 row.get::<_, i64>(7)?,
2598 ))
2599 })?)?
2600 };
2601
2602 let mut candidates = rows
2603 .into_iter()
2604 .filter_map(|(node, blob, dimensions)| {
2605 let dimensions = usize::try_from(dimensions).ok()?;
2606 let vector = semantic_vector_from_blob(&blob, dimensions)?;
2607 Some(GraphSemanticCandidate {
2608 score: graph_semantic_cosine(query_vector, &vector),
2609 node,
2610 })
2611 })
2612 .collect::<Vec<_>>();
2613 candidates.sort_by(|left, right| {
2614 right
2615 .score
2616 .partial_cmp(&left.score)
2617 .unwrap_or(std::cmp::Ordering::Equal)
2618 .then_with(|| left.node.kind.cmp(&right.node.kind))
2619 .then_with(|| left.node.label.cmp(&right.node.label))
2620 .then_with(|| left.node.id.cmp(&right.node.id))
2621 });
2622 if limit > 0 && candidates.len() > limit {
2623 candidates.truncate(limit);
2624 }
2625 Ok(candidates)
2626 }
2627
2628 fn paged_nodes_by_kind(
2629 &self,
2630 kind: &str,
2631 options: GraphQueryOptions,
2632 ) -> Result<GraphPagedSubgraph> {
2633 let mut sql = String::from(
2634 r#"
2635 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2636 FROM graph_nodes
2637 WHERE kind = ?
2638 "#,
2639 );
2640 let mut values = vec![Value::Text(kind.to_string())];
2641 push_sqlite_property_filter_exists(
2642 &mut sql,
2643 &mut values,
2644 "graph_nodes",
2645 &options.property_filters,
2646 );
2647 if let Some(cursor) = &options.cursor {
2648 sql.push_str(" AND id > ?");
2649 values.push(Value::Text(cursor.clone()));
2650 }
2651 sql.push_str(" ORDER BY id");
2652 if let Some(limit) = options.limit {
2653 sql.push_str(" LIMIT ?");
2654 values.push(Value::Integer(limit.saturating_add(1) as i64));
2655 }
2656
2657 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2658 let mut stmt = self.conn.prepare(&sql)?;
2659 let mut nodes =
2660 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2661 let before_limit = nodes.len();
2662 let mut next_cursor = None;
2663 if let Some(limit) = options.limit
2664 && nodes.len() > limit
2665 {
2666 next_cursor = nodes
2667 .get(limit.saturating_sub(1))
2668 .map(|node| node.id.clone());
2669 nodes.truncate(limit);
2670 }
2671 let expected_indexes = if options.property_filters.is_empty() {
2672 vec!["idx_graph_nodes_kind"]
2673 } else {
2674 vec![
2675 "idx_graph_nodes_kind",
2676 "idx_graph_node_properties_key_value_node",
2677 ]
2678 };
2679 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2680 if !options.property_filters.is_empty() {
2681 diagnostics.push(
2682 "property filters were evaluated by SQLite materialized property rows before paging"
2683 .to_string(),
2684 );
2685 }
2686 if options.cursor.is_some() {
2687 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2688 }
2689 if next_cursor.is_some() {
2690 diagnostics.push(
2691 "result was truncated; pass page.next_cursor as --cursor for the next page"
2692 .to_string(),
2693 );
2694 }
2695 Ok(GraphPagedSubgraph {
2696 page: GraphQueryPage {
2697 cursor: options.cursor,
2698 limit: options.limit,
2699 next_cursor,
2700 returned_nodes: nodes.len(),
2701 returned_edges: 0,
2702 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2703 diagnostics,
2704 },
2705 nodes,
2706 edges: Vec::new(),
2707 })
2708 }
2709
2710 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2711 match kind {
2712 Some(kind) => {
2713 let mut stmt = self.conn.prepare(
2714 r#"
2715 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2716 FROM graph_edges
2717 WHERE from_id = ?1 AND kind = ?2
2718 ORDER BY to_id, kind
2719 "#,
2720 )?;
2721 collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2722 }
2723 None => {
2724 let mut stmt = self.conn.prepare(
2725 r#"
2726 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2727 FROM graph_edges
2728 WHERE from_id = ?1
2729 ORDER BY to_id, kind
2730 "#,
2731 )?;
2732 collect_rows(stmt.query_map([from_id], edge_from_row)?)
2733 }
2734 }
2735 }
2736
2737 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2738 let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2739 let mut stmt = self.conn.prepare(&sql)?;
2740 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2741 }
2742
2743 fn semantic_seeded_expansion_edges(
2744 &self,
2745 current_id: &str,
2746 options: &SemanticSeededNeighborhoodOptions,
2747 ) -> Result<SemanticSeededNeighborhoodExpansion> {
2748 let from_score = sqlite_semantic_seeded_edge_score_expr("e", "8");
2749 let to_score = sqlite_semantic_seeded_edge_score_expr(
2750 "e",
2751 "CASE WHEN e.from_id = e.to_id THEN 8 ELSE 4 END",
2752 );
2753 let limit_clause = if options.edge_scan_cap > 0 {
2754 "LIMIT ?"
2755 } else {
2756 ""
2757 };
2758 let sql = format!(
2759 r#"
2760WITH candidate_edges AS (
2761 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2762 {from_score} AS score
2763 FROM graph_edges e INDEXED BY idx_graph_edges_from_kind
2764 WHERE e.from_id = ?
2765 UNION ALL
2766 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2767 {to_score} AS score
2768 FROM graph_edges e INDEXED BY idx_graph_edges_to_kind
2769 WHERE e.to_id = ?
2770),
2771ranked_edges AS (
2772 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2773 MAX(score) AS score
2774 FROM candidate_edges
2775 GROUP BY edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2776),
2777limited_edges AS (
2778 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2779 score, COUNT(*) OVER () AS total_edges
2780 FROM ranked_edges
2781 ORDER BY score DESC, edge_key ASC
2782 {limit_clause}
2783)
2784SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, total_edges
2785FROM limited_edges
2786ORDER BY score DESC, edge_key ASC
2787"#
2788 );
2789 let mut values = vec![
2790 Value::Text(current_id.to_string()),
2791 Value::Text(current_id.to_string()),
2792 ];
2793 if options.edge_scan_cap > 0 {
2794 values.push(Value::Integer(
2795 options
2796 .edge_scan_cap
2797 .saturating_add(1)
2798 .min(i64::MAX as usize) as i64,
2799 ));
2800 }
2801 let mut stmt = self.conn.prepare(&sql)?;
2802 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2803 Ok((edge_from_row_at(row, 0)?, row.get::<_, i64>(7)? as usize))
2804 })?)?;
2805 let total_candidates = rows.first().map(|(_, total)| *total).unwrap_or(0);
2806 let mut edges = rows.into_iter().map(|(edge, _)| edge).collect::<Vec<_>>();
2807 let mut skipped_by_edge_cap = 0usize;
2808 if options.edge_scan_cap > 0 && total_candidates > options.edge_scan_cap {
2809 skipped_by_edge_cap = total_candidates - options.edge_scan_cap;
2810 edges.truncate(options.edge_scan_cap);
2811 }
2812 Ok(SemanticSeededNeighborhoodExpansion {
2813 edges,
2814 skipped_by_edge_cap,
2815 })
2816 }
2817
2818 fn paged_edges(
2819 &self,
2820 kind: Option<&str>,
2821 options: GraphQueryOptions,
2822 ) -> Result<GraphPagedSubgraph> {
2823 let primary_property_filter = options.property_filters.first();
2824 let mut values = Vec::new();
2825 let mut sql = if let Some(filter) = primary_property_filter {
2826 values.push(Value::Text(filter.key.clone()));
2827 values.push(Value::Text(filter.value.clone()));
2828 String::from(
2829 r#"
2830 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2831 FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2832 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2833 ON e.edge_key = ep0.edge_key
2834 WHERE ep0.key = ?
2835 AND ep0.value = ?
2836 "#,
2837 )
2838 } else {
2839 String::from(
2840 r#"
2841 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2842 FROM graph_edges e
2843 WHERE 1 = 1
2844 "#,
2845 )
2846 };
2847 if let Some(kind) = kind {
2848 sql.push_str(" AND e.kind = ?");
2849 values.push(Value::Text(kind.to_string()));
2850 }
2851 push_sqlite_edge_property_filter_exists(
2852 &mut sql,
2853 &mut values,
2854 "e",
2855 if primary_property_filter.is_some() {
2856 &options.property_filters[1..]
2857 } else {
2858 &options.property_filters
2859 },
2860 );
2861 if let Some(cursor) = &options.cursor {
2862 if primary_property_filter.is_some() {
2863 sql.push_str(" AND ep0.edge_key > ?");
2864 } else {
2865 sql.push_str(" AND e.edge_key > ?");
2866 }
2867 values.push(Value::Text(cursor.clone()));
2868 }
2869 if primary_property_filter.is_some() {
2870 sql.push_str(" ORDER BY ep0.edge_key");
2871 } else {
2872 sql.push_str(" ORDER BY e.edge_key");
2873 }
2874 if let Some(limit) = options.limit {
2875 sql.push_str(" LIMIT ?");
2876 values.push(Value::Integer(limit.saturating_add(1) as i64));
2877 }
2878
2879 let primary_property_row_count = if let Some(filter) = primary_property_filter {
2880 Some(self.conn.query_row(
2881 r#"
2882 SELECT COUNT(*)
2883 FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2884 WHERE key = ?1 AND value = ?2
2885 "#,
2886 (&filter.key, &filter.value),
2887 |row| row.get::<_, usize>(0),
2888 )?)
2889 } else {
2890 None
2891 };
2892 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2893 let mut stmt = self.conn.prepare(&sql)?;
2894 let mut edges =
2895 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2896 let before_limit = edges.len();
2897 let mut next_cursor = None;
2898 if let Some(limit) = options.limit
2899 && edges.len() > limit
2900 {
2901 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2902 edges.truncate(limit);
2903 }
2904 let expected_indexes = if options.property_filters.is_empty() {
2905 vec!["idx_graph_edges_edge_key"]
2906 } else {
2907 vec![
2908 "idx_graph_edge_properties_key_value_edge",
2909 "idx_graph_edges_edge_key",
2910 ]
2911 };
2912 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2913 if !options.property_filters.is_empty() {
2914 if let Some(row_count) = primary_property_row_count {
2915 diagnostics.push(format!(
2916 "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2917 ));
2918 }
2919 diagnostics.push(
2920 "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2921 .to_string(),
2922 );
2923 }
2924 if options.cursor.is_some() {
2925 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2926 }
2927 if next_cursor.is_some() {
2928 diagnostics.push(
2929 "result was truncated; pass page.next_cursor as --cursor for the next page"
2930 .to_string(),
2931 );
2932 }
2933 Ok(GraphPagedSubgraph {
2934 page: GraphQueryPage {
2935 cursor: options.cursor,
2936 limit: options.limit,
2937 next_cursor,
2938 returned_nodes: 0,
2939 returned_edges: edges.len(),
2940 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2941 diagnostics,
2942 },
2943 nodes: Vec::new(),
2944 edges,
2945 })
2946 }
2947
2948 fn paged_incident_edges(
2949 &self,
2950 node_id: &str,
2951 kind: Option<&str>,
2952 options: GraphQueryOptions,
2953 ) -> Result<GraphPagedSubgraph> {
2954 let (sql, values) = sqlite_incident_edges_union_query(
2955 node_id,
2956 kind,
2957 &options.property_filters,
2958 options.cursor.as_deref(),
2959 options.limit,
2960 );
2961 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2962 let mut stmt = self.conn.prepare(&sql)?;
2963 let mut edges =
2964 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2965 let before_limit = edges.len();
2966 let mut next_cursor = None;
2967 if let Some(limit) = options.limit
2968 && edges.len() > limit
2969 {
2970 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2971 edges.truncate(limit);
2972 }
2973 let expected_indexes = if options.property_filters.is_empty() {
2974 vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2975 } else {
2976 vec![
2977 "idx_graph_edges_from_kind",
2978 "idx_graph_edges_to_kind",
2979 "idx_graph_edge_properties_key_value_edge",
2980 ]
2981 };
2982 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2983 diagnostics.push(
2984 "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2985 .to_string(),
2986 );
2987 if !options.property_filters.is_empty() {
2988 diagnostics.push(
2989 "edge property filters were evaluated by SQLite materialized property rows before paging"
2990 .to_string(),
2991 );
2992 }
2993 if options.cursor.is_some() {
2994 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2995 }
2996 if next_cursor.is_some() {
2997 diagnostics.push(
2998 "result was truncated; pass page.next_cursor as --cursor for the next page"
2999 .to_string(),
3000 );
3001 }
3002 Ok(GraphPagedSubgraph {
3003 page: GraphQueryPage {
3004 cursor: options.cursor,
3005 limit: options.limit,
3006 next_cursor,
3007 returned_nodes: 0,
3008 returned_edges: edges.len(),
3009 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3010 diagnostics,
3011 },
3012 nodes: Vec::new(),
3013 edges,
3014 })
3015 }
3016
3017 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
3018 if node_ids.is_empty() {
3019 return Ok(Vec::new());
3020 }
3021 if node_ids.len() <= 20 {
3022 return self.edges_between_nodes_inline(node_ids);
3023 }
3024 self.assert_not_in_temp_table_section();
3025 self.temp_table_active.set(true);
3026 let result = (|| -> Result<Vec<GraphEdge>> {
3027 let tx = self.conn.unchecked_transaction()?;
3028 tx.execute_batch(
3029 r#"
3030 CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
3031 DELETE FROM _edges_between_ids;
3032 "#,
3033 )?;
3034 for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
3035 let row_placeholders: Vec<String> =
3036 chunk.iter().map(|_| "(?)".to_string()).collect();
3037 let placeholders = row_placeholders.join(", ");
3038 let sql =
3039 format!("INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}");
3040 let values: Vec<Value> =
3041 chunk.iter().map(|id| Value::Text((*id).clone())).collect();
3042 tx.execute(&sql, params_from_iter(values.iter()))?;
3043 }
3044 let edges = {
3045 let mut stmt = tx.prepare(
3046 r#"
3047 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3048 FROM graph_edges e
3049 WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
3050 AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
3051 ORDER BY e.from_id, e.kind, e.to_id
3052 "#,
3053 )?;
3054 collect_rows(stmt.query_map([], edge_from_row)?)?
3055 };
3056 tx.finish()?;
3057 Ok(edges)
3058 })();
3059 self.temp_table_active.set(false);
3060 result
3061 }
3062
3063 fn ranked_neighborhood(
3064 &self,
3065 center_id: &str,
3066 options: &RankedNeighborhoodOptions,
3067 ) -> Result<Option<RankedNeighborhoodResult>> {
3068 if self.node(center_id)?.is_none() {
3069 return Ok(None);
3070 }
3071 let center = self.node(center_id)?.unwrap();
3072
3073 let base_score_expr = match options.scoring {
3074 tsift_core::NeighborhoodScoring::BreadthFirst => {
3075 "MAX(0, 120 - (walk.depth * 18))".to_string()
3076 }
3077 tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
3078 "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
3079 WHEN 'semantic_relation' THEN 34 \
3080 WHEN 'mentions_entity' THEN 28 \
3081 WHEN 'mentions_concept' THEN 28 \
3082 WHEN 'tagged_entity' THEN 28 \
3083 WHEN 'tagged_concept' THEN 28 \
3084 WHEN 'related_concept' THEN 28 \
3085 WHEN 'mentions' THEN 22 \
3086 WHEN 'calls' THEN 20 \
3087 WHEN 'requests_context' THEN 18 \
3088 WHEN 'scopes_context' THEN 18 \
3089 WHEN 'scopes_source' THEN 18 \
3090 WHEN 'explains_result' THEN 18 \
3091 WHEN 'defines' THEN 12 \
3092 WHEN 'contains' THEN 12 \
3093 WHEN 'belongs_to' THEN 12 \
3094 ELSE 8 END".to_string()
3095 }
3096 tsift_core::NeighborhoodScoring::DegreeWeighted => {
3097 "MAX(0, 120 - (walk.depth * 18)) + CASE \
3098 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
3099 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
3100 ELSE 0 END"
3101 .to_string()
3102 }
3103 };
3104 let now_unix = options.observed_at_now_unix.unwrap_or_else(unix_now);
3105 let observed_at_half_life_secs = options.observed_at_half_life_secs.max(1);
3106 let observed_at_weight = options.observed_at_weight.max(0);
3107 let memory_node_boost = options.memory_node_boost.max(0);
3108 let observed_at_value = "COALESCE(\
3109 CAST(json_extract(n_score.freshness_json, '$.observed_at_unix') AS INTEGER), \
3110 CAST(json_extract(n_score.properties_json, '$.observed_at_unix') AS INTEGER), \
3111 CAST(json_extract(n_score.properties_json, '$.max_observed_at_unix') AS INTEGER)\
3112 )";
3113 let observed_at_expr = if observed_at_weight == 0 {
3114 "0".to_string()
3115 } else {
3116 format!(
3117 "CASE \
3118 WHEN {observed_at_value} IS NULL THEN 0 \
3119 WHEN ({now_unix} - {observed_at_value}) < {observed_at_half_life_secs} THEN {observed_at_weight} \
3120 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 2) THEN {observed_at_weight} / 2 \
3121 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 4) THEN {observed_at_weight} / 4 \
3122 ELSE 0 END"
3123 )
3124 };
3125 let confidence_value =
3126 "CAST(json_extract(n_score.properties_json, '$.confidence') AS REAL)";
3127 let memory_signal_expr = if memory_node_boost == 0 {
3128 "0".to_string()
3129 } else {
3130 format!(
3131 "(CASE \
3132 WHEN n_score.kind = 'memory_projection' THEN 0 \
3133 WHEN n_score.kind IN ('finding', 'decision', 'memory_event') THEN {memory_node_boost} \
3134 WHEN n_score.kind IN ('note', 'memory_session') THEN {memory_node_boost} / 2 \
3135 WHEN n_score.kind IN ('source_handle', 'semantic_concept', 'semantic_vector_handle') \
3136 AND json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3137 WHEN n_score.kind LIKE 'memory_%' THEN {memory_node_boost} \
3138 WHEN json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3139 ELSE 0 END) \
3140 + (CASE \
3141 WHEN json_extract(n_score.properties_json, '$.confidence') IS NULL THEN 0 \
3142 WHEN {confidence_value} <= 0.0 THEN 0 \
3143 WHEN {confidence_value} >= 1.0 THEN {memory_node_boost} \
3144 ELSE CAST(ROUND({memory_node_boost} * {confidence_value}) AS INTEGER) END)"
3145 )
3146 };
3147 let score_expr =
3148 format!("({base_score_expr}) + ({observed_at_expr}) + ({memory_signal_expr})");
3149
3150 let use_degree_cache = matches!(
3151 options.scoring,
3152 tsift_core::NeighborhoodScoring::DegreeWeighted
3153 );
3154 let degree_cte = if use_degree_cache {
3155 "degree_cache AS ( \
3156 SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
3157 FROM graph_nodes n), "
3158 } else {
3159 ""
3160 };
3161 let mut sql = format!(
3162 r#"
3163WITH RECURSIVE {degree_cte}walk(id, depth, edge_kind, score) AS (
3164SELECT ?, 0, '', ?
3165UNION
3166SELECT e.to_id, walk.depth + 1, e.kind,
3167"#,
3168 );
3169 sql.push_str(&format!(" {}\n", score_expr));
3170 sql.push_str(
3171 r#"
3172FROM walk
3173JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3174ON e.from_id = walk.id
3175JOIN graph_nodes n_score ON n_score.id = e.to_id
3176WHERE walk.depth < ?
3177"#,
3178 );
3179 let mut values = vec![
3180 Value::Text(center_id.to_string()),
3181 Value::Integer(i64::MAX),
3182 Value::Integer(options.depth as i64),
3183 ];
3184 if let Some(kind) = &options.edge_kind {
3185 sql.push_str(" AND e.kind = ?");
3186 values.push(Value::Text(kind.clone()));
3187 }
3188 sql.push_str(
3189 r#"
3190 ),
3191scored_nodes AS (
3192SELECT walk.id, MAX(walk.score) AS score,
3193n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3194FROM walk
3195JOIN graph_nodes n ON n.id = walk.id
3196GROUP BY walk.id
3197 ),
3198 ranked AS (
3199 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3200 FROM scored_nodes
3201 ORDER BY score DESC, id ASC
3202 ),
3203 kept AS (
3204 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3205 FROM ranked
3206 LIMIT ?
3207 ),
3208 total AS (
3209 SELECT COUNT(*) AS cnt FROM scored_nodes
3210 )
3211 SELECT
3212 'meta' AS row_type,
3213 (SELECT cnt FROM total) AS total_discovered,
3214 0 AS node_id, '' AS node_kind, '' AS node_label,
3215 '' AS node_props, '' AS node_prov, '' AS node_fresh,
3216 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3217 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3218 UNION ALL
3219 SELECT
3220 'node' AS row_type,
3221 0 AS total_discovered,
3222 k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
3223 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3224 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3225 FROM kept k
3226 UNION ALL
3227 SELECT
3228 'edge' AS row_type,
3229 0 AS total_discovered,
3230 '' AS node_id, '' AS node_kind, '' AS node_label,
3231 '' AS node_props, '' AS node_prov, '' AS node_fresh,
3232 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3233 FROM graph_edges e
3234 WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
3235 AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
3236"#,
3237 );
3238 values.push(Value::Integer(options.max_nodes.saturating_add(1) as i64));
3239
3240 let mut stmt = self.conn.prepare(&sql)?;
3241 let mut nodes = vec![center.clone()];
3242 let mut edges = Vec::new();
3243 let mut total_discovered = 0usize;
3244
3245 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3246 let row_type: String = row.get(0)?;
3247 match row_type.as_str() {
3248 "meta" => Ok(QueryResult::Meta {
3249 total: row.get::<_, i64>(1)? as usize,
3250 }),
3251 "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
3252 "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
3253 _ => Err(rusqlite::Error::InvalidQuery),
3254 }
3255 })?;
3256 for row_result in rows {
3257 match row_result? {
3258 QueryResult::Meta { total } => {
3259 total_discovered = total;
3260 }
3261 QueryResult::Node(node) => {
3262 if node.id != center_id {
3263 nodes.push(node);
3264 }
3265 }
3266 QueryResult::Edge(edge) => {
3267 edges.push(edge);
3268 }
3269 }
3270 }
3271
3272 let total_discovered = total_discovered.max(nodes.len());
3273 let pruned_count = total_discovered.saturating_sub(nodes.len());
3274
3275 match options.property_mode {
3276 PropertyMode::Full => {}
3277 PropertyMode::Omit => {
3278 for n in &mut nodes {
3279 n.properties.clear();
3280 }
3281 for e in &mut edges {
3282 e.properties.clear();
3283 }
3284 }
3285 PropertyMode::Sample => {
3286 let mut seen_kinds = std::collections::BTreeSet::new();
3287 for n in &mut nodes {
3288 if !seen_kinds.contains(&n.kind) {
3289 seen_kinds.insert(n.kind.clone());
3290 } else {
3291 n.properties.clear();
3292 }
3293 }
3294 for e in &mut edges {
3295 e.properties.clear();
3296 }
3297 }
3298 }
3299
3300 Ok(Some(RankedNeighborhoodResult {
3301 nodes,
3302 edges,
3303 pruned_count,
3304 total_discovered,
3305 }))
3306 }
3307
3308 fn neighborhood(
3309 &self,
3310 center_id: &str,
3311 depth: usize,
3312 kind: Option<&str>,
3313 ) -> Result<Option<GraphSubgraph>> {
3314 self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
3315 .map(|result| {
3316 result.map(|result| {
3317 GraphSubgraph {
3318 nodes: result.nodes,
3319 edges: result.edges,
3320 }
3321 .sorted()
3322 })
3323 })
3324 }
3325
3326 fn paged_neighborhood(
3327 &self,
3328 center_id: &str,
3329 depth: usize,
3330 kind: Option<&str>,
3331 options: GraphQueryOptions,
3332 ) -> Result<Option<GraphPagedSubgraph>> {
3333 if self.node(center_id)?.is_none() {
3334 return Ok(None);
3335 }
3336 let mut sql = String::from(
3337 r#"
3338 WITH RECURSIVE walk(id, depth) AS (
3339 SELECT ?, 0
3340 UNION
3341 SELECT e.to_id, walk.depth + 1
3342 FROM walk
3343 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3344 ON e.from_id = walk.id
3345 WHERE walk.depth < ?
3346 "#,
3347 );
3348 let mut values = vec![
3349 Value::Text(center_id.to_string()),
3350 Value::Integer(depth as i64),
3351 ];
3352 if let Some(kind) = kind {
3353 sql.push_str(" AND e.kind = ?");
3354 values.push(Value::Text(kind.to_string()));
3355 }
3356 sql.push_str(
3357 r#"
3358 ),
3359 filtered_nodes AS (
3360 SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3361 FROM walk
3362 JOIN graph_nodes n ON n.id = walk.id
3363 WHERE 1 = 1
3364 "#,
3365 );
3366 push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
3367 if let Some(cursor) = &options.cursor {
3368 sql.push_str(" AND n.id > ?");
3369 values.push(Value::Text(cursor.clone()));
3370 }
3371 sql.push_str(
3372 r#"
3373 ),
3374 page_nodes AS (
3375 SELECT id, kind, label, properties_json, provenance_json, freshness_json
3376 FROM filtered_nodes
3377 ORDER BY id
3378 "#,
3379 );
3380 if let Some(limit) = options.limit {
3381 sql.push_str(" LIMIT ?");
3382 values.push(Value::Integer(limit.saturating_add(1) as i64));
3383 }
3384 sql.push_str(
3385 r#"
3386 ),
3387 walk_edges AS (
3388 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3389 FROM walk
3390 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3391 ON e.from_id = walk.id
3392 WHERE walk.depth < ?
3393 "#,
3394 );
3395 values.push(Value::Integer(depth as i64));
3396 if let Some(kind) = kind {
3397 sql.push_str(" AND e.kind = ?");
3398 values.push(Value::Text(kind.to_string()));
3399 }
3400 sql.push_str(
3401 r#"
3402 )
3403 SELECT
3404 'node' AS row_type,
3405 p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
3406 NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
3407 NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
3408 FROM page_nodes p
3409 UNION ALL
3410 SELECT DISTINCT
3411 'edge' AS row_type,
3412 NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
3413 NULL AS provenance_json, NULL AS freshness_json,
3414 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3415 FROM walk_edges e
3416 WHERE e.from_id IN (SELECT id FROM page_nodes)
3417 AND e.to_id IN (SELECT id FROM page_nodes)
3418 "#,
3419 );
3420
3421 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3422 let mut stmt = self.conn.prepare(&sql)?;
3423 let mut nodes = Vec::new();
3424 let mut edges = Vec::new();
3425 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3426 let row_type: String = row.get(0)?;
3427 match row_type.as_str() {
3428 "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
3429 "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
3430 _ => Err(rusqlite::Error::InvalidQuery),
3431 }
3432 })?;
3433 for row in rows {
3434 let (node, edge) = row?;
3435 if let Some(node) = node {
3436 nodes.push(node);
3437 }
3438 if let Some(edge) = edge {
3439 edges.push(edge);
3440 }
3441 }
3442 nodes.sort_by(|left, right| left.id.cmp(&right.id));
3443 let before_limit = nodes.len();
3444 let mut next_cursor = None;
3445 if let Some(limit) = options.limit
3446 && nodes.len() > limit
3447 {
3448 next_cursor = nodes
3449 .get(limit.saturating_sub(1))
3450 .map(|node| node.id.clone());
3451 nodes.truncate(limit);
3452 }
3453 let node_ids = nodes
3454 .iter()
3455 .map(|node| node.id.as_str())
3456 .collect::<BTreeSet<_>>();
3457 edges.retain(|edge| {
3458 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
3459 });
3460 edges.sort_by(|left, right| {
3461 left.from_id
3462 .cmp(&right.from_id)
3463 .then(left.kind.cmp(&right.kind))
3464 .then(left.to_id.cmp(&right.to_id))
3465 });
3466 let expected_indexes = if options.property_filters.is_empty() {
3467 vec!["idx_graph_edges_from_kind"]
3468 } else {
3469 vec![
3470 "idx_graph_edges_from_kind",
3471 "idx_graph_node_properties_key_value_node",
3472 ]
3473 };
3474 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3475 diagnostics.push(
3476 "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
3477 );
3478 if !options.property_filters.is_empty() {
3479 diagnostics.push(
3480 "property filters were evaluated by SQLite materialized property rows before paging"
3481 .to_string(),
3482 );
3483 }
3484 if options.cursor.is_some() {
3485 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
3486 }
3487 if next_cursor.is_some() {
3488 diagnostics.push(
3489 "result was truncated; pass page.next_cursor as --cursor for the next page"
3490 .to_string(),
3491 );
3492 }
3493 Ok(Some(GraphPagedSubgraph {
3494 page: GraphQueryPage {
3495 cursor: options.cursor,
3496 limit: options.limit,
3497 next_cursor,
3498 returned_nodes: nodes.len(),
3499 returned_edges: edges.len(),
3500 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3501 diagnostics,
3502 },
3503 nodes,
3504 edges,
3505 }))
3506 }
3507
3508 fn shortest_path(
3509 &self,
3510 from_id: &str,
3511 to_id: &str,
3512 kind: Option<&str>,
3513 ) -> Result<Option<GraphPath>> {
3514 self.shortest_path_with_max_hops(from_id, to_id, kind, None)
3515 }
3516
3517 fn shortest_path_with_max_hops(
3518 &self,
3519 from_id: &str,
3520 to_id: &str,
3521 kind: Option<&str>,
3522 max_hops: Option<usize>,
3523 ) -> Result<Option<GraphPath>> {
3524 if from_id == to_id {
3525 return Ok(Some(GraphPath {
3526 nodes: vec![from_id.to_string()],
3527 hops: 0,
3528 }));
3529 }
3530 let hop_limit = max_hops.unwrap_or(usize::MAX);
3531 if hop_limit == 0 {
3532 return Ok(None);
3533 }
3534
3535 self.assert_not_in_temp_table_section();
3536 self.temp_table_active.set(true);
3537 let result = (|| -> Result<Option<GraphPath>> {
3538 let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
3539 let tbl = format!("_tsift_frontier_{call_id}");
3540
3541 let mut visited = BTreeSet::from([from_id.to_string()]);
3542 let mut parent =
3543 BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
3544 let mut frontier = vec![from_id.to_string()];
3545 self.conn.execute_batch(&format!(
3546 r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
3547 DELETE FROM "{tbl}";"#,
3548 ))?;
3549 let select_sql = if kind.is_some() {
3550 format!(
3551 r#"SELECT e.from_id, e.to_id
3552 FROM "{tbl}" f
3553 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3554 ON e.from_id = f.id
3555 WHERE e.kind = ?
3556 ORDER BY e.from_id, e.to_id, e.kind"#,
3557 )
3558 } else {
3559 format!(
3560 r#"SELECT e.from_id, e.to_id
3561 FROM "{tbl}" f
3562 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3563 ON e.from_id = f.id
3564 ORDER BY e.from_id, e.to_id, e.kind"#,
3565 )
3566 };
3567 let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3568 let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3569 let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3570 let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3571 let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3572 let mut found_path: Option<GraphPath> = None;
3573 for _depth in 0..hop_limit {
3574 if frontier.is_empty() {
3575 break;
3576 }
3577 self.conn.execute(&delete_sql, [])?;
3578 for id in &frontier {
3579 frontier_insert_stmt.execute([id.as_str()])?;
3580 }
3581 let mut next_frontier = BTreeSet::new();
3582 let rows = if let Some(kind) = kind {
3583 collect_rows(frontier_select_stmt.query_map([kind], |row| {
3584 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3585 })?)?
3586 } else {
3587 collect_rows(frontier_select_stmt.query_map([], |row| {
3588 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3589 })?)?
3590 };
3591 for (from, next) in rows {
3592 if !visited.insert(next.clone()) {
3593 continue;
3594 }
3595 parent.insert(next.clone(), from);
3596 if next == to_id {
3597 let mut nodes = vec![to_id.to_string()];
3598 let mut cursor = to_id;
3599 while let Some(previous) = parent.get(cursor) {
3600 if previous.is_empty() {
3601 break;
3602 }
3603 nodes.push(previous.clone());
3604 cursor = previous;
3605 }
3606 nodes.reverse();
3607 found_path = Some(GraphPath {
3608 hops: nodes.len().saturating_sub(1),
3609 nodes,
3610 });
3611 break;
3612 }
3613 next_frontier.insert(next);
3614 }
3615 if found_path.is_some() {
3616 break;
3617 }
3618 frontier = next_frontier.into_iter().collect();
3619 }
3620 let _ = self.conn.execute_batch(&drop_sql);
3621 Ok(found_path)
3622 })();
3623 self.temp_table_active.set(false);
3624 result
3625 }
3626
3627 fn reachable_nodes_by_kind(
3628 &self,
3629 from_id: &str,
3630 kind: &str,
3631 depth: usize,
3632 limit: usize,
3633 ) -> Result<Vec<(GraphNode, GraphPath)>> {
3634 Ok(self
3635 .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3636 .remove(kind)
3637 .unwrap_or_default())
3638 }
3639
3640 fn reachable_nodes_by_kinds(
3641 &self,
3642 from_id: &str,
3643 kinds: &[&str],
3644 depth: usize,
3645 limit: usize,
3646 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3647 let mut requested = kinds
3648 .iter()
3649 .map(|kind| (*kind).to_string())
3650 .collect::<BTreeSet<_>>()
3651 .into_iter()
3652 .collect::<Vec<_>>();
3653 let mut results = requested
3654 .iter()
3655 .map(|kind| (kind.clone(), Vec::new()))
3656 .collect::<BTreeMap<_, _>>();
3657 if requested.is_empty() {
3658 return Ok(results);
3659 }
3660 requested.sort();
3661 let placeholders = std::iter::repeat_n("?", requested.len())
3662 .collect::<Vec<_>>()
3663 .join(", ");
3664 let mut sql = format!(
3665 r#"
3666 WITH RECURSIVE walk(id, depth, path) AS (
3667 SELECT ?, 0, char(31) || ? || char(31)
3668 UNION ALL
3669 SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3670 FROM walk
3671 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3672 ON e.from_id = walk.id
3673 WHERE walk.depth < ?
3674 AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3675 ),
3676 ranked AS (
3677 SELECT
3678 n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3679 walk.path, walk.depth,
3680 ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3681 FROM walk
3682 JOIN graph_nodes n ON n.id = walk.id
3683 WHERE n.kind IN ({placeholders}) AND n.id <> ?
3684 ),
3685 kind_ranked AS (
3686 SELECT *,
3687 ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3688 FROM ranked
3689 WHERE rn = 1
3690 )
3691 SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3692 FROM kind_ranked
3693 "#,
3694 );
3695 let mut values = vec![
3696 Value::Text(from_id.to_string()),
3697 Value::Text(from_id.to_string()),
3698 Value::Integer(depth as i64),
3699 ];
3700 values.extend(requested.iter().cloned().map(Value::Text));
3701 values.push(Value::Text(from_id.to_string()));
3702 if limit > 0 && limit != usize::MAX {
3703 sql.push_str(" WHERE kind_rank <= ?");
3704 values.push(Value::Integer(limit as i64));
3705 }
3706 sql.push_str(" ORDER BY kind, depth, label, id");
3707 let mut stmt = self.conn.prepare(&sql)?;
3708 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3709 let node = node_from_row(row)?;
3710 let path: String = row.get(6)?;
3711 let hops: usize = row.get(7)?;
3712 Ok((
3713 node,
3714 GraphPath {
3715 nodes: path
3716 .split('\u{1f}')
3717 .filter(|part| !part.is_empty())
3718 .map(str::to_string)
3719 .collect(),
3720 hops,
3721 },
3722 ))
3723 })?)?;
3724 for (node, path) in rows {
3725 results
3726 .entry(node.kind.clone())
3727 .or_default()
3728 .push((node, path));
3729 }
3730 Ok(results)
3731 }
3732
3733 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3734 if let Some(node) = self.node(target)? {
3735 return Ok(Some(node));
3736 }
3737 if kinds.is_empty() {
3738 return Ok(None);
3739 }
3740
3741 let normalized = target.trim().trim_start_matches('#');
3742 let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3743 .collect::<Vec<_>>()
3744 .join(", ");
3745 let kind_rank = kinds
3746 .iter()
3747 .enumerate()
3748 .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3749 .collect::<Vec<_>>()
3750 .join(" ");
3751 let sql = format!(
3752 r#"
3753 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3754 FROM graph_nodes n
3755 WHERE n.kind IN ({kind_placeholders})
3756 AND (
3757 EXISTS (
3758 SELECT 1
3759 FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3760 WHERE p_handle.node_id = n.id
3761 AND p_handle.key = 'handle'
3762 AND p_handle.value = ?
3763 )
3764 OR EXISTS (
3765 SELECT 1
3766 FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3767 WHERE p_ref.node_id = n.id
3768 AND p_ref.key = 'ref_id'
3769 AND p_ref.value = ?
3770 )
3771 OR n.label = ?
3772 OR n.label = ?
3773 )
3774 ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3775 LIMIT 1
3776 "#
3777 );
3778 let mut values = kinds
3779 .iter()
3780 .map(|kind| Value::Text((*kind).to_string()))
3781 .collect::<Vec<_>>();
3782 values.push(Value::Text(target.to_string()));
3783 values.push(Value::Text(normalized.to_string()));
3784 values.push(Value::Text(target.to_string()));
3785 values.push(Value::Text(format!("#{normalized}")));
3786 values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3787 self.conn
3788 .query_row(&sql, params_from_iter(values.iter()), node_from_row)
3789 .optional()
3790 .map_err(Into::into)
3791 }
3792}
3793
3794fn to_json<T: Serialize>(value: &T) -> Result<String> {
3795 serde_json::to_string(value).map_err(Into::into)
3796}
3797
3798fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3799 let payload = serde_json::to_vec(value)?;
3800 Ok(blake3::hash(&payload).to_hex().to_string())
3801}
3802
3803fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3804 value.as_ref().map(to_json).transpose()
3805}
3806
3807fn collect_rows<T>(
3808 rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3809) -> Result<Vec<T>> {
3810 rows.collect::<std::result::Result<Vec<_>, _>>()
3811 .map_err(Into::into)
3812}
3813
3814enum QueryResult {
3815 Meta { total: usize },
3816 Node(GraphNode),
3817 Edge(GraphEdge),
3818}
3819
3820fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3821 let properties_col = offset + 3;
3822 let provenance_col = offset + 4;
3823 let freshness_col = offset + 5;
3824 let properties_json: String = row.get(properties_col)?;
3825 let provenance_json: String = row.get(provenance_col)?;
3826 let freshness_json: Option<String> = row.get(freshness_col)?;
3827 Ok(GraphNode {
3828 id: row.get(offset)?,
3829 kind: row.get(offset + 1)?,
3830 label: row.get(offset + 2)?,
3831 properties: from_json(properties_col, &properties_json)?,
3832 provenance: from_json(provenance_col, &provenance_json)?,
3833 freshness: optional_from_json(freshness_col, freshness_json)?,
3834 })
3835}
3836
3837fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3838 node_from_row_at(row, 0)
3839}
3840
3841fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3842 let properties_col = offset + 4;
3843 let provenance_col = offset + 5;
3844 let freshness_col = offset + 6;
3845 let properties_json: String = row.get(properties_col)?;
3846 let provenance_json: String = row.get(provenance_col)?;
3847 let freshness_json: Option<String> = row.get(freshness_col)?;
3848 Ok(GraphEdge {
3849 id: row.get(offset)?,
3850 from_id: row.get(offset + 1)?,
3851 to_id: row.get(offset + 2)?,
3852 kind: row.get(offset + 3)?,
3853 properties: from_json(properties_col, &properties_json)?,
3854 provenance: from_json(provenance_col, &provenance_json)?,
3855 freshness: optional_from_json(freshness_col, freshness_json)?,
3856 })
3857}
3858
3859fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3860 edge_from_row_at(row, 0)
3861}
3862
3863fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3864 serde_json::from_str(raw)
3865 .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3866}
3867
3868fn optional_from_json<T: DeserializeOwned>(
3869 column: usize,
3870 raw: Option<String>,
3871) -> rusqlite::Result<Option<T>> {
3872 raw.map(|value| from_json(column, &value)).transpose()
3873}
3874
3875fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3876 nodes
3877 .iter()
3878 .find(|node| node.kind == "projection_meta")
3879 .and_then(|node| node.properties.get("projection_version").cloned())
3880}
3881
3882fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3883 nodes
3884 .iter()
3885 .find(|node| node.kind == "projection_meta")
3886 .and_then(|node| node.properties.get("content_hash").cloned())
3887}
3888
3889fn unix_now() -> i64 {
3890 std::time::SystemTime::now()
3891 .duration_since(std::time::UNIX_EPOCH)
3892 .map(|duration| duration.as_secs() as i64)
3893 .unwrap_or_default()
3894}
3895
3896fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3897 let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3898 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3899 Ok(page_count.saturating_mul(page_size))
3900}
3901
3902fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3903 let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3904 let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3905 Ok(freelist_count.saturating_mul(page_size))
3906}
3907
3908#[cfg(test)]
3909mod tests {
3910 use super::*;
3911
3912 fn sample_provenance() -> GraphProvenance {
3913 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3914 }
3915
3916 fn sample_projection() -> GraphProjection {
3917 let source = sample_provenance();
3918 GraphProjection {
3919 nodes: vec![
3920 GraphNode::new("doc:livekit", "document", "LiveKit guide")
3921 .with_property("domain", "livekit")
3922 .with_provenance(source.clone())
3923 .with_freshness(GraphFreshness::content_hash("node-hash")),
3924 GraphNode::new("topic:rooms", "topic", "Rooms"),
3925 GraphNode::new("topic:egress", "topic", "Egress"),
3926 ],
3927 edges: vec![
3928 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3929 .with_property("confidence", "0.91")
3930 .with_provenance(source.clone())
3931 .with_freshness(GraphFreshness::content_hash("edge-hash")),
3932 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3933 ],
3934 }
3935 }
3936
3937 fn assert_projection_store_contract(store: &impl GraphStore) {
3938 let projection = sample_projection();
3939 projection.upsert_into(store).unwrap();
3940
3941 assert_eq!(
3942 store.node("doc:livekit").unwrap(),
3943 projection
3944 .nodes
3945 .iter()
3946 .find(|node| node.id == "doc:livekit")
3947 .cloned()
3948 );
3949 assert_eq!(
3950 store.nodes_by_kind("topic").unwrap(),
3951 vec![
3952 GraphNode::new("topic:egress", "topic", "Egress"),
3953 GraphNode::new("topic:rooms", "topic", "Rooms"),
3954 ]
3955 );
3956
3957 let mentions = store
3958 .outgoing_edges("doc:livekit", Some("mentions"))
3959 .unwrap();
3960 assert_eq!(mentions.len(), 1);
3961 assert_eq!(mentions[0].to_id, "topic:rooms");
3962 assert_eq!(
3963 mentions[0].properties.get("confidence"),
3964 Some(&"0.91".into())
3965 );
3966
3967 let path = store
3968 .shortest_path("doc:livekit", "topic:egress", None)
3969 .unwrap()
3970 .unwrap();
3971 assert_eq!(
3972 path.nodes,
3973 vec!["doc:livekit", "topic:rooms", "topic:egress"]
3974 );
3975 }
3976
3977 #[test]
3978 fn sqlite_store_round_trips_generic_nodes_edges() {
3979 let store = SqliteGraphStore::in_memory().unwrap();
3980 let source = sample_provenance();
3981 let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3982 .with_property("domain", "livekit")
3983 .with_provenance(source.clone())
3984 .with_freshness(GraphFreshness::content_hash("node-hash"));
3985 let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
3986 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3987 .with_property("confidence", "0.91")
3988 .with_provenance(source)
3989 .with_freshness(GraphFreshness::content_hash("edge-hash"));
3990
3991 store.upsert_node(&node).unwrap();
3992 store.upsert_node(&topic).unwrap();
3993 store.upsert_edge(&edge).unwrap();
3994
3995 assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
3996 assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
3997 assert_eq!(store.all_nodes().unwrap().len(), 2);
3998 assert_eq!(store.all_edges().unwrap().len(), 1);
3999 assert_eq!(
4000 store
4001 .outgoing_edges("doc:livekit", Some("mentions"))
4002 .unwrap(),
4003 vec![edge]
4004 );
4005 }
4006
4007 #[test]
4008 fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
4009 let store = SqliteGraphStore::in_memory().unwrap();
4010 for node in [
4011 GraphNode::new("doc:livekit", "document", "LiveKit guide"),
4012 GraphNode::new("topic:rooms", "topic", "Rooms"),
4013 GraphNode::new("topic:egress", "topic", "Egress"),
4014 ] {
4015 store.upsert_node(&node).unwrap();
4016 }
4017 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4018 .with_property("confidence", "0.91");
4019 let edge_id = edge.id.clone();
4020 store.upsert_edge(&edge).unwrap();
4021 store
4022 .upsert_edge(
4023 &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
4024 .with_property("confidence", "0.42"),
4025 )
4026 .unwrap();
4027
4028 assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
4029 let mut expected_incident_ids = vec![
4030 GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
4031 GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
4032 ];
4033 expected_incident_ids.sort();
4034 assert_eq!(
4035 store
4036 .incident_edges("topic:rooms", None)
4037 .unwrap()
4038 .into_iter()
4039 .map(|edge| edge.id)
4040 .collect::<Vec<_>>(),
4041 expected_incident_ids
4042 );
4043
4044 let page = store
4045 .paged_edges(
4046 Some("mentions"),
4047 GraphQueryOptions {
4048 property_filters: vec![GraphPropertyFilter {
4049 key: "confidence".to_string(),
4050 value: "0.91".to_string(),
4051 }],
4052 ..GraphQueryOptions::default()
4053 },
4054 )
4055 .unwrap();
4056 assert_eq!(page.edges.len(), 1);
4057 assert_eq!(page.edges[0].id, edge_id);
4058 assert!(
4059 page.page
4060 .diagnostics
4061 .iter()
4062 .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
4063 "{:?}",
4064 page.page.diagnostics
4065 );
4066 assert!(
4067 page.page
4068 .diagnostics
4069 .iter()
4070 .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
4071 "{:?}",
4072 page.page.diagnostics
4073 );
4074 assert!(
4075 page.page.diagnostics.iter().any(|diagnostic| diagnostic
4076 .contains("edge property primary filter matched 1 materialized row")),
4077 "{:?}",
4078 page.page.diagnostics
4079 );
4080 assert!(
4081 page.page
4082 .diagnostics
4083 .iter()
4084 .any(|diagnostic| diagnostic
4085 .contains("drives from SQLite materialized property rows")),
4086 "{:?}",
4087 page.page.diagnostics
4088 );
4089
4090 let property_rows: usize = store
4091 .conn
4092 .query_row(
4093 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4094 [],
4095 |row| row.get(0),
4096 )
4097 .unwrap();
4098 assert_eq!(property_rows, 2);
4099 }
4100
4101 #[test]
4102 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
4103 let sqlite = SqliteGraphStore::in_memory().unwrap();
4104 assert_projection_store_contract(&sqlite);
4105 }
4106
4107 #[test]
4108 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
4109 fn assert_crud_contract(store: &impl GraphStore) {
4110 let projection = sample_projection();
4111 projection.upsert_into(store).unwrap();
4112
4113 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
4114 assert_eq!(
4115 neighborhood
4116 .nodes
4117 .iter()
4118 .map(|node| node.id.as_str())
4119 .collect::<Vec<_>>(),
4120 vec!["doc:livekit", "topic:egress", "topic:rooms"]
4121 );
4122 assert_eq!(
4123 neighborhood
4124 .edges
4125 .iter()
4126 .map(|edge| (
4127 edge.from_id.as_str(),
4128 edge.kind.as_str(),
4129 edge.to_id.as_str()
4130 ))
4131 .collect::<Vec<_>>(),
4132 vec![
4133 ("doc:livekit", "mentions", "topic:rooms"),
4134 ("topic:rooms", "related_to", "topic:egress"),
4135 ]
4136 );
4137
4138 assert_eq!(
4139 store
4140 .delete_edge("topic:rooms", "topic:egress", "related_to")
4141 .unwrap(),
4142 1
4143 );
4144 assert!(
4145 store
4146 .shortest_path("doc:livekit", "topic:egress", None)
4147 .unwrap()
4148 .is_none()
4149 );
4150 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
4151 assert!(store.node("topic:rooms").unwrap().is_none());
4152 assert!(
4153 store
4154 .outgoing_edges("doc:livekit", None)
4155 .unwrap()
4156 .is_empty()
4157 );
4158 }
4159
4160 assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
4161 }
4162
4163 #[test]
4164 fn sqlite_ranked_neighborhood_prefers_recent_memory_nodes_when_pruning() {
4165 let store = SqliteGraphStore::in_memory().unwrap();
4166 store
4167 .upsert_node(&GraphNode::new("center", "file", "center"))
4168 .unwrap();
4169 store
4170 .upsert_node(&GraphNode::new("aaa-code", "symbol", "code candidate"))
4171 .unwrap();
4172 store
4173 .upsert_node(
4174 &GraphNode::new("mmm-stale", "memory_event", "stale memory")
4175 .with_property("provider", "tsift-memory")
4176 .with_property("observed_at_unix", "1000"),
4177 )
4178 .unwrap();
4179 store
4180 .upsert_node(
4181 &GraphNode::new("zzz-fresh", "memory_event", "fresh memory")
4182 .with_property("provider", "tsift-memory")
4183 .with_property("observed_at_unix", "1995"),
4184 )
4185 .unwrap();
4186 store
4187 .upsert_edge(&GraphEdge::new("center", "aaa-code", "mentions"))
4188 .unwrap();
4189 store
4190 .upsert_edge(&GraphEdge::new("center", "mmm-stale", "mentions"))
4191 .unwrap();
4192 store
4193 .upsert_edge(&GraphEdge::new("center", "zzz-fresh", "mentions"))
4194 .unwrap();
4195
4196 let options = RankedNeighborhoodOptions::new(1, 1)
4197 .with_observed_at_now_unix(2000)
4198 .with_observed_at_half_life_secs(100);
4199 let result = store
4200 .ranked_neighborhood("center", &options)
4201 .unwrap()
4202 .unwrap();
4203 let ids: Vec<_> = result.nodes.iter().map(|node| node.id.as_str()).collect();
4204 assert!(ids.contains(&"center"));
4205 assert!(
4206 ids.contains(&"zzz-fresh"),
4207 "fresh memory node should survive pruning: {ids:?}"
4208 );
4209 assert!(!ids.contains(&"aaa-code"));
4210 assert!(!ids.contains(&"mmm-stale"));
4211 }
4212
4213 #[test]
4214 fn sqlite_semantic_seeded_neighborhood_scores_before_sql_edge_cap() {
4215 let store = SqliteGraphStore::in_memory().unwrap();
4216 store
4217 .upsert_node(&GraphNode::new("seed", "semantic_concept", "graph budget"))
4218 .unwrap();
4219 store
4220 .upsert_node(&GraphNode::new("zzz_high", "symbol", "high_signal"))
4221 .unwrap();
4222 store
4223 .upsert_edge(&GraphEdge::new("zzz_high", "seed", "mentions_concept"))
4224 .unwrap();
4225 for idx in 0..24 {
4226 let id = format!("aaa_low_{idx:02}");
4227 store
4228 .upsert_node(&GraphNode::new(id.clone(), "note", format!("low {idx}")))
4229 .unwrap();
4230 store
4231 .upsert_edge(&GraphEdge::new(id, "seed", "weak_link"))
4232 .unwrap();
4233 }
4234
4235 let options = SemanticSeededNeighborhoodOptions::new(1, 3)
4236 .with_edge_scan_cap(16)
4237 .with_node_discovery_cap(9);
4238 let result = store
4239 .semantic_seeded_neighborhood(&["seed".to_string()], &options)
4240 .unwrap();
4241 let ids = result
4242 .nodes
4243 .iter()
4244 .map(|node| node.id.as_str())
4245 .collect::<Vec<_>>();
4246
4247 assert_eq!(ids.len(), 3);
4248 assert_eq!(ids[0], "seed");
4249 assert_eq!(ids[1], "zzz_high");
4250 assert_eq!(result.skipped_by_edge_cap, 9);
4251 assert!(result.truncated);
4252 }
4253
4254 #[test]
4255 fn sqlite_upsert_projection_batches_rows_and_properties() {
4256 let mut store = SqliteGraphStore::in_memory().unwrap();
4257 let mut projection = sample_projection();
4258 store.upsert_projection(&projection).unwrap();
4259
4260 let page = store
4261 .paged_nodes_by_kind(
4262 "document",
4263 GraphQueryOptions {
4264 property_filters: vec![GraphPropertyFilter {
4265 key: "domain".to_string(),
4266 value: "livekit".to_string(),
4267 }],
4268 ..GraphQueryOptions::default()
4269 },
4270 )
4271 .unwrap();
4272 assert_eq!(page.nodes[0].id, "doc:livekit");
4273
4274 projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4275 .with_property("domain", "recording");
4276 store.upsert_projection(&projection).unwrap();
4277
4278 let old_property_count: usize = store
4279 .conn
4280 .query_row(
4281 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
4282 [],
4283 |row| row.get(0),
4284 )
4285 .unwrap();
4286 let updated_page = store
4287 .paged_nodes_by_kind(
4288 "document",
4289 GraphQueryOptions {
4290 property_filters: vec![GraphPropertyFilter {
4291 key: "domain".to_string(),
4292 value: "recording".to_string(),
4293 }],
4294 ..GraphQueryOptions::default()
4295 },
4296 )
4297 .unwrap();
4298 assert_eq!(old_property_count, 0);
4299 assert_eq!(updated_page.nodes[0].id, "doc:livekit");
4300 let edge_property_count: usize = store
4301 .conn
4302 .query_row(
4303 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4304 [],
4305 |row| row.get(0),
4306 )
4307 .unwrap();
4308 assert_eq!(edge_property_count, 1);
4309
4310 let changes_before = store.conn.total_changes();
4311 store.upsert_projection(&projection).unwrap();
4312 assert_eq!(
4313 store.conn.total_changes(),
4314 changes_before,
4315 "unchanged projection rows should not rewrite graph rows or materialized properties"
4316 );
4317 }
4318
4319 #[test]
4320 fn sqlite_semantic_top_candidates_use_materialized_vector_table() {
4321 let store = SqliteGraphStore::in_memory().unwrap();
4322 store
4323 .upsert_node(
4324 &GraphNode::new("concept:graph", "semantic_concept", "graph navigation")
4325 .with_property("embedding_model", "fixture-v1")
4326 .with_property("embedding", "1.0,0.0"),
4327 )
4328 .unwrap();
4329 store
4330 .upsert_node(
4331 &GraphNode::new("concept:sqlite", "semantic_concept", "sqlite search")
4332 .with_property("embedding", "0.0,1.0"),
4333 )
4334 .unwrap();
4335 store
4336 .upsert_node(&GraphNode::new("entity:skip", "semantic_entity", "skipped"))
4337 .unwrap();
4338
4339 let vector_rows: usize = store
4340 .conn
4341 .query_row(
4342 "SELECT COUNT(*) FROM graph_node_semantic_vectors",
4343 [],
4344 |row| row.get(0),
4345 )
4346 .unwrap();
4347 assert_eq!(vector_rows, 2);
4348
4349 let candidates = store
4350 .semantic_top_candidates(&[1.0, 0.0], &["semantic_concept"], 1)
4351 .unwrap();
4352 assert_eq!(candidates.len(), 1);
4353 assert_eq!(candidates[0].node.id, "concept:graph");
4354 assert_eq!(candidates[0].score, 1.0);
4355
4356 store
4357 .upsert_node(&GraphNode::new(
4358 "concept:graph",
4359 "semantic_concept",
4360 "graph navigation",
4361 ))
4362 .unwrap();
4363 let vector_rows_after_update: usize = store
4364 .conn
4365 .query_row(
4366 "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE node_id = 'concept:graph'",
4367 [],
4368 |row| row.get(0),
4369 )
4370 .unwrap();
4371 assert_eq!(vector_rows_after_update, 0);
4372 }
4373
4374 #[test]
4375 fn sqlite_store_filters_edges_by_kind_and_paths() {
4376 let store = SqliteGraphStore::in_memory().unwrap();
4377 for id in ["a", "b", "c"] {
4378 store
4379 .upsert_node(&GraphNode::new(id, "symbol", id))
4380 .unwrap();
4381 }
4382 store
4383 .upsert_edge(&GraphEdge::new("a", "b", "calls"))
4384 .unwrap();
4385 store
4386 .upsert_edge(&GraphEdge::new("a", "c", "documents"))
4387 .unwrap();
4388 store
4389 .upsert_edge(&GraphEdge::new("b", "c", "calls"))
4390 .unwrap();
4391
4392 let calls = store.outgoing_edges("a", Some("calls")).unwrap();
4393 assert_eq!(calls.len(), 1);
4394 assert_eq!(calls[0].to_id, "b");
4395 assert_eq!(store.graph_counts().unwrap(), (3, 3));
4396 assert_eq!(
4397 store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
4398 "b"
4399 );
4400
4401 let path = store
4402 .shortest_path("a", "c", Some("calls"))
4403 .unwrap()
4404 .unwrap();
4405 assert_eq!(path.nodes, vec!["a", "b", "c"]);
4406 assert_eq!(path.hops, 2);
4407
4408 assert!(
4409 store
4410 .shortest_path("c", "a", Some("calls"))
4411 .unwrap()
4412 .is_none()
4413 );
4414 }
4415
4416 #[test]
4417 fn sqlite_store_batches_edges_between_node_sets() {
4418 let store = SqliteGraphStore::in_memory().unwrap();
4419 for id in ["a", "b", "c", "outside"] {
4420 store
4421 .upsert_node(&GraphNode::new(id, "symbol", id))
4422 .unwrap();
4423 }
4424 for edge in [
4425 GraphEdge::new("a", "b", "calls"),
4426 GraphEdge::new("b", "c", "calls"),
4427 GraphEdge::new("a", "outside", "calls"),
4428 GraphEdge::new("outside", "c", "calls"),
4429 ] {
4430 store.upsert_edge(&edge).unwrap();
4431 }
4432
4433 let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
4434 .into_iter()
4435 .collect::<BTreeSet<_>>();
4436 let edge_keys = store
4437 .edges_between_nodes(&scoped)
4438 .unwrap()
4439 .into_iter()
4440 .map(|edge| (edge.from_id, edge.kind, edge.to_id))
4441 .collect::<Vec<_>>();
4442
4443 assert_eq!(
4444 edge_keys,
4445 vec![
4446 ("a".to_string(), "calls".to_string(), "b".to_string()),
4447 ("b".to_string(), "calls".to_string(), "c".to_string()),
4448 ]
4449 );
4450 }
4451
4452 #[test]
4453 fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
4454 let mut store = SqliteGraphStore::in_memory().unwrap();
4455 let mut projection = sample_projection();
4456 projection.nodes.push(
4457 GraphNode::new(
4458 "projection:fixture",
4459 "projection_meta",
4460 "fixture projection",
4461 )
4462 .with_property("projection_version", "fixture-v1")
4463 .with_property("content_hash", "hash-a"),
4464 );
4465 store
4466 .replace_projection_with_version(
4467 "root",
4468 &projection,
4469 Some("fixture-v1"),
4470 Some("commit-a".to_string()),
4471 )
4472 .unwrap();
4473
4474 projection.nodes.retain(|node| node.id != "topic:egress");
4475 projection.edges.retain(|edge| edge.to_id != "topic:egress");
4476 let refresh = store
4477 .replace_projection_with_version(
4478 "root",
4479 &projection,
4480 Some("fixture-v2"),
4481 Some("commit-b".to_string()),
4482 )
4483 .unwrap();
4484
4485 assert_eq!(refresh.projection_version, "fixture-v2");
4486 assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
4487 assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
4488 assert_eq!(refresh.tombstoned_edges.len(), 1);
4489 assert_eq!(refresh.deleted_nodes, 1);
4490 assert_eq!(refresh.deleted_edges, 1);
4491 assert_eq!(refresh.unchanged_nodes, 3);
4492 assert_eq!(refresh.upserted_nodes, 0);
4493 assert_eq!(refresh.unchanged_properties, 4);
4494 assert_eq!(refresh.upserted_properties, 0);
4495 assert_eq!(refresh.deleted_properties, 0);
4496 assert!(
4497 refresh
4498 .phase_timings
4499 .iter()
4500 .any(|phase| phase.name == "sqlite_property_row_staging"),
4501 "{:?}",
4502 refresh.phase_timings
4503 );
4504 assert!(
4505 refresh
4506 .phase_timings
4507 .iter()
4508 .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
4509 "{:?}",
4510 refresh.phase_timings
4511 );
4512 let version = store.projection_version("root").unwrap().unwrap();
4513 assert_eq!(version.projection_version, "fixture-v2");
4514 assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
4515 let cached_counts: (usize, usize, usize, usize) = store
4516 .conn
4517 .query_row(
4518 r#"
4519 SELECT nodes, edges, tombstone_nodes, tombstone_edges
4520 FROM graph_operator_stats
4521 WHERE scope = 'root'
4522 "#,
4523 [],
4524 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
4525 )
4526 .unwrap();
4527 assert_eq!(cached_counts, (3, 1, 1, 1));
4528
4529 projection
4530 .nodes
4531 .push(GraphNode::new("topic:egress", "topic", "Egress"));
4532 let refresh = store
4533 .replace_projection_with_version(
4534 "root",
4535 &projection,
4536 Some("fixture-v3"),
4537 Some("commit-c".to_string()),
4538 )
4539 .unwrap();
4540 assert_eq!(refresh.pruned_tombstones, 1);
4541 assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
4542
4543 projection.nodes.retain(|node| node.id != "topic:egress");
4544 store
4545 .replace_projection_with_version(
4546 "root",
4547 &projection,
4548 Some("fixture-v4"),
4549 Some("commit-d".to_string()),
4550 )
4551 .unwrap();
4552 assert_eq!(store.compact_storage("root", true).unwrap(), 2);
4553 let cached_counts: (usize, usize, usize, usize) = store
4554 .conn
4555 .query_row(
4556 r#"
4557 SELECT nodes, edges, tombstone_nodes, tombstone_edges
4558 FROM graph_operator_stats
4559 WHERE scope = 'root'
4560 "#,
4561 [],
4562 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
4563 )
4564 .unwrap();
4565 assert_eq!(cached_counts, (3, 1, 0, 0));
4566 }
4567
4568 #[test]
4569 fn sqlite_shortest_path_uses_bounded_frontier() {
4570 let store = SqliteGraphStore::in_memory().unwrap();
4571 for idx in 0..80 {
4572 store
4573 .upsert_node(&GraphNode::new(
4574 format!("node:{idx:02}"),
4575 "symbol",
4576 format!("node {idx:02}"),
4577 ))
4578 .unwrap();
4579 }
4580 for idx in 0..79 {
4581 store
4582 .upsert_edge(&GraphEdge::new(
4583 format!("node:{idx:02}"),
4584 format!("node:{:02}", idx + 1),
4585 "calls",
4586 ))
4587 .unwrap();
4588 }
4589 store
4590 .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
4591 .unwrap();
4592
4593 assert!(
4594 store
4595 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
4596 .unwrap()
4597 .is_none()
4598 );
4599 let path = store
4600 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
4601 .unwrap()
4602 .unwrap();
4603 assert_eq!(path.hops, 79);
4604 assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
4605 assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
4606
4607 let direct = store
4608 .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
4609 .unwrap()
4610 .unwrap();
4611 assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
4612 }
4613
4614 #[test]
4615 fn sqlite_resolves_evidence_targets_with_indexed_properties() {
4616 let store = SqliteGraphStore::in_memory().unwrap();
4617 for node in [
4618 GraphNode::new("gbak-refresh", "backlog", "#refresh")
4619 .with_property("ref_id", "refresh")
4620 .with_property("handle", "backlog-handle"),
4621 GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
4622 .with_property("ref_id", "refresh"),
4623 GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
4624 .with_property("ref_id", "refresh"),
4625 ] {
4626 store.upsert_node(&node).unwrap();
4627 }
4628
4629 let by_ref = store
4630 .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
4631 .unwrap()
4632 .unwrap();
4633 assert_eq!(by_ref.id, "gbak-refresh");
4634 let by_handle = store
4635 .resolve_evidence_target("backlog-handle", &["backlog"])
4636 .unwrap()
4637 .unwrap();
4638 assert_eq!(by_handle.id, "gbak-refresh");
4639 }
4640
4641 #[test]
4642 fn sqlite_schema_migration_backfills_materialized_node_properties() {
4643 let conn = Connection::open_in_memory().unwrap();
4644 conn.execute_batch(
4645 r#"
4646 PRAGMA user_version = 2;
4647 CREATE TABLE graph_nodes (
4648 id TEXT PRIMARY KEY,
4649 kind TEXT NOT NULL,
4650 label TEXT NOT NULL,
4651 properties_json TEXT NOT NULL DEFAULT '{}',
4652 provenance_json TEXT NOT NULL DEFAULT '[]',
4653 freshness_json TEXT,
4654 row_hash TEXT,
4655 source_watermark TEXT
4656 );
4657 CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
4658 CREATE TABLE graph_edges (
4659 from_id TEXT NOT NULL,
4660 to_id TEXT NOT NULL,
4661 kind TEXT NOT NULL,
4662 properties_json TEXT NOT NULL DEFAULT '{}',
4663 provenance_json TEXT NOT NULL DEFAULT '[]',
4664 freshness_json TEXT,
4665 row_hash TEXT,
4666 source_watermark TEXT,
4667 PRIMARY KEY (from_id, to_id, kind)
4668 );
4669 CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
4670 CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
4671 CREATE TABLE graph_projection_versions (
4672 scope TEXT PRIMARY KEY,
4673 projection_version TEXT NOT NULL,
4674 content_hash TEXT,
4675 source_watermark TEXT,
4676 observed_at_unix INTEGER NOT NULL
4677 );
4678 CREATE TABLE graph_tombstones (
4679 row_key TEXT PRIMARY KEY,
4680 row_kind TEXT NOT NULL,
4681 deleted_at_unix INTEGER NOT NULL
4682 );
4683 INSERT INTO graph_nodes
4684 (id, kind, label, properties_json, provenance_json)
4685 VALUES
4686 ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
4687 ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]'),
4688 ('concept:graph', 'semantic_concept', 'Graph navigation', '{"embedding":"1.0,0.0","embedding_model":"fixture-v1"}', '[]');
4689 INSERT INTO graph_edges
4690 (from_id, to_id, kind, properties_json, provenance_json)
4691 VALUES
4692 ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
4693 "#,
4694 )
4695 .unwrap();
4696
4697 let store = SqliteGraphStore::from_connection(conn).unwrap();
4698 let version: i64 = store
4699 .conn
4700 .pragma_query_value(None, "user_version", |row| row.get(0))
4701 .unwrap();
4702 assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
4703 let property_rows: usize = store
4704 .conn
4705 .query_row(
4706 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
4707 [],
4708 |row| row.get(0),
4709 )
4710 .unwrap();
4711 assert_eq!(property_rows, 2);
4712 let edge_property_rows: usize = store
4713 .conn
4714 .query_row(
4715 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4716 [],
4717 |row| row.get(0),
4718 )
4719 .unwrap();
4720 assert_eq!(edge_property_rows, 1);
4721 let semantic_vector_rows: usize = store
4722 .conn
4723 .query_row(
4724 "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE model = 'fixture-v1'",
4725 [],
4726 |row| row.get(0),
4727 )
4728 .unwrap();
4729 assert_eq!(semantic_vector_rows, 1);
4730 let edge = store
4731 .edge(&GraphEdge::stable_id(
4732 "topic:rooms",
4733 "topic:egress",
4734 "mentions",
4735 ))
4736 .unwrap()
4737 .unwrap();
4738 assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4739
4740 let page = store
4741 .paged_nodes_by_kind(
4742 "topic",
4743 GraphQueryOptions {
4744 property_filters: vec![GraphPropertyFilter {
4745 key: "domain".to_string(),
4746 value: "livekit".to_string(),
4747 }],
4748 ..GraphQueryOptions::default()
4749 },
4750 )
4751 .unwrap();
4752 assert_eq!(page.nodes[0].id, "topic:rooms");
4753 assert!(
4754 page.page
4755 .diagnostics
4756 .iter()
4757 .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
4758 "{:?}",
4759 page.page.diagnostics
4760 );
4761 }
4762
4763 #[test]
4764 fn sqlite_store_batches_reachable_nodes_by_kinds() {
4765 let store = SqliteGraphStore::in_memory().unwrap();
4766 for node in [
4767 GraphNode::new("start", "backlog", "start"),
4768 GraphNode::new("ctx", "worker_context", "context"),
4769 GraphNode::new("src", "source_handle", "source"),
4770 GraphNode::new("sem", "semantic_concept", "concept"),
4771 ] {
4772 store.upsert_node(&node).unwrap();
4773 }
4774 store
4775 .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
4776 .unwrap();
4777 store
4778 .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
4779 .unwrap();
4780 store
4781 .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
4782 .unwrap();
4783
4784 let rows = store
4785 .reachable_nodes_by_kinds(
4786 "start",
4787 &["worker_context", "source_handle", "semantic_concept"],
4788 2,
4789 8,
4790 )
4791 .unwrap();
4792 assert_eq!(rows["worker_context"][0].0.id, "ctx");
4793 assert_eq!(
4794 rows["source_handle"][0].1.nodes,
4795 vec!["start", "ctx", "src"]
4796 );
4797 assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4798 }
4799
4800 #[test]
4801 fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4802 let mut store = SqliteGraphStore::in_memory().unwrap();
4803 let source = GraphProvenance::new("fixture", "bulk");
4804 let mut projection = GraphProjection::default();
4805 for idx in 0..128 {
4806 projection.nodes.push(
4807 GraphNode::new(
4808 format!("node:{idx:03}"),
4809 if idx % 2 == 0 { "symbol" } else { "file" },
4810 format!("bulk node {idx:03}"),
4811 )
4812 .with_property("ordinal", idx.to_string())
4813 .with_provenance(source.clone())
4814 .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4815 );
4816 }
4817 for idx in 0..127 {
4818 projection.edges.push(
4819 GraphEdge::new(
4820 format!("node:{idx:03}"),
4821 format!("node:{:03}", idx + 1),
4822 "next",
4823 )
4824 .with_property("ordinal", idx.to_string())
4825 .with_provenance(source.clone())
4826 .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4827 );
4828 }
4829
4830 store
4831 .replace_projection_with_version(
4832 "root",
4833 &projection,
4834 Some("bulk-v1"),
4835 Some("commit-a".to_string()),
4836 )
4837 .unwrap();
4838
4839 projection
4840 .nodes
4841 .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4842 projection.edges.retain(|edge| {
4843 !edge.from_id.ends_with("000")
4844 && !edge.to_id.ends_with("000")
4845 && !edge.from_id.ends_with("064")
4846 && !edge.to_id.ends_with("064")
4847 });
4848 let refresh = store
4849 .replace_projection_with_version(
4850 "root",
4851 &projection,
4852 Some("bulk-v2"),
4853 Some("commit-b".to_string()),
4854 )
4855 .unwrap();
4856
4857 assert_eq!(store.all_nodes().unwrap().len(), 126);
4858 assert_eq!(store.all_edges().unwrap().len(), 124);
4859 assert_eq!(
4860 refresh.tombstoned_nodes,
4861 vec!["node:000".to_string(), "node:064".to_string()]
4862 );
4863 assert_eq!(refresh.tombstoned_edges.len(), 3);
4864 assert_eq!(refresh.deleted_nodes, 2);
4865 assert_eq!(refresh.deleted_edges, 3);
4866 assert_eq!(refresh.unchanged_nodes, 126);
4867 assert_eq!(refresh.unchanged_edges, 124);
4868 assert_eq!(refresh.upserted_nodes, 0);
4869 assert_eq!(refresh.upserted_edges, 0);
4870 assert_eq!(refresh.unchanged_properties, 250);
4871 assert_eq!(refresh.upserted_properties, 0);
4872 assert!(
4873 refresh
4874 .phase_timings
4875 .iter()
4876 .any(|phase| phase.name == "sqlite_node_staging"
4877 && phase.detail.contains("126 unchanged skipped")
4878 && phase.detail.contains("multi-row chunks up to 500 rows")),
4879 "{:?}",
4880 refresh.phase_timings
4881 );
4882 assert!(
4883 refresh
4884 .phase_timings
4885 .iter()
4886 .any(|phase| phase.name == "sqlite_node_staging"
4887 && phase.detail.contains("124 unchanged skipped")),
4888 "{:?}",
4889 refresh.phase_timings
4890 );
4891 let staged_node_properties: usize = store
4892 .conn
4893 .query_row(
4894 "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4895 [],
4896 |row| row.get(0),
4897 )
4898 .unwrap();
4899 let staged_edge_properties: usize = store
4900 .conn
4901 .query_row(
4902 "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4903 [],
4904 |row| row.get(0),
4905 )
4906 .unwrap();
4907 assert_eq!(staged_node_properties, 0);
4908 assert_eq!(staged_edge_properties, 0);
4909 assert!(
4910 refresh
4911 .phase_timings
4912 .iter()
4913 .any(|phase| phase.name == "sqlite_property_row_staging"
4914 && phase.detail.contains("new/changed node rows")),
4915 "{:?}",
4916 refresh.phase_timings
4917 );
4918 assert!(
4919 refresh
4920 .phase_timings
4921 .iter()
4922 .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4923 && phase.detail.contains("new/changed edge rows")),
4924 "{:?}",
4925 refresh.phase_timings
4926 );
4927 assert_eq!(
4928 store
4929 .projection_version("root")
4930 .unwrap()
4931 .unwrap()
4932 .source_watermark
4933 .as_deref(),
4934 Some("commit-b")
4935 );
4936 }
4937
4938 #[test]
4939 fn sqlite_reentrant_temp_table_guard_panics() {
4940 let store = SqliteGraphStore::in_memory().unwrap();
4941 store.temp_table_active.set(true);
4942 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4943 store.assert_not_in_temp_table_section();
4944 }));
4945 assert!(result.is_err());
4946 }
4947
4948 #[test]
4949 fn sqlite_temp_table_guard_clears_after_method() {
4950 let mut store = SqliteGraphStore::in_memory().unwrap();
4951 let projection = GraphProjection {
4952 nodes: vec![],
4953 edges: vec![],
4954 };
4955 store.replace_projection(&projection).unwrap();
4956 assert!(!store.temp_table_active.get());
4957 }
4958
4959 #[test]
4960 fn derive_ontology_summarizes_types_and_relations() {
4961 let mut store = SqliteGraphStore::in_memory().unwrap();
4962 let seed = GraphProjection {
4963 nodes: vec![
4964 GraphNode::new("fn:a", "function", "a"),
4965 GraphNode::new("fn:b", "function", "b"),
4966 GraphNode::new("mod:m", "module", "m"),
4967 ],
4968 edges: vec![
4969 GraphEdge::new("fn:a", "fn:b", "calls"),
4970 GraphEdge::new("mod:m", "fn:a", "contains"),
4971 ],
4972 };
4973 store.upsert_projection(&seed).unwrap();
4974
4975 let onto = store.derive_ontology().unwrap();
4976 let type_kinds: std::collections::BTreeSet<_> =
4977 onto.nodes.iter().map(|n| n.label.clone()).collect();
4978 assert!(type_kinds.contains("function"));
4979 assert!(type_kinds.contains("module"));
4980 assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
4981
4982 let rel: std::collections::BTreeSet<_> = onto
4983 .edges
4984 .iter()
4985 .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
4986 .collect();
4987 assert!(rel.contains(&(
4988 "ontology_type:function".into(),
4989 "ontology_relation:calls".into(),
4990 "ontology_type:function".into()
4991 )));
4992 assert!(rel.contains(&(
4993 "ontology_type:module".into(),
4994 "ontology_relation:contains".into(),
4995 "ontology_type:function".into()
4996 )));
4997
4998 let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
4999 assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
5000
5001 store.upsert_projection(&onto).unwrap();
5003 let onto2 = store.derive_ontology().unwrap();
5004 assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
5005 assert_eq!(onto2.nodes.len(), onto.nodes.len());
5006 assert_eq!(onto2.edges.len(), onto.edges.len());
5007 }
5008}