1use anyhow::{Context, Result, bail};
2use rusqlite::{
3 Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4 types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::cell::Cell;
8use std::collections::{BTreeMap, BTreeSet};
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15 ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16 ConvexRowsGraphClient, GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL,
17 GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY, GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY, GraphEdge,
18 GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath, GraphProjection, GraphPropertyFilter,
19 GraphProvenance, GraphQueryOptions, GraphQueryPage, GraphSemanticCandidate, GraphStore,
20 GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
21 SQLITE_GRAPH_SCHEMA_VERSION, SemanticSeededNeighborhoodExpansion,
22 SemanticSeededNeighborhoodOptions, TerseGraphEdge, TerseGraphNode, apply_graph_edge_query_page,
23 apply_graph_query_page, graph_edge_id, graph_semantic_cosine,
24 graph_semantic_top_candidates_by_property_scan, parse_graph_semantic_vector_property,
25 shortest_path_using_outgoing, stable_graph_edge_id,
26};
27
28fn row_usize(row: &Row<'_>, idx: usize) -> rusqlite::Result<usize> {
29 let value: i64 = row.get(idx)?;
30 usize::try_from(value).map_err(|_| rusqlite::Error::IntegralValueOutOfRange(idx, value))
31}
32
33fn row_u64(row: &Row<'_>, idx: usize) -> rusqlite::Result<u64> {
34 let value: i64 = row.get(idx)?;
35 u64::try_from(value).map_err(|_| rusqlite::Error::IntegralValueOutOfRange(idx, value))
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
39#[serde(rename_all = "snake_case")]
40pub enum ReadOnlyRecovery {
41 SnapshotFallback,
42 SnapshotFallbackWal,
43}
44
45pub fn read_only_snapshot_recovery(
46 db_path: &Path,
47 err: &anyhow::Error,
48) -> Option<ReadOnlyRecovery> {
49 if !error_mentions_locked_db(err) {
50 return None;
51 }
52 if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
53 Some(ReadOnlyRecovery::SnapshotFallbackWal)
54 } else if rollback_journal_path(db_path).exists() {
55 Some(ReadOnlyRecovery::SnapshotFallback)
56 } else {
57 None
58 }
59}
60
61pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
62 let mut journal = db_path.as_os_str().to_os_string();
63 journal.push("-journal");
64 PathBuf::from(journal)
65}
66
67pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
68 let mut wal = db_path.as_os_str().to_os_string();
69 wal.push("-wal");
70 PathBuf::from(wal)
71}
72
73pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
74 let mut shm = db_path.as_os_str().to_os_string();
75 shm.push("-shm");
76 PathBuf::from(shm)
77}
78
79pub fn copy_read_only_snapshot(
80 db_path: &Path,
81 default_stem: &str,
82) -> Result<(PathBuf, Vec<PathBuf>)> {
83 let snapshot_path = snapshot_copy_path(db_path, default_stem);
84 std::fs::copy(db_path, &snapshot_path).with_context(|| {
85 format!(
86 "copying locked db {} to snapshot {}",
87 db_path.display(),
88 snapshot_path.display()
89 )
90 })?;
91 let mut cleanup_paths = vec![snapshot_path.clone()];
92 copy_optional_snapshot_sidecar(
93 &wal_sidecar_path(db_path),
94 &wal_sidecar_path(&snapshot_path),
95 &mut cleanup_paths,
96 )?;
97 copy_optional_snapshot_sidecar(
98 &shared_memory_sidecar_path(db_path),
99 &shared_memory_sidecar_path(&snapshot_path),
100 &mut cleanup_paths,
101 )?;
102 Ok((snapshot_path, cleanup_paths))
103}
104
105pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
106 err.chain()
107 .any(|cause| cause.to_string().contains("database is locked"))
108}
109
110fn copy_optional_snapshot_sidecar(
111 source_path: &Path,
112 snapshot_path: &Path,
113 cleanup_paths: &mut Vec<PathBuf>,
114) -> Result<()> {
115 match std::fs::copy(source_path, snapshot_path) {
116 Ok(_) => {
117 cleanup_paths.push(snapshot_path.to_path_buf());
118 Ok(())
119 }
120 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
121 Err(err) => Err(err).with_context(|| {
122 format!(
123 "copying SQLite sidecar {} to snapshot {}",
124 source_path.display(),
125 snapshot_path.display()
126 )
127 }),
128 }
129}
130
131fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
132 let nanos = SystemTime::now()
133 .duration_since(UNIX_EPOCH)
134 .unwrap_or(Duration::ZERO)
135 .as_nanos();
136 let stem = db_path
137 .file_stem()
138 .and_then(|stem| stem.to_str())
139 .unwrap_or(default_stem);
140 let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
141 file_name.push(".db");
142 std::env::temp_dir().join(file_name)
143}
144const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
145const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
146
147pub struct SqliteGraphStore {
160 conn: Connection,
161 _snapshot_copy: Option<SnapshotCopyGuard>,
162 read_only_recovery: Option<ReadOnlyRecovery>,
163 temp_table_active: Cell<bool>,
164}
165
166pub struct SqliteReadOnlyConnection {
177 conn: Connection,
178 _snapshot_copy: Option<SnapshotCopyGuard>,
179 recovery: Option<ReadOnlyRecovery>,
180}
181
182static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
183
184impl SqliteReadOnlyConnection {
185 pub fn conn(&self) -> &Connection {
186 &self.conn
187 }
188
189 pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
190 self.recovery
191 }
192}
193
194struct SnapshotCopyGuard {
195 paths: Vec<PathBuf>,
196}
197
198impl Drop for SnapshotCopyGuard {
199 fn drop(&mut self) {
200 for path in &self.paths {
201 let _ = std::fs::remove_file(path);
202 }
203 }
204}
205
206fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
207 conn.busy_timeout(Duration::from_secs(5))?;
208 conn.pragma_update(None, "journal_mode", "WAL")?;
209 let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
210 if mode.to_lowercase() != "wal" {
211 bail!(
212 "graph substrate db {} requires WAL mode for concurrent reads, got {}",
213 db_path.display(),
214 mode
215 );
216 }
217 conn.pragma_update(
218 None,
219 "wal_autocheckpoint",
220 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
221 )?;
222 let checkpoint_pages: i64 =
223 conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
224 if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
225 bail!(
226 "graph substrate db {} requires wal_autocheckpoint={}, got {}",
227 db_path.display(),
228 SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
229 checkpoint_pages
230 );
231 }
232 Ok(())
233}
234
235fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
236 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
237 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
238 for row in rows {
239 if row? == column {
240 return Ok(true);
241 }
242 }
243 Ok(false)
244}
245
246fn sqlite_table_exists(conn: &Connection, table: &str) -> Result<bool> {
247 conn.query_row(
248 r#"
249 SELECT EXISTS(
250 SELECT 1
251 FROM sqlite_master
252 WHERE type = 'table' AND name = ?1
253 )
254 "#,
255 [table],
256 |row| row.get::<_, bool>(0),
257 )
258 .map_err(Into::into)
259}
260
261fn add_column_if_missing(
262 conn: &Connection,
263 table: &str,
264 column: &str,
265 definition: &str,
266) -> Result<()> {
267 if !sqlite_column_exists(conn, table, column)? {
268 conn.execute(
269 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
270 [],
271 )?;
272 }
273 Ok(())
274}
275
276fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
277 if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
278 return Ok(());
279 }
280 let rows = {
281 let mut stmt = conn.prepare(
282 r#"
283 SELECT from_id, to_id, kind
284 FROM graph_edges
285 WHERE edge_key IS NULL OR edge_key = ''
286 ORDER BY from_id, kind, to_id
287 "#,
288 )?;
289 collect_rows(stmt.query_map([], |row| {
290 Ok((
291 row.get::<_, String>(0)?,
292 row.get::<_, String>(1)?,
293 row.get::<_, String>(2)?,
294 ))
295 })?)?
296 };
297 let mut update = conn.prepare(
298 r#"
299 UPDATE graph_edges
300 SET edge_key = ?4
301 WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
302 "#,
303 )?;
304 for (from_id, to_id, kind) in rows {
305 update.execute((
306 &from_id,
307 &to_id,
308 &kind,
309 stable_graph_edge_id(&from_id, &to_id, &kind),
310 ))?;
311 }
312 Ok(())
313}
314
315fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
316 if old_version < 2 {
317 add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
318 add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
319 add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
320 add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
321 }
322 if old_version < 3 {
323 rebuild_graph_node_properties(conn)?;
324 }
325 if old_version < 4 {
326 ensure_sqlite_graph_operator_stats_schema(conn)?;
327 }
328 if old_version < 5 {
329 add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
330 backfill_graph_edge_keys(conn)?;
331 ensure_sqlite_graph_edge_properties_schema(conn)?;
332 rebuild_graph_edge_properties(conn)?;
333 }
334 if old_version < 6 {
335 ensure_sqlite_graph_semantic_vectors_schema(conn)?;
336 rebuild_graph_node_semantic_vectors(conn)?;
337 }
338 Ok(())
339}
340
341fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
342 conn.execute_batch(
343 r#"
344 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
345 ON graph_nodes(kind, label, id);
346 CREATE TABLE IF NOT EXISTS graph_operator_stats (
347 scope TEXT PRIMARY KEY,
348 nodes INTEGER NOT NULL,
349 edges INTEGER NOT NULL,
350 tombstone_nodes INTEGER NOT NULL,
351 tombstone_edges INTEGER NOT NULL,
352 file_size_bytes INTEGER,
353 freelist_bytes INTEGER,
354 observed_at_unix INTEGER NOT NULL
355 );
356 "#,
357 )?;
358 Ok(())
359}
360
361fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
362 conn.execute_batch(
363 r#"
364 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
365 ON graph_edges(edge_key);
366 CREATE TABLE IF NOT EXISTS graph_edge_properties (
367 edge_key TEXT NOT NULL,
368 key TEXT NOT NULL,
369 value TEXT NOT NULL,
370 PRIMARY KEY (edge_key, key),
371 FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
372 );
373 CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
374 ON graph_edge_properties(key, value, edge_key);
375 "#,
376 )?;
377 Ok(())
378}
379
380fn ensure_sqlite_graph_semantic_vectors_schema(conn: &Connection) -> Result<()> {
381 conn.execute_batch(
382 r#"
383 CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
384 node_id TEXT PRIMARY KEY,
385 kind TEXT NOT NULL,
386 model TEXT NOT NULL,
387 dimensions INTEGER NOT NULL,
388 vector_blob BLOB NOT NULL,
389 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
390 );
391 CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
392 ON graph_node_semantic_vectors(kind, dimensions, node_id);
393 "#,
394 )?;
395 Ok(())
396}
397
398fn replace_node_properties(
399 conn: &Connection,
400 node_id: &str,
401 properties: &BTreeMap<String, String>,
402) -> Result<()> {
403 conn.execute(
404 "DELETE FROM graph_node_properties WHERE node_id = ?1",
405 [node_id],
406 )?;
407 let mut insert = conn.prepare(
408 r#"
409 INSERT INTO graph_node_properties (node_id, key, value)
410 VALUES (?1, ?2, ?3)
411 ON CONFLICT(node_id, key) DO UPDATE SET
412 value = excluded.value
413 "#,
414 )?;
415 for (key, value) in properties {
416 insert.execute((node_id, key, value))?;
417 }
418 Ok(())
419}
420
421struct GraphSemanticVectorRow {
422 model: String,
423 dimensions: usize,
424 vector_blob: Vec<u8>,
425}
426
427fn graph_semantic_vector_row(
428 properties: &BTreeMap<String, String>,
429) -> Option<GraphSemanticVectorRow> {
430 let vector = properties
431 .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
432 .and_then(|value| parse_graph_semantic_vector_property(value))?;
433 Some(GraphSemanticVectorRow {
434 model: properties
435 .get(GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY)
436 .cloned()
437 .unwrap_or_else(|| GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL.to_string()),
438 dimensions: vector.len(),
439 vector_blob: semantic_vector_to_blob(&vector),
440 })
441}
442
443fn semantic_vector_to_blob(vector: &[f64]) -> Vec<u8> {
444 let mut blob = Vec::with_capacity(std::mem::size_of_val(vector));
445 for value in vector {
446 blob.extend_from_slice(&value.to_le_bytes());
447 }
448 blob
449}
450
451fn semantic_vector_from_blob(blob: &[u8], dimensions: usize) -> Option<Vec<f64>> {
452 if dimensions == 0 || blob.len() != dimensions * std::mem::size_of::<f64>() {
453 return None;
454 }
455 let mut vector = Vec::with_capacity(dimensions);
456 for chunk in blob.chunks_exact(std::mem::size_of::<f64>()) {
457 let value = f64::from_le_bytes(chunk.try_into().ok()?);
458 if !value.is_finite() {
459 return None;
460 }
461 vector.push(value);
462 }
463 Some(vector)
464}
465
466fn replace_node_semantic_vector(
467 conn: &Connection,
468 node_id: &str,
469 kind: &str,
470 properties: &BTreeMap<String, String>,
471) -> Result<()> {
472 conn.execute(
473 "DELETE FROM graph_node_semantic_vectors WHERE node_id = ?1",
474 [node_id],
475 )?;
476 let Some(row) = graph_semantic_vector_row(properties) else {
477 return Ok(());
478 };
479 conn.execute(
480 r#"
481 INSERT INTO graph_node_semantic_vectors
482 (node_id, kind, model, dimensions, vector_blob)
483 VALUES (?1, ?2, ?3, ?4, ?5)
484 "#,
485 (
486 node_id,
487 kind,
488 row.model,
489 row.dimensions as i64,
490 row.vector_blob,
491 ),
492 )?;
493 Ok(())
494}
495
496fn replace_edge_properties(
497 conn: &Connection,
498 edge_key: &str,
499 properties: &BTreeMap<String, String>,
500) -> Result<()> {
501 conn.execute(
502 "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
503 [edge_key],
504 )?;
505 let mut insert = conn.prepare(
506 r#"
507 INSERT INTO graph_edge_properties (edge_key, key, value)
508 VALUES (?1, ?2, ?3)
509 ON CONFLICT(edge_key, key) DO UPDATE SET
510 value = excluded.value
511 "#,
512 )?;
513 for (key, value) in properties {
514 insert.execute((edge_key, key, value))?;
515 }
516 Ok(())
517}
518
519fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
520 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
521 return Ok(());
522 }
523 conn.execute_batch(
524 r#"
525 DELETE FROM graph_node_properties;
526 INSERT INTO graph_node_properties (node_id, key, value)
527 SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
528 FROM graph_nodes, json_each(graph_nodes.properties_json)
529 WHERE json_each.key IS NOT NULL
530 AND json_each.value IS NOT NULL
531 "#,
532 )?;
533 Ok(())
534}
535
536fn rebuild_graph_node_semantic_vectors(conn: &Connection) -> Result<()> {
537 if !sqlite_column_exists(conn, "graph_nodes", "properties_json")?
538 || !sqlite_table_exists(conn, "graph_node_semantic_vectors")?
539 {
540 return Ok(());
541 }
542 conn.execute("DELETE FROM graph_node_semantic_vectors", [])?;
543 let rows = {
544 let mut stmt = conn.prepare(
545 r#"
546 SELECT id, kind, properties_json
547 FROM graph_nodes
548 WHERE json_extract(properties_json, '$.embedding') IS NOT NULL
549 ORDER BY id
550 "#,
551 )?;
552 collect_rows(stmt.query_map([], |row| {
553 Ok((
554 row.get::<_, String>(0)?,
555 row.get::<_, String>(1)?,
556 row.get::<_, String>(2)?,
557 ))
558 })?)?
559 };
560 for (node_id, kind, properties_json) in rows {
561 let properties: BTreeMap<String, String> = serde_json::from_str(&properties_json)
562 .with_context(|| format!("parsing semantic properties for graph node {node_id}"))?;
563 replace_node_semantic_vector(conn, &node_id, &kind, &properties)?;
564 }
565 Ok(())
566}
567
568fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
569 if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
570 || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
571 {
572 return Ok(());
573 }
574 conn.execute_batch(
575 r#"
576 DELETE FROM graph_edge_properties;
577 INSERT INTO graph_edge_properties (edge_key, key, value)
578 SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
579 FROM graph_edges, json_each(graph_edges.properties_json)
580 WHERE graph_edges.edge_key IS NOT NULL
581 AND graph_edges.edge_key <> ''
582 AND json_each.key IS NOT NULL
583 AND json_each.value IS NOT NULL
584 "#,
585 )?;
586 Ok(())
587}
588
589pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
590 let conn = Connection::open_with_flags(
591 db_path,
592 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
593 )
594 .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
595 conn.busy_timeout(Duration::from_secs(5))?;
596 Ok(SqliteReadOnlyConnection {
597 conn,
598 _snapshot_copy: None,
599 recovery: None,
600 })
601}
602
603pub fn open_graph_read_only_connection_resilient(
604 db_path: &Path,
605) -> Result<SqliteReadOnlyConnection> {
606 match open_graph_read_only_connection(db_path).and_then(|connection| {
607 connection
608 .conn
609 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
610 Ok(connection)
611 }) {
612 Ok(connection) => Ok(connection),
613 Err(err) => match read_only_snapshot_recovery(db_path, &err) {
614 Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
615 None => Err(err),
616 },
617 }
618}
619
620fn open_graph_read_only_snapshot(
621 db_path: &Path,
622 recovery: ReadOnlyRecovery,
623) -> Result<SqliteReadOnlyConnection> {
624 let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
625 let conn = Connection::open_with_flags(
626 &snapshot_path,
627 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
628 )
629 .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
630 conn.busy_timeout(Duration::from_secs(5))?;
631 Ok(SqliteReadOnlyConnection {
632 conn,
633 _snapshot_copy: Some(SnapshotCopyGuard {
634 paths: cleanup_paths,
635 }),
636 recovery: Some(recovery),
637 })
638}
639
640#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
641pub struct SqliteProjectionRefreshPhase {
642 pub name: String,
643 pub duration_micros: u128,
644 pub detail: String,
645}
646
647#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
648pub struct SqliteProjectionRefresh {
649 pub scope: String,
650 pub projection_version: String,
651 pub source_watermark: Option<String>,
652 pub tombstoned_nodes: Vec<String>,
653 pub tombstoned_edges: Vec<String>,
654 pub upserted_nodes: usize,
655 pub upserted_edges: usize,
656 pub unchanged_nodes: usize,
657 pub unchanged_edges: usize,
658 pub upserted_properties: usize,
659 pub unchanged_properties: usize,
660 pub deleted_properties: usize,
661 pub deleted_nodes: usize,
662 pub deleted_edges: usize,
663 pub pruned_tombstones: usize,
664 pub file_size_bytes_before: Option<u64>,
665 pub file_size_bytes_after: Option<u64>,
666 pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
667}
668
669#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
670pub struct SqliteProjectionVersion {
671 pub projection_version: String,
672 pub content_hash: Option<String>,
673 pub source_watermark: Option<String>,
674}
675
676fn sqlite_refresh_phase_timing(
677 name: &str,
678 started: Instant,
679 detail: &str,
680) -> SqliteProjectionRefreshPhase {
681 SqliteProjectionRefreshPhase {
682 name: name.to_string(),
683 duration_micros: started.elapsed().as_micros(),
684 detail: detail.to_string(),
685 }
686}
687
688fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
689 let row = format!(
690 "({})",
691 (0..column_count)
692 .map(|_| "?")
693 .collect::<Vec<_>>()
694 .join(", ")
695 );
696 (0..row_count)
697 .map(|_| row.as_str())
698 .collect::<Vec<_>>()
699 .join(", ")
700}
701
702fn sqlite_stage_all_ids(
703 tx: &rusqlite::Transaction<'_>,
704 nodes: &[GraphNode],
705 edges: &[GraphEdge],
706) -> Result<()> {
707 for chunk in nodes
708 .iter()
709 .map(|n| &n.id)
710 .collect::<Vec<_>>()
711 .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
712 {
713 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
714 let sql = format!(
715 "INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}",
716 placeholders.join(", ")
717 );
718 let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
719 tx.execute(&sql, params_from_iter(values.iter()))?;
720 }
721 for chunk in edges
722 .iter()
723 .map(graph_edge_id)
724 .collect::<Vec<_>>()
725 .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
726 {
727 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
728 let sql = format!(
729 "INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}",
730 placeholders.join(", ")
731 );
732 let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
733 tx.execute(&sql, params_from_iter(values.iter()))?;
734 }
735 Ok(())
736}
737
738fn sqlite_stage_projection_nodes(
739 tx: &rusqlite::Transaction<'_>,
740 nodes: &[&GraphNode],
741 source_watermark: Option<&str>,
742) -> Result<()> {
743 let mut insert_semantic_vector = tx.prepare(
744 r#"
745 INSERT INTO next_graph_node_semantic_vectors
746 (node_id, kind, model, dimensions, vector_blob)
747 VALUES (?1, ?2, ?3, ?4, ?5)
748 "#,
749 )?;
750 for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
751 let sql = format!(
752 r#"
753 INSERT INTO next_graph_nodes
754 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
755 VALUES {}
756 "#,
757 sqlite_graph_staging_placeholders(8, chunk.len())
758 );
759 let mut values = Vec::with_capacity(chunk.len() * 8);
760 for node in chunk {
761 values.push(Value::Text(node.id.clone()));
762 values.push(Value::Text(node.kind.clone()));
763 values.push(Value::Text(node.label.clone()));
764 values.push(Value::Text(to_json(&node.properties)?));
765 values.push(Value::Text(to_json(&node.provenance)?));
766 values.push(
767 optional_to_json(&node.freshness)?
768 .map(Value::Text)
769 .unwrap_or(Value::Null),
770 );
771 values.push(Value::Text(row_hash(node)?));
772 values.push(
773 source_watermark
774 .map(|watermark| Value::Text(watermark.to_string()))
775 .unwrap_or(Value::Null),
776 );
777 }
778 tx.execute(&sql, params_from_iter(values))?;
779 for node in chunk {
780 let Some(row) = graph_semantic_vector_row(&node.properties) else {
781 continue;
782 };
783 insert_semantic_vector.execute((
784 &node.id,
785 &node.kind,
786 row.model,
787 row.dimensions as i64,
788 row.vector_blob,
789 ))?;
790 }
791 }
792 Ok(())
793}
794
795fn sqlite_stage_projection_edges(
796 tx: &rusqlite::Transaction<'_>,
797 edges: &[&GraphEdge],
798 source_watermark: Option<&str>,
799) -> Result<()> {
800 for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
801 let sql = format!(
802 r#"
803 INSERT INTO next_graph_edges
804 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
805 VALUES {}
806 "#,
807 sqlite_graph_staging_placeholders(9, chunk.len())
808 );
809 let mut values = Vec::with_capacity(chunk.len() * 9);
810 for edge in chunk {
811 values.push(Value::Text(graph_edge_id(edge)));
812 values.push(Value::Text(edge.from_id.clone()));
813 values.push(Value::Text(edge.to_id.clone()));
814 values.push(Value::Text(edge.kind.clone()));
815 values.push(Value::Text(to_json(&edge.properties)?));
816 values.push(Value::Text(to_json(&edge.provenance)?));
817 values.push(
818 optional_to_json(&edge.freshness)?
819 .map(Value::Text)
820 .unwrap_or(Value::Null),
821 );
822 values.push(Value::Text(row_hash(edge)?));
823 values.push(
824 source_watermark
825 .map(|watermark| Value::Text(watermark.to_string()))
826 .unwrap_or(Value::Null),
827 );
828 }
829 tx.execute(&sql, params_from_iter(values))?;
830 }
831 Ok(())
832}
833
834impl SqliteGraphStore {
835 fn assert_not_in_temp_table_section(&self) {
836 if self.temp_table_active.get() {
837 panic!(
838 "SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection"
839 );
840 }
841 }
842
843 pub fn open(db_path: &Path) -> Result<Self> {
844 if let Some(parent) = db_path.parent() {
845 std::fs::create_dir_all(parent)
846 .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
847 }
848 let conn = Connection::open(db_path)
849 .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
850 configure_writable_graph_connection(&conn, db_path)?;
851 Self::from_connection(conn)
852 }
853
854 pub fn in_memory() -> Result<Self> {
855 let conn = Connection::open_in_memory()?;
856 conn.busy_timeout(Duration::from_secs(5))?;
857 Self::from_connection(conn)
858 }
859
860 pub fn open_read_only(db_path: &Path) -> Result<Self> {
861 let connection = open_graph_read_only_connection(db_path)?;
862 Self::from_read_only_connection(connection)
863 }
864
865 pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
866 let connection = open_graph_read_only_connection_resilient(db_path)?;
867 Self::from_read_only_connection(connection)
868 }
869
870 pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
871 self.read_only_recovery
872 }
873
874 pub fn has_user_triggers(&self) -> Result<bool> {
875 self.conn
876 .query_row(
877 r#"
878 SELECT EXISTS(
879 SELECT 1
880 FROM sqlite_master
881 WHERE type = 'trigger'
882 AND name NOT LIKE 'sqlite_%'
883 )
884 "#,
885 [],
886 |row| row.get::<_, bool>(0),
887 )
888 .map_err(Into::into)
889 }
890
891 fn from_connection(conn: Connection) -> Result<Self> {
892 conn.pragma_update(None, "foreign_keys", "ON")?;
893 let user_version: i64 =
894 conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
895 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
896 bail!(
897 "graph.db schema version {} is newer than supported version {}",
898 user_version,
899 SQLITE_GRAPH_SCHEMA_VERSION
900 );
901 }
902 conn.execute_batch(
903 r#"
904 CREATE TABLE IF NOT EXISTS graph_nodes (
905 id TEXT PRIMARY KEY,
906 kind TEXT NOT NULL,
907 label TEXT NOT NULL,
908 properties_json TEXT NOT NULL DEFAULT '{}',
909 provenance_json TEXT NOT NULL DEFAULT '[]',
910 freshness_json TEXT,
911 row_hash TEXT,
912 source_watermark TEXT
913 );
914 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
915 ON graph_nodes(kind);
916 CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
917 ON graph_nodes(kind, label, id);
918
919 CREATE TABLE IF NOT EXISTS graph_edges (
920 edge_key TEXT NOT NULL UNIQUE,
921 from_id TEXT NOT NULL,
922 to_id TEXT NOT NULL,
923 kind TEXT NOT NULL,
924 properties_json TEXT NOT NULL DEFAULT '{}',
925 provenance_json TEXT NOT NULL DEFAULT '[]',
926 freshness_json TEXT,
927 row_hash TEXT,
928 source_watermark TEXT,
929 PRIMARY KEY (from_id, to_id, kind),
930 FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
931 FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
932 );
933 CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
934 ON graph_edges(from_id, kind);
935 CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
936 ON graph_edges(to_id, kind);
937
938 CREATE TABLE IF NOT EXISTS graph_projection_versions (
939 scope TEXT PRIMARY KEY,
940 projection_version TEXT NOT NULL,
941 content_hash TEXT,
942 source_watermark TEXT,
943 observed_at_unix INTEGER NOT NULL
944 );
945
946 CREATE TABLE IF NOT EXISTS graph_tombstones (
947 row_key TEXT PRIMARY KEY,
948 row_kind TEXT NOT NULL,
949 deleted_at_unix INTEGER NOT NULL
950 );
951
952 CREATE TABLE IF NOT EXISTS graph_node_properties (
953 node_id TEXT NOT NULL,
954 key TEXT NOT NULL,
955 value TEXT NOT NULL,
956 PRIMARY KEY (node_id, key),
957 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
958 );
959 CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
960 ON graph_node_properties(key, value, node_id);
961
962 CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
963 node_id TEXT PRIMARY KEY,
964 kind TEXT NOT NULL,
965 model TEXT NOT NULL,
966 dimensions INTEGER NOT NULL,
967 vector_blob BLOB NOT NULL,
968 FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
969 );
970 CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
971 ON graph_node_semantic_vectors(kind, dimensions, node_id);
972
973 CREATE TABLE IF NOT EXISTS graph_operator_stats (
974 scope TEXT PRIMARY KEY,
975 nodes INTEGER NOT NULL,
976 edges INTEGER NOT NULL,
977 tombstone_nodes INTEGER NOT NULL,
978 tombstone_edges INTEGER NOT NULL,
979 file_size_bytes INTEGER,
980 freelist_bytes INTEGER,
981 observed_at_unix INTEGER NOT NULL
982 );
983 "#,
984 )?;
985 if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
986 migrate_sqlite_graph_schema(&conn, user_version)?;
987 conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
988 }
989 Ok(Self {
990 conn,
991 _snapshot_copy: None,
992 read_only_recovery: None,
993 temp_table_active: Cell::new(false),
994 })
995 }
996
997 fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
998 connection.conn.pragma_update(None, "foreign_keys", "ON")?;
999 let user_version: i64 =
1000 connection
1001 .conn
1002 .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
1003 if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
1004 bail!(
1005 "graph.db schema version {} is newer than supported version {}",
1006 user_version,
1007 SQLITE_GRAPH_SCHEMA_VERSION
1008 );
1009 }
1010 connection
1011 .conn
1012 .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
1013 Ok(Self {
1014 conn: connection.conn,
1015 _snapshot_copy: connection._snapshot_copy,
1016 read_only_recovery: connection.recovery,
1017 temp_table_active: Cell::new(false),
1018 })
1019 }
1020
1021 pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1022 self.replace_projection_with_version("root", projection, None, None)
1023 .map(|_| ())
1024 }
1025
1026 pub fn replace_projection_with_version(
1027 &mut self,
1028 scope: impl Into<String>,
1029 projection: &GraphProjection,
1030 projection_version: Option<&str>,
1031 source_watermark: Option<String>,
1032 ) -> Result<SqliteProjectionRefresh> {
1033 self.assert_not_in_temp_table_section();
1034 self.temp_table_active.set(true);
1035 let scope = scope.into();
1036 let mut result = self.replace_projection_with_version_fallible(
1037 scope,
1038 projection,
1039 projection_version,
1040 source_watermark,
1041 );
1042 self.temp_table_active.set(false);
1043 if let Ok(ref mut refresh) = result {
1044 let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
1045 let autocheckpoint = if total_rows > 10000 {
1046 8192
1047 } else if total_rows > 1000 {
1048 4096
1049 } else {
1050 2048
1051 };
1052 let _ = self
1053 .conn
1054 .pragma_update(None, "wal_autocheckpoint", autocheckpoint);
1055 let checkpoint_started = Instant::now();
1063 let checkpoint = self
1064 .conn
1065 .query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
1066 Ok((
1067 row.get::<_, i64>(0)?,
1068 row.get::<_, i64>(1)?,
1069 row.get::<_, i64>(2)?,
1070 ))
1071 });
1072 let (name, detail) = match checkpoint {
1073 Ok((0, _log, _checkpointed)) => (
1074 "wal_checkpoint:ok",
1075 "wal_checkpoint(TRUNCATE) truncated the WAL".to_string(),
1076 ),
1077 Ok((_busy, log, checkpointed)) => (
1078 "wal_checkpoint:busy",
1079 format!(
1080 "wal_checkpoint(TRUNCATE) was blocked by concurrent readers \
1081 ({checkpointed}/{log} frames checkpointed, WAL not truncated); \
1082 the -wal file may grow until readers release and a writer truncates it"
1083 ),
1084 ),
1085 Err(err) => (
1086 "wal_checkpoint:error",
1087 format!(
1088 "wal_checkpoint(TRUNCATE) failed: {err}; \
1089 the -wal file may grow until a writer can checkpoint it"
1090 ),
1091 ),
1092 };
1093 refresh.phase_timings.push(SqliteProjectionRefreshPhase {
1094 name: name.to_string(),
1095 duration_micros: checkpoint_started.elapsed().as_micros(),
1096 detail,
1097 });
1098 }
1099 result
1100 }
1101
1102 fn replace_projection_with_version_fallible(
1103 &mut self,
1104 scope: String,
1105 projection: &GraphProjection,
1106 projection_version: Option<&str>,
1107 source_watermark: Option<String>,
1108 ) -> Result<SqliteProjectionRefresh> {
1109 let projection_version = projection_version
1110 .map(str::to_string)
1111 .or_else(|| projection_version_from_nodes(&projection.nodes))
1112 .unwrap_or_else(|| "unversioned".to_string());
1113 let projection_hash = projection_hash_from_nodes(&projection.nodes);
1114 let observed_at_unix = unix_now();
1115 let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
1116 let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
1117 let mut phase_timings = Vec::new();
1118
1119 let tx = self.conn.transaction()?;
1120 let started = Instant::now();
1121 tx.execute_batch(
1122 r#"
1123 CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
1124 id TEXT PRIMARY KEY,
1125 kind TEXT NOT NULL,
1126 label TEXT NOT NULL,
1127 properties_json TEXT NOT NULL,
1128 provenance_json TEXT NOT NULL,
1129 freshness_json TEXT,
1130 row_hash TEXT NOT NULL,
1131 source_watermark TEXT
1132 );
1133 CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
1134 edge_key TEXT PRIMARY KEY,
1135 from_id TEXT NOT NULL,
1136 to_id TEXT NOT NULL,
1137 kind TEXT NOT NULL,
1138 properties_json TEXT NOT NULL,
1139 provenance_json TEXT NOT NULL,
1140 freshness_json TEXT,
1141 row_hash TEXT NOT NULL,
1142 source_watermark TEXT
1143 );
1144 CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
1145 ON next_graph_edges(from_id, to_id, kind);
1146 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
1147 node_id TEXT NOT NULL,
1148 key TEXT NOT NULL,
1149 value TEXT NOT NULL,
1150 PRIMARY KEY (node_id, key)
1151 );
1152 CREATE TEMP TABLE IF NOT EXISTS next_graph_node_semantic_vectors (
1153 node_id TEXT PRIMARY KEY,
1154 kind TEXT NOT NULL,
1155 model TEXT NOT NULL,
1156 dimensions INTEGER NOT NULL,
1157 vector_blob BLOB NOT NULL
1158 );
1159 CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
1160 edge_key TEXT NOT NULL,
1161 key TEXT NOT NULL,
1162 value TEXT NOT NULL,
1163 PRIMARY KEY (edge_key, key)
1164 );
1165 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
1166 id TEXT PRIMARY KEY
1167 );
1168 CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
1169 edge_key TEXT PRIMARY KEY
1170 );
1171 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
1172 id TEXT PRIMARY KEY
1173 );
1174 CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
1175 edge_key TEXT PRIMARY KEY
1176 );
1177 DELETE FROM next_graph_nodes;
1178 DELETE FROM next_graph_edges;
1179 DELETE FROM next_graph_node_properties;
1180 DELETE FROM next_graph_node_semantic_vectors;
1181 DELETE FROM next_graph_edge_properties;
1182 DELETE FROM next_graph_changed_nodes;
1183 DELETE FROM next_graph_changed_edges;
1184 DELETE FROM next_graph_all_node_ids;
1185 DELETE FROM next_graph_all_edge_keys;
1186 "#,
1187 )?;
1188 phase_timings.push(sqlite_refresh_phase_timing(
1189 "sqlite_temp_table_prepare",
1190 started,
1191 "create and clear refresh staging tables before row loading",
1192 ));
1193 let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
1194 (
1195 projection.nodes.iter().collect(),
1196 projection.edges.iter().collect(),
1197 0usize,
1198 0usize,
1199 )
1200 } else {
1201 let existing_node_hashes: BTreeMap<String, String> = {
1202 let mut stmt =
1203 tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
1204 let rows = stmt.query_map([], |row| {
1205 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1206 })?;
1207 collect_rows(rows)?.into_iter().collect()
1208 };
1209 let existing_edge_hashes: BTreeMap<String, String> = {
1210 let mut stmt = tx.prepare(
1211 "SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL",
1212 )?;
1213 let rows = stmt.query_map([], |row| {
1214 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1215 })?;
1216 collect_rows(rows)?.into_iter().collect()
1217 };
1218 let cn: Vec<&GraphNode> = projection
1219 .nodes
1220 .iter()
1221 .filter(|n| {
1222 let hash = row_hash(*n).ok();
1223 hash.as_ref()
1224 .is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
1225 })
1226 .collect();
1227 let ce: Vec<&GraphEdge> = projection
1228 .edges
1229 .iter()
1230 .filter(|e| {
1231 let hash = row_hash(*e).ok();
1232 let ek = graph_edge_id(e);
1233 hash.as_ref()
1234 .is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
1235 })
1236 .collect();
1237 let sn = projection.nodes.len() - cn.len();
1238 let se = projection.edges.len() - ce.len();
1239 (cn, ce, sn, se)
1240 };
1241 {
1242 let started = Instant::now();
1243 sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
1244 sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
1245 sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
1246 phase_timings.push(sqlite_refresh_phase_timing(
1247 "sqlite_node_staging",
1248 started,
1249 &format!(
1250 "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
1251 changed_nodes.len(),
1252 skipped_nodes,
1253 changed_edges.len(),
1254 skipped_edges,
1255 SQLITE_GRAPH_STAGING_CHUNK_ROWS
1256 ),
1257 ));
1258 }
1259 {
1260 let started = Instant::now();
1261 let changed_nodes_sql = if force_refresh_writes {
1262 r#"
1263 INSERT INTO next_graph_changed_nodes (id)
1264 SELECT id
1265 FROM next_graph_nodes
1266 "#
1267 } else {
1268 r#"
1269 INSERT INTO next_graph_changed_nodes (id)
1270 SELECT n.id
1271 FROM next_graph_nodes n
1272 LEFT JOIN graph_nodes g ON g.id = n.id
1273 WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
1274 "#
1275 };
1276 tx.execute(changed_nodes_sql, [])?;
1277 tx.execute_batch(
1278 r#"
1279 INSERT INTO next_graph_node_properties (node_id, key, value)
1280 SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
1281 FROM next_graph_nodes n
1282 JOIN next_graph_changed_nodes c ON c.id = n.id,
1283 json_each(n.properties_json)
1284 WHERE json_each.key IS NOT NULL
1285 AND json_each.value IS NOT NULL
1286 "#,
1287 )?;
1288 phase_timings.push(sqlite_refresh_phase_timing(
1289 "sqlite_property_row_staging",
1290 started,
1291 "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1292 ));
1293 }
1294 {
1295 let started = Instant::now();
1296 let changed_edges_sql = if force_refresh_writes {
1297 r#"
1298 INSERT INTO next_graph_changed_edges (edge_key)
1299 SELECT edge_key
1300 FROM next_graph_edges
1301 "#
1302 } else {
1303 r#"
1304 INSERT INTO next_graph_changed_edges (edge_key)
1305 SELECT n.edge_key
1306 FROM next_graph_edges n
1307 LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1308 WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1309 "#
1310 };
1311 tx.execute(changed_edges_sql, [])?;
1312 tx.execute_batch(
1313 r#"
1314 INSERT INTO next_graph_edge_properties (edge_key, key, value)
1315 SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1316 FROM next_graph_edges e
1317 JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1318 json_each(e.properties_json)
1319 WHERE json_each.key IS NOT NULL
1320 AND json_each.value IS NOT NULL
1321 "#,
1322 )?;
1323 phase_timings.push(sqlite_refresh_phase_timing(
1324 "sqlite_edge_property_row_staging",
1325 started,
1326 "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1327 ));
1328 }
1329
1330 let delta_started = Instant::now();
1331 let tombstoned_nodes = {
1332 let sql = if force_refresh_writes {
1333 r#"
1334 SELECT g.id
1335 FROM graph_nodes g
1336 LEFT JOIN next_graph_nodes n ON n.id = g.id
1337 WHERE n.id IS NULL
1338 ORDER BY g.id
1339 "#
1340 } else {
1341 r#"
1342 SELECT g.id
1343 FROM graph_nodes g
1344 LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1345 WHERE n.id IS NULL
1346 ORDER BY g.id
1347 "#
1348 };
1349 let mut stmt = tx.prepare(sql)?;
1350 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1351 };
1352 let tombstoned_edges = {
1353 let sql = if force_refresh_writes {
1354 r#"
1355 SELECT g.edge_key
1356 FROM graph_edges g
1357 LEFT JOIN next_graph_edges n
1358 ON n.edge_key = g.edge_key
1359 WHERE n.edge_key IS NULL
1360 ORDER BY g.edge_key
1361 "#
1362 } else {
1363 r#"
1364 SELECT g.edge_key
1365 FROM graph_edges g
1366 LEFT JOIN next_graph_all_edge_keys n
1367 ON n.edge_key = g.edge_key
1368 WHERE n.edge_key IS NULL
1369 ORDER BY g.edge_key
1370 "#
1371 };
1372 let mut stmt = tx.prepare(sql)?;
1373 collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1374 };
1375 let unchanged_nodes: usize = if force_refresh_writes {
1376 tx.query_row(
1377 r#"
1378 SELECT COUNT(*)
1379 FROM next_graph_nodes n
1380 JOIN graph_nodes g ON g.id = n.id
1381 WHERE g.row_hash = n.row_hash
1382 "#,
1383 [],
1384 |row| row_usize(row, 0),
1385 )?
1386 } else {
1387 skipped_nodes
1388 };
1389 let unchanged_edges: usize = if force_refresh_writes {
1390 tx.query_row(
1391 r#"
1392 SELECT COUNT(*)
1393 FROM next_graph_edges n
1394 JOIN graph_edges g
1395 ON g.edge_key = n.edge_key
1396 WHERE g.row_hash = n.row_hash
1397 "#,
1398 [],
1399 |row| row_usize(row, 0),
1400 )?
1401 } else {
1402 skipped_edges
1403 };
1404 let reused_owner_node_properties: usize = if force_refresh_writes {
1405 tx.query_row(
1406 r#"
1407 SELECT COUNT(*)
1408 FROM graph_node_properties g
1409 JOIN next_graph_nodes n ON n.id = g.node_id
1410 LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1411 WHERE c.id IS NULL
1412 "#,
1413 [],
1414 |row| row_usize(row, 0),
1415 )?
1416 } else {
1417 tx.query_row(
1418 r#"
1419 SELECT COUNT(*)
1420 FROM graph_node_properties g
1421 JOIN next_graph_all_node_ids a ON a.id = g.node_id
1422 LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1423 WHERE c.id IS NULL
1424 "#,
1425 [],
1426 |row| row_usize(row, 0),
1427 )?
1428 };
1429 let reused_owner_edge_properties: usize = if force_refresh_writes {
1430 tx.query_row(
1431 r#"
1432 SELECT COUNT(*)
1433 FROM graph_edge_properties g
1434 JOIN next_graph_edges n ON n.edge_key = g.edge_key
1435 LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1436 WHERE c.edge_key IS NULL
1437 "#,
1438 [],
1439 |row| row_usize(row, 0),
1440 )?
1441 } else {
1442 tx.query_row(
1443 r#"
1444 SELECT COUNT(*)
1445 FROM graph_edge_properties g
1446 JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1447 LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1448 WHERE c.edge_key IS NULL
1449 "#,
1450 [],
1451 |row| row_usize(row, 0),
1452 )?
1453 };
1454 let unchanged_changed_node_properties: usize = tx.query_row(
1455 r#"
1456 SELECT COUNT(*)
1457 FROM next_graph_node_properties n
1458 JOIN graph_node_properties g
1459 ON g.node_id = n.node_id AND g.key = n.key
1460 WHERE g.value = n.value
1461 "#,
1462 [],
1463 |row| row_usize(row, 0),
1464 )?;
1465 let unchanged_changed_edge_properties: usize = tx.query_row(
1466 r#"
1467 SELECT COUNT(*)
1468 FROM next_graph_edge_properties n
1469 JOIN graph_edge_properties g
1470 ON g.edge_key = n.edge_key AND g.key = n.key
1471 WHERE g.value = n.value
1472 "#,
1473 [],
1474 |row| row_usize(row, 0),
1475 )?;
1476 let unchanged_properties = reused_owner_node_properties
1477 + reused_owner_edge_properties
1478 + unchanged_changed_node_properties
1479 + unchanged_changed_edge_properties;
1480
1481 let deleted_edges = if force_refresh_writes {
1482 tx.execute(
1483 r#"
1484 DELETE FROM graph_edges
1485 WHERE NOT EXISTS (
1486 SELECT 1
1487 FROM next_graph_edges n
1488 WHERE n.edge_key = graph_edges.edge_key
1489 )
1490 "#,
1491 [],
1492 )?
1493 } else {
1494 tx.execute(
1495 r#"
1496 DELETE FROM graph_edges
1497 WHERE NOT EXISTS (
1498 SELECT 1
1499 FROM next_graph_all_edge_keys n
1500 WHERE n.edge_key = graph_edges.edge_key
1501 )
1502 "#,
1503 [],
1504 )?
1505 };
1506 let deleted_nodes = if force_refresh_writes {
1507 tx.execute(
1508 r#"
1509 DELETE FROM graph_nodes
1510 WHERE NOT EXISTS (
1511 SELECT 1
1512 FROM next_graph_nodes n
1513 WHERE n.id = graph_nodes.id
1514 )
1515 "#,
1516 [],
1517 )?
1518 } else {
1519 tx.execute(
1520 r#"
1521 DELETE FROM graph_nodes
1522 WHERE NOT EXISTS (
1523 SELECT 1
1524 FROM next_graph_all_node_ids n
1525 WHERE n.id = graph_nodes.id
1526 )
1527 "#,
1528 [],
1529 )?
1530 };
1531
1532 let upsert_nodes_sql = r#"
1533 INSERT INTO graph_nodes
1534 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1535 SELECT
1536 n.id,
1537 n.kind,
1538 n.label,
1539 n.properties_json,
1540 n.provenance_json,
1541 n.freshness_json,
1542 n.row_hash,
1543 n.source_watermark
1544 FROM next_graph_nodes n
1545 JOIN next_graph_changed_nodes c ON c.id = n.id
1546 WHERE true
1547 ON CONFLICT(id) DO UPDATE SET
1548 kind = excluded.kind,
1549 label = excluded.label,
1550 properties_json = excluded.properties_json,
1551 provenance_json = excluded.provenance_json,
1552 freshness_json = excluded.freshness_json,
1553 row_hash = excluded.row_hash,
1554 source_watermark = excluded.source_watermark
1555 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1556 "#;
1557 tx.execute(upsert_nodes_sql, [])?;
1558 tx.execute(
1559 r#"
1560 DELETE FROM graph_node_semantic_vectors
1561 WHERE EXISTS (
1562 SELECT 1
1563 FROM next_graph_changed_nodes c
1564 WHERE c.id = graph_node_semantic_vectors.node_id
1565 )
1566 AND NOT EXISTS (
1567 SELECT 1
1568 FROM next_graph_node_semantic_vectors n
1569 WHERE n.node_id = graph_node_semantic_vectors.node_id
1570 )
1571 "#,
1572 [],
1573 )?;
1574 tx.execute(
1575 r#"
1576 INSERT INTO graph_node_semantic_vectors
1577 (node_id, kind, model, dimensions, vector_blob)
1578 SELECT n.node_id, n.kind, n.model, n.dimensions, n.vector_blob
1579 FROM next_graph_node_semantic_vectors n
1580 WHERE true
1581 ON CONFLICT(node_id) DO UPDATE SET
1582 kind = excluded.kind,
1583 model = excluded.model,
1584 dimensions = excluded.dimensions,
1585 vector_blob = excluded.vector_blob
1586 "#,
1587 [],
1588 )?;
1589 let upsert_edges_sql = r#"
1590 INSERT INTO graph_edges
1591 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1592 SELECT
1593 n.edge_key,
1594 n.from_id,
1595 n.to_id,
1596 n.kind,
1597 n.properties_json,
1598 n.provenance_json,
1599 n.freshness_json,
1600 n.row_hash,
1601 n.source_watermark
1602 FROM next_graph_edges n
1603 JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1604 WHERE true
1605 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1606 edge_key = excluded.edge_key,
1607 properties_json = excluded.properties_json,
1608 provenance_json = excluded.provenance_json,
1609 freshness_json = excluded.freshness_json,
1610 row_hash = excluded.row_hash,
1611 source_watermark = excluded.source_watermark
1612 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1613 "#;
1614 tx.execute(upsert_edges_sql, [])?;
1615 let deleted_node_properties = tx.execute(
1616 r#"
1617 DELETE FROM graph_node_properties
1618 WHERE EXISTS (
1619 SELECT 1
1620 FROM next_graph_changed_nodes c
1621 WHERE c.id = graph_node_properties.node_id
1622 )
1623 AND NOT EXISTS (
1624 SELECT 1
1625 FROM next_graph_node_properties n
1626 WHERE n.node_id = graph_node_properties.node_id
1627 AND n.key = graph_node_properties.key
1628 )
1629 "#,
1630 [],
1631 )?;
1632 let deleted_edge_properties = tx.execute(
1633 r#"
1634 DELETE FROM graph_edge_properties
1635 WHERE EXISTS (
1636 SELECT 1
1637 FROM next_graph_changed_edges c
1638 WHERE c.edge_key = graph_edge_properties.edge_key
1639 )
1640 AND NOT EXISTS (
1641 SELECT 1
1642 FROM next_graph_edge_properties n
1643 WHERE n.edge_key = graph_edge_properties.edge_key
1644 AND n.key = graph_edge_properties.key
1645 )
1646 "#,
1647 [],
1648 )?;
1649 let deleted_properties = deleted_node_properties + deleted_edge_properties;
1650 let upsert_properties_sql = r#"
1651 INSERT INTO graph_node_properties (node_id, key, value)
1652 SELECT n.node_id, n.key, n.value
1653 FROM next_graph_node_properties n
1654 LEFT JOIN graph_node_properties g
1655 ON g.node_id = n.node_id AND g.key = n.key
1656 WHERE g.node_id IS NULL OR g.value IS NOT n.value
1657 ON CONFLICT(node_id, key) DO UPDATE SET
1658 value = excluded.value
1659 WHERE graph_node_properties.value IS NOT excluded.value
1660 "#;
1661 let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1662 let upsert_edge_properties_sql = r#"
1663 INSERT INTO graph_edge_properties (edge_key, key, value)
1664 SELECT n.edge_key, n.key, n.value
1665 FROM next_graph_edge_properties n
1666 LEFT JOIN graph_edge_properties g
1667 ON g.edge_key = n.edge_key AND g.key = n.key
1668 WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1669 ON CONFLICT(edge_key, key) DO UPDATE SET
1670 value = excluded.value
1671 WHERE graph_edge_properties.value IS NOT excluded.value
1672 "#;
1673 let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1674 let upserted_properties = upserted_node_properties + upserted_edge_properties;
1675 tx.execute(
1676 r#"
1677 INSERT INTO graph_projection_versions
1678 (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1679 VALUES (?1, ?2, ?3, ?4, ?5)
1680 ON CONFLICT(scope) DO UPDATE SET
1681 projection_version = excluded.projection_version,
1682 content_hash = excluded.content_hash,
1683 source_watermark = excluded.source_watermark,
1684 observed_at_unix = excluded.observed_at_unix
1685 "#,
1686 (
1687 &scope,
1688 &projection_version,
1689 &projection_hash,
1690 &source_watermark,
1691 observed_at_unix,
1692 ),
1693 )?;
1694 let pruned_node_tombstones = tx.execute(
1695 r#"
1696 DELETE FROM graph_tombstones
1697 WHERE row_kind = 'node'
1698 AND EXISTS (
1699 SELECT 1
1700 FROM next_graph_nodes n
1701 WHERE n.id = substr(graph_tombstones.row_key, 6)
1702 )
1703 "#,
1704 [],
1705 )?;
1706 let pruned_edge_tombstones = tx.execute(
1707 r#"
1708 DELETE FROM graph_tombstones
1709 WHERE row_kind = 'edge'
1710 AND EXISTS (
1711 SELECT 1
1712 FROM next_graph_edges n
1713 WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1714 )
1715 "#,
1716 [],
1717 )?;
1718 {
1719 let mut insert_node_tombstone = tx.prepare(
1720 r#"
1721 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1722 VALUES (?1, 'node', ?2)
1723 ON CONFLICT(row_key) DO UPDATE SET
1724 row_kind = excluded.row_kind,
1725 deleted_at_unix = excluded.deleted_at_unix
1726 "#,
1727 )?;
1728 for id in &tombstoned_nodes {
1729 insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1730 }
1731 }
1732 {
1733 let mut insert_edge_tombstone = tx.prepare(
1734 r#"
1735 INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1736 VALUES (?1, 'edge', ?2)
1737 ON CONFLICT(row_key) DO UPDATE SET
1738 row_kind = excluded.row_kind,
1739 deleted_at_unix = excluded.deleted_at_unix
1740 "#,
1741 )?;
1742 for key in &tombstoned_edges {
1743 insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1744 }
1745 }
1746 let tombstone_node_count: usize = tx.query_row(
1747 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1748 [],
1749 |row| row_usize(row, 0),
1750 )?;
1751 let tombstone_edge_count: usize = tx.query_row(
1752 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1753 [],
1754 |row| row_usize(row, 0),
1755 )?;
1756 tx.execute(
1757 r#"
1758 INSERT INTO graph_operator_stats
1759 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1760 VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1761 ON CONFLICT(scope) DO UPDATE SET
1762 nodes = excluded.nodes,
1763 edges = excluded.edges,
1764 tombstone_nodes = excluded.tombstone_nodes,
1765 tombstone_edges = excluded.tombstone_edges,
1766 file_size_bytes = excluded.file_size_bytes,
1767 freelist_bytes = excluded.freelist_bytes,
1768 observed_at_unix = excluded.observed_at_unix
1769 "#,
1770 (
1771 &scope,
1772 projection.nodes.len() as i64,
1773 projection.edges.len() as i64,
1774 tombstone_node_count as i64,
1775 tombstone_edge_count as i64,
1776 observed_at_unix,
1777 ),
1778 )?;
1779 phase_timings.push(sqlite_refresh_phase_timing(
1780 "sqlite_delta_write",
1781 delta_started,
1782 "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1783 ));
1784 let commit_started = Instant::now();
1785 tx.commit()?;
1786 phase_timings.push(sqlite_refresh_phase_timing(
1787 "sqlite_commit",
1788 commit_started,
1789 "commit refresh transaction and publish old-or-new graph visibility",
1790 ));
1791 let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1792 let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1793 let stats_started = Instant::now();
1794 self.conn.execute(
1795 r#"
1796 UPDATE graph_operator_stats
1797 SET file_size_bytes = ?2,
1798 freelist_bytes = ?3,
1799 observed_at_unix = ?4
1800 WHERE scope = ?1
1801 "#,
1802 (
1803 &scope,
1804 file_size_bytes_after.map(|value| value as i64),
1805 freelist_bytes_after.map(|value| value as i64),
1806 unix_now(),
1807 ),
1808 )?;
1809 phase_timings.push(sqlite_refresh_phase_timing(
1810 "sqlite_stats_cache_update",
1811 stats_started,
1812 "persist post-commit file and freelist proof for status/doctor",
1813 ));
1814 Ok(SqliteProjectionRefresh {
1815 scope,
1816 projection_version,
1817 source_watermark,
1818 upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1819 upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1820 unchanged_nodes,
1821 unchanged_edges,
1822 upserted_properties,
1823 unchanged_properties,
1824 deleted_properties,
1825 deleted_nodes,
1826 deleted_edges,
1827 pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1828 file_size_bytes_before,
1829 file_size_bytes_after,
1830 tombstoned_nodes,
1831 tombstoned_edges,
1832 phase_timings,
1833 })
1834 }
1835
1836 pub fn derive_ontology(&self) -> Result<GraphProjection> {
1845 let mut projection = GraphProjection::default();
1846
1847 let mut node_stmt = self.conn.prepare(
1848 "SELECT kind, COUNT(*) FROM graph_nodes \
1849 WHERE kind != 'ontology_type' \
1850 GROUP BY kind ORDER BY kind",
1851 )?;
1852 let node_rows = node_stmt.query_map([], |row| {
1853 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1854 })?;
1855 for row in node_rows {
1856 let (kind, count) = row?;
1857 projection.nodes.push(
1858 GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1859 .with_property("type_kind", &kind)
1860 .with_property("instance_count", count.to_string())
1861 .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1862 );
1863 }
1864
1865 let mut rel_stmt = self.conn.prepare(
1866 "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1867 FROM graph_edges e \
1868 JOIN graph_nodes n1 ON e.from_id = n1.id \
1869 JOIN graph_nodes n2 ON e.to_id = n2.id \
1870 WHERE e.kind NOT LIKE 'ontology_relation:%' \
1871 AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1872 GROUP BY n1.kind, e.kind, n2.kind \
1873 ORDER BY n1.kind, e.kind, n2.kind",
1874 )?;
1875 let rel_rows = rel_stmt.query_map([], |row| {
1876 Ok((
1877 row.get::<_, String>(0)?,
1878 row.get::<_, String>(1)?,
1879 row.get::<_, String>(2)?,
1880 row.get::<_, i64>(3)?,
1881 ))
1882 })?;
1883 for row in rel_rows {
1884 let (from_kind, edge_kind, to_kind, count) = row?;
1885 projection.edges.push(
1886 GraphEdge::new(
1887 format!("ontology_type:{from_kind}"),
1888 format!("ontology_type:{to_kind}"),
1889 format!("ontology_relation:{edge_kind}"),
1890 )
1891 .with_property("edge_kind", &edge_kind)
1892 .with_property("instance_count", count.to_string())
1893 .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1894 );
1895 }
1896
1897 Ok(projection)
1898 }
1899
1900 pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1901 let tx = self.conn.transaction()?;
1902 {
1903 let mut insert_node = tx.prepare(
1904 r#"
1905 INSERT INTO graph_nodes
1906 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1907 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1908 ON CONFLICT(id) DO UPDATE SET
1909 kind = excluded.kind,
1910 label = excluded.label,
1911 properties_json = excluded.properties_json,
1912 provenance_json = excluded.provenance_json,
1913 freshness_json = excluded.freshness_json,
1914 row_hash = excluded.row_hash,
1915 source_watermark = excluded.source_watermark
1916 WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1917 OR graph_nodes.source_watermark IS NOT excluded.source_watermark
1918 "#,
1919 )?;
1920 let mut delete_properties =
1921 tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1922 let mut insert_property = tx.prepare(
1923 r#"
1924 INSERT INTO graph_node_properties (node_id, key, value)
1925 VALUES (?1, ?2, ?3)
1926 "#,
1927 )?;
1928 for node in &projection.nodes {
1929 let changed = insert_node.execute((
1930 &node.id,
1931 &node.kind,
1932 &node.label,
1933 to_json(&node.properties)?,
1934 to_json(&node.provenance)?,
1935 optional_to_json(&node.freshness)?,
1936 row_hash(node)?,
1937 ))?;
1938 if changed > 0 {
1939 delete_properties.execute([&node.id])?;
1940 for (key, value) in &node.properties {
1941 insert_property.execute((&node.id, key, value))?;
1942 }
1943 replace_node_semantic_vector(&tx, &node.id, &node.kind, &node.properties)?;
1944 }
1945 }
1946 }
1947 {
1948 let mut insert_edge = tx.prepare(
1949 r#"
1950 INSERT INTO graph_edges
1951 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1952 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1953 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1954 edge_key = excluded.edge_key,
1955 properties_json = excluded.properties_json,
1956 provenance_json = excluded.provenance_json,
1957 freshness_json = excluded.freshness_json,
1958 row_hash = excluded.row_hash,
1959 source_watermark = excluded.source_watermark
1960 WHERE graph_edges.row_hash IS NOT excluded.row_hash
1961 OR graph_edges.source_watermark IS NOT excluded.source_watermark
1962 "#,
1963 )?;
1964 let mut delete_properties =
1965 tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1966 let mut insert_property = tx.prepare(
1967 r#"
1968 INSERT INTO graph_edge_properties (edge_key, key, value)
1969 VALUES (?1, ?2, ?3)
1970 "#,
1971 )?;
1972 for edge in &projection.edges {
1973 let edge_key = graph_edge_id(edge);
1974 let changed = insert_edge.execute((
1975 &edge_key,
1976 &edge.from_id,
1977 &edge.to_id,
1978 &edge.kind,
1979 to_json(&edge.properties)?,
1980 to_json(&edge.provenance)?,
1981 optional_to_json(&edge.freshness)?,
1982 row_hash(edge)?,
1983 ))?;
1984 if changed > 0 {
1985 delete_properties.execute([&edge_key])?;
1986 for (key, value) in &edge.properties {
1987 insert_property.execute((&edge_key, key, value))?;
1988 }
1989 }
1990 }
1991 }
1992 tx.commit()?;
1993 Ok(())
1994 }
1995
1996 pub fn delete_source_projection(&mut self, source_ref: &str, provider: &str) -> Result<usize> {
2004 let deleted = self.conn.execute(
2005 r#"
2006 DELETE FROM graph_nodes
2007 WHERE id IN (
2008 SELECT node_id FROM graph_node_properties
2009 WHERE key = 'source_ref' AND value = ?1
2010 )
2011 AND id IN (
2012 SELECT node_id FROM graph_node_properties
2013 WHERE key = 'provider' AND value = ?2
2014 )
2015 "#,
2016 (source_ref, provider),
2017 )?;
2018 Ok(deleted)
2019 }
2020
2021 pub fn link_nodes_by_shared_property(
2029 &mut self,
2030 kind: &str,
2031 id_key: &str,
2032 id_prefix: &str,
2033 edge_kind: &str,
2034 edge_properties: &[(&str, &str)],
2035 ) -> Result<usize> {
2036 let mut groups: BTreeMap<String, Vec<String>> = BTreeMap::new();
2039 {
2040 let mut stmt = self.conn.prepare(
2041 "SELECT p.value, n.id \
2042 FROM graph_nodes n \
2043 JOIN graph_node_properties p ON p.node_id = n.id \
2044 WHERE n.kind = ?1 AND p.key = ?2 AND p.value LIKE ?3 || '%' \
2045 ORDER BY p.value, n.id",
2046 )?;
2047 let rows = stmt.query_map((kind, id_key, id_prefix), |row| {
2048 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2049 })?;
2050 for row in rows {
2051 let (shared, node_id) = row?;
2052 groups.entry(shared).or_default().push(node_id);
2053 }
2054 }
2055
2056 let mut edges: Vec<GraphEdge> = Vec::new();
2057 for members in groups.values() {
2058 let Some((rep, rest)) = members.split_first() else {
2059 continue;
2060 };
2061 for member in rest {
2062 let mut edge = GraphEdge::new(member.clone(), rep.clone(), edge_kind);
2063 for (key, value) in edge_properties {
2064 edge = edge.with_property(*key, *value);
2065 }
2066 edges.push(edge);
2067 }
2068 }
2069
2070 let count = edges.len();
2071 if count > 0 {
2072 self.upsert_projection(&GraphProjection {
2073 nodes: Vec::new(),
2074 edges,
2075 })?;
2076 }
2077 Ok(count)
2078 }
2079
2080 pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
2081 self.conn
2082 .query_row(
2083 r#"
2084 SELECT projection_version, content_hash, source_watermark
2085 FROM graph_projection_versions
2086 WHERE scope = ?1
2087 "#,
2088 [scope],
2089 |row| {
2090 Ok(SqliteProjectionVersion {
2091 projection_version: row.get(0)?,
2092 content_hash: row.get(1)?,
2093 source_watermark: row.get(2)?,
2094 })
2095 },
2096 )
2097 .optional()
2098 .map_err(Into::into)
2099 }
2100
2101 pub fn update_projection_source_watermark(
2102 &mut self,
2103 scope: &str,
2104 source_watermark: Option<String>,
2105 ) -> Result<()> {
2106 self.conn.execute(
2107 r#"
2108 UPDATE graph_projection_versions
2109 SET source_watermark = ?2
2110 WHERE scope = ?1
2111 "#,
2112 (scope, source_watermark),
2113 )?;
2114 Ok(())
2115 }
2116
2117 pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
2118 let pruned_tombstones = if prune_tombstones {
2119 self.conn.execute("DELETE FROM graph_tombstones", [])?
2120 } else {
2121 0
2122 };
2123 self.conn.execute_batch(
2124 r#"
2125 PRAGMA wal_checkpoint(TRUNCATE);
2126 VACUUM;
2127 "#,
2128 )?;
2129 let nodes = self
2130 .conn
2131 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2132 row.get::<_, i64>(0)
2133 })?;
2134 let edges = self
2135 .conn
2136 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2137 row.get::<_, i64>(0)
2138 })?;
2139 let tombstone_nodes = self.conn.query_row(
2140 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
2141 [],
2142 |row| row.get::<_, i64>(0),
2143 )?;
2144 let tombstone_edges = self.conn.query_row(
2145 "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
2146 [],
2147 |row| row.get::<_, i64>(0),
2148 )?;
2149 let file_size_bytes = sqlite_database_size_bytes(&self.conn)
2150 .ok()
2151 .map(|value| value as i64);
2152 let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
2153 .ok()
2154 .map(|value| value as i64);
2155 self.conn.execute(
2156 r#"
2157 INSERT INTO graph_operator_stats
2158 (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
2159 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
2160 ON CONFLICT(scope) DO UPDATE SET
2161 nodes = excluded.nodes,
2162 edges = excluded.edges,
2163 tombstone_nodes = excluded.tombstone_nodes,
2164 tombstone_edges = excluded.tombstone_edges,
2165 file_size_bytes = excluded.file_size_bytes,
2166 freelist_bytes = excluded.freelist_bytes,
2167 observed_at_unix = excluded.observed_at_unix
2168 "#,
2169 (
2170 scope,
2171 nodes,
2172 edges,
2173 tombstone_nodes,
2174 tombstone_edges,
2175 file_size_bytes,
2176 freelist_bytes,
2177 ),
2178 )?;
2179 Ok(pruned_tombstones)
2180 }
2181
2182 fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2183 let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
2184 let in_clause = placeholders.join(", ");
2185 let sql = format!(
2186 "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
2187 FROM graph_edges e \
2188 WHERE e.from_id IN ({in_clause}) \
2189 AND e.to_id IN ({in_clause}) \
2190 ORDER BY e.from_id, e.kind, e.to_id"
2191 );
2192 let values: Vec<Value> = node_ids
2193 .iter()
2194 .chain(node_ids.iter())
2195 .map(|id| Value::Text(id.clone()))
2196 .collect();
2197 let mut stmt = self.conn.prepare(&sql)?;
2198 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2199 }
2200}
2201
2202fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
2203 let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
2204 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2205 row.get::<_, String>(3)
2206 })?)
2207}
2208
2209fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
2210 let mut diagnostics = vec![format!(
2211 "sqlite query pushdown active; plan: {}",
2212 plan.join(" | ")
2213 )];
2214 for expected_index in expected_indexes {
2215 if plan.iter().any(|row| row.contains(expected_index)) {
2216 diagnostics.push(format!("sqlite query plan uses {expected_index}"));
2217 } else {
2218 diagnostics.push(format!(
2219 "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
2220 ));
2221 }
2222 }
2223 diagnostics
2224}
2225
2226#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2227pub struct TerseDiagnostic {
2228 pub code: &'static str,
2229 #[serde(skip_serializing_if = "Option::is_none")]
2230 pub index: Option<String>,
2231}
2232
2233#[allow(dead_code)]
2234fn terse_query_plan_diagnostics(
2235 plan: &[String],
2236 expected_indexes: &[&str],
2237) -> Vec<TerseDiagnostic> {
2238 let mut diagnostics = vec![TerseDiagnostic {
2239 code: "plan_active",
2240 index: None,
2241 }];
2242 for expected_index in expected_indexes {
2243 if plan.iter().any(|row| row.contains(expected_index)) {
2244 diagnostics.push(TerseDiagnostic {
2245 code: "idx_ok",
2246 index: Some(expected_index.to_string()),
2247 });
2248 } else {
2249 diagnostics.push(TerseDiagnostic {
2250 code: "idx_missing",
2251 index: Some(expected_index.to_string()),
2252 });
2253 }
2254 }
2255 diagnostics
2256}
2257
2258fn push_sqlite_property_filter_exists(
2259 sql: &mut String,
2260 values: &mut Vec<Value>,
2261 node_alias: &str,
2262 filters: &[GraphPropertyFilter],
2263) {
2264 for (index, filter) in filters.iter().enumerate() {
2265 sql.push_str(&format!(
2266 r#"
2267 AND EXISTS (
2268 SELECT 1
2269 FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
2270 WHERE p{index}.node_id = {node_alias}.id
2271 AND p{index}.key = ?
2272 AND p{index}.value = ?
2273 )
2274 "#
2275 ));
2276 values.push(Value::Text(filter.key.clone()));
2277 values.push(Value::Text(filter.value.clone()));
2278 }
2279}
2280
2281fn push_sqlite_edge_property_filter_exists(
2282 sql: &mut String,
2283 values: &mut Vec<Value>,
2284 edge_alias: &str,
2285 filters: &[GraphPropertyFilter],
2286) {
2287 for (index, filter) in filters.iter().enumerate() {
2288 sql.push_str(&format!(
2289 r#"
2290 AND EXISTS (
2291 SELECT 1
2292 FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
2293 WHERE ep{index}.edge_key = {edge_alias}.edge_key
2294 AND ep{index}.key = ?
2295 AND ep{index}.value = ?
2296 )
2297 "#
2298 ));
2299 values.push(Value::Text(filter.key.clone()));
2300 values.push(Value::Text(filter.value.clone()));
2301 }
2302}
2303
2304struct SqliteIncidentEdgeBranch<'a> {
2305 index_name: &'a str,
2306 endpoint_column: &'a str,
2307 node_id: &'a str,
2308 kind: Option<&'a str>,
2309 filters: &'a [GraphPropertyFilter],
2310 cursor: Option<&'a str>,
2311}
2312
2313fn push_sqlite_incident_edge_branch(
2314 sql: &mut String,
2315 values: &mut Vec<Value>,
2316 branch: SqliteIncidentEdgeBranch<'_>,
2317) {
2318 sql.push_str(&format!(
2319 r#"
2320 SELECT
2321 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2322 FROM graph_edges e INDEXED BY {index_name}
2323 WHERE e.{endpoint_column} = ?
2324 "#,
2325 index_name = branch.index_name,
2326 endpoint_column = branch.endpoint_column,
2327 ));
2328 values.push(Value::Text(branch.node_id.to_string()));
2329 if let Some(kind) = branch.kind {
2330 sql.push_str(" AND e.kind = ?");
2331 values.push(Value::Text(kind.to_string()));
2332 }
2333 push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
2334 if let Some(cursor) = branch.cursor {
2335 sql.push_str(" AND e.edge_key > ?");
2336 values.push(Value::Text(cursor.to_string()));
2337 }
2338}
2339
2340fn sqlite_incident_edges_union_query(
2341 node_id: &str,
2342 kind: Option<&str>,
2343 filters: &[GraphPropertyFilter],
2344 cursor: Option<&str>,
2345 limit: Option<usize>,
2346) -> (String, Vec<Value>) {
2347 let mut sql = String::from(
2348 r#"
2349 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2350 FROM (
2351 "#,
2352 );
2353 let mut values = Vec::new();
2354 push_sqlite_incident_edge_branch(
2355 &mut sql,
2356 &mut values,
2357 SqliteIncidentEdgeBranch {
2358 index_name: "idx_graph_edges_from_kind",
2359 endpoint_column: "from_id",
2360 node_id,
2361 kind,
2362 filters,
2363 cursor,
2364 },
2365 );
2366 sql.push_str(" UNION ");
2367 push_sqlite_incident_edge_branch(
2368 &mut sql,
2369 &mut values,
2370 SqliteIncidentEdgeBranch {
2371 index_name: "idx_graph_edges_to_kind",
2372 endpoint_column: "to_id",
2373 node_id,
2374 kind,
2375 filters,
2376 cursor,
2377 },
2378 );
2379 sql.push_str(
2380 r#"
2381 ) e
2382 ORDER BY e.edge_key
2383 "#,
2384 );
2385 if let Some(limit) = limit {
2386 sql.push_str(" LIMIT ?");
2387 values.push(Value::Integer(limit.saturating_add(1) as i64));
2388 }
2389 (sql, values)
2390}
2391
2392fn sqlite_semantic_seeded_edge_score_expr(edge_alias: &str, direction_bonus: &str) -> String {
2393 format!(
2394 "(CASE {edge_alias}.kind \
2395WHEN 'semantic_relation' THEN 340 \
2396WHEN 'mentions_entity' THEN 280 \
2397WHEN 'mentions_concept' THEN 280 \
2398WHEN 'tagged_entity' THEN 280 \
2399WHEN 'tagged_concept' THEN 280 \
2400WHEN 'related_concept' THEN 280 \
2401WHEN 'mentions' THEN 220 \
2402WHEN 'calls' THEN 200 \
2403WHEN 'requests_context' THEN 180 \
2404WHEN 'scopes_context' THEN 180 \
2405WHEN 'scopes_source' THEN 180 \
2406WHEN 'explains_result' THEN 180 \
2407WHEN 'defines' THEN 120 \
2408WHEN 'contains' THEN 120 \
2409WHEN 'belongs_to' THEN 120 \
2410WHEN {edge_alias}.kind LIKE '%community%' THEN 200 \
2411WHEN {edge_alias}.kind LIKE '%semantic%' \
2412OR {edge_alias}.kind LIKE '%concept%' \
2413OR {edge_alias}.kind LIKE '%entity%' THEN 240 \
2414ELSE 80 END) \
2415+ ({direction_bonus}) \
2416+ (CASE {edge_alias}.kind \
2417WHEN 'mentions_concept' THEN 30 \
2418WHEN 'mentions_entity' THEN 30 \
2419WHEN 'tagged_concept' THEN 30 \
2420WHEN 'tagged_entity' THEN 30 \
2421WHEN 'related_concept' THEN 30 \
2422WHEN 'semantic_relation' THEN 28 \
2423WHEN 'calls' THEN 24 \
2424WHEN 'mentions' THEN 22 \
2425WHEN 'requests_context' THEN 18 \
2426WHEN 'scopes_context' THEN 18 \
2427WHEN 'scopes_source' THEN 18 \
2428WHEN 'explains_result' THEN 18 \
2429WHEN 'defines' THEN 12 \
2430WHEN 'contains' THEN 12 \
2431WHEN 'belongs_to' THEN 12 \
2432ELSE 0 END)"
2433 )
2434}
2435
2436impl GraphStore for SqliteGraphStore {
2437 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
2438 self.conn.execute(
2439 r#"
2440 INSERT INTO graph_nodes
2441 (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2442 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
2443 ON CONFLICT(id) DO UPDATE SET
2444 kind = excluded.kind,
2445 label = excluded.label,
2446 properties_json = excluded.properties_json,
2447 provenance_json = excluded.provenance_json,
2448 freshness_json = excluded.freshness_json,
2449 row_hash = excluded.row_hash,
2450 source_watermark = excluded.source_watermark
2451 "#,
2452 (
2453 &node.id,
2454 &node.kind,
2455 &node.label,
2456 to_json(&node.properties)?,
2457 to_json(&node.provenance)?,
2458 optional_to_json(&node.freshness)?,
2459 row_hash(node)?,
2460 ),
2461 )?;
2462 replace_node_properties(&self.conn, &node.id, &node.properties)?;
2463 replace_node_semantic_vector(&self.conn, &node.id, &node.kind, &node.properties)?;
2464 Ok(())
2465 }
2466
2467 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2468 let edge_key = graph_edge_id(edge);
2469 self.conn.execute(
2470 r#"
2471 INSERT INTO graph_edges
2472 (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2473 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2474 ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2475 edge_key = excluded.edge_key,
2476 properties_json = excluded.properties_json,
2477 provenance_json = excluded.provenance_json,
2478 freshness_json = excluded.freshness_json,
2479 row_hash = excluded.row_hash,
2480 source_watermark = excluded.source_watermark
2481 "#,
2482 (
2483 &edge_key,
2484 &edge.from_id,
2485 &edge.to_id,
2486 &edge.kind,
2487 to_json(&edge.properties)?,
2488 to_json(&edge.provenance)?,
2489 optional_to_json(&edge.freshness)?,
2490 row_hash(edge)?,
2491 ),
2492 )?;
2493 replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2494 Ok(())
2495 }
2496
2497 fn delete_node(&self, id: &str) -> Result<usize> {
2498 self.conn
2499 .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2500 .map_err(Into::into)
2501 }
2502
2503 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2504 self.conn
2505 .execute(
2506 "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2507 (from_id, to_id, kind),
2508 )
2509 .map_err(Into::into)
2510 }
2511
2512 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2513 self.conn
2514 .query_row(
2515 r#"
2516SELECT id, kind, label, properties_json, provenance_json, freshness_json
2517 FROM graph_nodes
2518 WHERE id = ?1
2519 "#,
2520 [id],
2521 node_from_row,
2522 )
2523 .optional()
2524 .map_err(Into::into)
2525 }
2526
2527 fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
2528 let unique_ids = ids.iter().cloned().collect::<BTreeSet<_>>();
2529 if unique_ids.is_empty() {
2530 return Ok(Vec::new());
2531 }
2532
2533 let mut nodes = Vec::new();
2534 let id_refs = unique_ids.iter().collect::<Vec<_>>();
2535 for chunk in id_refs.chunks(450) {
2536 let placeholders = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
2537 let sql = format!(
2538 "SELECT id, kind, label, properties_json, provenance_json, freshness_json \
2539FROM graph_nodes \
2540WHERE id IN ({placeholders}) \
2541ORDER BY id"
2542 );
2543 let values = chunk
2544 .iter()
2545 .map(|id| Value::Text((*id).clone()))
2546 .collect::<Vec<_>>();
2547 let mut stmt = self.conn.prepare(&sql)?;
2548 nodes.extend(collect_rows(
2549 stmt.query_map(params_from_iter(values.iter()), node_from_row)?,
2550 )?);
2551 }
2552 nodes.sort_by(|left, right| left.id.cmp(&right.id));
2553 Ok(nodes)
2554 }
2555
2556 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2557 let mut stmt = self.conn.prepare(
2558 r#"
2559SELECT id, kind, label, properties_json, provenance_json, freshness_json
2560 FROM graph_nodes
2561 ORDER BY id
2562 "#,
2563 )?;
2564 collect_rows(stmt.query_map([], node_from_row)?)
2565 }
2566
2567 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2568 let mut stmt = self.conn.prepare(
2569 r#"
2570 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2571 FROM graph_edges
2572 ORDER BY from_id, kind, to_id
2573 "#,
2574 )?;
2575 collect_rows(stmt.query_map([], edge_from_row)?)
2576 }
2577
2578 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2579 self.conn
2580 .query_row(
2581 r#"
2582 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2583 FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2584 WHERE edge_key = ?1
2585 "#,
2586 [edge_id],
2587 edge_from_row,
2588 )
2589 .optional()
2590 .map_err(Into::into)
2591 }
2592
2593 fn graph_counts(&self) -> Result<(usize, usize)> {
2594 let nodes = self
2595 .conn
2596 .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2597 row_usize(row, 0)
2598 })?;
2599 let edges = self
2600 .conn
2601 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2602 row_usize(row, 0)
2603 })?;
2604 Ok((nodes, edges))
2605 }
2606
2607 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2608 match kind {
2609 Some(kind) => self
2610 .conn
2611 .query_row(
2612 r#"
2613 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2614 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2615 WHERE from_id <> to_id AND kind = ?1
2616 ORDER BY from_id, kind, to_id
2617 LIMIT 1
2618 "#,
2619 [kind],
2620 edge_from_row,
2621 )
2622 .optional()
2623 .map_err(Into::into),
2624 None => self
2625 .conn
2626 .query_row(
2627 r#"
2628 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2629 FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2630 WHERE from_id <> to_id
2631 ORDER BY from_id, kind, to_id
2632 LIMIT 1
2633 "#,
2634 [],
2635 edge_from_row,
2636 )
2637 .optional()
2638 .map_err(Into::into),
2639 }
2640 }
2641
2642 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2643 self.conn
2644 .query_row(
2645 r#"
2646 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2647 ep.key, ep.value
2648 FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2649 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2650 ON e.edge_key = ep.edge_key
2651 WHERE e.from_id <> e.to_id
2652 ORDER BY ep.key, ep.value, ep.edge_key
2653 LIMIT 1
2654 "#,
2655 [],
2656 |row| {
2657 Ok((
2658 edge_from_row(row)?,
2659 GraphPropertyFilter {
2660 key: row.get(7)?,
2661 value: row.get(8)?,
2662 },
2663 ))
2664 },
2665 )
2666 .optional()
2667 .map_err(Into::into)
2668 }
2669
2670 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2671 let mut stmt = self.conn.prepare(
2672 r#"
2673 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2674 FROM graph_nodes
2675 WHERE kind = ?1
2676 ORDER BY id
2677 "#,
2678 )?;
2679 collect_rows(stmt.query_map([kind], node_from_row)?)
2680 }
2681
2682 fn semantic_top_candidates(
2683 &self,
2684 query_vector: &[f64],
2685 kinds: &[&str],
2686 limit: usize,
2687 ) -> Result<Vec<GraphSemanticCandidate>> {
2688 if query_vector.is_empty() || kinds.is_empty() {
2689 return Ok(Vec::new());
2690 }
2691 if !sqlite_table_exists(&self.conn, "graph_node_semantic_vectors")? {
2692 return graph_semantic_top_candidates_by_property_scan(
2693 self,
2694 query_vector,
2695 kinds,
2696 limit,
2697 );
2698 }
2699
2700 let unique_kinds = kinds.iter().copied().collect::<BTreeSet<_>>();
2701 if unique_kinds.is_empty() {
2702 return Ok(Vec::new());
2703 }
2704 let kind_placeholders = unique_kinds
2705 .iter()
2706 .map(|_| "?")
2707 .collect::<Vec<_>>()
2708 .join(", ");
2709 let sql = format!(
2710 r#"
2711 SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2712 graph_node_semantic_vectors.vector_blob,
2713 graph_node_semantic_vectors.dimensions
2714 FROM graph_node_semantic_vectors INDEXED BY idx_graph_node_semantic_vectors_kind_dims
2715 JOIN graph_nodes n ON n.id = graph_node_semantic_vectors.node_id
2716 WHERE graph_node_semantic_vectors.dimensions = ?
2717 AND graph_node_semantic_vectors.kind IN ({kind_placeholders})
2718 ORDER BY graph_node_semantic_vectors.kind, n.label, n.id
2719 "#
2720 );
2721 let mut values = vec![Value::Integer(query_vector.len() as i64)];
2722 values.extend(
2723 unique_kinds
2724 .into_iter()
2725 .map(|kind| Value::Text(kind.to_string())),
2726 );
2727 let rows = {
2728 let mut stmt = self.conn.prepare(&sql)?;
2729 collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2730 Ok((
2731 node_from_row_at(row, 0)?,
2732 row.get::<_, Vec<u8>>(6)?,
2733 row.get::<_, i64>(7)?,
2734 ))
2735 })?)?
2736 };
2737
2738 let mut candidates = rows
2739 .into_iter()
2740 .filter_map(|(node, blob, dimensions)| {
2741 let dimensions = usize::try_from(dimensions).ok()?;
2742 let vector = semantic_vector_from_blob(&blob, dimensions)?;
2743 Some(GraphSemanticCandidate {
2744 score: graph_semantic_cosine(query_vector, &vector),
2745 node,
2746 })
2747 })
2748 .collect::<Vec<_>>();
2749 candidates.sort_by(|left, right| {
2750 right
2751 .score
2752 .partial_cmp(&left.score)
2753 .unwrap_or(std::cmp::Ordering::Equal)
2754 .then_with(|| left.node.kind.cmp(&right.node.kind))
2755 .then_with(|| left.node.label.cmp(&right.node.label))
2756 .then_with(|| left.node.id.cmp(&right.node.id))
2757 });
2758 if limit > 0 && candidates.len() > limit {
2759 candidates.truncate(limit);
2760 }
2761 Ok(candidates)
2762 }
2763
2764 fn paged_nodes_by_kind(
2765 &self,
2766 kind: &str,
2767 options: GraphQueryOptions,
2768 ) -> Result<GraphPagedSubgraph> {
2769 let mut sql = String::from(
2770 r#"
2771 SELECT id, kind, label, properties_json, provenance_json, freshness_json
2772 FROM graph_nodes
2773 WHERE kind = ?
2774 "#,
2775 );
2776 let mut values = vec![Value::Text(kind.to_string())];
2777 push_sqlite_property_filter_exists(
2778 &mut sql,
2779 &mut values,
2780 "graph_nodes",
2781 &options.property_filters,
2782 );
2783 if let Some(cursor) = &options.cursor {
2784 sql.push_str(" AND id > ?");
2785 values.push(Value::Text(cursor.clone()));
2786 }
2787 sql.push_str(" ORDER BY id");
2788 if let Some(limit) = options.limit {
2789 sql.push_str(" LIMIT ?");
2790 values.push(Value::Integer(limit.saturating_add(1) as i64));
2791 }
2792
2793 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2794 let mut stmt = self.conn.prepare(&sql)?;
2795 let mut nodes =
2796 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2797 let before_limit = nodes.len();
2798 let mut next_cursor = None;
2799 if let Some(limit) = options.limit
2800 && nodes.len() > limit
2801 {
2802 next_cursor = nodes
2803 .get(limit.saturating_sub(1))
2804 .map(|node| node.id.clone());
2805 nodes.truncate(limit);
2806 }
2807 let expected_indexes = if options.property_filters.is_empty() {
2808 vec!["idx_graph_nodes_kind"]
2809 } else {
2810 vec![
2811 "idx_graph_nodes_kind",
2812 "idx_graph_node_properties_key_value_node",
2813 ]
2814 };
2815 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2816 if !options.property_filters.is_empty() {
2817 diagnostics.push(
2818 "property filters were evaluated by SQLite materialized property rows before paging"
2819 .to_string(),
2820 );
2821 }
2822 if options.cursor.is_some() {
2823 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2824 }
2825 if next_cursor.is_some() {
2826 diagnostics.push(
2827 "result was truncated; pass page.next_cursor as --cursor for the next page"
2828 .to_string(),
2829 );
2830 }
2831 Ok(GraphPagedSubgraph {
2832 page: GraphQueryPage {
2833 cursor: options.cursor,
2834 limit: options.limit,
2835 next_cursor,
2836 returned_nodes: nodes.len(),
2837 returned_edges: 0,
2838 truncated: options.limit.is_some_and(|limit| before_limit > limit),
2839 diagnostics,
2840 },
2841 nodes,
2842 edges: Vec::new(),
2843 })
2844 }
2845
2846 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2847 match kind {
2848 Some(kind) => {
2849 let mut stmt = self.conn.prepare(
2850 r#"
2851 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2852 FROM graph_edges
2853 WHERE from_id = ?1 AND kind = ?2
2854 ORDER BY to_id, kind
2855 "#,
2856 )?;
2857 collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2858 }
2859 None => {
2860 let mut stmt = self.conn.prepare(
2861 r#"
2862 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2863 FROM graph_edges
2864 WHERE from_id = ?1
2865 ORDER BY to_id, kind
2866 "#,
2867 )?;
2868 collect_rows(stmt.query_map([from_id], edge_from_row)?)
2869 }
2870 }
2871 }
2872
2873 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2874 let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2875 let mut stmt = self.conn.prepare(&sql)?;
2876 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2877 }
2878
2879 fn semantic_seeded_expansion_edges(
2880 &self,
2881 current_id: &str,
2882 options: &SemanticSeededNeighborhoodOptions,
2883 ) -> Result<SemanticSeededNeighborhoodExpansion> {
2884 let from_score = sqlite_semantic_seeded_edge_score_expr("e", "8");
2885 let to_score = sqlite_semantic_seeded_edge_score_expr(
2886 "e",
2887 "CASE WHEN e.from_id = e.to_id THEN 8 ELSE 4 END",
2888 );
2889 let limit_clause = if options.edge_scan_cap > 0 {
2890 "LIMIT ?"
2891 } else {
2892 ""
2893 };
2894 let sql = format!(
2895 r#"
2896WITH candidate_edges AS (
2897 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2898 {from_score} AS score
2899 FROM graph_edges e INDEXED BY idx_graph_edges_from_kind
2900 WHERE e.from_id = ?
2901 UNION ALL
2902 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2903 {to_score} AS score
2904 FROM graph_edges e INDEXED BY idx_graph_edges_to_kind
2905 WHERE e.to_id = ?
2906),
2907ranked_edges AS (
2908 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2909 MAX(score) AS score
2910 FROM candidate_edges
2911 GROUP BY edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2912),
2913limited_edges AS (
2914 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2915 score, COUNT(*) OVER () AS total_edges
2916 FROM ranked_edges
2917 ORDER BY score DESC, edge_key ASC
2918 {limit_clause}
2919)
2920SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, total_edges
2921FROM limited_edges
2922ORDER BY score DESC, edge_key ASC
2923"#
2924 );
2925 let mut values = vec![
2926 Value::Text(current_id.to_string()),
2927 Value::Text(current_id.to_string()),
2928 ];
2929 if options.edge_scan_cap > 0 {
2930 values.push(Value::Integer(
2931 options
2932 .edge_scan_cap
2933 .saturating_add(1)
2934 .min(i64::MAX as usize) as i64,
2935 ));
2936 }
2937 let mut stmt = self.conn.prepare(&sql)?;
2938 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2939 Ok((edge_from_row_at(row, 0)?, row.get::<_, i64>(7)? as usize))
2940 })?)?;
2941 let total_candidates = rows.first().map(|(_, total)| *total).unwrap_or(0);
2942 let mut edges = rows.into_iter().map(|(edge, _)| edge).collect::<Vec<_>>();
2943 let mut skipped_by_edge_cap = 0usize;
2944 if options.edge_scan_cap > 0 && total_candidates > options.edge_scan_cap {
2945 skipped_by_edge_cap = total_candidates - options.edge_scan_cap;
2946 edges.truncate(options.edge_scan_cap);
2947 }
2948 Ok(SemanticSeededNeighborhoodExpansion {
2949 edges,
2950 skipped_by_edge_cap,
2951 })
2952 }
2953
2954 fn paged_edges(
2955 &self,
2956 kind: Option<&str>,
2957 options: GraphQueryOptions,
2958 ) -> Result<GraphPagedSubgraph> {
2959 let primary_property_filter = options.property_filters.first();
2960 let mut values = Vec::new();
2961 let mut sql = if let Some(filter) = primary_property_filter {
2962 values.push(Value::Text(filter.key.clone()));
2963 values.push(Value::Text(filter.value.clone()));
2964 String::from(
2965 r#"
2966 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2967 FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2968 JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2969 ON e.edge_key = ep0.edge_key
2970 WHERE ep0.key = ?
2971 AND ep0.value = ?
2972 "#,
2973 )
2974 } else {
2975 String::from(
2976 r#"
2977 SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2978 FROM graph_edges e
2979 WHERE 1 = 1
2980 "#,
2981 )
2982 };
2983 if let Some(kind) = kind {
2984 sql.push_str(" AND e.kind = ?");
2985 values.push(Value::Text(kind.to_string()));
2986 }
2987 push_sqlite_edge_property_filter_exists(
2988 &mut sql,
2989 &mut values,
2990 "e",
2991 if primary_property_filter.is_some() {
2992 &options.property_filters[1..]
2993 } else {
2994 &options.property_filters
2995 },
2996 );
2997 if let Some(cursor) = &options.cursor {
2998 if primary_property_filter.is_some() {
2999 sql.push_str(" AND ep0.edge_key > ?");
3000 } else {
3001 sql.push_str(" AND e.edge_key > ?");
3002 }
3003 values.push(Value::Text(cursor.clone()));
3004 }
3005 if primary_property_filter.is_some() {
3006 sql.push_str(" ORDER BY ep0.edge_key");
3007 } else {
3008 sql.push_str(" ORDER BY e.edge_key");
3009 }
3010 if let Some(limit) = options.limit {
3011 sql.push_str(" LIMIT ?");
3012 values.push(Value::Integer(limit.saturating_add(1) as i64));
3013 }
3014
3015 let primary_property_row_count = if let Some(filter) = primary_property_filter {
3016 Some(self.conn.query_row(
3017 r#"
3018 SELECT COUNT(*)
3019 FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
3020 WHERE key = ?1 AND value = ?2
3021 "#,
3022 (&filter.key, &filter.value),
3023 |row| row_usize(row, 0),
3024 )?)
3025 } else {
3026 None
3027 };
3028 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3029 let mut stmt = self.conn.prepare(&sql)?;
3030 let mut edges =
3031 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
3032 let before_limit = edges.len();
3033 let mut next_cursor = None;
3034 if let Some(limit) = options.limit
3035 && edges.len() > limit
3036 {
3037 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
3038 edges.truncate(limit);
3039 }
3040 let expected_indexes = if options.property_filters.is_empty() {
3041 vec!["idx_graph_edges_edge_key"]
3042 } else {
3043 vec![
3044 "idx_graph_edge_properties_key_value_edge",
3045 "idx_graph_edges_edge_key",
3046 ]
3047 };
3048 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3049 if !options.property_filters.is_empty() {
3050 if let Some(row_count) = primary_property_row_count {
3051 diagnostics.push(format!(
3052 "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
3053 ));
3054 }
3055 diagnostics.push(
3056 "edge property scan drives from SQLite materialized property rows before joining graph_edges"
3057 .to_string(),
3058 );
3059 }
3060 if options.cursor.is_some() {
3061 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
3062 }
3063 if next_cursor.is_some() {
3064 diagnostics.push(
3065 "result was truncated; pass page.next_cursor as --cursor for the next page"
3066 .to_string(),
3067 );
3068 }
3069 Ok(GraphPagedSubgraph {
3070 page: GraphQueryPage {
3071 cursor: options.cursor,
3072 limit: options.limit,
3073 next_cursor,
3074 returned_nodes: 0,
3075 returned_edges: edges.len(),
3076 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3077 diagnostics,
3078 },
3079 nodes: Vec::new(),
3080 edges,
3081 })
3082 }
3083
3084 fn paged_incident_edges(
3085 &self,
3086 node_id: &str,
3087 kind: Option<&str>,
3088 options: GraphQueryOptions,
3089 ) -> Result<GraphPagedSubgraph> {
3090 let (sql, values) = sqlite_incident_edges_union_query(
3091 node_id,
3092 kind,
3093 &options.property_filters,
3094 options.cursor.as_deref(),
3095 options.limit,
3096 );
3097 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3098 let mut stmt = self.conn.prepare(&sql)?;
3099 let mut edges =
3100 collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
3101 let before_limit = edges.len();
3102 let mut next_cursor = None;
3103 if let Some(limit) = options.limit
3104 && edges.len() > limit
3105 {
3106 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
3107 edges.truncate(limit);
3108 }
3109 let expected_indexes = if options.property_filters.is_empty() {
3110 vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
3111 } else {
3112 vec![
3113 "idx_graph_edges_from_kind",
3114 "idx_graph_edges_to_kind",
3115 "idx_graph_edge_properties_key_value_edge",
3116 ]
3117 };
3118 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3119 diagnostics.push(
3120 "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
3121 .to_string(),
3122 );
3123 if !options.property_filters.is_empty() {
3124 diagnostics.push(
3125 "edge property filters were evaluated by SQLite materialized property rows before paging"
3126 .to_string(),
3127 );
3128 }
3129 if options.cursor.is_some() {
3130 diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
3131 }
3132 if next_cursor.is_some() {
3133 diagnostics.push(
3134 "result was truncated; pass page.next_cursor as --cursor for the next page"
3135 .to_string(),
3136 );
3137 }
3138 Ok(GraphPagedSubgraph {
3139 page: GraphQueryPage {
3140 cursor: options.cursor,
3141 limit: options.limit,
3142 next_cursor,
3143 returned_nodes: 0,
3144 returned_edges: edges.len(),
3145 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3146 diagnostics,
3147 },
3148 nodes: Vec::new(),
3149 edges,
3150 })
3151 }
3152
3153 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
3154 if node_ids.is_empty() {
3155 return Ok(Vec::new());
3156 }
3157 if node_ids.len() <= 20 {
3158 return self.edges_between_nodes_inline(node_ids);
3159 }
3160 self.assert_not_in_temp_table_section();
3161 self.temp_table_active.set(true);
3162 let result = (|| -> Result<Vec<GraphEdge>> {
3163 let tx = self.conn.unchecked_transaction()?;
3164 tx.execute_batch(
3165 r#"
3166 CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
3167 DELETE FROM _edges_between_ids;
3168 "#,
3169 )?;
3170 for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
3171 let row_placeholders: Vec<String> =
3172 chunk.iter().map(|_| "(?)".to_string()).collect();
3173 let placeholders = row_placeholders.join(", ");
3174 let sql =
3175 format!("INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}");
3176 let values: Vec<Value> =
3177 chunk.iter().map(|id| Value::Text((*id).clone())).collect();
3178 tx.execute(&sql, params_from_iter(values.iter()))?;
3179 }
3180 let edges = {
3181 let mut stmt = tx.prepare(
3182 r#"
3183 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3184 FROM graph_edges e
3185 WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
3186 AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
3187 ORDER BY e.from_id, e.kind, e.to_id
3188 "#,
3189 )?;
3190 collect_rows(stmt.query_map([], edge_from_row)?)?
3191 };
3192 tx.finish()?;
3193 Ok(edges)
3194 })();
3195 self.temp_table_active.set(false);
3196 result
3197 }
3198
3199 fn ranked_neighborhood(
3200 &self,
3201 center_id: &str,
3202 options: &RankedNeighborhoodOptions,
3203 ) -> Result<Option<RankedNeighborhoodResult>> {
3204 if self.node(center_id)?.is_none() {
3205 return Ok(None);
3206 }
3207 let center = self.node(center_id)?.unwrap();
3208
3209 let base_score_expr = match options.scoring {
3210 tsift_core::NeighborhoodScoring::BreadthFirst => {
3211 "MAX(0, 120 - (walk.depth * 18))".to_string()
3212 }
3213 tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
3214 "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
3215 WHEN 'semantic_relation' THEN 34 \
3216 WHEN 'mentions_entity' THEN 28 \
3217 WHEN 'mentions_concept' THEN 28 \
3218 WHEN 'tagged_entity' THEN 28 \
3219 WHEN 'tagged_concept' THEN 28 \
3220 WHEN 'related_concept' THEN 28 \
3221 WHEN 'mentions' THEN 22 \
3222 WHEN 'calls' THEN 20 \
3223 WHEN 'requests_context' THEN 18 \
3224 WHEN 'scopes_context' THEN 18 \
3225 WHEN 'scopes_source' THEN 18 \
3226 WHEN 'explains_result' THEN 18 \
3227 WHEN 'defines' THEN 12 \
3228 WHEN 'contains' THEN 12 \
3229 WHEN 'belongs_to' THEN 12 \
3230 ELSE 8 END".to_string()
3231 }
3232 tsift_core::NeighborhoodScoring::DegreeWeighted => {
3233 "MAX(0, 120 - (walk.depth * 18)) + CASE \
3234 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
3235 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
3236 ELSE 0 END"
3237 .to_string()
3238 }
3239 };
3240 let now_unix = options.observed_at_now_unix.unwrap_or_else(unix_now);
3241 let observed_at_half_life_secs = options.observed_at_half_life_secs.max(1);
3242 let observed_at_weight = options.observed_at_weight.max(0);
3243 let memory_node_boost = options.memory_node_boost.max(0);
3244 let observed_at_value = "COALESCE(\
3245 CAST(json_extract(n_score.freshness_json, '$.observed_at_unix') AS INTEGER), \
3246 CAST(json_extract(n_score.properties_json, '$.observed_at_unix') AS INTEGER), \
3247 CAST(json_extract(n_score.properties_json, '$.max_observed_at_unix') AS INTEGER)\
3248 )";
3249 let observed_at_expr = if observed_at_weight == 0 {
3250 "0".to_string()
3251 } else {
3252 format!(
3253 "CASE \
3254 WHEN {observed_at_value} IS NULL THEN 0 \
3255 WHEN ({now_unix} - {observed_at_value}) < {observed_at_half_life_secs} THEN {observed_at_weight} \
3256 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 2) THEN {observed_at_weight} / 2 \
3257 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 4) THEN {observed_at_weight} / 4 \
3258 ELSE 0 END"
3259 )
3260 };
3261 let confidence_value =
3262 "CAST(json_extract(n_score.properties_json, '$.confidence') AS REAL)";
3263 let memory_signal_expr = if memory_node_boost == 0 {
3264 "0".to_string()
3265 } else {
3266 format!(
3267 "(CASE \
3268 WHEN n_score.kind = 'memory_projection' THEN 0 \
3269 WHEN n_score.kind IN ('finding', 'decision', 'memory_event') THEN {memory_node_boost} \
3270 WHEN n_score.kind IN ('note', 'memory_session') THEN {memory_node_boost} / 2 \
3271 WHEN n_score.kind IN ('source_handle', 'semantic_concept', 'semantic_vector_handle') \
3272 AND json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3273 WHEN n_score.kind LIKE 'memory_%' THEN {memory_node_boost} \
3274 WHEN json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3275 ELSE 0 END) \
3276 + (CASE \
3277 WHEN json_extract(n_score.properties_json, '$.confidence') IS NULL THEN 0 \
3278 WHEN {confidence_value} <= 0.0 THEN 0 \
3279 WHEN {confidence_value} >= 1.0 THEN {memory_node_boost} \
3280 ELSE CAST(ROUND({memory_node_boost} * {confidence_value}) AS INTEGER) END)"
3281 )
3282 };
3283 let score_expr =
3284 format!("({base_score_expr}) + ({observed_at_expr}) + ({memory_signal_expr})");
3285
3286 let use_degree_cache = matches!(
3287 options.scoring,
3288 tsift_core::NeighborhoodScoring::DegreeWeighted
3289 );
3290 let degree_cte = if use_degree_cache {
3291 "degree_cache AS ( \
3292 SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
3293 FROM graph_nodes n), "
3294 } else {
3295 ""
3296 };
3297 let mut sql = format!(
3298 r#"
3299WITH RECURSIVE {degree_cte}walk(id, depth, edge_kind, score) AS (
3300SELECT ?, 0, '', ?
3301UNION
3302SELECT e.to_id, walk.depth + 1, e.kind,
3303"#,
3304 );
3305 sql.push_str(&format!(" {}\n", score_expr));
3306 sql.push_str(
3307 r#"
3308FROM walk
3309JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3310ON e.from_id = walk.id
3311JOIN graph_nodes n_score ON n_score.id = e.to_id
3312WHERE walk.depth < ?
3313"#,
3314 );
3315 let mut values = vec![
3316 Value::Text(center_id.to_string()),
3317 Value::Integer(i64::MAX),
3318 Value::Integer(options.depth as i64),
3319 ];
3320 if let Some(kind) = &options.edge_kind {
3321 sql.push_str(" AND e.kind = ?");
3322 values.push(Value::Text(kind.clone()));
3323 }
3324 sql.push_str(
3325 r#"
3326 ),
3327scored_nodes AS (
3328SELECT walk.id, MAX(walk.score) AS score,
3329n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3330FROM walk
3331JOIN graph_nodes n ON n.id = walk.id
3332GROUP BY walk.id
3333 ),
3334 ranked AS (
3335 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3336 FROM scored_nodes
3337 ORDER BY score DESC, id ASC
3338 ),
3339 kept AS (
3340 SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3341 FROM ranked
3342 LIMIT ?
3343 ),
3344 total AS (
3345 SELECT COUNT(*) AS cnt FROM scored_nodes
3346 )
3347 SELECT
3348 'meta' AS row_type,
3349 (SELECT cnt FROM total) AS total_discovered,
3350 0 AS node_id, '' AS node_kind, '' AS node_label,
3351 '' AS node_props, '' AS node_prov, '' AS node_fresh,
3352 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3353 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3354 UNION ALL
3355 SELECT
3356 'node' AS row_type,
3357 0 AS total_discovered,
3358 k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
3359 '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3360 '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3361 FROM kept k
3362 UNION ALL
3363 SELECT
3364 'edge' AS row_type,
3365 0 AS total_discovered,
3366 '' AS node_id, '' AS node_kind, '' AS node_label,
3367 '' AS node_props, '' AS node_prov, '' AS node_fresh,
3368 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3369 FROM graph_edges e
3370 WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
3371 AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
3372"#,
3373 );
3374 values.push(Value::Integer(options.max_nodes.saturating_add(1) as i64));
3375
3376 let mut stmt = self.conn.prepare(&sql)?;
3377 let mut nodes = vec![center.clone()];
3378 let mut edges = Vec::new();
3379 let mut total_discovered = 0usize;
3380
3381 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3382 let row_type: String = row.get(0)?;
3383 match row_type.as_str() {
3384 "meta" => Ok(QueryResult::Meta {
3385 total: row.get::<_, i64>(1)? as usize,
3386 }),
3387 "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
3388 "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
3389 _ => Err(rusqlite::Error::InvalidQuery),
3390 }
3391 })?;
3392 for row_result in rows {
3393 match row_result? {
3394 QueryResult::Meta { total } => {
3395 total_discovered = total;
3396 }
3397 QueryResult::Node(node) => {
3398 if node.id != center_id {
3399 nodes.push(node);
3400 }
3401 }
3402 QueryResult::Edge(edge) => {
3403 edges.push(edge);
3404 }
3405 }
3406 }
3407
3408 let total_discovered = total_discovered.max(nodes.len());
3409 let pruned_count = total_discovered.saturating_sub(nodes.len());
3410
3411 match options.property_mode {
3412 PropertyMode::Full => {}
3413 PropertyMode::Omit => {
3414 for n in &mut nodes {
3415 n.properties.clear();
3416 }
3417 for e in &mut edges {
3418 e.properties.clear();
3419 }
3420 }
3421 PropertyMode::Sample => {
3422 let mut seen_kinds = std::collections::BTreeSet::new();
3423 for n in &mut nodes {
3424 if !seen_kinds.contains(&n.kind) {
3425 seen_kinds.insert(n.kind.clone());
3426 } else {
3427 n.properties.clear();
3428 }
3429 }
3430 for e in &mut edges {
3431 e.properties.clear();
3432 }
3433 }
3434 }
3435
3436 Ok(Some(RankedNeighborhoodResult {
3437 nodes,
3438 edges,
3439 pruned_count,
3440 total_discovered,
3441 }))
3442 }
3443
3444 fn neighborhood(
3445 &self,
3446 center_id: &str,
3447 depth: usize,
3448 kind: Option<&str>,
3449 ) -> Result<Option<GraphSubgraph>> {
3450 self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
3451 .map(|result| {
3452 result.map(|result| {
3453 GraphSubgraph {
3454 nodes: result.nodes,
3455 edges: result.edges,
3456 }
3457 .sorted()
3458 })
3459 })
3460 }
3461
3462 fn paged_neighborhood(
3463 &self,
3464 center_id: &str,
3465 depth: usize,
3466 kind: Option<&str>,
3467 options: GraphQueryOptions,
3468 ) -> Result<Option<GraphPagedSubgraph>> {
3469 if self.node(center_id)?.is_none() {
3470 return Ok(None);
3471 }
3472 let mut sql = String::from(
3473 r#"
3474 WITH RECURSIVE walk(id, depth) AS (
3475 SELECT ?, 0
3476 UNION
3477 SELECT e.to_id, walk.depth + 1
3478 FROM walk
3479 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3480 ON e.from_id = walk.id
3481 WHERE walk.depth < ?
3482 "#,
3483 );
3484 let mut values = vec![
3485 Value::Text(center_id.to_string()),
3486 Value::Integer(depth as i64),
3487 ];
3488 if let Some(kind) = kind {
3489 sql.push_str(" AND e.kind = ?");
3490 values.push(Value::Text(kind.to_string()));
3491 }
3492 sql.push_str(
3493 r#"
3494 ),
3495 filtered_nodes AS (
3496 SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3497 FROM walk
3498 JOIN graph_nodes n ON n.id = walk.id
3499 WHERE 1 = 1
3500 "#,
3501 );
3502 push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
3503 if let Some(cursor) = &options.cursor {
3504 sql.push_str(" AND n.id > ?");
3505 values.push(Value::Text(cursor.clone()));
3506 }
3507 sql.push_str(
3508 r#"
3509 ),
3510 page_nodes AS (
3511 SELECT id, kind, label, properties_json, provenance_json, freshness_json
3512 FROM filtered_nodes
3513 ORDER BY id
3514 "#,
3515 );
3516 if let Some(limit) = options.limit {
3517 sql.push_str(" LIMIT ?");
3518 values.push(Value::Integer(limit.saturating_add(1) as i64));
3519 }
3520 sql.push_str(
3521 r#"
3522 ),
3523 walk_edges AS (
3524 SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3525 FROM walk
3526 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3527 ON e.from_id = walk.id
3528 WHERE walk.depth < ?
3529 "#,
3530 );
3531 values.push(Value::Integer(depth as i64));
3532 if let Some(kind) = kind {
3533 sql.push_str(" AND e.kind = ?");
3534 values.push(Value::Text(kind.to_string()));
3535 }
3536 sql.push_str(
3537 r#"
3538 )
3539 SELECT
3540 'node' AS row_type,
3541 p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
3542 NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
3543 NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
3544 FROM page_nodes p
3545 UNION ALL
3546 SELECT DISTINCT
3547 'edge' AS row_type,
3548 NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
3549 NULL AS provenance_json, NULL AS freshness_json,
3550 e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3551 FROM walk_edges e
3552 WHERE e.from_id IN (SELECT id FROM page_nodes)
3553 AND e.to_id IN (SELECT id FROM page_nodes)
3554 "#,
3555 );
3556
3557 let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3558 let mut stmt = self.conn.prepare(&sql)?;
3559 let mut nodes = Vec::new();
3560 let mut edges = Vec::new();
3561 let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3562 let row_type: String = row.get(0)?;
3563 match row_type.as_str() {
3564 "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
3565 "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
3566 _ => Err(rusqlite::Error::InvalidQuery),
3567 }
3568 })?;
3569 for row in rows {
3570 let (node, edge) = row?;
3571 if let Some(node) = node {
3572 nodes.push(node);
3573 }
3574 if let Some(edge) = edge {
3575 edges.push(edge);
3576 }
3577 }
3578 nodes.sort_by(|left, right| left.id.cmp(&right.id));
3579 let before_limit = nodes.len();
3580 let mut next_cursor = None;
3581 if let Some(limit) = options.limit
3582 && nodes.len() > limit
3583 {
3584 next_cursor = nodes
3585 .get(limit.saturating_sub(1))
3586 .map(|node| node.id.clone());
3587 nodes.truncate(limit);
3588 }
3589 let node_ids = nodes
3590 .iter()
3591 .map(|node| node.id.as_str())
3592 .collect::<BTreeSet<_>>();
3593 edges.retain(|edge| {
3594 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
3595 });
3596 edges.sort_by(|left, right| {
3597 left.from_id
3598 .cmp(&right.from_id)
3599 .then(left.kind.cmp(&right.kind))
3600 .then(left.to_id.cmp(&right.to_id))
3601 });
3602 let expected_indexes = if options.property_filters.is_empty() {
3603 vec!["idx_graph_edges_from_kind"]
3604 } else {
3605 vec![
3606 "idx_graph_edges_from_kind",
3607 "idx_graph_node_properties_key_value_node",
3608 ]
3609 };
3610 let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3611 diagnostics.push(
3612 "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
3613 );
3614 if !options.property_filters.is_empty() {
3615 diagnostics.push(
3616 "property filters were evaluated by SQLite materialized property rows before paging"
3617 .to_string(),
3618 );
3619 }
3620 if options.cursor.is_some() {
3621 diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
3622 }
3623 if next_cursor.is_some() {
3624 diagnostics.push(
3625 "result was truncated; pass page.next_cursor as --cursor for the next page"
3626 .to_string(),
3627 );
3628 }
3629 Ok(Some(GraphPagedSubgraph {
3630 page: GraphQueryPage {
3631 cursor: options.cursor,
3632 limit: options.limit,
3633 next_cursor,
3634 returned_nodes: nodes.len(),
3635 returned_edges: edges.len(),
3636 truncated: options.limit.is_some_and(|limit| before_limit > limit),
3637 diagnostics,
3638 },
3639 nodes,
3640 edges,
3641 }))
3642 }
3643
3644 fn shortest_path(
3645 &self,
3646 from_id: &str,
3647 to_id: &str,
3648 kind: Option<&str>,
3649 ) -> Result<Option<GraphPath>> {
3650 self.shortest_path_with_max_hops(from_id, to_id, kind, None)
3651 }
3652
3653 fn shortest_path_with_max_hops(
3654 &self,
3655 from_id: &str,
3656 to_id: &str,
3657 kind: Option<&str>,
3658 max_hops: Option<usize>,
3659 ) -> Result<Option<GraphPath>> {
3660 if from_id == to_id {
3661 return Ok(Some(GraphPath {
3662 nodes: vec![from_id.to_string()],
3663 hops: 0,
3664 }));
3665 }
3666 let hop_limit = max_hops.unwrap_or(usize::MAX);
3667 if hop_limit == 0 {
3668 return Ok(None);
3669 }
3670
3671 self.assert_not_in_temp_table_section();
3672 self.temp_table_active.set(true);
3673 let result = (|| -> Result<Option<GraphPath>> {
3674 let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
3675 let tbl = format!("_tsift_frontier_{call_id}");
3676
3677 let mut visited = BTreeSet::from([from_id.to_string()]);
3678 let mut parent =
3679 BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
3680 let mut frontier = vec![from_id.to_string()];
3681 self.conn.execute_batch(&format!(
3682 r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
3683 DELETE FROM "{tbl}";"#,
3684 ))?;
3685 let select_sql = if kind.is_some() {
3686 format!(
3687 r#"SELECT e.from_id, e.to_id
3688 FROM "{tbl}" f
3689 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3690 ON e.from_id = f.id
3691 WHERE e.kind = ?
3692 ORDER BY e.from_id, e.to_id, e.kind"#,
3693 )
3694 } else {
3695 format!(
3696 r#"SELECT e.from_id, e.to_id
3697 FROM "{tbl}" f
3698 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3699 ON e.from_id = f.id
3700 ORDER BY e.from_id, e.to_id, e.kind"#,
3701 )
3702 };
3703 let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3704 let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3705 let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3706 let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3707 let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3708 let mut found_path: Option<GraphPath> = None;
3709 for _depth in 0..hop_limit {
3710 if frontier.is_empty() {
3711 break;
3712 }
3713 self.conn.execute(&delete_sql, [])?;
3714 for id in &frontier {
3715 frontier_insert_stmt.execute([id.as_str()])?;
3716 }
3717 let mut next_frontier = BTreeSet::new();
3718 let rows = if let Some(kind) = kind {
3719 collect_rows(frontier_select_stmt.query_map([kind], |row| {
3720 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3721 })?)?
3722 } else {
3723 collect_rows(frontier_select_stmt.query_map([], |row| {
3724 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3725 })?)?
3726 };
3727 for (from, next) in rows {
3728 if !visited.insert(next.clone()) {
3729 continue;
3730 }
3731 parent.insert(next.clone(), from);
3732 if next == to_id {
3733 let mut nodes = vec![to_id.to_string()];
3734 let mut cursor = to_id;
3735 while let Some(previous) = parent.get(cursor) {
3736 if previous.is_empty() {
3737 break;
3738 }
3739 nodes.push(previous.clone());
3740 cursor = previous;
3741 }
3742 nodes.reverse();
3743 found_path = Some(GraphPath {
3744 hops: nodes.len().saturating_sub(1),
3745 nodes,
3746 });
3747 break;
3748 }
3749 next_frontier.insert(next);
3750 }
3751 if found_path.is_some() {
3752 break;
3753 }
3754 frontier = next_frontier.into_iter().collect();
3755 }
3756 let _ = self.conn.execute_batch(&drop_sql);
3757 Ok(found_path)
3758 })();
3759 self.temp_table_active.set(false);
3760 result
3761 }
3762
3763 fn reachable_nodes_by_kind(
3764 &self,
3765 from_id: &str,
3766 kind: &str,
3767 depth: usize,
3768 limit: usize,
3769 ) -> Result<Vec<(GraphNode, GraphPath)>> {
3770 Ok(self
3771 .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3772 .remove(kind)
3773 .unwrap_or_default())
3774 }
3775
3776 fn reachable_nodes_by_kinds(
3777 &self,
3778 from_id: &str,
3779 kinds: &[&str],
3780 depth: usize,
3781 limit: usize,
3782 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3783 let mut requested = kinds
3784 .iter()
3785 .map(|kind| (*kind).to_string())
3786 .collect::<BTreeSet<_>>()
3787 .into_iter()
3788 .collect::<Vec<_>>();
3789 let mut results = requested
3790 .iter()
3791 .map(|kind| (kind.clone(), Vec::new()))
3792 .collect::<BTreeMap<_, _>>();
3793 if requested.is_empty() {
3794 return Ok(results);
3795 }
3796 requested.sort();
3797 let placeholders = std::iter::repeat_n("?", requested.len())
3798 .collect::<Vec<_>>()
3799 .join(", ");
3800 let mut sql = format!(
3801 r#"
3802 WITH RECURSIVE walk(id, depth, path) AS (
3803 SELECT ?, 0, char(31) || ? || char(31)
3804 UNION ALL
3805 SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3806 FROM walk
3807 JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3808 ON e.from_id = walk.id
3809 WHERE walk.depth < ?
3810 AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3811 ),
3812 ranked AS (
3813 SELECT
3814 n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3815 walk.path, walk.depth,
3816 ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3817 FROM walk
3818 JOIN graph_nodes n ON n.id = walk.id
3819 WHERE n.kind IN ({placeholders}) AND n.id <> ?
3820 ),
3821 kind_ranked AS (
3822 SELECT *,
3823 ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3824 FROM ranked
3825 WHERE rn = 1
3826 )
3827 SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3828 FROM kind_ranked
3829 "#,
3830 );
3831 let mut values = vec![
3832 Value::Text(from_id.to_string()),
3833 Value::Text(from_id.to_string()),
3834 Value::Integer(depth as i64),
3835 ];
3836 values.extend(requested.iter().cloned().map(Value::Text));
3837 values.push(Value::Text(from_id.to_string()));
3838 if limit > 0 && limit != usize::MAX {
3839 sql.push_str(" WHERE kind_rank <= ?");
3840 values.push(Value::Integer(limit as i64));
3841 }
3842 sql.push_str(" ORDER BY kind, depth, label, id");
3843 let mut stmt = self.conn.prepare(&sql)?;
3844 let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3845 let node = node_from_row(row)?;
3846 let path: String = row.get(6)?;
3847 let hops = row_usize(row, 7)?;
3848 Ok((
3849 node,
3850 GraphPath {
3851 nodes: path
3852 .split('\u{1f}')
3853 .filter(|part| !part.is_empty())
3854 .map(str::to_string)
3855 .collect(),
3856 hops,
3857 },
3858 ))
3859 })?)?;
3860 for (node, path) in rows {
3861 results
3862 .entry(node.kind.clone())
3863 .or_default()
3864 .push((node, path));
3865 }
3866 Ok(results)
3867 }
3868
3869 fn evidence_target_candidates(
3870 &self,
3871 target: &str,
3872 kinds: &[&str],
3873 preferred_path: Option<&str>,
3874 ) -> Result<Vec<GraphNode>> {
3875 if kinds.is_empty() {
3876 return Ok(Vec::new());
3877 }
3878
3879 let normalized = target.trim().trim_start_matches('#');
3880 let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3881 .collect::<Vec<_>>()
3882 .join(", ");
3883 let kind_rank = kinds
3884 .iter()
3885 .enumerate()
3886 .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3887 .collect::<Vec<_>>()
3888 .join(" ");
3889 let path_filter = if preferred_path.is_some() {
3890 r#"
3891AND EXISTS (
3892 SELECT 1
3893 FROM graph_node_properties p_path INDEXED BY idx_graph_node_properties_key_value_node
3894 WHERE p_path.node_id = n.id
3895 AND p_path.key = 'path'
3896 AND p_path.value = ?
3897)
3898"#
3899 } else {
3900 ""
3901 };
3902 let sql = format!(
3903 r#"
3904SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3905FROM graph_nodes n
3906WHERE n.kind IN ({kind_placeholders})
3907 AND (
3908 EXISTS (
3909 SELECT 1
3910 FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3911 WHERE p_handle.node_id = n.id
3912 AND p_handle.key = 'handle'
3913 AND p_handle.value = ?
3914 )
3915 OR EXISTS (
3916 SELECT 1
3917 FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3918 WHERE p_ref.node_id = n.id
3919 AND p_ref.key = 'ref_id'
3920 AND p_ref.value = ?
3921)
3922OR n.label = ?
3923OR n.label = ?
3924)
3925{path_filter}
3926ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3927"#
3928 );
3929 let mut values = kinds
3930 .iter()
3931 .map(|kind| Value::Text((*kind).to_string()))
3932 .collect::<Vec<_>>();
3933 values.push(Value::Text(target.to_string()));
3934 values.push(Value::Text(normalized.to_string()));
3935 values.push(Value::Text(target.to_string()));
3936 values.push(Value::Text(format!("#{normalized}")));
3937 if let Some(path) = preferred_path {
3938 values.push(Value::Text(path.to_string()));
3939 }
3940 values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3941 let mut stmt = self.conn.prepare(&sql)?;
3942 collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)
3943 }
3944
3945 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3946 if let Some(node) = self.node(target)? {
3947 return Ok(Some(node));
3948 }
3949 Ok(self
3950 .evidence_target_candidates(target, kinds, None)?
3951 .into_iter()
3952 .next())
3953 }
3954}
3955
3956fn to_json<T: Serialize>(value: &T) -> Result<String> {
3957 serde_json::to_string(value).map_err(Into::into)
3958}
3959
3960fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3961 let payload = serde_json::to_vec(value)?;
3962 Ok(blake3::hash(&payload).to_hex().to_string())
3963}
3964
3965fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3966 value.as_ref().map(to_json).transpose()
3967}
3968
3969fn collect_rows<T>(
3970 rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3971) -> Result<Vec<T>> {
3972 rows.collect::<std::result::Result<Vec<_>, _>>()
3973 .map_err(Into::into)
3974}
3975
3976enum QueryResult {
3977 Meta { total: usize },
3978 Node(GraphNode),
3979 Edge(GraphEdge),
3980}
3981
3982fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3983 let properties_col = offset + 3;
3984 let provenance_col = offset + 4;
3985 let freshness_col = offset + 5;
3986 let properties_json: String = row.get(properties_col)?;
3987 let provenance_json: String = row.get(provenance_col)?;
3988 let freshness_json: Option<String> = row.get(freshness_col)?;
3989 Ok(GraphNode {
3990 id: row.get(offset)?,
3991 kind: row.get(offset + 1)?,
3992 label: row.get(offset + 2)?,
3993 properties: from_json(properties_col, &properties_json)?,
3994 provenance: from_json(provenance_col, &provenance_json)?,
3995 freshness: optional_from_json(freshness_col, freshness_json)?,
3996 })
3997}
3998
3999fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
4000 node_from_row_at(row, 0)
4001}
4002
4003fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
4004 let properties_col = offset + 4;
4005 let provenance_col = offset + 5;
4006 let freshness_col = offset + 6;
4007 let properties_json: String = row.get(properties_col)?;
4008 let provenance_json: String = row.get(provenance_col)?;
4009 let freshness_json: Option<String> = row.get(freshness_col)?;
4010 Ok(GraphEdge {
4011 id: row.get(offset)?,
4012 from_id: row.get(offset + 1)?,
4013 to_id: row.get(offset + 2)?,
4014 kind: row.get(offset + 3)?,
4015 properties: from_json(properties_col, &properties_json)?,
4016 provenance: from_json(provenance_col, &provenance_json)?,
4017 freshness: optional_from_json(freshness_col, freshness_json)?,
4018 })
4019}
4020
4021fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
4022 edge_from_row_at(row, 0)
4023}
4024
4025fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
4026 serde_json::from_str(raw)
4027 .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
4028}
4029
4030fn optional_from_json<T: DeserializeOwned>(
4031 column: usize,
4032 raw: Option<String>,
4033) -> rusqlite::Result<Option<T>> {
4034 raw.map(|value| from_json(column, &value)).transpose()
4035}
4036
4037fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
4038 nodes
4039 .iter()
4040 .find(|node| node.kind == "projection_meta")
4041 .and_then(|node| node.properties.get("projection_version").cloned())
4042}
4043
4044fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
4045 nodes
4046 .iter()
4047 .find(|node| node.kind == "projection_meta")
4048 .and_then(|node| node.properties.get("content_hash").cloned())
4049}
4050
4051fn unix_now() -> i64 {
4052 std::time::SystemTime::now()
4053 .duration_since(std::time::UNIX_EPOCH)
4054 .map(|duration| duration.as_secs() as i64)
4055 .unwrap_or_default()
4056}
4057
4058fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
4059 let page_count = conn.query_row("PRAGMA page_count", [], |row| row_u64(row, 0))?;
4060 let page_size = conn.query_row("PRAGMA page_size", [], |row| row_u64(row, 0))?;
4061 Ok(page_count.saturating_mul(page_size))
4062}
4063
4064fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
4065 let freelist_count = conn.query_row("PRAGMA freelist_count", [], |row| row_u64(row, 0))?;
4066 let page_size = conn.query_row("PRAGMA page_size", [], |row| row_u64(row, 0))?;
4067 Ok(freelist_count.saturating_mul(page_size))
4068}
4069
4070#[cfg(test)]
4071mod tests {
4072 use super::*;
4073
4074 fn sample_provenance() -> GraphProvenance {
4075 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
4076 }
4077
4078 fn sample_projection() -> GraphProjection {
4079 let source = sample_provenance();
4080 GraphProjection {
4081 nodes: vec![
4082 GraphNode::new("doc:livekit", "document", "LiveKit guide")
4083 .with_property("domain", "livekit")
4084 .with_provenance(source.clone())
4085 .with_freshness(GraphFreshness::content_hash("node-hash")),
4086 GraphNode::new("topic:rooms", "topic", "Rooms"),
4087 GraphNode::new("topic:egress", "topic", "Egress"),
4088 ],
4089 edges: vec![
4090 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4091 .with_property("confidence", "0.91")
4092 .with_provenance(source.clone())
4093 .with_freshness(GraphFreshness::content_hash("edge-hash")),
4094 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
4095 ],
4096 }
4097 }
4098
4099 fn assert_projection_store_contract(store: &impl GraphStore) {
4100 let projection = sample_projection();
4101 projection.upsert_into(store).unwrap();
4102
4103 assert_eq!(
4104 store.node("doc:livekit").unwrap(),
4105 projection
4106 .nodes
4107 .iter()
4108 .find(|node| node.id == "doc:livekit")
4109 .cloned()
4110 );
4111 assert_eq!(
4112 store.nodes_by_kind("topic").unwrap(),
4113 vec![
4114 GraphNode::new("topic:egress", "topic", "Egress"),
4115 GraphNode::new("topic:rooms", "topic", "Rooms"),
4116 ]
4117 );
4118
4119 let mentions = store
4120 .outgoing_edges("doc:livekit", Some("mentions"))
4121 .unwrap();
4122 assert_eq!(mentions.len(), 1);
4123 assert_eq!(mentions[0].to_id, "topic:rooms");
4124 assert_eq!(
4125 mentions[0].properties.get("confidence"),
4126 Some(&"0.91".into())
4127 );
4128
4129 let path = store
4130 .shortest_path("doc:livekit", "topic:egress", None)
4131 .unwrap()
4132 .unwrap();
4133 assert_eq!(
4134 path.nodes,
4135 vec!["doc:livekit", "topic:rooms", "topic:egress"]
4136 );
4137 }
4138
4139 #[test]
4140 fn sqlite_store_round_trips_generic_nodes_edges() {
4141 let store = SqliteGraphStore::in_memory().unwrap();
4142 let source = sample_provenance();
4143 let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4144 .with_property("domain", "livekit")
4145 .with_provenance(source.clone())
4146 .with_freshness(GraphFreshness::content_hash("node-hash"));
4147 let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
4148 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4149 .with_property("confidence", "0.91")
4150 .with_provenance(source)
4151 .with_freshness(GraphFreshness::content_hash("edge-hash"));
4152
4153 store.upsert_node(&node).unwrap();
4154 store.upsert_node(&topic).unwrap();
4155 store.upsert_edge(&edge).unwrap();
4156
4157 assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
4158 assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
4159 assert_eq!(store.all_nodes().unwrap().len(), 2);
4160 assert_eq!(store.all_edges().unwrap().len(), 1);
4161 assert_eq!(
4162 store
4163 .outgoing_edges("doc:livekit", Some("mentions"))
4164 .unwrap(),
4165 vec![edge]
4166 );
4167 }
4168
4169 #[test]
4170 fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
4171 let store = SqliteGraphStore::in_memory().unwrap();
4172 for node in [
4173 GraphNode::new("doc:livekit", "document", "LiveKit guide"),
4174 GraphNode::new("topic:rooms", "topic", "Rooms"),
4175 GraphNode::new("topic:egress", "topic", "Egress"),
4176 ] {
4177 store.upsert_node(&node).unwrap();
4178 }
4179 let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4180 .with_property("confidence", "0.91");
4181 let edge_id = edge.id.clone();
4182 store.upsert_edge(&edge).unwrap();
4183 store
4184 .upsert_edge(
4185 &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
4186 .with_property("confidence", "0.42"),
4187 )
4188 .unwrap();
4189
4190 assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
4191 let mut expected_incident_ids = vec![
4192 GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
4193 GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
4194 ];
4195 expected_incident_ids.sort();
4196 assert_eq!(
4197 store
4198 .incident_edges("topic:rooms", None)
4199 .unwrap()
4200 .into_iter()
4201 .map(|edge| edge.id)
4202 .collect::<Vec<_>>(),
4203 expected_incident_ids
4204 );
4205
4206 let page = store
4207 .paged_edges(
4208 Some("mentions"),
4209 GraphQueryOptions {
4210 property_filters: vec![GraphPropertyFilter {
4211 key: "confidence".to_string(),
4212 value: "0.91".to_string(),
4213 }],
4214 ..GraphQueryOptions::default()
4215 },
4216 )
4217 .unwrap();
4218 assert_eq!(page.edges.len(), 1);
4219 assert_eq!(page.edges[0].id, edge_id);
4220 assert!(
4221 page.page
4222 .diagnostics
4223 .iter()
4224 .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
4225 "{:?}",
4226 page.page.diagnostics
4227 );
4228 assert!(
4229 page.page
4230 .diagnostics
4231 .iter()
4232 .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
4233 "{:?}",
4234 page.page.diagnostics
4235 );
4236 assert!(
4237 page.page.diagnostics.iter().any(|diagnostic| diagnostic
4238 .contains("edge property primary filter matched 1 materialized row")),
4239 "{:?}",
4240 page.page.diagnostics
4241 );
4242 assert!(
4243 page.page
4244 .diagnostics
4245 .iter()
4246 .any(|diagnostic| diagnostic
4247 .contains("drives from SQLite materialized property rows")),
4248 "{:?}",
4249 page.page.diagnostics
4250 );
4251
4252 let property_rows: usize = store
4253 .conn
4254 .query_row(
4255 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4256 [],
4257 |row| row_usize(row, 0),
4258 )
4259 .unwrap();
4260 assert_eq!(property_rows, 2);
4261 }
4262
4263 #[test]
4264 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
4265 let sqlite = SqliteGraphStore::in_memory().unwrap();
4266 assert_projection_store_contract(&sqlite);
4267 }
4268
4269 #[test]
4270 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
4271 fn assert_crud_contract(store: &impl GraphStore) {
4272 let projection = sample_projection();
4273 projection.upsert_into(store).unwrap();
4274
4275 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
4276 assert_eq!(
4277 neighborhood
4278 .nodes
4279 .iter()
4280 .map(|node| node.id.as_str())
4281 .collect::<Vec<_>>(),
4282 vec!["doc:livekit", "topic:egress", "topic:rooms"]
4283 );
4284 assert_eq!(
4285 neighborhood
4286 .edges
4287 .iter()
4288 .map(|edge| (
4289 edge.from_id.as_str(),
4290 edge.kind.as_str(),
4291 edge.to_id.as_str()
4292 ))
4293 .collect::<Vec<_>>(),
4294 vec![
4295 ("doc:livekit", "mentions", "topic:rooms"),
4296 ("topic:rooms", "related_to", "topic:egress"),
4297 ]
4298 );
4299
4300 assert_eq!(
4301 store
4302 .delete_edge("topic:rooms", "topic:egress", "related_to")
4303 .unwrap(),
4304 1
4305 );
4306 assert!(
4307 store
4308 .shortest_path("doc:livekit", "topic:egress", None)
4309 .unwrap()
4310 .is_none()
4311 );
4312 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
4313 assert!(store.node("topic:rooms").unwrap().is_none());
4314 assert!(
4315 store
4316 .outgoing_edges("doc:livekit", None)
4317 .unwrap()
4318 .is_empty()
4319 );
4320 }
4321
4322 assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
4323 }
4324
4325 #[test]
4326 fn sqlite_ranked_neighborhood_prefers_recent_memory_nodes_when_pruning() {
4327 let store = SqliteGraphStore::in_memory().unwrap();
4328 store
4329 .upsert_node(&GraphNode::new("center", "file", "center"))
4330 .unwrap();
4331 store
4332 .upsert_node(&GraphNode::new("aaa-code", "symbol", "code candidate"))
4333 .unwrap();
4334 store
4335 .upsert_node(
4336 &GraphNode::new("mmm-stale", "memory_event", "stale memory")
4337 .with_property("provider", "tsift-memory")
4338 .with_property("observed_at_unix", "1000"),
4339 )
4340 .unwrap();
4341 store
4342 .upsert_node(
4343 &GraphNode::new("zzz-fresh", "memory_event", "fresh memory")
4344 .with_property("provider", "tsift-memory")
4345 .with_property("observed_at_unix", "1995"),
4346 )
4347 .unwrap();
4348 store
4349 .upsert_edge(&GraphEdge::new("center", "aaa-code", "mentions"))
4350 .unwrap();
4351 store
4352 .upsert_edge(&GraphEdge::new("center", "mmm-stale", "mentions"))
4353 .unwrap();
4354 store
4355 .upsert_edge(&GraphEdge::new("center", "zzz-fresh", "mentions"))
4356 .unwrap();
4357
4358 let options = RankedNeighborhoodOptions::new(1, 1)
4359 .with_observed_at_now_unix(2000)
4360 .with_observed_at_half_life_secs(100);
4361 let result = store
4362 .ranked_neighborhood("center", &options)
4363 .unwrap()
4364 .unwrap();
4365 let ids: Vec<_> = result.nodes.iter().map(|node| node.id.as_str()).collect();
4366 assert!(ids.contains(&"center"));
4367 assert!(
4368 ids.contains(&"zzz-fresh"),
4369 "fresh memory node should survive pruning: {ids:?}"
4370 );
4371 assert!(!ids.contains(&"aaa-code"));
4372 assert!(!ids.contains(&"mmm-stale"));
4373 }
4374
4375 #[test]
4376 fn sqlite_semantic_seeded_neighborhood_scores_before_sql_edge_cap() {
4377 let store = SqliteGraphStore::in_memory().unwrap();
4378 store
4379 .upsert_node(&GraphNode::new("seed", "semantic_concept", "graph budget"))
4380 .unwrap();
4381 store
4382 .upsert_node(&GraphNode::new("zzz_high", "symbol", "high_signal"))
4383 .unwrap();
4384 store
4385 .upsert_edge(&GraphEdge::new("zzz_high", "seed", "mentions_concept"))
4386 .unwrap();
4387 for idx in 0..24 {
4388 let id = format!("aaa_low_{idx:02}");
4389 store
4390 .upsert_node(&GraphNode::new(id.clone(), "note", format!("low {idx}")))
4391 .unwrap();
4392 store
4393 .upsert_edge(&GraphEdge::new(id, "seed", "weak_link"))
4394 .unwrap();
4395 }
4396
4397 let options = SemanticSeededNeighborhoodOptions::new(1, 3)
4398 .with_edge_scan_cap(16)
4399 .with_node_discovery_cap(9);
4400 let result = store
4401 .semantic_seeded_neighborhood(&["seed".to_string()], &options)
4402 .unwrap();
4403 let ids = result
4404 .nodes
4405 .iter()
4406 .map(|node| node.id.as_str())
4407 .collect::<Vec<_>>();
4408
4409 assert_eq!(ids.len(), 3);
4410 assert_eq!(ids[0], "seed");
4411 assert_eq!(ids[1], "zzz_high");
4412 assert_eq!(result.skipped_by_edge_cap, 9);
4413 assert!(result.truncated);
4414 }
4415
4416 #[test]
4417 fn sqlite_upsert_projection_batches_rows_and_properties() {
4418 let mut store = SqliteGraphStore::in_memory().unwrap();
4419 let mut projection = sample_projection();
4420 store.upsert_projection(&projection).unwrap();
4421
4422 let page = store
4423 .paged_nodes_by_kind(
4424 "document",
4425 GraphQueryOptions {
4426 property_filters: vec![GraphPropertyFilter {
4427 key: "domain".to_string(),
4428 value: "livekit".to_string(),
4429 }],
4430 ..GraphQueryOptions::default()
4431 },
4432 )
4433 .unwrap();
4434 assert_eq!(page.nodes[0].id, "doc:livekit");
4435
4436 projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4437 .with_property("domain", "recording");
4438 store.upsert_projection(&projection).unwrap();
4439
4440 let old_property_count: usize = store
4441 .conn
4442 .query_row(
4443 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
4444 [],
4445 |row| row_usize(row, 0),
4446 )
4447 .unwrap();
4448 let updated_page = store
4449 .paged_nodes_by_kind(
4450 "document",
4451 GraphQueryOptions {
4452 property_filters: vec![GraphPropertyFilter {
4453 key: "domain".to_string(),
4454 value: "recording".to_string(),
4455 }],
4456 ..GraphQueryOptions::default()
4457 },
4458 )
4459 .unwrap();
4460 assert_eq!(old_property_count, 0);
4461 assert_eq!(updated_page.nodes[0].id, "doc:livekit");
4462 let edge_property_count: usize = store
4463 .conn
4464 .query_row(
4465 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4466 [],
4467 |row| row_usize(row, 0),
4468 )
4469 .unwrap();
4470 assert_eq!(edge_property_count, 1);
4471
4472 let changes_before = store.conn.total_changes();
4473 store.upsert_projection(&projection).unwrap();
4474 assert_eq!(
4475 store.conn.total_changes(),
4476 changes_before,
4477 "unchanged projection rows should not rewrite graph rows or materialized properties"
4478 );
4479 }
4480
4481 #[test]
4482 fn sqlite_semantic_top_candidates_use_materialized_vector_table() {
4483 let store = SqliteGraphStore::in_memory().unwrap();
4484 store
4485 .upsert_node(
4486 &GraphNode::new("concept:graph", "semantic_concept", "graph navigation")
4487 .with_property("embedding_model", "fixture-v1")
4488 .with_property("embedding", "1.0,0.0"),
4489 )
4490 .unwrap();
4491 store
4492 .upsert_node(
4493 &GraphNode::new("concept:sqlite", "semantic_concept", "sqlite search")
4494 .with_property("embedding", "0.0,1.0"),
4495 )
4496 .unwrap();
4497 store
4498 .upsert_node(&GraphNode::new("entity:skip", "semantic_entity", "skipped"))
4499 .unwrap();
4500
4501 let vector_rows: usize = store
4502 .conn
4503 .query_row(
4504 "SELECT COUNT(*) FROM graph_node_semantic_vectors",
4505 [],
4506 |row| row_usize(row, 0),
4507 )
4508 .unwrap();
4509 assert_eq!(vector_rows, 2);
4510
4511 let candidates = store
4512 .semantic_top_candidates(&[1.0, 0.0], &["semantic_concept"], 1)
4513 .unwrap();
4514 assert_eq!(candidates.len(), 1);
4515 assert_eq!(candidates[0].node.id, "concept:graph");
4516 assert_eq!(candidates[0].score, 1.0);
4517
4518 store
4519 .upsert_node(&GraphNode::new(
4520 "concept:graph",
4521 "semantic_concept",
4522 "graph navigation",
4523 ))
4524 .unwrap();
4525 let vector_rows_after_update: usize = store
4526 .conn
4527 .query_row(
4528 "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE node_id = 'concept:graph'",
4529 [],
4530 |row| row_usize(row, 0),
4531 )
4532 .unwrap();
4533 assert_eq!(vector_rows_after_update, 0);
4534 }
4535
4536 #[test]
4537 fn sqlite_store_filters_edges_by_kind_and_paths() {
4538 let store = SqliteGraphStore::in_memory().unwrap();
4539 for id in ["a", "b", "c"] {
4540 store
4541 .upsert_node(&GraphNode::new(id, "symbol", id))
4542 .unwrap();
4543 }
4544 store
4545 .upsert_edge(&GraphEdge::new("a", "b", "calls"))
4546 .unwrap();
4547 store
4548 .upsert_edge(&GraphEdge::new("a", "c", "documents"))
4549 .unwrap();
4550 store
4551 .upsert_edge(&GraphEdge::new("b", "c", "calls"))
4552 .unwrap();
4553
4554 let calls = store.outgoing_edges("a", Some("calls")).unwrap();
4555 assert_eq!(calls.len(), 1);
4556 assert_eq!(calls[0].to_id, "b");
4557 assert_eq!(store.graph_counts().unwrap(), (3, 3));
4558 assert_eq!(
4559 store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
4560 "b"
4561 );
4562
4563 let path = store
4564 .shortest_path("a", "c", Some("calls"))
4565 .unwrap()
4566 .unwrap();
4567 assert_eq!(path.nodes, vec!["a", "b", "c"]);
4568 assert_eq!(path.hops, 2);
4569
4570 assert!(
4571 store
4572 .shortest_path("c", "a", Some("calls"))
4573 .unwrap()
4574 .is_none()
4575 );
4576 }
4577
4578 #[test]
4579 fn sqlite_store_batches_edges_between_node_sets() {
4580 let store = SqliteGraphStore::in_memory().unwrap();
4581 for id in ["a", "b", "c", "outside"] {
4582 store
4583 .upsert_node(&GraphNode::new(id, "symbol", id))
4584 .unwrap();
4585 }
4586 for edge in [
4587 GraphEdge::new("a", "b", "calls"),
4588 GraphEdge::new("b", "c", "calls"),
4589 GraphEdge::new("a", "outside", "calls"),
4590 GraphEdge::new("outside", "c", "calls"),
4591 ] {
4592 store.upsert_edge(&edge).unwrap();
4593 }
4594
4595 let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
4596 .into_iter()
4597 .collect::<BTreeSet<_>>();
4598 let edge_keys = store
4599 .edges_between_nodes(&scoped)
4600 .unwrap()
4601 .into_iter()
4602 .map(|edge| (edge.from_id, edge.kind, edge.to_id))
4603 .collect::<Vec<_>>();
4604
4605 assert_eq!(
4606 edge_keys,
4607 vec![
4608 ("a".to_string(), "calls".to_string(), "b".to_string()),
4609 ("b".to_string(), "calls".to_string(), "c".to_string()),
4610 ]
4611 );
4612 }
4613
4614 #[test]
4615 fn wal_checkpoint_outcome_is_recorded_not_swallowed() {
4616 let dir = tempfile::tempdir().unwrap();
4617 let db_path = dir.path().join("graph.db");
4618 let mut store = SqliteGraphStore::open(&db_path).unwrap();
4619 let refresh = store
4620 .replace_projection_with_version("root", &sample_projection(), Some("v1"), None)
4621 .unwrap();
4622 assert!(
4625 refresh
4626 .phase_timings
4627 .iter()
4628 .any(|phase| phase.name == "wal_checkpoint:ok"),
4629 "expected a wal_checkpoint:ok phase: {:?}",
4630 refresh
4631 .phase_timings
4632 .iter()
4633 .map(|phase| &phase.name)
4634 .collect::<Vec<_>>()
4635 );
4636 }
4637
4638 #[test]
4639 fn wal_checkpoint_records_busy_when_a_reader_blocks_truncate() {
4640 let dir = tempfile::tempdir().unwrap();
4641 let db_path = dir.path().join("graph.db");
4642 let mut store = SqliteGraphStore::open(&db_path).unwrap();
4643 store
4644 .replace_projection_with_version("root", &sample_projection(), Some("v1"), None)
4645 .unwrap();
4646
4647 let reader = rusqlite::Connection::open(&db_path).unwrap();
4651 reader.execute_batch("BEGIN").unwrap();
4652 let _pinned: i64 = reader
4653 .query_row("SELECT count(*) FROM graph_operator_stats", [], |row| {
4654 row.get(0)
4655 })
4656 .unwrap();
4657
4658 let mut projection = sample_projection();
4659 projection
4660 .nodes
4661 .push(GraphNode::new("topic:extra", "topic", "Extra"));
4662 let refresh = store
4663 .replace_projection_with_version("root", &projection, Some("v2"), None)
4664 .unwrap();
4665
4666 assert!(
4667 refresh
4668 .phase_timings
4669 .iter()
4670 .any(|phase| phase.name == "wal_checkpoint:busy"),
4671 "expected a wal_checkpoint:busy phase while a reader holds the WAL: {:?}",
4672 refresh
4673 .phase_timings
4674 .iter()
4675 .map(|phase| &phase.name)
4676 .collect::<Vec<_>>()
4677 );
4678 drop(reader);
4679 }
4680
4681 #[test]
4682 fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
4683 let mut store = SqliteGraphStore::in_memory().unwrap();
4684 let mut projection = sample_projection();
4685 projection.nodes.push(
4686 GraphNode::new(
4687 "projection:fixture",
4688 "projection_meta",
4689 "fixture projection",
4690 )
4691 .with_property("projection_version", "fixture-v1")
4692 .with_property("content_hash", "hash-a"),
4693 );
4694 store
4695 .replace_projection_with_version(
4696 "root",
4697 &projection,
4698 Some("fixture-v1"),
4699 Some("commit-a".to_string()),
4700 )
4701 .unwrap();
4702
4703 projection.nodes.retain(|node| node.id != "topic:egress");
4704 projection.edges.retain(|edge| edge.to_id != "topic:egress");
4705 let refresh = store
4706 .replace_projection_with_version(
4707 "root",
4708 &projection,
4709 Some("fixture-v2"),
4710 Some("commit-b".to_string()),
4711 )
4712 .unwrap();
4713
4714 assert_eq!(refresh.projection_version, "fixture-v2");
4715 assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
4716 assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
4717 assert_eq!(refresh.tombstoned_edges.len(), 1);
4718 assert_eq!(refresh.deleted_nodes, 1);
4719 assert_eq!(refresh.deleted_edges, 1);
4720 assert_eq!(refresh.unchanged_nodes, 3);
4721 assert_eq!(refresh.upserted_nodes, 0);
4722 assert_eq!(refresh.unchanged_properties, 4);
4723 assert_eq!(refresh.upserted_properties, 0);
4724 assert_eq!(refresh.deleted_properties, 0);
4725 assert!(
4726 refresh
4727 .phase_timings
4728 .iter()
4729 .any(|phase| phase.name == "sqlite_property_row_staging"),
4730 "{:?}",
4731 refresh.phase_timings
4732 );
4733 assert!(
4734 refresh
4735 .phase_timings
4736 .iter()
4737 .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
4738 "{:?}",
4739 refresh.phase_timings
4740 );
4741 let version = store.projection_version("root").unwrap().unwrap();
4742 assert_eq!(version.projection_version, "fixture-v2");
4743 assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
4744 let cached_counts: (usize, usize, usize, usize) = store
4745 .conn
4746 .query_row(
4747 r#"
4748 SELECT nodes, edges, tombstone_nodes, tombstone_edges
4749 FROM graph_operator_stats
4750 WHERE scope = 'root'
4751 "#,
4752 [],
4753 |row| {
4754 Ok((
4755 row_usize(row, 0)?,
4756 row_usize(row, 1)?,
4757 row_usize(row, 2)?,
4758 row_usize(row, 3)?,
4759 ))
4760 },
4761 )
4762 .unwrap();
4763 assert_eq!(cached_counts, (3, 1, 1, 1));
4764
4765 projection
4766 .nodes
4767 .push(GraphNode::new("topic:egress", "topic", "Egress"));
4768 let refresh = store
4769 .replace_projection_with_version(
4770 "root",
4771 &projection,
4772 Some("fixture-v3"),
4773 Some("commit-c".to_string()),
4774 )
4775 .unwrap();
4776 assert_eq!(refresh.pruned_tombstones, 1);
4777 assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
4778
4779 projection.nodes.retain(|node| node.id != "topic:egress");
4780 store
4781 .replace_projection_with_version(
4782 "root",
4783 &projection,
4784 Some("fixture-v4"),
4785 Some("commit-d".to_string()),
4786 )
4787 .unwrap();
4788 assert_eq!(store.compact_storage("root", true).unwrap(), 2);
4789 let cached_counts: (usize, usize, usize, usize) = store
4790 .conn
4791 .query_row(
4792 r#"
4793 SELECT nodes, edges, tombstone_nodes, tombstone_edges
4794 FROM graph_operator_stats
4795 WHERE scope = 'root'
4796 "#,
4797 [],
4798 |row| {
4799 Ok((
4800 row_usize(row, 0)?,
4801 row_usize(row, 1)?,
4802 row_usize(row, 2)?,
4803 row_usize(row, 3)?,
4804 ))
4805 },
4806 )
4807 .unwrap();
4808 assert_eq!(cached_counts, (3, 1, 0, 0));
4809 }
4810
4811 #[test]
4812 fn sqlite_shortest_path_uses_bounded_frontier() {
4813 let store = SqliteGraphStore::in_memory().unwrap();
4814 for idx in 0..80 {
4815 store
4816 .upsert_node(&GraphNode::new(
4817 format!("node:{idx:02}"),
4818 "symbol",
4819 format!("node {idx:02}"),
4820 ))
4821 .unwrap();
4822 }
4823 for idx in 0..79 {
4824 store
4825 .upsert_edge(&GraphEdge::new(
4826 format!("node:{idx:02}"),
4827 format!("node:{:02}", idx + 1),
4828 "calls",
4829 ))
4830 .unwrap();
4831 }
4832 store
4833 .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
4834 .unwrap();
4835
4836 assert!(
4837 store
4838 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
4839 .unwrap()
4840 .is_none()
4841 );
4842 let path = store
4843 .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
4844 .unwrap()
4845 .unwrap();
4846 assert_eq!(path.hops, 79);
4847 assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
4848 assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
4849
4850 let direct = store
4851 .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
4852 .unwrap()
4853 .unwrap();
4854 assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
4855 }
4856
4857 #[test]
4858 fn sqlite_resolves_evidence_targets_with_indexed_properties() {
4859 let store = SqliteGraphStore::in_memory().unwrap();
4860 for node in [
4861 GraphNode::new("gbak-refresh", "backlog", "#refresh")
4862 .with_property("ref_id", "refresh")
4863 .with_property("path", "tasks/current.md")
4864 .with_property("handle", "backlog-handle"),
4865 GraphNode::new("gbak-zrefresh", "backlog", "#refresh")
4866 .with_property("ref_id", "refresh")
4867 .with_property("path", "tasks/other.md"),
4868 GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
4869 .with_property("ref_id", "refresh"),
4870 GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
4871 .with_property("ref_id", "refresh"),
4872 ] {
4873 store.upsert_node(&node).unwrap();
4874 }
4875
4876 let by_ref = store
4877 .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
4878 .unwrap()
4879 .unwrap();
4880 assert_eq!(by_ref.id, "gbak-refresh");
4881 let by_handle = store
4882 .resolve_evidence_target("backlog-handle", &["backlog"])
4883 .unwrap()
4884 .unwrap();
4885 assert_eq!(by_handle.id, "gbak-refresh");
4886 let by_path = store
4887 .evidence_target_candidates("#refresh", &["backlog"], Some("tasks/other.md"))
4888 .unwrap();
4889 assert_eq!(by_path.len(), 1);
4890 assert_eq!(by_path[0].id, "gbak-zrefresh");
4891 }
4892
4893 #[test]
4894 fn sqlite_schema_migration_backfills_materialized_node_properties() {
4895 let conn = Connection::open_in_memory().unwrap();
4896 conn.execute_batch(
4897 r#"
4898 PRAGMA user_version = 2;
4899 CREATE TABLE graph_nodes (
4900 id TEXT PRIMARY KEY,
4901 kind TEXT NOT NULL,
4902 label TEXT NOT NULL,
4903 properties_json TEXT NOT NULL DEFAULT '{}',
4904 provenance_json TEXT NOT NULL DEFAULT '[]',
4905 freshness_json TEXT,
4906 row_hash TEXT,
4907 source_watermark TEXT
4908 );
4909 CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
4910 CREATE TABLE graph_edges (
4911 from_id TEXT NOT NULL,
4912 to_id TEXT NOT NULL,
4913 kind TEXT NOT NULL,
4914 properties_json TEXT NOT NULL DEFAULT '{}',
4915 provenance_json TEXT NOT NULL DEFAULT '[]',
4916 freshness_json TEXT,
4917 row_hash TEXT,
4918 source_watermark TEXT,
4919 PRIMARY KEY (from_id, to_id, kind)
4920 );
4921 CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
4922 CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
4923 CREATE TABLE graph_projection_versions (
4924 scope TEXT PRIMARY KEY,
4925 projection_version TEXT NOT NULL,
4926 content_hash TEXT,
4927 source_watermark TEXT,
4928 observed_at_unix INTEGER NOT NULL
4929 );
4930 CREATE TABLE graph_tombstones (
4931 row_key TEXT PRIMARY KEY,
4932 row_kind TEXT NOT NULL,
4933 deleted_at_unix INTEGER NOT NULL
4934 );
4935 INSERT INTO graph_nodes
4936 (id, kind, label, properties_json, provenance_json)
4937 VALUES
4938 ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
4939 ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]'),
4940 ('concept:graph', 'semantic_concept', 'Graph navigation', '{"embedding":"1.0,0.0","embedding_model":"fixture-v1"}', '[]');
4941 INSERT INTO graph_edges
4942 (from_id, to_id, kind, properties_json, provenance_json)
4943 VALUES
4944 ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
4945 "#,
4946 )
4947 .unwrap();
4948
4949 let store = SqliteGraphStore::from_connection(conn).unwrap();
4950 let version: i64 = store
4951 .conn
4952 .pragma_query_value(None, "user_version", |row| row.get(0))
4953 .unwrap();
4954 assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
4955 let property_rows: usize = store
4956 .conn
4957 .query_row(
4958 "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
4959 [],
4960 |row| row_usize(row, 0),
4961 )
4962 .unwrap();
4963 assert_eq!(property_rows, 2);
4964 let edge_property_rows: usize = store
4965 .conn
4966 .query_row(
4967 "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4968 [],
4969 |row| row_usize(row, 0),
4970 )
4971 .unwrap();
4972 assert_eq!(edge_property_rows, 1);
4973 let semantic_vector_rows: usize = store
4974 .conn
4975 .query_row(
4976 "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE model = 'fixture-v1'",
4977 [],
4978 |row| row_usize(row, 0),
4979 )
4980 .unwrap();
4981 assert_eq!(semantic_vector_rows, 1);
4982 let edge = store
4983 .edge(&GraphEdge::stable_id(
4984 "topic:rooms",
4985 "topic:egress",
4986 "mentions",
4987 ))
4988 .unwrap()
4989 .unwrap();
4990 assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4991
4992 let page = store
4993 .paged_nodes_by_kind(
4994 "topic",
4995 GraphQueryOptions {
4996 property_filters: vec![GraphPropertyFilter {
4997 key: "domain".to_string(),
4998 value: "livekit".to_string(),
4999 }],
5000 ..GraphQueryOptions::default()
5001 },
5002 )
5003 .unwrap();
5004 assert_eq!(page.nodes[0].id, "topic:rooms");
5005 assert!(
5006 page.page
5007 .diagnostics
5008 .iter()
5009 .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
5010 "{:?}",
5011 page.page.diagnostics
5012 );
5013 }
5014
5015 #[test]
5016 fn sqlite_store_batches_reachable_nodes_by_kinds() {
5017 let store = SqliteGraphStore::in_memory().unwrap();
5018 for node in [
5019 GraphNode::new("start", "backlog", "start"),
5020 GraphNode::new("ctx", "worker_context", "context"),
5021 GraphNode::new("src", "source_handle", "source"),
5022 GraphNode::new("sem", "semantic_concept", "concept"),
5023 ] {
5024 store.upsert_node(&node).unwrap();
5025 }
5026 store
5027 .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
5028 .unwrap();
5029 store
5030 .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
5031 .unwrap();
5032 store
5033 .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
5034 .unwrap();
5035
5036 let rows = store
5037 .reachable_nodes_by_kinds(
5038 "start",
5039 &["worker_context", "source_handle", "semantic_concept"],
5040 2,
5041 8,
5042 )
5043 .unwrap();
5044 assert_eq!(rows["worker_context"][0].0.id, "ctx");
5045 assert_eq!(
5046 rows["source_handle"][0].1.nodes,
5047 vec!["start", "ctx", "src"]
5048 );
5049 assert_eq!(rows["semantic_concept"][0].1.hops, 1);
5050 }
5051
5052 #[test]
5053 fn sqlite_projection_refresh_handles_bulk_row_replacement() {
5054 let mut store = SqliteGraphStore::in_memory().unwrap();
5055 let source = GraphProvenance::new("fixture", "bulk");
5056 let mut projection = GraphProjection::default();
5057 for idx in 0..128 {
5058 projection.nodes.push(
5059 GraphNode::new(
5060 format!("node:{idx:03}"),
5061 if idx % 2 == 0 { "symbol" } else { "file" },
5062 format!("bulk node {idx:03}"),
5063 )
5064 .with_property("ordinal", idx.to_string())
5065 .with_provenance(source.clone())
5066 .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
5067 );
5068 }
5069 for idx in 0..127 {
5070 projection.edges.push(
5071 GraphEdge::new(
5072 format!("node:{idx:03}"),
5073 format!("node:{:03}", idx + 1),
5074 "next",
5075 )
5076 .with_property("ordinal", idx.to_string())
5077 .with_provenance(source.clone())
5078 .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
5079 );
5080 }
5081
5082 store
5083 .replace_projection_with_version(
5084 "root",
5085 &projection,
5086 Some("bulk-v1"),
5087 Some("commit-a".to_string()),
5088 )
5089 .unwrap();
5090
5091 projection
5092 .nodes
5093 .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
5094 projection.edges.retain(|edge| {
5095 !edge.from_id.ends_with("000")
5096 && !edge.to_id.ends_with("000")
5097 && !edge.from_id.ends_with("064")
5098 && !edge.to_id.ends_with("064")
5099 });
5100 let refresh = store
5101 .replace_projection_with_version(
5102 "root",
5103 &projection,
5104 Some("bulk-v2"),
5105 Some("commit-b".to_string()),
5106 )
5107 .unwrap();
5108
5109 assert_eq!(store.all_nodes().unwrap().len(), 126);
5110 assert_eq!(store.all_edges().unwrap().len(), 124);
5111 assert_eq!(
5112 refresh.tombstoned_nodes,
5113 vec!["node:000".to_string(), "node:064".to_string()]
5114 );
5115 assert_eq!(refresh.tombstoned_edges.len(), 3);
5116 assert_eq!(refresh.deleted_nodes, 2);
5117 assert_eq!(refresh.deleted_edges, 3);
5118 assert_eq!(refresh.unchanged_nodes, 126);
5119 assert_eq!(refresh.unchanged_edges, 124);
5120 assert_eq!(refresh.upserted_nodes, 0);
5121 assert_eq!(refresh.upserted_edges, 0);
5122 assert_eq!(refresh.unchanged_properties, 250);
5123 assert_eq!(refresh.upserted_properties, 0);
5124 assert!(
5125 refresh
5126 .phase_timings
5127 .iter()
5128 .any(|phase| phase.name == "sqlite_node_staging"
5129 && phase.detail.contains("126 unchanged skipped")
5130 && phase.detail.contains("multi-row chunks up to 500 rows")),
5131 "{:?}",
5132 refresh.phase_timings
5133 );
5134 assert!(
5135 refresh
5136 .phase_timings
5137 .iter()
5138 .any(|phase| phase.name == "sqlite_node_staging"
5139 && phase.detail.contains("124 unchanged skipped")),
5140 "{:?}",
5141 refresh.phase_timings
5142 );
5143 let staged_node_properties: usize = store
5144 .conn
5145 .query_row(
5146 "SELECT COUNT(*) FROM temp.next_graph_node_properties",
5147 [],
5148 |row| row_usize(row, 0),
5149 )
5150 .unwrap();
5151 let staged_edge_properties: usize = store
5152 .conn
5153 .query_row(
5154 "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
5155 [],
5156 |row| row_usize(row, 0),
5157 )
5158 .unwrap();
5159 assert_eq!(staged_node_properties, 0);
5160 assert_eq!(staged_edge_properties, 0);
5161 assert!(
5162 refresh
5163 .phase_timings
5164 .iter()
5165 .any(|phase| phase.name == "sqlite_property_row_staging"
5166 && phase.detail.contains("new/changed node rows")),
5167 "{:?}",
5168 refresh.phase_timings
5169 );
5170 assert!(
5171 refresh
5172 .phase_timings
5173 .iter()
5174 .any(|phase| phase.name == "sqlite_edge_property_row_staging"
5175 && phase.detail.contains("new/changed edge rows")),
5176 "{:?}",
5177 refresh.phase_timings
5178 );
5179 assert_eq!(
5180 store
5181 .projection_version("root")
5182 .unwrap()
5183 .unwrap()
5184 .source_watermark
5185 .as_deref(),
5186 Some("commit-b")
5187 );
5188 }
5189
5190 #[test]
5191 fn sqlite_reentrant_temp_table_guard_panics() {
5192 let store = SqliteGraphStore::in_memory().unwrap();
5193 store.temp_table_active.set(true);
5194 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
5195 store.assert_not_in_temp_table_section();
5196 }));
5197 assert!(result.is_err());
5198 }
5199
5200 #[test]
5201 fn sqlite_temp_table_guard_clears_after_method() {
5202 let mut store = SqliteGraphStore::in_memory().unwrap();
5203 let projection = GraphProjection {
5204 nodes: vec![],
5205 edges: vec![],
5206 };
5207 store.replace_projection(&projection).unwrap();
5208 assert!(!store.temp_table_active.get());
5209 }
5210
5211 #[test]
5212 fn derive_ontology_summarizes_types_and_relations() {
5213 let mut store = SqliteGraphStore::in_memory().unwrap();
5214 let seed = GraphProjection {
5215 nodes: vec![
5216 GraphNode::new("fn:a", "function", "a"),
5217 GraphNode::new("fn:b", "function", "b"),
5218 GraphNode::new("mod:m", "module", "m"),
5219 ],
5220 edges: vec![
5221 GraphEdge::new("fn:a", "fn:b", "calls"),
5222 GraphEdge::new("mod:m", "fn:a", "contains"),
5223 ],
5224 };
5225 store.upsert_projection(&seed).unwrap();
5226
5227 let onto = store.derive_ontology().unwrap();
5228 let type_kinds: std::collections::BTreeSet<_> =
5229 onto.nodes.iter().map(|n| n.label.clone()).collect();
5230 assert!(type_kinds.contains("function"));
5231 assert!(type_kinds.contains("module"));
5232 assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
5233
5234 let rel: std::collections::BTreeSet<_> = onto
5235 .edges
5236 .iter()
5237 .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
5238 .collect();
5239 assert!(rel.contains(&(
5240 "ontology_type:function".into(),
5241 "ontology_relation:calls".into(),
5242 "ontology_type:function".into()
5243 )));
5244 assert!(rel.contains(&(
5245 "ontology_type:module".into(),
5246 "ontology_relation:contains".into(),
5247 "ontology_type:function".into()
5248 )));
5249
5250 let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
5251 assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
5252
5253 store.upsert_projection(&onto).unwrap();
5255 let onto2 = store.derive_ontology().unwrap();
5256 assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
5257 assert_eq!(onto2.nodes.len(), onto.nodes.len());
5258 assert_eq!(onto2.edges.len(), onto.edges.len());
5259 }
5260
5261 #[test]
5265 fn link_nodes_by_shared_property_stars_duplicates_idempotently() {
5266 let mut store = SqliteGraphStore::in_memory().unwrap();
5267 let seed = GraphProjection {
5268 nodes: vec![
5269 GraphNode::new("kgent-z", "semantic_entity", "Dup")
5271 .with_property("entity_id", "kgent-canon"),
5272 GraphNode::new("kgent-a", "semantic_entity", "Dup")
5273 .with_property("entity_id", "kgent-canon"),
5274 GraphNode::new("kgent-m", "semantic_entity", "Dup")
5275 .with_property("entity_id", "kgent-canon"),
5276 GraphNode::new("kgent-solo", "semantic_entity", "Solo")
5278 .with_property("entity_id", "kgent-other"),
5279 GraphNode::new("kgent-local", "semantic_entity", "Local")
5281 .with_property("entity_id", "e0"),
5282 ],
5283 edges: vec![],
5284 };
5285 store.upsert_projection(&seed).unwrap();
5286
5287 let written = store
5288 .link_nodes_by_shared_property(
5289 "semantic_entity",
5290 "entity_id",
5291 "kgent-",
5292 "same_as",
5293 &[("provider", "tsift-kg")],
5294 )
5295 .unwrap();
5296 assert_eq!(written, 2);
5298
5299 let same_as: Vec<(String, String)> = {
5301 let mut stmt = store
5302 .conn
5303 .prepare("SELECT from_id, to_id FROM graph_edges WHERE kind = 'same_as' ORDER BY from_id")
5304 .unwrap();
5305 let rows = stmt
5306 .query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)))
5307 .unwrap();
5308 rows.map(|r| r.unwrap()).collect()
5309 };
5310 assert_eq!(
5311 same_as,
5312 vec![
5313 ("kgent-m".to_string(), "kgent-a".to_string()),
5314 ("kgent-z".to_string(), "kgent-a".to_string()),
5315 ]
5316 );
5317
5318 let written2 = store
5320 .link_nodes_by_shared_property(
5321 "semantic_entity",
5322 "entity_id",
5323 "kgent-",
5324 "same_as",
5325 &[("provider", "tsift-kg")],
5326 )
5327 .unwrap();
5328 assert_eq!(written2, 2);
5329 let edge_count: i64 = store
5330 .conn
5331 .query_row(
5332 "SELECT COUNT(*) FROM graph_edges WHERE kind = 'same_as'",
5333 [],
5334 |r| r.get(0),
5335 )
5336 .unwrap();
5337 assert_eq!(edge_count, 2, "re-run must not duplicate same_as edges");
5338 }
5339
5340 #[test]
5344 fn delete_source_projection_is_scoped_by_provider_and_cascades() {
5345 let mut store = SqliteGraphStore::in_memory().unwrap();
5346 let seed = GraphProjection {
5347 nodes: vec![
5348 GraphNode::new("kg:a:1", "semantic_entity", "Alpha")
5350 .with_property("provider", "tsift-kg")
5351 .with_property("source_ref", "src/a.rs"),
5352 GraphNode::new("kg:a:2", "semantic_entity", "Beta")
5353 .with_property("provider", "tsift-kg")
5354 .with_property("source_ref", "src/a.rs"),
5355 GraphNode::new("kg:b:1", "semantic_entity", "Gamma")
5357 .with_property("provider", "tsift-kg")
5358 .with_property("source_ref", "src/b.rs"),
5359 GraphNode::new("ast:a:1", "function", "fn_in_a")
5361 .with_property("provider", "tsift-ast")
5362 .with_property("source_ref", "src/a.rs"),
5363 ],
5364 edges: vec![
5365 GraphEdge::new("kg:a:1", "kg:a:2", "semantic_relation")
5366 .with_property("provider", "tsift-kg"),
5367 GraphEdge::new("ast:a:1", "kg:a:1", "references"),
5368 ],
5369 };
5370 store.upsert_projection(&seed).unwrap();
5371
5372 let deleted = store
5373 .delete_source_projection("src/a.rs", "tsift-kg")
5374 .unwrap();
5375 assert_eq!(deleted, 2, "only the two KG nodes for src/a.rs are deleted");
5376
5377 assert!(store.node("kg:a:1").unwrap().is_none());
5379 assert!(store.node("kg:a:2").unwrap().is_none());
5380 assert!(store.node("kg:b:1").unwrap().is_some());
5382 assert!(store.node("ast:a:1").unwrap().is_some());
5384
5385 let edge_count: i64 = store
5388 .conn
5389 .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| row.get(0))
5390 .unwrap();
5391 assert_eq!(edge_count, 0);
5392
5393 let orphan_props: i64 = store
5395 .conn
5396 .query_row(
5397 "SELECT COUNT(*) FROM graph_node_properties WHERE node_id IN ('kg:a:1','kg:a:2')",
5398 [],
5399 |row| row.get(0),
5400 )
5401 .unwrap();
5402 assert_eq!(orphan_props, 0);
5403 }
5404}