Skip to main content

tsift_sqlite/
lib.rs

1use anyhow::{Context, Result, bail};
2use rusqlite::{
3    Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4    types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::cell::Cell;
8use std::collections::{BTreeMap, BTreeSet};
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15    ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16    ConvexRowsGraphClient, GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL,
17    GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY, GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY, GraphEdge,
18    GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath, GraphProjection, GraphPropertyFilter,
19    GraphProvenance, GraphQueryOptions, GraphQueryPage, GraphSemanticCandidate, GraphStore,
20    GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
21    SQLITE_GRAPH_SCHEMA_VERSION, SemanticSeededNeighborhoodExpansion,
22    SemanticSeededNeighborhoodOptions, TerseGraphEdge, TerseGraphNode, apply_graph_edge_query_page,
23    apply_graph_query_page, graph_edge_id, graph_semantic_cosine,
24    graph_semantic_top_candidates_by_property_scan, parse_graph_semantic_vector_property,
25    shortest_path_using_outgoing, stable_graph_edge_id,
26};
27
28fn row_usize(row: &Row<'_>, idx: usize) -> rusqlite::Result<usize> {
29    let value: i64 = row.get(idx)?;
30    usize::try_from(value).map_err(|_| rusqlite::Error::IntegralValueOutOfRange(idx, value))
31}
32
33fn row_u64(row: &Row<'_>, idx: usize) -> rusqlite::Result<u64> {
34    let value: i64 = row.get(idx)?;
35    u64::try_from(value).map_err(|_| rusqlite::Error::IntegralValueOutOfRange(idx, value))
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
39#[serde(rename_all = "snake_case")]
40pub enum ReadOnlyRecovery {
41    SnapshotFallback,
42    SnapshotFallbackWal,
43}
44
45pub fn read_only_snapshot_recovery(
46    db_path: &Path,
47    err: &anyhow::Error,
48) -> Option<ReadOnlyRecovery> {
49    if !error_mentions_locked_db(err) {
50        return None;
51    }
52    if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
53        Some(ReadOnlyRecovery::SnapshotFallbackWal)
54    } else if rollback_journal_path(db_path).exists() {
55        Some(ReadOnlyRecovery::SnapshotFallback)
56    } else {
57        None
58    }
59}
60
61pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
62    let mut journal = db_path.as_os_str().to_os_string();
63    journal.push("-journal");
64    PathBuf::from(journal)
65}
66
67pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
68    let mut wal = db_path.as_os_str().to_os_string();
69    wal.push("-wal");
70    PathBuf::from(wal)
71}
72
73pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
74    let mut shm = db_path.as_os_str().to_os_string();
75    shm.push("-shm");
76    PathBuf::from(shm)
77}
78
79pub fn copy_read_only_snapshot(
80    db_path: &Path,
81    default_stem: &str,
82) -> Result<(PathBuf, Vec<PathBuf>)> {
83    let snapshot_path = snapshot_copy_path(db_path, default_stem);
84    std::fs::copy(db_path, &snapshot_path).with_context(|| {
85        format!(
86            "copying locked db {} to snapshot {}",
87            db_path.display(),
88            snapshot_path.display()
89        )
90    })?;
91    let mut cleanup_paths = vec![snapshot_path.clone()];
92    copy_optional_snapshot_sidecar(
93        &wal_sidecar_path(db_path),
94        &wal_sidecar_path(&snapshot_path),
95        &mut cleanup_paths,
96    )?;
97    copy_optional_snapshot_sidecar(
98        &shared_memory_sidecar_path(db_path),
99        &shared_memory_sidecar_path(&snapshot_path),
100        &mut cleanup_paths,
101    )?;
102    Ok((snapshot_path, cleanup_paths))
103}
104
105pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
106    err.chain()
107        .any(|cause| cause.to_string().contains("database is locked"))
108}
109
110fn copy_optional_snapshot_sidecar(
111    source_path: &Path,
112    snapshot_path: &Path,
113    cleanup_paths: &mut Vec<PathBuf>,
114) -> Result<()> {
115    match std::fs::copy(source_path, snapshot_path) {
116        Ok(_) => {
117            cleanup_paths.push(snapshot_path.to_path_buf());
118            Ok(())
119        }
120        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
121        Err(err) => Err(err).with_context(|| {
122            format!(
123                "copying SQLite sidecar {} to snapshot {}",
124                source_path.display(),
125                snapshot_path.display()
126            )
127        }),
128    }
129}
130
131fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
132    let nanos = SystemTime::now()
133        .duration_since(UNIX_EPOCH)
134        .unwrap_or(Duration::ZERO)
135        .as_nanos();
136    let stem = db_path
137        .file_stem()
138        .and_then(|stem| stem.to_str())
139        .unwrap_or(default_stem);
140    let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
141    file_name.push(".db");
142    std::env::temp_dir().join(file_name)
143}
144const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
145const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
146
147/// SQLite-backed graph store.
148///
149/// # Temp-table invariant
150///
151/// Several methods — `replace_projection_with_version`, `upsert_projection`,
152/// `edges_between_nodes`, and `breadth_first_search` — create connection-scoped
153/// `TEMP TABLE` staging areas inside the underlying SQLite connection. Two
154/// concurrent calls on the same `SqliteGraphStore` (or sharing the same
155/// connection through `SqliteReadOnlyConnection`) would collide on those temp
156/// table names and produce incorrect results or errors.
157///
158/// **Only one temp-table-using method may be active at a time per connection.**
159pub struct SqliteGraphStore {
160    conn: Connection,
161    _snapshot_copy: Option<SnapshotCopyGuard>,
162    read_only_recovery: Option<ReadOnlyRecovery>,
163    temp_table_active: Cell<bool>,
164}
165
166/// Read-only handle to a SQLite graph database connection.
167///
168/// # Temp-table invariant
169///
170/// When this connection is unwrapped into a `SqliteGraphStore` via
171/// `SqliteGraphStore::from_read_only_connection`, the same temp-table
172/// concurrency rules apply: only one temp-table-using method
173/// (`replace_projection_with_version`, `upsert_projection`,
174/// `edges_between_nodes`, `breadth_first_search`) may be active at a time
175/// per connection.
176pub struct SqliteReadOnlyConnection {
177    conn: Connection,
178    _snapshot_copy: Option<SnapshotCopyGuard>,
179    recovery: Option<ReadOnlyRecovery>,
180}
181
182static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
183
184impl SqliteReadOnlyConnection {
185    pub fn conn(&self) -> &Connection {
186        &self.conn
187    }
188
189    pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
190        self.recovery
191    }
192}
193
194struct SnapshotCopyGuard {
195    paths: Vec<PathBuf>,
196}
197
198impl Drop for SnapshotCopyGuard {
199    fn drop(&mut self) {
200        for path in &self.paths {
201            let _ = std::fs::remove_file(path);
202        }
203    }
204}
205
206fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
207    conn.busy_timeout(Duration::from_secs(5))?;
208    conn.pragma_update(None, "journal_mode", "WAL")?;
209    let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
210    if mode.to_lowercase() != "wal" {
211        bail!(
212            "graph substrate db {} requires WAL mode for concurrent reads, got {}",
213            db_path.display(),
214            mode
215        );
216    }
217    conn.pragma_update(
218        None,
219        "wal_autocheckpoint",
220        SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
221    )?;
222    let checkpoint_pages: i64 =
223        conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
224    if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
225        bail!(
226            "graph substrate db {} requires wal_autocheckpoint={}, got {}",
227            db_path.display(),
228            SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
229            checkpoint_pages
230        );
231    }
232    Ok(())
233}
234
235fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
236    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
237    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
238    for row in rows {
239        if row? == column {
240            return Ok(true);
241        }
242    }
243    Ok(false)
244}
245
246fn sqlite_table_exists(conn: &Connection, table: &str) -> Result<bool> {
247    conn.query_row(
248        r#"
249        SELECT EXISTS(
250            SELECT 1
251            FROM sqlite_master
252            WHERE type = 'table' AND name = ?1
253        )
254        "#,
255        [table],
256        |row| row.get::<_, bool>(0),
257    )
258    .map_err(Into::into)
259}
260
261fn add_column_if_missing(
262    conn: &Connection,
263    table: &str,
264    column: &str,
265    definition: &str,
266) -> Result<()> {
267    if !sqlite_column_exists(conn, table, column)? {
268        conn.execute(
269            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
270            [],
271        )?;
272    }
273    Ok(())
274}
275
276fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
277    if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
278        return Ok(());
279    }
280    let rows = {
281        let mut stmt = conn.prepare(
282            r#"
283            SELECT from_id, to_id, kind
284            FROM graph_edges
285            WHERE edge_key IS NULL OR edge_key = ''
286            ORDER BY from_id, kind, to_id
287            "#,
288        )?;
289        collect_rows(stmt.query_map([], |row| {
290            Ok((
291                row.get::<_, String>(0)?,
292                row.get::<_, String>(1)?,
293                row.get::<_, String>(2)?,
294            ))
295        })?)?
296    };
297    let mut update = conn.prepare(
298        r#"
299        UPDATE graph_edges
300        SET edge_key = ?4
301        WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
302        "#,
303    )?;
304    for (from_id, to_id, kind) in rows {
305        update.execute((
306            &from_id,
307            &to_id,
308            &kind,
309            stable_graph_edge_id(&from_id, &to_id, &kind),
310        ))?;
311    }
312    Ok(())
313}
314
315fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
316    if old_version < 2 {
317        add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
318        add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
319        add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
320        add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
321    }
322    if old_version < 3 {
323        rebuild_graph_node_properties(conn)?;
324    }
325    if old_version < 4 {
326        ensure_sqlite_graph_operator_stats_schema(conn)?;
327    }
328    if old_version < 5 {
329        add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
330        backfill_graph_edge_keys(conn)?;
331        ensure_sqlite_graph_edge_properties_schema(conn)?;
332        rebuild_graph_edge_properties(conn)?;
333    }
334    if old_version < 6 {
335        ensure_sqlite_graph_semantic_vectors_schema(conn)?;
336        rebuild_graph_node_semantic_vectors(conn)?;
337    }
338    Ok(())
339}
340
341fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
342    conn.execute_batch(
343        r#"
344        CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
345            ON graph_nodes(kind, label, id);
346        CREATE TABLE IF NOT EXISTS graph_operator_stats (
347            scope TEXT PRIMARY KEY,
348            nodes INTEGER NOT NULL,
349            edges INTEGER NOT NULL,
350            tombstone_nodes INTEGER NOT NULL,
351            tombstone_edges INTEGER NOT NULL,
352            file_size_bytes INTEGER,
353            freelist_bytes INTEGER,
354            observed_at_unix INTEGER NOT NULL
355        );
356        "#,
357    )?;
358    Ok(())
359}
360
361fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
362    conn.execute_batch(
363        r#"
364        CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
365            ON graph_edges(edge_key);
366        CREATE TABLE IF NOT EXISTS graph_edge_properties (
367            edge_key TEXT NOT NULL,
368            key TEXT NOT NULL,
369            value TEXT NOT NULL,
370            PRIMARY KEY (edge_key, key),
371            FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
372        );
373        CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
374            ON graph_edge_properties(key, value, edge_key);
375        "#,
376    )?;
377    Ok(())
378}
379
380fn ensure_sqlite_graph_semantic_vectors_schema(conn: &Connection) -> Result<()> {
381    conn.execute_batch(
382        r#"
383        CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
384            node_id TEXT PRIMARY KEY,
385            kind TEXT NOT NULL,
386            model TEXT NOT NULL,
387            dimensions INTEGER NOT NULL,
388            vector_blob BLOB NOT NULL,
389            FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
390        );
391        CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
392            ON graph_node_semantic_vectors(kind, dimensions, node_id);
393        "#,
394    )?;
395    Ok(())
396}
397
398fn replace_node_properties(
399    conn: &Connection,
400    node_id: &str,
401    properties: &BTreeMap<String, String>,
402) -> Result<()> {
403    conn.execute(
404        "DELETE FROM graph_node_properties WHERE node_id = ?1",
405        [node_id],
406    )?;
407    let mut insert = conn.prepare(
408        r#"
409        INSERT INTO graph_node_properties (node_id, key, value)
410        VALUES (?1, ?2, ?3)
411        ON CONFLICT(node_id, key) DO UPDATE SET
412            value = excluded.value
413        "#,
414    )?;
415    for (key, value) in properties {
416        insert.execute((node_id, key, value))?;
417    }
418    Ok(())
419}
420
421struct GraphSemanticVectorRow {
422    model: String,
423    dimensions: usize,
424    vector_blob: Vec<u8>,
425}
426
427fn graph_semantic_vector_row(
428    properties: &BTreeMap<String, String>,
429) -> Option<GraphSemanticVectorRow> {
430    let vector = properties
431        .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
432        .and_then(|value| parse_graph_semantic_vector_property(value))?;
433    Some(GraphSemanticVectorRow {
434        model: properties
435            .get(GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY)
436            .cloned()
437            .unwrap_or_else(|| GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL.to_string()),
438        dimensions: vector.len(),
439        vector_blob: semantic_vector_to_blob(&vector),
440    })
441}
442
443fn semantic_vector_to_blob(vector: &[f64]) -> Vec<u8> {
444    let mut blob = Vec::with_capacity(std::mem::size_of_val(vector));
445    for value in vector {
446        blob.extend_from_slice(&value.to_le_bytes());
447    }
448    blob
449}
450
451fn semantic_vector_from_blob(blob: &[u8], dimensions: usize) -> Option<Vec<f64>> {
452    if dimensions == 0 || blob.len() != dimensions * std::mem::size_of::<f64>() {
453        return None;
454    }
455    let mut vector = Vec::with_capacity(dimensions);
456    for chunk in blob.chunks_exact(std::mem::size_of::<f64>()) {
457        let value = f64::from_le_bytes(chunk.try_into().ok()?);
458        if !value.is_finite() {
459            return None;
460        }
461        vector.push(value);
462    }
463    Some(vector)
464}
465
466fn replace_node_semantic_vector(
467    conn: &Connection,
468    node_id: &str,
469    kind: &str,
470    properties: &BTreeMap<String, String>,
471) -> Result<()> {
472    conn.execute(
473        "DELETE FROM graph_node_semantic_vectors WHERE node_id = ?1",
474        [node_id],
475    )?;
476    let Some(row) = graph_semantic_vector_row(properties) else {
477        return Ok(());
478    };
479    conn.execute(
480        r#"
481        INSERT INTO graph_node_semantic_vectors
482            (node_id, kind, model, dimensions, vector_blob)
483        VALUES (?1, ?2, ?3, ?4, ?5)
484        "#,
485        (
486            node_id,
487            kind,
488            row.model,
489            row.dimensions as i64,
490            row.vector_blob,
491        ),
492    )?;
493    Ok(())
494}
495
496fn replace_edge_properties(
497    conn: &Connection,
498    edge_key: &str,
499    properties: &BTreeMap<String, String>,
500) -> Result<()> {
501    conn.execute(
502        "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
503        [edge_key],
504    )?;
505    let mut insert = conn.prepare(
506        r#"
507        INSERT INTO graph_edge_properties (edge_key, key, value)
508        VALUES (?1, ?2, ?3)
509        ON CONFLICT(edge_key, key) DO UPDATE SET
510            value = excluded.value
511        "#,
512    )?;
513    for (key, value) in properties {
514        insert.execute((edge_key, key, value))?;
515    }
516    Ok(())
517}
518
519fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
520    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
521        return Ok(());
522    }
523    conn.execute_batch(
524        r#"
525        DELETE FROM graph_node_properties;
526        INSERT INTO graph_node_properties (node_id, key, value)
527        SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
528        FROM graph_nodes, json_each(graph_nodes.properties_json)
529        WHERE json_each.key IS NOT NULL
530          AND json_each.value IS NOT NULL
531        "#,
532    )?;
533    Ok(())
534}
535
536fn rebuild_graph_node_semantic_vectors(conn: &Connection) -> Result<()> {
537    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")?
538        || !sqlite_table_exists(conn, "graph_node_semantic_vectors")?
539    {
540        return Ok(());
541    }
542    conn.execute("DELETE FROM graph_node_semantic_vectors", [])?;
543    let rows = {
544        let mut stmt = conn.prepare(
545            r#"
546            SELECT id, kind, properties_json
547            FROM graph_nodes
548            WHERE json_extract(properties_json, '$.embedding') IS NOT NULL
549            ORDER BY id
550            "#,
551        )?;
552        collect_rows(stmt.query_map([], |row| {
553            Ok((
554                row.get::<_, String>(0)?,
555                row.get::<_, String>(1)?,
556                row.get::<_, String>(2)?,
557            ))
558        })?)?
559    };
560    for (node_id, kind, properties_json) in rows {
561        let properties: BTreeMap<String, String> = serde_json::from_str(&properties_json)
562            .with_context(|| format!("parsing semantic properties for graph node {node_id}"))?;
563        replace_node_semantic_vector(conn, &node_id, &kind, &properties)?;
564    }
565    Ok(())
566}
567
568fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
569    if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
570        || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
571    {
572        return Ok(());
573    }
574    conn.execute_batch(
575        r#"
576        DELETE FROM graph_edge_properties;
577        INSERT INTO graph_edge_properties (edge_key, key, value)
578        SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
579        FROM graph_edges, json_each(graph_edges.properties_json)
580        WHERE graph_edges.edge_key IS NOT NULL
581          AND graph_edges.edge_key <> ''
582          AND json_each.key IS NOT NULL
583          AND json_each.value IS NOT NULL
584        "#,
585    )?;
586    Ok(())
587}
588
589pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
590    let conn = Connection::open_with_flags(
591        db_path,
592        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
593    )
594    .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
595    conn.busy_timeout(Duration::from_secs(5))?;
596    Ok(SqliteReadOnlyConnection {
597        conn,
598        _snapshot_copy: None,
599        recovery: None,
600    })
601}
602
603pub fn open_graph_read_only_connection_resilient(
604    db_path: &Path,
605) -> Result<SqliteReadOnlyConnection> {
606    match open_graph_read_only_connection(db_path).and_then(|connection| {
607        connection
608            .conn
609            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
610        Ok(connection)
611    }) {
612        Ok(connection) => Ok(connection),
613        Err(err) => match read_only_snapshot_recovery(db_path, &err) {
614            Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
615            None => Err(err),
616        },
617    }
618}
619
620fn open_graph_read_only_snapshot(
621    db_path: &Path,
622    recovery: ReadOnlyRecovery,
623) -> Result<SqliteReadOnlyConnection> {
624    let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
625    let conn = Connection::open_with_flags(
626        &snapshot_path,
627        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
628    )
629    .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
630    conn.busy_timeout(Duration::from_secs(5))?;
631    Ok(SqliteReadOnlyConnection {
632        conn,
633        _snapshot_copy: Some(SnapshotCopyGuard {
634            paths: cleanup_paths,
635        }),
636        recovery: Some(recovery),
637    })
638}
639
640#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
641pub struct SqliteProjectionRefreshPhase {
642    pub name: String,
643    pub duration_micros: u128,
644    pub detail: String,
645}
646
647#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
648pub struct SqliteProjectionRefresh {
649    pub scope: String,
650    pub projection_version: String,
651    pub source_watermark: Option<String>,
652    pub tombstoned_nodes: Vec<String>,
653    pub tombstoned_edges: Vec<String>,
654    pub upserted_nodes: usize,
655    pub upserted_edges: usize,
656    pub unchanged_nodes: usize,
657    pub unchanged_edges: usize,
658    pub upserted_properties: usize,
659    pub unchanged_properties: usize,
660    pub deleted_properties: usize,
661    pub deleted_nodes: usize,
662    pub deleted_edges: usize,
663    pub pruned_tombstones: usize,
664    pub file_size_bytes_before: Option<u64>,
665    pub file_size_bytes_after: Option<u64>,
666    pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
667}
668
669#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
670pub struct SqliteProjectionVersion {
671    pub projection_version: String,
672    pub content_hash: Option<String>,
673    pub source_watermark: Option<String>,
674}
675
676fn sqlite_refresh_phase_timing(
677    name: &str,
678    started: Instant,
679    detail: &str,
680) -> SqliteProjectionRefreshPhase {
681    SqliteProjectionRefreshPhase {
682        name: name.to_string(),
683        duration_micros: started.elapsed().as_micros(),
684        detail: detail.to_string(),
685    }
686}
687
688fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
689    let row = format!(
690        "({})",
691        (0..column_count)
692            .map(|_| "?")
693            .collect::<Vec<_>>()
694            .join(", ")
695    );
696    (0..row_count)
697        .map(|_| row.as_str())
698        .collect::<Vec<_>>()
699        .join(", ")
700}
701
702fn sqlite_stage_all_ids(
703    tx: &rusqlite::Transaction<'_>,
704    nodes: &[GraphNode],
705    edges: &[GraphEdge],
706) -> Result<()> {
707    for chunk in nodes
708        .iter()
709        .map(|n| &n.id)
710        .collect::<Vec<_>>()
711        .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
712    {
713        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
714        let sql = format!(
715            "INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}",
716            placeholders.join(", ")
717        );
718        let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
719        tx.execute(&sql, params_from_iter(values.iter()))?;
720    }
721    for chunk in edges
722        .iter()
723        .map(graph_edge_id)
724        .collect::<Vec<_>>()
725        .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
726    {
727        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
728        let sql = format!(
729            "INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}",
730            placeholders.join(", ")
731        );
732        let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
733        tx.execute(&sql, params_from_iter(values.iter()))?;
734    }
735    Ok(())
736}
737
738fn sqlite_stage_projection_nodes(
739    tx: &rusqlite::Transaction<'_>,
740    nodes: &[&GraphNode],
741    source_watermark: Option<&str>,
742) -> Result<()> {
743    let mut insert_semantic_vector = tx.prepare(
744        r#"
745        INSERT INTO next_graph_node_semantic_vectors
746            (node_id, kind, model, dimensions, vector_blob)
747        VALUES (?1, ?2, ?3, ?4, ?5)
748        "#,
749    )?;
750    for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
751        let sql = format!(
752            r#"
753            INSERT INTO next_graph_nodes
754                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
755            VALUES {}
756            "#,
757            sqlite_graph_staging_placeholders(8, chunk.len())
758        );
759        let mut values = Vec::with_capacity(chunk.len() * 8);
760        for node in chunk {
761            values.push(Value::Text(node.id.clone()));
762            values.push(Value::Text(node.kind.clone()));
763            values.push(Value::Text(node.label.clone()));
764            values.push(Value::Text(to_json(&node.properties)?));
765            values.push(Value::Text(to_json(&node.provenance)?));
766            values.push(
767                optional_to_json(&node.freshness)?
768                    .map(Value::Text)
769                    .unwrap_or(Value::Null),
770            );
771            values.push(Value::Text(row_hash(node)?));
772            values.push(
773                source_watermark
774                    .map(|watermark| Value::Text(watermark.to_string()))
775                    .unwrap_or(Value::Null),
776            );
777        }
778        tx.execute(&sql, params_from_iter(values))?;
779        for node in chunk {
780            let Some(row) = graph_semantic_vector_row(&node.properties) else {
781                continue;
782            };
783            insert_semantic_vector.execute((
784                &node.id,
785                &node.kind,
786                row.model,
787                row.dimensions as i64,
788                row.vector_blob,
789            ))?;
790        }
791    }
792    Ok(())
793}
794
795fn sqlite_stage_projection_edges(
796    tx: &rusqlite::Transaction<'_>,
797    edges: &[&GraphEdge],
798    source_watermark: Option<&str>,
799) -> Result<()> {
800    for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
801        let sql = format!(
802            r#"
803            INSERT INTO next_graph_edges
804                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
805            VALUES {}
806            "#,
807            sqlite_graph_staging_placeholders(9, chunk.len())
808        );
809        let mut values = Vec::with_capacity(chunk.len() * 9);
810        for edge in chunk {
811            values.push(Value::Text(graph_edge_id(edge)));
812            values.push(Value::Text(edge.from_id.clone()));
813            values.push(Value::Text(edge.to_id.clone()));
814            values.push(Value::Text(edge.kind.clone()));
815            values.push(Value::Text(to_json(&edge.properties)?));
816            values.push(Value::Text(to_json(&edge.provenance)?));
817            values.push(
818                optional_to_json(&edge.freshness)?
819                    .map(Value::Text)
820                    .unwrap_or(Value::Null),
821            );
822            values.push(Value::Text(row_hash(edge)?));
823            values.push(
824                source_watermark
825                    .map(|watermark| Value::Text(watermark.to_string()))
826                    .unwrap_or(Value::Null),
827            );
828        }
829        tx.execute(&sql, params_from_iter(values))?;
830    }
831    Ok(())
832}
833
834impl SqliteGraphStore {
835    fn assert_not_in_temp_table_section(&self) {
836        if self.temp_table_active.get() {
837            panic!(
838                "SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection"
839            );
840        }
841    }
842
843    pub fn open(db_path: &Path) -> Result<Self> {
844        if let Some(parent) = db_path.parent() {
845            std::fs::create_dir_all(parent)
846                .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
847        }
848        let conn = Connection::open(db_path)
849            .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
850        configure_writable_graph_connection(&conn, db_path)?;
851        Self::from_connection(conn)
852    }
853
854    pub fn in_memory() -> Result<Self> {
855        let conn = Connection::open_in_memory()?;
856        conn.busy_timeout(Duration::from_secs(5))?;
857        Self::from_connection(conn)
858    }
859
860    pub fn open_read_only(db_path: &Path) -> Result<Self> {
861        let connection = open_graph_read_only_connection(db_path)?;
862        Self::from_read_only_connection(connection)
863    }
864
865    pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
866        let connection = open_graph_read_only_connection_resilient(db_path)?;
867        Self::from_read_only_connection(connection)
868    }
869
870    pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
871        self.read_only_recovery
872    }
873
874    pub fn has_user_triggers(&self) -> Result<bool> {
875        self.conn
876            .query_row(
877                r#"
878                SELECT EXISTS(
879                    SELECT 1
880                    FROM sqlite_master
881                    WHERE type = 'trigger'
882                      AND name NOT LIKE 'sqlite_%'
883                )
884                "#,
885                [],
886                |row| row.get::<_, bool>(0),
887            )
888            .map_err(Into::into)
889    }
890
891    fn from_connection(conn: Connection) -> Result<Self> {
892        conn.pragma_update(None, "foreign_keys", "ON")?;
893        let user_version: i64 =
894            conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
895        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
896            bail!(
897                "graph.db schema version {} is newer than supported version {}",
898                user_version,
899                SQLITE_GRAPH_SCHEMA_VERSION
900            );
901        }
902        conn.execute_batch(
903            r#"
904            CREATE TABLE IF NOT EXISTS graph_nodes (
905                id TEXT PRIMARY KEY,
906                kind TEXT NOT NULL,
907                label TEXT NOT NULL,
908                properties_json TEXT NOT NULL DEFAULT '{}',
909                provenance_json TEXT NOT NULL DEFAULT '[]',
910                freshness_json TEXT,
911                row_hash TEXT,
912                source_watermark TEXT
913            );
914            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
915                ON graph_nodes(kind);
916            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
917                ON graph_nodes(kind, label, id);
918
919            CREATE TABLE IF NOT EXISTS graph_edges (
920                edge_key TEXT NOT NULL UNIQUE,
921                from_id TEXT NOT NULL,
922                to_id TEXT NOT NULL,
923                kind TEXT NOT NULL,
924                properties_json TEXT NOT NULL DEFAULT '{}',
925                provenance_json TEXT NOT NULL DEFAULT '[]',
926                freshness_json TEXT,
927                row_hash TEXT,
928                source_watermark TEXT,
929                PRIMARY KEY (from_id, to_id, kind),
930                FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
931                FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
932            );
933            CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
934                ON graph_edges(from_id, kind);
935            CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
936                ON graph_edges(to_id, kind);
937
938            CREATE TABLE IF NOT EXISTS graph_projection_versions (
939                scope TEXT PRIMARY KEY,
940                projection_version TEXT NOT NULL,
941                content_hash TEXT,
942                source_watermark TEXT,
943                observed_at_unix INTEGER NOT NULL
944            );
945
946            CREATE TABLE IF NOT EXISTS graph_tombstones (
947                row_key TEXT PRIMARY KEY,
948                row_kind TEXT NOT NULL,
949                deleted_at_unix INTEGER NOT NULL
950            );
951
952            CREATE TABLE IF NOT EXISTS graph_node_properties (
953                node_id TEXT NOT NULL,
954                key TEXT NOT NULL,
955                value TEXT NOT NULL,
956                PRIMARY KEY (node_id, key),
957                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
958            );
959            CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
960                ON graph_node_properties(key, value, node_id);
961
962            CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
963                node_id TEXT PRIMARY KEY,
964                kind TEXT NOT NULL,
965                model TEXT NOT NULL,
966                dimensions INTEGER NOT NULL,
967                vector_blob BLOB NOT NULL,
968                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
969            );
970            CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
971                ON graph_node_semantic_vectors(kind, dimensions, node_id);
972
973            CREATE TABLE IF NOT EXISTS graph_operator_stats (
974                scope TEXT PRIMARY KEY,
975                nodes INTEGER NOT NULL,
976                edges INTEGER NOT NULL,
977                tombstone_nodes INTEGER NOT NULL,
978                tombstone_edges INTEGER NOT NULL,
979                file_size_bytes INTEGER,
980                freelist_bytes INTEGER,
981                observed_at_unix INTEGER NOT NULL
982            );
983            "#,
984        )?;
985        if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
986            migrate_sqlite_graph_schema(&conn, user_version)?;
987            conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
988        }
989        Ok(Self {
990            conn,
991            _snapshot_copy: None,
992            read_only_recovery: None,
993            temp_table_active: Cell::new(false),
994        })
995    }
996
997    fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
998        connection.conn.pragma_update(None, "foreign_keys", "ON")?;
999        let user_version: i64 =
1000            connection
1001                .conn
1002                .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
1003        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
1004            bail!(
1005                "graph.db schema version {} is newer than supported version {}",
1006                user_version,
1007                SQLITE_GRAPH_SCHEMA_VERSION
1008            );
1009        }
1010        connection
1011            .conn
1012            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
1013        Ok(Self {
1014            conn: connection.conn,
1015            _snapshot_copy: connection._snapshot_copy,
1016            read_only_recovery: connection.recovery,
1017            temp_table_active: Cell::new(false),
1018        })
1019    }
1020
1021    pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1022        self.replace_projection_with_version("root", projection, None, None)
1023            .map(|_| ())
1024    }
1025
1026    pub fn replace_projection_with_version(
1027        &mut self,
1028        scope: impl Into<String>,
1029        projection: &GraphProjection,
1030        projection_version: Option<&str>,
1031        source_watermark: Option<String>,
1032    ) -> Result<SqliteProjectionRefresh> {
1033        self.assert_not_in_temp_table_section();
1034        self.temp_table_active.set(true);
1035        let scope = scope.into();
1036        let mut result = self.replace_projection_with_version_fallible(
1037            scope,
1038            projection,
1039            projection_version,
1040            source_watermark,
1041        );
1042        self.temp_table_active.set(false);
1043        if let Ok(ref mut refresh) = result {
1044            let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
1045            let autocheckpoint = if total_rows > 10000 {
1046                8192
1047            } else if total_rows > 1000 {
1048                4096
1049            } else {
1050                2048
1051            };
1052            let _ = self
1053                .conn
1054                .pragma_update(None, "wal_autocheckpoint", autocheckpoint);
1055            // `PRAGMA wal_checkpoint(TRUNCATE)` reports its outcome in a result
1056            // row `(busy, log_frames, checkpointed_frames)` rather than erroring:
1057            // concurrent readers pin the WAL and yield `busy=1`, so the WAL is
1058            // not truncated. The old `let _ = execute_batch(...)` discarded that
1059            // row entirely, hiding unbounded `-wal` growth that later trips
1060            // snapshot-export/import sidecar checks. Record the outcome in
1061            // phase_timings instead of swallowing it (#gdbwalcheckpoint).
1062            let checkpoint_started = Instant::now();
1063            let checkpoint = self
1064                .conn
1065                .query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
1066                    Ok((
1067                        row.get::<_, i64>(0)?,
1068                        row.get::<_, i64>(1)?,
1069                        row.get::<_, i64>(2)?,
1070                    ))
1071                });
1072            let (name, detail) = match checkpoint {
1073                Ok((0, _log, _checkpointed)) => (
1074                    "wal_checkpoint:ok",
1075                    "wal_checkpoint(TRUNCATE) truncated the WAL".to_string(),
1076                ),
1077                Ok((_busy, log, checkpointed)) => (
1078                    "wal_checkpoint:busy",
1079                    format!(
1080                        "wal_checkpoint(TRUNCATE) was blocked by concurrent readers \
1081                         ({checkpointed}/{log} frames checkpointed, WAL not truncated); \
1082                         the -wal file may grow until readers release and a writer truncates it"
1083                    ),
1084                ),
1085                Err(err) => (
1086                    "wal_checkpoint:error",
1087                    format!(
1088                        "wal_checkpoint(TRUNCATE) failed: {err}; \
1089                         the -wal file may grow until a writer can checkpoint it"
1090                    ),
1091                ),
1092            };
1093            refresh.phase_timings.push(SqliteProjectionRefreshPhase {
1094                name: name.to_string(),
1095                duration_micros: checkpoint_started.elapsed().as_micros(),
1096                detail,
1097            });
1098        }
1099        result
1100    }
1101
1102    fn replace_projection_with_version_fallible(
1103        &mut self,
1104        scope: String,
1105        projection: &GraphProjection,
1106        projection_version: Option<&str>,
1107        source_watermark: Option<String>,
1108    ) -> Result<SqliteProjectionRefresh> {
1109        let projection_version = projection_version
1110            .map(str::to_string)
1111            .or_else(|| projection_version_from_nodes(&projection.nodes))
1112            .unwrap_or_else(|| "unversioned".to_string());
1113        let projection_hash = projection_hash_from_nodes(&projection.nodes);
1114        let observed_at_unix = unix_now();
1115        let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
1116        let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
1117        let mut phase_timings = Vec::new();
1118
1119        let tx = self.conn.transaction()?;
1120        let started = Instant::now();
1121        tx.execute_batch(
1122            r#"
1123            CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
1124                id TEXT PRIMARY KEY,
1125                kind TEXT NOT NULL,
1126                label TEXT NOT NULL,
1127                properties_json TEXT NOT NULL,
1128                provenance_json TEXT NOT NULL,
1129                freshness_json TEXT,
1130                row_hash TEXT NOT NULL,
1131                source_watermark TEXT
1132            );
1133            CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
1134                edge_key TEXT PRIMARY KEY,
1135                from_id TEXT NOT NULL,
1136                to_id TEXT NOT NULL,
1137                kind TEXT NOT NULL,
1138                properties_json TEXT NOT NULL,
1139                provenance_json TEXT NOT NULL,
1140                freshness_json TEXT,
1141                row_hash TEXT NOT NULL,
1142                source_watermark TEXT
1143            );
1144            CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
1145                ON next_graph_edges(from_id, to_id, kind);
1146            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
1147                node_id TEXT NOT NULL,
1148                key TEXT NOT NULL,
1149                value TEXT NOT NULL,
1150                PRIMARY KEY (node_id, key)
1151            );
1152            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_semantic_vectors (
1153                node_id TEXT PRIMARY KEY,
1154                kind TEXT NOT NULL,
1155                model TEXT NOT NULL,
1156                dimensions INTEGER NOT NULL,
1157                vector_blob BLOB NOT NULL
1158            );
1159            CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
1160                edge_key TEXT NOT NULL,
1161                key TEXT NOT NULL,
1162                value TEXT NOT NULL,
1163                PRIMARY KEY (edge_key, key)
1164            );
1165            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
1166                id TEXT PRIMARY KEY
1167            );
1168            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
1169                edge_key TEXT PRIMARY KEY
1170            );
1171            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
1172                id TEXT PRIMARY KEY
1173            );
1174            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
1175                edge_key TEXT PRIMARY KEY
1176            );
1177            DELETE FROM next_graph_nodes;
1178            DELETE FROM next_graph_edges;
1179            DELETE FROM next_graph_node_properties;
1180            DELETE FROM next_graph_node_semantic_vectors;
1181            DELETE FROM next_graph_edge_properties;
1182            DELETE FROM next_graph_changed_nodes;
1183            DELETE FROM next_graph_changed_edges;
1184            DELETE FROM next_graph_all_node_ids;
1185            DELETE FROM next_graph_all_edge_keys;
1186            "#,
1187        )?;
1188        phase_timings.push(sqlite_refresh_phase_timing(
1189            "sqlite_temp_table_prepare",
1190            started,
1191            "create and clear refresh staging tables before row loading",
1192        ));
1193        let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
1194            (
1195                projection.nodes.iter().collect(),
1196                projection.edges.iter().collect(),
1197                0usize,
1198                0usize,
1199            )
1200        } else {
1201            let existing_node_hashes: BTreeMap<String, String> = {
1202                let mut stmt =
1203                    tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
1204                let rows = stmt.query_map([], |row| {
1205                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1206                })?;
1207                collect_rows(rows)?.into_iter().collect()
1208            };
1209            let existing_edge_hashes: BTreeMap<String, String> = {
1210                let mut stmt = tx.prepare(
1211                    "SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL",
1212                )?;
1213                let rows = stmt.query_map([], |row| {
1214                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1215                })?;
1216                collect_rows(rows)?.into_iter().collect()
1217            };
1218            let cn: Vec<&GraphNode> = projection
1219                .nodes
1220                .iter()
1221                .filter(|n| {
1222                    let hash = row_hash(*n).ok();
1223                    hash.as_ref()
1224                        .is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
1225                })
1226                .collect();
1227            let ce: Vec<&GraphEdge> = projection
1228                .edges
1229                .iter()
1230                .filter(|e| {
1231                    let hash = row_hash(*e).ok();
1232                    let ek = graph_edge_id(e);
1233                    hash.as_ref()
1234                        .is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
1235                })
1236                .collect();
1237            let sn = projection.nodes.len() - cn.len();
1238            let se = projection.edges.len() - ce.len();
1239            (cn, ce, sn, se)
1240        };
1241        {
1242            let started = Instant::now();
1243            sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
1244            sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
1245            sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
1246            phase_timings.push(sqlite_refresh_phase_timing(
1247                "sqlite_node_staging",
1248                started,
1249                &format!(
1250                    "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
1251                    changed_nodes.len(),
1252                    skipped_nodes,
1253                    changed_edges.len(),
1254                    skipped_edges,
1255                    SQLITE_GRAPH_STAGING_CHUNK_ROWS
1256                ),
1257            ));
1258        }
1259        {
1260            let started = Instant::now();
1261            let changed_nodes_sql = if force_refresh_writes {
1262                r#"
1263                INSERT INTO next_graph_changed_nodes (id)
1264                SELECT id
1265                FROM next_graph_nodes
1266                "#
1267            } else {
1268                r#"
1269                INSERT INTO next_graph_changed_nodes (id)
1270                SELECT n.id
1271                FROM next_graph_nodes n
1272                LEFT JOIN graph_nodes g ON g.id = n.id
1273                WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
1274                "#
1275            };
1276            tx.execute(changed_nodes_sql, [])?;
1277            tx.execute_batch(
1278                r#"
1279                INSERT INTO next_graph_node_properties (node_id, key, value)
1280                SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
1281                FROM next_graph_nodes n
1282                JOIN next_graph_changed_nodes c ON c.id = n.id,
1283                     json_each(n.properties_json)
1284                WHERE json_each.key IS NOT NULL
1285                  AND json_each.value IS NOT NULL
1286                "#,
1287            )?;
1288            phase_timings.push(sqlite_refresh_phase_timing(
1289                "sqlite_property_row_staging",
1290                started,
1291                "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1292            ));
1293        }
1294        {
1295            let started = Instant::now();
1296            let changed_edges_sql = if force_refresh_writes {
1297                r#"
1298                INSERT INTO next_graph_changed_edges (edge_key)
1299                SELECT edge_key
1300                FROM next_graph_edges
1301                "#
1302            } else {
1303                r#"
1304                INSERT INTO next_graph_changed_edges (edge_key)
1305                SELECT n.edge_key
1306                FROM next_graph_edges n
1307                LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1308                WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1309                "#
1310            };
1311            tx.execute(changed_edges_sql, [])?;
1312            tx.execute_batch(
1313                r#"
1314                INSERT INTO next_graph_edge_properties (edge_key, key, value)
1315                SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1316                FROM next_graph_edges e
1317                JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1318                     json_each(e.properties_json)
1319                WHERE json_each.key IS NOT NULL
1320                  AND json_each.value IS NOT NULL
1321                "#,
1322            )?;
1323            phase_timings.push(sqlite_refresh_phase_timing(
1324                "sqlite_edge_property_row_staging",
1325                started,
1326                "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1327            ));
1328        }
1329
1330        let delta_started = Instant::now();
1331        let tombstoned_nodes = {
1332            let sql = if force_refresh_writes {
1333                r#"
1334                SELECT g.id
1335                FROM graph_nodes g
1336                LEFT JOIN next_graph_nodes n ON n.id = g.id
1337                WHERE n.id IS NULL
1338                ORDER BY g.id
1339                "#
1340            } else {
1341                r#"
1342                SELECT g.id
1343                FROM graph_nodes g
1344                LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1345                WHERE n.id IS NULL
1346                ORDER BY g.id
1347                "#
1348            };
1349            let mut stmt = tx.prepare(sql)?;
1350            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1351        };
1352        let tombstoned_edges = {
1353            let sql = if force_refresh_writes {
1354                r#"
1355                SELECT g.edge_key
1356                FROM graph_edges g
1357                LEFT JOIN next_graph_edges n
1358                    ON n.edge_key = g.edge_key
1359                WHERE n.edge_key IS NULL
1360                ORDER BY g.edge_key
1361                "#
1362            } else {
1363                r#"
1364                SELECT g.edge_key
1365                FROM graph_edges g
1366                LEFT JOIN next_graph_all_edge_keys n
1367                    ON n.edge_key = g.edge_key
1368                WHERE n.edge_key IS NULL
1369                ORDER BY g.edge_key
1370                "#
1371            };
1372            let mut stmt = tx.prepare(sql)?;
1373            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1374        };
1375        let unchanged_nodes: usize = if force_refresh_writes {
1376            tx.query_row(
1377                r#"
1378                SELECT COUNT(*)
1379                FROM next_graph_nodes n
1380                JOIN graph_nodes g ON g.id = n.id
1381                WHERE g.row_hash = n.row_hash
1382                "#,
1383                [],
1384                |row| row_usize(row, 0),
1385            )?
1386        } else {
1387            skipped_nodes
1388        };
1389        let unchanged_edges: usize = if force_refresh_writes {
1390            tx.query_row(
1391                r#"
1392                SELECT COUNT(*)
1393                FROM next_graph_edges n
1394                JOIN graph_edges g
1395                    ON g.edge_key = n.edge_key
1396                WHERE g.row_hash = n.row_hash
1397                "#,
1398                [],
1399                |row| row_usize(row, 0),
1400            )?
1401        } else {
1402            skipped_edges
1403        };
1404        let reused_owner_node_properties: usize = if force_refresh_writes {
1405            tx.query_row(
1406                r#"
1407                SELECT COUNT(*)
1408                FROM graph_node_properties g
1409                JOIN next_graph_nodes n ON n.id = g.node_id
1410                LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1411                WHERE c.id IS NULL
1412                "#,
1413                [],
1414                |row| row_usize(row, 0),
1415            )?
1416        } else {
1417            tx.query_row(
1418                r#"
1419                SELECT COUNT(*)
1420                FROM graph_node_properties g
1421                JOIN next_graph_all_node_ids a ON a.id = g.node_id
1422                LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1423                WHERE c.id IS NULL
1424                "#,
1425                [],
1426                |row| row_usize(row, 0),
1427            )?
1428        };
1429        let reused_owner_edge_properties: usize = if force_refresh_writes {
1430            tx.query_row(
1431                r#"
1432                SELECT COUNT(*)
1433                FROM graph_edge_properties g
1434                JOIN next_graph_edges n ON n.edge_key = g.edge_key
1435                LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1436                WHERE c.edge_key IS NULL
1437                "#,
1438                [],
1439                |row| row_usize(row, 0),
1440            )?
1441        } else {
1442            tx.query_row(
1443                r#"
1444                SELECT COUNT(*)
1445                FROM graph_edge_properties g
1446                JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1447                LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1448                WHERE c.edge_key IS NULL
1449                "#,
1450                [],
1451                |row| row_usize(row, 0),
1452            )?
1453        };
1454        let unchanged_changed_node_properties: usize = tx.query_row(
1455            r#"
1456            SELECT COUNT(*)
1457            FROM next_graph_node_properties n
1458            JOIN graph_node_properties g
1459                ON g.node_id = n.node_id AND g.key = n.key
1460            WHERE g.value = n.value
1461            "#,
1462            [],
1463            |row| row_usize(row, 0),
1464        )?;
1465        let unchanged_changed_edge_properties: usize = tx.query_row(
1466            r#"
1467            SELECT COUNT(*)
1468            FROM next_graph_edge_properties n
1469            JOIN graph_edge_properties g
1470                ON g.edge_key = n.edge_key AND g.key = n.key
1471            WHERE g.value = n.value
1472            "#,
1473            [],
1474            |row| row_usize(row, 0),
1475        )?;
1476        let unchanged_properties = reused_owner_node_properties
1477            + reused_owner_edge_properties
1478            + unchanged_changed_node_properties
1479            + unchanged_changed_edge_properties;
1480
1481        let deleted_edges = if force_refresh_writes {
1482            tx.execute(
1483                r#"
1484                DELETE FROM graph_edges
1485                WHERE NOT EXISTS (
1486                    SELECT 1
1487                    FROM next_graph_edges n
1488                    WHERE n.edge_key = graph_edges.edge_key
1489                )
1490                "#,
1491                [],
1492            )?
1493        } else {
1494            tx.execute(
1495                r#"
1496                DELETE FROM graph_edges
1497                WHERE NOT EXISTS (
1498                    SELECT 1
1499                    FROM next_graph_all_edge_keys n
1500                    WHERE n.edge_key = graph_edges.edge_key
1501                )
1502                "#,
1503                [],
1504            )?
1505        };
1506        let deleted_nodes = if force_refresh_writes {
1507            tx.execute(
1508                r#"
1509                DELETE FROM graph_nodes
1510                WHERE NOT EXISTS (
1511                    SELECT 1
1512                    FROM next_graph_nodes n
1513                    WHERE n.id = graph_nodes.id
1514                )
1515                "#,
1516                [],
1517            )?
1518        } else {
1519            tx.execute(
1520                r#"
1521                DELETE FROM graph_nodes
1522                WHERE NOT EXISTS (
1523                    SELECT 1
1524                    FROM next_graph_all_node_ids n
1525                    WHERE n.id = graph_nodes.id
1526                )
1527                "#,
1528                [],
1529            )?
1530        };
1531
1532        let upsert_nodes_sql = r#"
1533            INSERT INTO graph_nodes
1534                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1535            SELECT
1536                n.id,
1537                n.kind,
1538                n.label,
1539                n.properties_json,
1540                n.provenance_json,
1541                n.freshness_json,
1542                n.row_hash,
1543                n.source_watermark
1544            FROM next_graph_nodes n
1545            JOIN next_graph_changed_nodes c ON c.id = n.id
1546            WHERE true
1547            ON CONFLICT(id) DO UPDATE SET
1548                kind = excluded.kind,
1549                label = excluded.label,
1550                properties_json = excluded.properties_json,
1551                provenance_json = excluded.provenance_json,
1552                freshness_json = excluded.freshness_json,
1553                row_hash = excluded.row_hash,
1554                source_watermark = excluded.source_watermark
1555            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1556            "#;
1557        tx.execute(upsert_nodes_sql, [])?;
1558        tx.execute(
1559            r#"
1560                DELETE FROM graph_node_semantic_vectors
1561                WHERE EXISTS (
1562                    SELECT 1
1563                    FROM next_graph_changed_nodes c
1564                    WHERE c.id = graph_node_semantic_vectors.node_id
1565                )
1566                AND NOT EXISTS (
1567                    SELECT 1
1568                    FROM next_graph_node_semantic_vectors n
1569                    WHERE n.node_id = graph_node_semantic_vectors.node_id
1570                )
1571                "#,
1572            [],
1573        )?;
1574        tx.execute(
1575            r#"
1576                INSERT INTO graph_node_semantic_vectors
1577                    (node_id, kind, model, dimensions, vector_blob)
1578                SELECT n.node_id, n.kind, n.model, n.dimensions, n.vector_blob
1579                FROM next_graph_node_semantic_vectors n
1580                WHERE true
1581                ON CONFLICT(node_id) DO UPDATE SET
1582                    kind = excluded.kind,
1583                    model = excluded.model,
1584                    dimensions = excluded.dimensions,
1585                    vector_blob = excluded.vector_blob
1586                "#,
1587            [],
1588        )?;
1589        let upsert_edges_sql = r#"
1590            INSERT INTO graph_edges
1591            (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1592            SELECT
1593                n.edge_key,
1594                n.from_id,
1595                n.to_id,
1596                n.kind,
1597                n.properties_json,
1598                n.provenance_json,
1599                n.freshness_json,
1600                n.row_hash,
1601                n.source_watermark
1602            FROM next_graph_edges n
1603            JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1604            WHERE true
1605            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1606                edge_key = excluded.edge_key,
1607                properties_json = excluded.properties_json,
1608                provenance_json = excluded.provenance_json,
1609                freshness_json = excluded.freshness_json,
1610                row_hash = excluded.row_hash,
1611                source_watermark = excluded.source_watermark
1612            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1613            "#;
1614        tx.execute(upsert_edges_sql, [])?;
1615        let deleted_node_properties = tx.execute(
1616            r#"
1617            DELETE FROM graph_node_properties
1618            WHERE EXISTS (
1619                SELECT 1
1620                FROM next_graph_changed_nodes c
1621                WHERE c.id = graph_node_properties.node_id
1622            )
1623              AND NOT EXISTS (
1624                SELECT 1
1625                FROM next_graph_node_properties n
1626                WHERE n.node_id = graph_node_properties.node_id
1627                  AND n.key = graph_node_properties.key
1628            )
1629            "#,
1630            [],
1631        )?;
1632        let deleted_edge_properties = tx.execute(
1633            r#"
1634            DELETE FROM graph_edge_properties
1635            WHERE EXISTS (
1636                SELECT 1
1637                FROM next_graph_changed_edges c
1638                WHERE c.edge_key = graph_edge_properties.edge_key
1639            )
1640              AND NOT EXISTS (
1641                SELECT 1
1642                FROM next_graph_edge_properties n
1643                WHERE n.edge_key = graph_edge_properties.edge_key
1644                  AND n.key = graph_edge_properties.key
1645            )
1646            "#,
1647            [],
1648        )?;
1649        let deleted_properties = deleted_node_properties + deleted_edge_properties;
1650        let upsert_properties_sql = r#"
1651            INSERT INTO graph_node_properties (node_id, key, value)
1652            SELECT n.node_id, n.key, n.value
1653            FROM next_graph_node_properties n
1654            LEFT JOIN graph_node_properties g
1655                ON g.node_id = n.node_id AND g.key = n.key
1656            WHERE g.node_id IS NULL OR g.value IS NOT n.value
1657            ON CONFLICT(node_id, key) DO UPDATE SET
1658                value = excluded.value
1659            WHERE graph_node_properties.value IS NOT excluded.value
1660            "#;
1661        let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1662        let upsert_edge_properties_sql = r#"
1663            INSERT INTO graph_edge_properties (edge_key, key, value)
1664            SELECT n.edge_key, n.key, n.value
1665            FROM next_graph_edge_properties n
1666            LEFT JOIN graph_edge_properties g
1667                ON g.edge_key = n.edge_key AND g.key = n.key
1668            WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1669            ON CONFLICT(edge_key, key) DO UPDATE SET
1670                value = excluded.value
1671            WHERE graph_edge_properties.value IS NOT excluded.value
1672            "#;
1673        let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1674        let upserted_properties = upserted_node_properties + upserted_edge_properties;
1675        tx.execute(
1676            r#"
1677            INSERT INTO graph_projection_versions
1678                (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1679            VALUES (?1, ?2, ?3, ?4, ?5)
1680            ON CONFLICT(scope) DO UPDATE SET
1681                projection_version = excluded.projection_version,
1682                content_hash = excluded.content_hash,
1683                source_watermark = excluded.source_watermark,
1684                observed_at_unix = excluded.observed_at_unix
1685            "#,
1686            (
1687                &scope,
1688                &projection_version,
1689                &projection_hash,
1690                &source_watermark,
1691                observed_at_unix,
1692            ),
1693        )?;
1694        let pruned_node_tombstones = tx.execute(
1695            r#"
1696            DELETE FROM graph_tombstones
1697            WHERE row_kind = 'node'
1698              AND EXISTS (
1699                SELECT 1
1700                FROM next_graph_nodes n
1701                WHERE n.id = substr(graph_tombstones.row_key, 6)
1702              )
1703            "#,
1704            [],
1705        )?;
1706        let pruned_edge_tombstones = tx.execute(
1707            r#"
1708            DELETE FROM graph_tombstones
1709            WHERE row_kind = 'edge'
1710              AND EXISTS (
1711                SELECT 1
1712                FROM next_graph_edges n
1713                WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1714              )
1715            "#,
1716            [],
1717        )?;
1718        {
1719            let mut insert_node_tombstone = tx.prepare(
1720                r#"
1721                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1722                VALUES (?1, 'node', ?2)
1723                ON CONFLICT(row_key) DO UPDATE SET
1724                    row_kind = excluded.row_kind,
1725                    deleted_at_unix = excluded.deleted_at_unix
1726                "#,
1727            )?;
1728            for id in &tombstoned_nodes {
1729                insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1730            }
1731        }
1732        {
1733            let mut insert_edge_tombstone = tx.prepare(
1734                r#"
1735                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1736                VALUES (?1, 'edge', ?2)
1737                ON CONFLICT(row_key) DO UPDATE SET
1738                    row_kind = excluded.row_kind,
1739                    deleted_at_unix = excluded.deleted_at_unix
1740                "#,
1741            )?;
1742            for key in &tombstoned_edges {
1743                insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1744            }
1745        }
1746        let tombstone_node_count: usize = tx.query_row(
1747            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1748            [],
1749            |row| row_usize(row, 0),
1750        )?;
1751        let tombstone_edge_count: usize = tx.query_row(
1752            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1753            [],
1754            |row| row_usize(row, 0),
1755        )?;
1756        tx.execute(
1757            r#"
1758            INSERT INTO graph_operator_stats
1759                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1760            VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1761            ON CONFLICT(scope) DO UPDATE SET
1762                nodes = excluded.nodes,
1763                edges = excluded.edges,
1764                tombstone_nodes = excluded.tombstone_nodes,
1765                tombstone_edges = excluded.tombstone_edges,
1766                file_size_bytes = excluded.file_size_bytes,
1767                freelist_bytes = excluded.freelist_bytes,
1768                observed_at_unix = excluded.observed_at_unix
1769            "#,
1770            (
1771                &scope,
1772                projection.nodes.len() as i64,
1773                projection.edges.len() as i64,
1774                tombstone_node_count as i64,
1775                tombstone_edge_count as i64,
1776                observed_at_unix,
1777            ),
1778        )?;
1779        phase_timings.push(sqlite_refresh_phase_timing(
1780            "sqlite_delta_write",
1781            delta_started,
1782            "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1783        ));
1784        let commit_started = Instant::now();
1785        tx.commit()?;
1786        phase_timings.push(sqlite_refresh_phase_timing(
1787            "sqlite_commit",
1788            commit_started,
1789            "commit refresh transaction and publish old-or-new graph visibility",
1790        ));
1791        let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1792        let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1793        let stats_started = Instant::now();
1794        self.conn.execute(
1795            r#"
1796            UPDATE graph_operator_stats
1797            SET file_size_bytes = ?2,
1798                freelist_bytes = ?3,
1799                observed_at_unix = ?4
1800            WHERE scope = ?1
1801            "#,
1802            (
1803                &scope,
1804                file_size_bytes_after.map(|value| value as i64),
1805                freelist_bytes_after.map(|value| value as i64),
1806                unix_now(),
1807            ),
1808        )?;
1809        phase_timings.push(sqlite_refresh_phase_timing(
1810            "sqlite_stats_cache_update",
1811            stats_started,
1812            "persist post-commit file and freelist proof for status/doctor",
1813        ));
1814        Ok(SqliteProjectionRefresh {
1815            scope,
1816            projection_version,
1817            source_watermark,
1818            upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1819            upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1820            unchanged_nodes,
1821            unchanged_edges,
1822            upserted_properties,
1823            unchanged_properties,
1824            deleted_properties,
1825            deleted_nodes,
1826            deleted_edges,
1827            pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1828            file_size_bytes_before,
1829            file_size_bytes_after,
1830            tombstoned_nodes,
1831            tombstoned_edges,
1832            phase_timings,
1833        })
1834    }
1835
1836    /// Derive a Semantic Ontology Graph (`#memgraphrag-ont`, MemGraphRAG arxiv
1837    /// 2606.00610 third layer) from the instance graph: one `ontology_type` node
1838    /// per distinct node kind, and one `ontology_relation:<edge_kind>` edge per
1839    /// observed `(from_kind, edge_kind, to_kind)` triple, each carrying an
1840    /// `instance_count`. This data-driven schema lets retrieval start from
1841    /// abstract types and prune by permitted inter-type relations. Existing
1842    /// ontology rows are excluded so the derivation is idempotent and never folds
1843    /// the ontology layer into itself.
1844    pub fn derive_ontology(&self) -> Result<GraphProjection> {
1845        let mut projection = GraphProjection::default();
1846
1847        let mut node_stmt = self.conn.prepare(
1848            "SELECT kind, COUNT(*) FROM graph_nodes \
1849             WHERE kind != 'ontology_type' \
1850             GROUP BY kind ORDER BY kind",
1851        )?;
1852        let node_rows = node_stmt.query_map([], |row| {
1853            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1854        })?;
1855        for row in node_rows {
1856            let (kind, count) = row?;
1857            projection.nodes.push(
1858                GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1859                    .with_property("type_kind", &kind)
1860                    .with_property("instance_count", count.to_string())
1861                    .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1862            );
1863        }
1864
1865        let mut rel_stmt = self.conn.prepare(
1866            "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1867             FROM graph_edges e \
1868             JOIN graph_nodes n1 ON e.from_id = n1.id \
1869             JOIN graph_nodes n2 ON e.to_id = n2.id \
1870             WHERE e.kind NOT LIKE 'ontology_relation:%' \
1871               AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1872             GROUP BY n1.kind, e.kind, n2.kind \
1873             ORDER BY n1.kind, e.kind, n2.kind",
1874        )?;
1875        let rel_rows = rel_stmt.query_map([], |row| {
1876            Ok((
1877                row.get::<_, String>(0)?,
1878                row.get::<_, String>(1)?,
1879                row.get::<_, String>(2)?,
1880                row.get::<_, i64>(3)?,
1881            ))
1882        })?;
1883        for row in rel_rows {
1884            let (from_kind, edge_kind, to_kind, count) = row?;
1885            projection.edges.push(
1886                GraphEdge::new(
1887                    format!("ontology_type:{from_kind}"),
1888                    format!("ontology_type:{to_kind}"),
1889                    format!("ontology_relation:{edge_kind}"),
1890                )
1891                .with_property("edge_kind", &edge_kind)
1892                .with_property("instance_count", count.to_string())
1893                .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1894            );
1895        }
1896
1897        Ok(projection)
1898    }
1899
1900    pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1901        let tx = self.conn.transaction()?;
1902        {
1903            let mut insert_node = tx.prepare(
1904                r#"
1905                INSERT INTO graph_nodes
1906                    (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1907                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1908                ON CONFLICT(id) DO UPDATE SET
1909                    kind = excluded.kind,
1910                    label = excluded.label,
1911                    properties_json = excluded.properties_json,
1912                provenance_json = excluded.provenance_json,
1913                freshness_json = excluded.freshness_json,
1914                row_hash = excluded.row_hash,
1915                source_watermark = excluded.source_watermark
1916            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1917               OR graph_nodes.source_watermark IS NOT excluded.source_watermark
1918            "#,
1919            )?;
1920            let mut delete_properties =
1921                tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1922            let mut insert_property = tx.prepare(
1923                r#"
1924                INSERT INTO graph_node_properties (node_id, key, value)
1925                VALUES (?1, ?2, ?3)
1926                "#,
1927            )?;
1928            for node in &projection.nodes {
1929                let changed = insert_node.execute((
1930                    &node.id,
1931                    &node.kind,
1932                    &node.label,
1933                    to_json(&node.properties)?,
1934                    to_json(&node.provenance)?,
1935                    optional_to_json(&node.freshness)?,
1936                    row_hash(node)?,
1937                ))?;
1938                if changed > 0 {
1939                    delete_properties.execute([&node.id])?;
1940                    for (key, value) in &node.properties {
1941                        insert_property.execute((&node.id, key, value))?;
1942                    }
1943                    replace_node_semantic_vector(&tx, &node.id, &node.kind, &node.properties)?;
1944                }
1945            }
1946        }
1947        {
1948            let mut insert_edge = tx.prepare(
1949                r#"
1950                INSERT INTO graph_edges
1951                    (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1952                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1953                ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1954                    edge_key = excluded.edge_key,
1955                    properties_json = excluded.properties_json,
1956                    provenance_json = excluded.provenance_json,
1957                freshness_json = excluded.freshness_json,
1958                row_hash = excluded.row_hash,
1959                source_watermark = excluded.source_watermark
1960            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1961               OR graph_edges.source_watermark IS NOT excluded.source_watermark
1962            "#,
1963            )?;
1964            let mut delete_properties =
1965                tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1966            let mut insert_property = tx.prepare(
1967                r#"
1968                INSERT INTO graph_edge_properties (edge_key, key, value)
1969                VALUES (?1, ?2, ?3)
1970                "#,
1971            )?;
1972            for edge in &projection.edges {
1973                let edge_key = graph_edge_id(edge);
1974                let changed = insert_edge.execute((
1975                    &edge_key,
1976                    &edge.from_id,
1977                    &edge.to_id,
1978                    &edge.kind,
1979                    to_json(&edge.properties)?,
1980                    to_json(&edge.provenance)?,
1981                    optional_to_json(&edge.freshness)?,
1982                    row_hash(edge)?,
1983                ))?;
1984                if changed > 0 {
1985                    delete_properties.execute([&edge_key])?;
1986                    for (key, value) in &edge.properties {
1987                        insert_property.execute((&edge_key, key, value))?;
1988                    }
1989                }
1990            }
1991        }
1992        tx.commit()?;
1993        Ok(())
1994    }
1995
1996    /// #kgrefreshdup: delete every node previously projected from `source_ref` by
1997    /// `provider`, so a re-extraction REPLACES that source's subgraph instead of
1998    /// accumulating duplicate canonical entities across refresh cycles. Incident
1999    /// edges, node/edge properties, and semantic vectors are removed automatically
2000    /// via the `ON DELETE CASCADE` foreign keys. The `provider` scope keeps the
2001    /// delete from touching non-KG nodes (e.g. AST symbols) that may share a
2002    /// `source_ref` path in the same graph database. Returns the node count deleted.
2003    pub fn delete_source_projection(&mut self, source_ref: &str, provider: &str) -> Result<usize> {
2004        let deleted = self.conn.execute(
2005            r#"
2006            DELETE FROM graph_nodes
2007            WHERE id IN (
2008                SELECT node_id FROM graph_node_properties
2009                WHERE key = 'source_ref' AND value = ?1
2010            )
2011            AND id IN (
2012                SELECT node_id FROM graph_node_properties
2013                WHERE key = 'provider' AND value = ?2
2014            )
2015            "#,
2016            (source_ref, provider),
2017        )?;
2018        Ok(deleted)
2019    }
2020
2021    /// #kgsameas: link nodes of `kind` that share the same `id_key` property value
2022    /// (restricted to values starting with `id_prefix`) using star `edge_kind`
2023    /// edges — each group's members point at its lexicographically-smallest node
2024    /// id. This collapses duplicate logical entities for ALL graph consumers (not
2025    /// just the context pack), without deleting any node, so per-source provenance
2026    /// is preserved. Idempotent: edges key on (from,to,kind), so re-running upserts
2027    /// the same set. Returns the number of link edges written.
2028    pub fn link_nodes_by_shared_property(
2029        &mut self,
2030        kind: &str,
2031        id_key: &str,
2032        id_prefix: &str,
2033        edge_kind: &str,
2034        edge_properties: &[(&str, &str)],
2035    ) -> Result<usize> {
2036        // Read members grouped by shared id value (ordered so the representative
2037        // is deterministic: the smallest node id within each group).
2038        let mut groups: BTreeMap<String, Vec<String>> = BTreeMap::new();
2039        {
2040            let mut stmt = self.conn.prepare(
2041                "SELECT p.value, n.id \
2042                 FROM graph_nodes n \
2043                 JOIN graph_node_properties p ON p.node_id = n.id \
2044                 WHERE n.kind = ?1 AND p.key = ?2 AND p.value LIKE ?3 || '%' \
2045                 ORDER BY p.value, n.id",
2046            )?;
2047            let rows = stmt.query_map((kind, id_key, id_prefix), |row| {
2048                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2049            })?;
2050            for row in rows {
2051                let (shared, node_id) = row?;
2052                groups.entry(shared).or_default().push(node_id);
2053            }
2054        }
2055
2056        let mut edges: Vec<GraphEdge> = Vec::new();
2057        for members in groups.values() {
2058            let Some((rep, rest)) = members.split_first() else {
2059                continue;
2060            };
2061            for member in rest {
2062                let mut edge = GraphEdge::new(member.clone(), rep.clone(), edge_kind);
2063                for (key, value) in edge_properties {
2064                    edge = edge.with_property(*key, *value);
2065                }
2066                edges.push(edge);
2067            }
2068        }
2069
2070        let count = edges.len();
2071        if count > 0 {
2072            self.upsert_projection(&GraphProjection {
2073                nodes: Vec::new(),
2074                edges,
2075            })?;
2076        }
2077        Ok(count)
2078    }
2079
2080    pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
2081        self.conn
2082            .query_row(
2083                r#"
2084                SELECT projection_version, content_hash, source_watermark
2085                FROM graph_projection_versions
2086                WHERE scope = ?1
2087                "#,
2088                [scope],
2089                |row| {
2090                    Ok(SqliteProjectionVersion {
2091                        projection_version: row.get(0)?,
2092                        content_hash: row.get(1)?,
2093                        source_watermark: row.get(2)?,
2094                    })
2095                },
2096            )
2097            .optional()
2098            .map_err(Into::into)
2099    }
2100
2101    pub fn update_projection_source_watermark(
2102        &mut self,
2103        scope: &str,
2104        source_watermark: Option<String>,
2105    ) -> Result<()> {
2106        self.conn.execute(
2107            r#"
2108            UPDATE graph_projection_versions
2109            SET source_watermark = ?2
2110            WHERE scope = ?1
2111            "#,
2112            (scope, source_watermark),
2113        )?;
2114        Ok(())
2115    }
2116
2117    pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
2118        let pruned_tombstones = if prune_tombstones {
2119            self.conn.execute("DELETE FROM graph_tombstones", [])?
2120        } else {
2121            0
2122        };
2123        self.conn.execute_batch(
2124            r#"
2125            PRAGMA wal_checkpoint(TRUNCATE);
2126            VACUUM;
2127            "#,
2128        )?;
2129        let nodes = self
2130            .conn
2131            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2132                row.get::<_, i64>(0)
2133            })?;
2134        let edges = self
2135            .conn
2136            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2137                row.get::<_, i64>(0)
2138            })?;
2139        let tombstone_nodes = self.conn.query_row(
2140            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
2141            [],
2142            |row| row.get::<_, i64>(0),
2143        )?;
2144        let tombstone_edges = self.conn.query_row(
2145            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
2146            [],
2147            |row| row.get::<_, i64>(0),
2148        )?;
2149        let file_size_bytes = sqlite_database_size_bytes(&self.conn)
2150            .ok()
2151            .map(|value| value as i64);
2152        let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
2153            .ok()
2154            .map(|value| value as i64);
2155        self.conn.execute(
2156            r#"
2157            INSERT INTO graph_operator_stats
2158                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
2159            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
2160            ON CONFLICT(scope) DO UPDATE SET
2161                nodes = excluded.nodes,
2162                edges = excluded.edges,
2163                tombstone_nodes = excluded.tombstone_nodes,
2164                tombstone_edges = excluded.tombstone_edges,
2165                file_size_bytes = excluded.file_size_bytes,
2166                freelist_bytes = excluded.freelist_bytes,
2167                observed_at_unix = excluded.observed_at_unix
2168            "#,
2169            (
2170                scope,
2171                nodes,
2172                edges,
2173                tombstone_nodes,
2174                tombstone_edges,
2175                file_size_bytes,
2176                freelist_bytes,
2177            ),
2178        )?;
2179        Ok(pruned_tombstones)
2180    }
2181
2182    fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2183        let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
2184        let in_clause = placeholders.join(", ");
2185        let sql = format!(
2186            "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
2187             FROM graph_edges e \
2188             WHERE e.from_id IN ({in_clause}) \
2189               AND e.to_id IN ({in_clause}) \
2190             ORDER BY e.from_id, e.kind, e.to_id"
2191        );
2192        let values: Vec<Value> = node_ids
2193            .iter()
2194            .chain(node_ids.iter())
2195            .map(|id| Value::Text(id.clone()))
2196            .collect();
2197        let mut stmt = self.conn.prepare(&sql)?;
2198        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2199    }
2200}
2201
2202fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
2203    let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
2204    collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2205        row.get::<_, String>(3)
2206    })?)
2207}
2208
2209fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
2210    let mut diagnostics = vec![format!(
2211        "sqlite query pushdown active; plan: {}",
2212        plan.join(" | ")
2213    )];
2214    for expected_index in expected_indexes {
2215        if plan.iter().any(|row| row.contains(expected_index)) {
2216            diagnostics.push(format!("sqlite query plan uses {expected_index}"));
2217        } else {
2218            diagnostics.push(format!(
2219                "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
2220            ));
2221        }
2222    }
2223    diagnostics
2224}
2225
2226#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2227pub struct TerseDiagnostic {
2228    pub code: &'static str,
2229    #[serde(skip_serializing_if = "Option::is_none")]
2230    pub index: Option<String>,
2231}
2232
2233#[allow(dead_code)]
2234fn terse_query_plan_diagnostics(
2235    plan: &[String],
2236    expected_indexes: &[&str],
2237) -> Vec<TerseDiagnostic> {
2238    let mut diagnostics = vec![TerseDiagnostic {
2239        code: "plan_active",
2240        index: None,
2241    }];
2242    for expected_index in expected_indexes {
2243        if plan.iter().any(|row| row.contains(expected_index)) {
2244            diagnostics.push(TerseDiagnostic {
2245                code: "idx_ok",
2246                index: Some(expected_index.to_string()),
2247            });
2248        } else {
2249            diagnostics.push(TerseDiagnostic {
2250                code: "idx_missing",
2251                index: Some(expected_index.to_string()),
2252            });
2253        }
2254    }
2255    diagnostics
2256}
2257
2258fn push_sqlite_property_filter_exists(
2259    sql: &mut String,
2260    values: &mut Vec<Value>,
2261    node_alias: &str,
2262    filters: &[GraphPropertyFilter],
2263) {
2264    for (index, filter) in filters.iter().enumerate() {
2265        sql.push_str(&format!(
2266            r#"
2267            AND EXISTS (
2268                SELECT 1
2269                FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
2270                WHERE p{index}.node_id = {node_alias}.id
2271                  AND p{index}.key = ?
2272                  AND p{index}.value = ?
2273            )
2274            "#
2275        ));
2276        values.push(Value::Text(filter.key.clone()));
2277        values.push(Value::Text(filter.value.clone()));
2278    }
2279}
2280
2281fn push_sqlite_edge_property_filter_exists(
2282    sql: &mut String,
2283    values: &mut Vec<Value>,
2284    edge_alias: &str,
2285    filters: &[GraphPropertyFilter],
2286) {
2287    for (index, filter) in filters.iter().enumerate() {
2288        sql.push_str(&format!(
2289            r#"
2290            AND EXISTS (
2291                SELECT 1
2292                FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
2293                WHERE ep{index}.edge_key = {edge_alias}.edge_key
2294                  AND ep{index}.key = ?
2295                  AND ep{index}.value = ?
2296            )
2297            "#
2298        ));
2299        values.push(Value::Text(filter.key.clone()));
2300        values.push(Value::Text(filter.value.clone()));
2301    }
2302}
2303
2304struct SqliteIncidentEdgeBranch<'a> {
2305    index_name: &'a str,
2306    endpoint_column: &'a str,
2307    node_id: &'a str,
2308    kind: Option<&'a str>,
2309    filters: &'a [GraphPropertyFilter],
2310    cursor: Option<&'a str>,
2311}
2312
2313fn push_sqlite_incident_edge_branch(
2314    sql: &mut String,
2315    values: &mut Vec<Value>,
2316    branch: SqliteIncidentEdgeBranch<'_>,
2317) {
2318    sql.push_str(&format!(
2319        r#"
2320        SELECT
2321            e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2322        FROM graph_edges e INDEXED BY {index_name}
2323        WHERE e.{endpoint_column} = ?
2324        "#,
2325        index_name = branch.index_name,
2326        endpoint_column = branch.endpoint_column,
2327    ));
2328    values.push(Value::Text(branch.node_id.to_string()));
2329    if let Some(kind) = branch.kind {
2330        sql.push_str(" AND e.kind = ?");
2331        values.push(Value::Text(kind.to_string()));
2332    }
2333    push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
2334    if let Some(cursor) = branch.cursor {
2335        sql.push_str(" AND e.edge_key > ?");
2336        values.push(Value::Text(cursor.to_string()));
2337    }
2338}
2339
2340fn sqlite_incident_edges_union_query(
2341    node_id: &str,
2342    kind: Option<&str>,
2343    filters: &[GraphPropertyFilter],
2344    cursor: Option<&str>,
2345    limit: Option<usize>,
2346) -> (String, Vec<Value>) {
2347    let mut sql = String::from(
2348        r#"
2349        SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2350        FROM (
2351        "#,
2352    );
2353    let mut values = Vec::new();
2354    push_sqlite_incident_edge_branch(
2355        &mut sql,
2356        &mut values,
2357        SqliteIncidentEdgeBranch {
2358            index_name: "idx_graph_edges_from_kind",
2359            endpoint_column: "from_id",
2360            node_id,
2361            kind,
2362            filters,
2363            cursor,
2364        },
2365    );
2366    sql.push_str(" UNION ");
2367    push_sqlite_incident_edge_branch(
2368        &mut sql,
2369        &mut values,
2370        SqliteIncidentEdgeBranch {
2371            index_name: "idx_graph_edges_to_kind",
2372            endpoint_column: "to_id",
2373            node_id,
2374            kind,
2375            filters,
2376            cursor,
2377        },
2378    );
2379    sql.push_str(
2380        r#"
2381        ) e
2382        ORDER BY e.edge_key
2383        "#,
2384    );
2385    if let Some(limit) = limit {
2386        sql.push_str(" LIMIT ?");
2387        values.push(Value::Integer(limit.saturating_add(1) as i64));
2388    }
2389    (sql, values)
2390}
2391
2392fn sqlite_semantic_seeded_edge_score_expr(edge_alias: &str, direction_bonus: &str) -> String {
2393    format!(
2394        "(CASE {edge_alias}.kind \
2395WHEN 'semantic_relation' THEN 340 \
2396WHEN 'mentions_entity' THEN 280 \
2397WHEN 'mentions_concept' THEN 280 \
2398WHEN 'tagged_entity' THEN 280 \
2399WHEN 'tagged_concept' THEN 280 \
2400WHEN 'related_concept' THEN 280 \
2401WHEN 'mentions' THEN 220 \
2402WHEN 'calls' THEN 200 \
2403WHEN 'requests_context' THEN 180 \
2404WHEN 'scopes_context' THEN 180 \
2405WHEN 'scopes_source' THEN 180 \
2406WHEN 'explains_result' THEN 180 \
2407WHEN 'defines' THEN 120 \
2408WHEN 'contains' THEN 120 \
2409WHEN 'belongs_to' THEN 120 \
2410WHEN {edge_alias}.kind LIKE '%community%' THEN 200 \
2411WHEN {edge_alias}.kind LIKE '%semantic%' \
2412OR {edge_alias}.kind LIKE '%concept%' \
2413OR {edge_alias}.kind LIKE '%entity%' THEN 240 \
2414ELSE 80 END) \
2415+ ({direction_bonus}) \
2416+ (CASE {edge_alias}.kind \
2417WHEN 'mentions_concept' THEN 30 \
2418WHEN 'mentions_entity' THEN 30 \
2419WHEN 'tagged_concept' THEN 30 \
2420WHEN 'tagged_entity' THEN 30 \
2421WHEN 'related_concept' THEN 30 \
2422WHEN 'semantic_relation' THEN 28 \
2423WHEN 'calls' THEN 24 \
2424WHEN 'mentions' THEN 22 \
2425WHEN 'requests_context' THEN 18 \
2426WHEN 'scopes_context' THEN 18 \
2427WHEN 'scopes_source' THEN 18 \
2428WHEN 'explains_result' THEN 18 \
2429WHEN 'defines' THEN 12 \
2430WHEN 'contains' THEN 12 \
2431WHEN 'belongs_to' THEN 12 \
2432ELSE 0 END)"
2433    )
2434}
2435
2436impl GraphStore for SqliteGraphStore {
2437    fn upsert_node(&self, node: &GraphNode) -> Result<()> {
2438        self.conn.execute(
2439            r#"
2440            INSERT INTO graph_nodes
2441                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2442            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
2443            ON CONFLICT(id) DO UPDATE SET
2444                kind = excluded.kind,
2445                label = excluded.label,
2446                properties_json = excluded.properties_json,
2447                provenance_json = excluded.provenance_json,
2448                freshness_json = excluded.freshness_json,
2449                row_hash = excluded.row_hash,
2450                source_watermark = excluded.source_watermark
2451            "#,
2452            (
2453                &node.id,
2454                &node.kind,
2455                &node.label,
2456                to_json(&node.properties)?,
2457                to_json(&node.provenance)?,
2458                optional_to_json(&node.freshness)?,
2459                row_hash(node)?,
2460            ),
2461        )?;
2462        replace_node_properties(&self.conn, &node.id, &node.properties)?;
2463        replace_node_semantic_vector(&self.conn, &node.id, &node.kind, &node.properties)?;
2464        Ok(())
2465    }
2466
2467    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2468        let edge_key = graph_edge_id(edge);
2469        self.conn.execute(
2470            r#"
2471            INSERT INTO graph_edges
2472                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2473            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2474            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2475                edge_key = excluded.edge_key,
2476                properties_json = excluded.properties_json,
2477                provenance_json = excluded.provenance_json,
2478                freshness_json = excluded.freshness_json,
2479                row_hash = excluded.row_hash,
2480                source_watermark = excluded.source_watermark
2481            "#,
2482            (
2483                &edge_key,
2484                &edge.from_id,
2485                &edge.to_id,
2486                &edge.kind,
2487                to_json(&edge.properties)?,
2488                to_json(&edge.provenance)?,
2489                optional_to_json(&edge.freshness)?,
2490                row_hash(edge)?,
2491            ),
2492        )?;
2493        replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2494        Ok(())
2495    }
2496
2497    fn delete_node(&self, id: &str) -> Result<usize> {
2498        self.conn
2499            .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2500            .map_err(Into::into)
2501    }
2502
2503    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2504        self.conn
2505            .execute(
2506                "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2507                (from_id, to_id, kind),
2508            )
2509            .map_err(Into::into)
2510    }
2511
2512    fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2513        self.conn
2514            .query_row(
2515                r#"
2516SELECT id, kind, label, properties_json, provenance_json, freshness_json
2517                FROM graph_nodes
2518                WHERE id = ?1
2519                "#,
2520                [id],
2521                node_from_row,
2522            )
2523            .optional()
2524            .map_err(Into::into)
2525    }
2526
2527    fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
2528        let unique_ids = ids.iter().cloned().collect::<BTreeSet<_>>();
2529        if unique_ids.is_empty() {
2530            return Ok(Vec::new());
2531        }
2532
2533        let mut nodes = Vec::new();
2534        let id_refs = unique_ids.iter().collect::<Vec<_>>();
2535        for chunk in id_refs.chunks(450) {
2536            let placeholders = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
2537            let sql = format!(
2538                "SELECT id, kind, label, properties_json, provenance_json, freshness_json \
2539FROM graph_nodes \
2540WHERE id IN ({placeholders}) \
2541ORDER BY id"
2542            );
2543            let values = chunk
2544                .iter()
2545                .map(|id| Value::Text((*id).clone()))
2546                .collect::<Vec<_>>();
2547            let mut stmt = self.conn.prepare(&sql)?;
2548            nodes.extend(collect_rows(
2549                stmt.query_map(params_from_iter(values.iter()), node_from_row)?,
2550            )?);
2551        }
2552        nodes.sort_by(|left, right| left.id.cmp(&right.id));
2553        Ok(nodes)
2554    }
2555
2556    fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2557        let mut stmt = self.conn.prepare(
2558            r#"
2559SELECT id, kind, label, properties_json, provenance_json, freshness_json
2560            FROM graph_nodes
2561            ORDER BY id
2562            "#,
2563        )?;
2564        collect_rows(stmt.query_map([], node_from_row)?)
2565    }
2566
2567    fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2568        let mut stmt = self.conn.prepare(
2569            r#"
2570            SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2571            FROM graph_edges
2572            ORDER BY from_id, kind, to_id
2573            "#,
2574        )?;
2575        collect_rows(stmt.query_map([], edge_from_row)?)
2576    }
2577
2578    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2579        self.conn
2580            .query_row(
2581                r#"
2582                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2583                FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2584                WHERE edge_key = ?1
2585                "#,
2586                [edge_id],
2587                edge_from_row,
2588            )
2589            .optional()
2590            .map_err(Into::into)
2591    }
2592
2593    fn graph_counts(&self) -> Result<(usize, usize)> {
2594        let nodes = self
2595            .conn
2596            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2597                row_usize(row, 0)
2598            })?;
2599        let edges = self
2600            .conn
2601            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2602                row_usize(row, 0)
2603            })?;
2604        Ok((nodes, edges))
2605    }
2606
2607    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2608        match kind {
2609            Some(kind) => self
2610                .conn
2611                .query_row(
2612                    r#"
2613                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2614                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2615                    WHERE from_id <> to_id AND kind = ?1
2616                    ORDER BY from_id, kind, to_id
2617                    LIMIT 1
2618                    "#,
2619                    [kind],
2620                    edge_from_row,
2621                )
2622                .optional()
2623                .map_err(Into::into),
2624            None => self
2625                .conn
2626                .query_row(
2627                    r#"
2628                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2629                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2630                    WHERE from_id <> to_id
2631                    ORDER BY from_id, kind, to_id
2632                    LIMIT 1
2633                    "#,
2634                    [],
2635                    edge_from_row,
2636                )
2637                .optional()
2638                .map_err(Into::into),
2639        }
2640    }
2641
2642    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2643        self.conn
2644            .query_row(
2645                r#"
2646                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2647                       ep.key, ep.value
2648                FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2649                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2650                  ON e.edge_key = ep.edge_key
2651                WHERE e.from_id <> e.to_id
2652                ORDER BY ep.key, ep.value, ep.edge_key
2653                LIMIT 1
2654                "#,
2655                [],
2656                |row| {
2657                    Ok((
2658                        edge_from_row(row)?,
2659                        GraphPropertyFilter {
2660                            key: row.get(7)?,
2661                            value: row.get(8)?,
2662                        },
2663                    ))
2664                },
2665            )
2666            .optional()
2667            .map_err(Into::into)
2668    }
2669
2670    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2671        let mut stmt = self.conn.prepare(
2672            r#"
2673            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2674            FROM graph_nodes
2675            WHERE kind = ?1
2676            ORDER BY id
2677            "#,
2678        )?;
2679        collect_rows(stmt.query_map([kind], node_from_row)?)
2680    }
2681
2682    fn semantic_top_candidates(
2683        &self,
2684        query_vector: &[f64],
2685        kinds: &[&str],
2686        limit: usize,
2687    ) -> Result<Vec<GraphSemanticCandidate>> {
2688        if query_vector.is_empty() || kinds.is_empty() {
2689            return Ok(Vec::new());
2690        }
2691        if !sqlite_table_exists(&self.conn, "graph_node_semantic_vectors")? {
2692            return graph_semantic_top_candidates_by_property_scan(
2693                self,
2694                query_vector,
2695                kinds,
2696                limit,
2697            );
2698        }
2699
2700        let unique_kinds = kinds.iter().copied().collect::<BTreeSet<_>>();
2701        if unique_kinds.is_empty() {
2702            return Ok(Vec::new());
2703        }
2704        let kind_placeholders = unique_kinds
2705            .iter()
2706            .map(|_| "?")
2707            .collect::<Vec<_>>()
2708            .join(", ");
2709        let sql = format!(
2710            r#"
2711            SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2712                   graph_node_semantic_vectors.vector_blob,
2713                   graph_node_semantic_vectors.dimensions
2714            FROM graph_node_semantic_vectors INDEXED BY idx_graph_node_semantic_vectors_kind_dims
2715            JOIN graph_nodes n ON n.id = graph_node_semantic_vectors.node_id
2716            WHERE graph_node_semantic_vectors.dimensions = ?
2717              AND graph_node_semantic_vectors.kind IN ({kind_placeholders})
2718            ORDER BY graph_node_semantic_vectors.kind, n.label, n.id
2719            "#
2720        );
2721        let mut values = vec![Value::Integer(query_vector.len() as i64)];
2722        values.extend(
2723            unique_kinds
2724                .into_iter()
2725                .map(|kind| Value::Text(kind.to_string())),
2726        );
2727        let rows = {
2728            let mut stmt = self.conn.prepare(&sql)?;
2729            collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2730                Ok((
2731                    node_from_row_at(row, 0)?,
2732                    row.get::<_, Vec<u8>>(6)?,
2733                    row.get::<_, i64>(7)?,
2734                ))
2735            })?)?
2736        };
2737
2738        let mut candidates = rows
2739            .into_iter()
2740            .filter_map(|(node, blob, dimensions)| {
2741                let dimensions = usize::try_from(dimensions).ok()?;
2742                let vector = semantic_vector_from_blob(&blob, dimensions)?;
2743                Some(GraphSemanticCandidate {
2744                    score: graph_semantic_cosine(query_vector, &vector),
2745                    node,
2746                })
2747            })
2748            .collect::<Vec<_>>();
2749        candidates.sort_by(|left, right| {
2750            right
2751                .score
2752                .partial_cmp(&left.score)
2753                .unwrap_or(std::cmp::Ordering::Equal)
2754                .then_with(|| left.node.kind.cmp(&right.node.kind))
2755                .then_with(|| left.node.label.cmp(&right.node.label))
2756                .then_with(|| left.node.id.cmp(&right.node.id))
2757        });
2758        if limit > 0 && candidates.len() > limit {
2759            candidates.truncate(limit);
2760        }
2761        Ok(candidates)
2762    }
2763
2764    fn paged_nodes_by_kind(
2765        &self,
2766        kind: &str,
2767        options: GraphQueryOptions,
2768    ) -> Result<GraphPagedSubgraph> {
2769        let mut sql = String::from(
2770            r#"
2771            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2772            FROM graph_nodes
2773            WHERE kind = ?
2774            "#,
2775        );
2776        let mut values = vec![Value::Text(kind.to_string())];
2777        push_sqlite_property_filter_exists(
2778            &mut sql,
2779            &mut values,
2780            "graph_nodes",
2781            &options.property_filters,
2782        );
2783        if let Some(cursor) = &options.cursor {
2784            sql.push_str(" AND id > ?");
2785            values.push(Value::Text(cursor.clone()));
2786        }
2787        sql.push_str(" ORDER BY id");
2788        if let Some(limit) = options.limit {
2789            sql.push_str(" LIMIT ?");
2790            values.push(Value::Integer(limit.saturating_add(1) as i64));
2791        }
2792
2793        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2794        let mut stmt = self.conn.prepare(&sql)?;
2795        let mut nodes =
2796            collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2797        let before_limit = nodes.len();
2798        let mut next_cursor = None;
2799        if let Some(limit) = options.limit
2800            && nodes.len() > limit
2801        {
2802            next_cursor = nodes
2803                .get(limit.saturating_sub(1))
2804                .map(|node| node.id.clone());
2805            nodes.truncate(limit);
2806        }
2807        let expected_indexes = if options.property_filters.is_empty() {
2808            vec!["idx_graph_nodes_kind"]
2809        } else {
2810            vec![
2811                "idx_graph_nodes_kind",
2812                "idx_graph_node_properties_key_value_node",
2813            ]
2814        };
2815        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2816        if !options.property_filters.is_empty() {
2817            diagnostics.push(
2818                "property filters were evaluated by SQLite materialized property rows before paging"
2819                    .to_string(),
2820            );
2821        }
2822        if options.cursor.is_some() {
2823            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2824        }
2825        if next_cursor.is_some() {
2826            diagnostics.push(
2827                "result was truncated; pass page.next_cursor as --cursor for the next page"
2828                    .to_string(),
2829            );
2830        }
2831        Ok(GraphPagedSubgraph {
2832            page: GraphQueryPage {
2833                cursor: options.cursor,
2834                limit: options.limit,
2835                next_cursor,
2836                returned_nodes: nodes.len(),
2837                returned_edges: 0,
2838                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2839                diagnostics,
2840            },
2841            nodes,
2842            edges: Vec::new(),
2843        })
2844    }
2845
2846    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2847        match kind {
2848            Some(kind) => {
2849                let mut stmt = self.conn.prepare(
2850                    r#"
2851                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2852                    FROM graph_edges
2853                    WHERE from_id = ?1 AND kind = ?2
2854                    ORDER BY to_id, kind
2855                    "#,
2856                )?;
2857                collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2858            }
2859            None => {
2860                let mut stmt = self.conn.prepare(
2861                    r#"
2862                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2863                    FROM graph_edges
2864                    WHERE from_id = ?1
2865                    ORDER BY to_id, kind
2866                    "#,
2867                )?;
2868                collect_rows(stmt.query_map([from_id], edge_from_row)?)
2869            }
2870        }
2871    }
2872
2873    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2874        let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2875        let mut stmt = self.conn.prepare(&sql)?;
2876        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2877    }
2878
2879    fn semantic_seeded_expansion_edges(
2880        &self,
2881        current_id: &str,
2882        options: &SemanticSeededNeighborhoodOptions,
2883    ) -> Result<SemanticSeededNeighborhoodExpansion> {
2884        let from_score = sqlite_semantic_seeded_edge_score_expr("e", "8");
2885        let to_score = sqlite_semantic_seeded_edge_score_expr(
2886            "e",
2887            "CASE WHEN e.from_id = e.to_id THEN 8 ELSE 4 END",
2888        );
2889        let limit_clause = if options.edge_scan_cap > 0 {
2890            "LIMIT ?"
2891        } else {
2892            ""
2893        };
2894        let sql = format!(
2895            r#"
2896WITH candidate_edges AS (
2897    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2898           {from_score} AS score
2899    FROM graph_edges e INDEXED BY idx_graph_edges_from_kind
2900    WHERE e.from_id = ?
2901    UNION ALL
2902    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2903           {to_score} AS score
2904    FROM graph_edges e INDEXED BY idx_graph_edges_to_kind
2905    WHERE e.to_id = ?
2906),
2907ranked_edges AS (
2908    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2909           MAX(score) AS score
2910    FROM candidate_edges
2911    GROUP BY edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2912),
2913limited_edges AS (
2914    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2915           score, COUNT(*) OVER () AS total_edges
2916    FROM ranked_edges
2917    ORDER BY score DESC, edge_key ASC
2918    {limit_clause}
2919)
2920SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, total_edges
2921FROM limited_edges
2922ORDER BY score DESC, edge_key ASC
2923"#
2924        );
2925        let mut values = vec![
2926            Value::Text(current_id.to_string()),
2927            Value::Text(current_id.to_string()),
2928        ];
2929        if options.edge_scan_cap > 0 {
2930            values.push(Value::Integer(
2931                options
2932                    .edge_scan_cap
2933                    .saturating_add(1)
2934                    .min(i64::MAX as usize) as i64,
2935            ));
2936        }
2937        let mut stmt = self.conn.prepare(&sql)?;
2938        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2939            Ok((edge_from_row_at(row, 0)?, row.get::<_, i64>(7)? as usize))
2940        })?)?;
2941        let total_candidates = rows.first().map(|(_, total)| *total).unwrap_or(0);
2942        let mut edges = rows.into_iter().map(|(edge, _)| edge).collect::<Vec<_>>();
2943        let mut skipped_by_edge_cap = 0usize;
2944        if options.edge_scan_cap > 0 && total_candidates > options.edge_scan_cap {
2945            skipped_by_edge_cap = total_candidates - options.edge_scan_cap;
2946            edges.truncate(options.edge_scan_cap);
2947        }
2948        Ok(SemanticSeededNeighborhoodExpansion {
2949            edges,
2950            skipped_by_edge_cap,
2951        })
2952    }
2953
2954    fn paged_edges(
2955        &self,
2956        kind: Option<&str>,
2957        options: GraphQueryOptions,
2958    ) -> Result<GraphPagedSubgraph> {
2959        let primary_property_filter = options.property_filters.first();
2960        let mut values = Vec::new();
2961        let mut sql = if let Some(filter) = primary_property_filter {
2962            values.push(Value::Text(filter.key.clone()));
2963            values.push(Value::Text(filter.value.clone()));
2964            String::from(
2965                r#"
2966                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2967                FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2968                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2969                  ON e.edge_key = ep0.edge_key
2970                WHERE ep0.key = ?
2971                  AND ep0.value = ?
2972                "#,
2973            )
2974        } else {
2975            String::from(
2976                r#"
2977                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2978                FROM graph_edges e
2979                WHERE 1 = 1
2980                "#,
2981            )
2982        };
2983        if let Some(kind) = kind {
2984            sql.push_str(" AND e.kind = ?");
2985            values.push(Value::Text(kind.to_string()));
2986        }
2987        push_sqlite_edge_property_filter_exists(
2988            &mut sql,
2989            &mut values,
2990            "e",
2991            if primary_property_filter.is_some() {
2992                &options.property_filters[1..]
2993            } else {
2994                &options.property_filters
2995            },
2996        );
2997        if let Some(cursor) = &options.cursor {
2998            if primary_property_filter.is_some() {
2999                sql.push_str(" AND ep0.edge_key > ?");
3000            } else {
3001                sql.push_str(" AND e.edge_key > ?");
3002            }
3003            values.push(Value::Text(cursor.clone()));
3004        }
3005        if primary_property_filter.is_some() {
3006            sql.push_str(" ORDER BY ep0.edge_key");
3007        } else {
3008            sql.push_str(" ORDER BY e.edge_key");
3009        }
3010        if let Some(limit) = options.limit {
3011            sql.push_str(" LIMIT ?");
3012            values.push(Value::Integer(limit.saturating_add(1) as i64));
3013        }
3014
3015        let primary_property_row_count = if let Some(filter) = primary_property_filter {
3016            Some(self.conn.query_row(
3017                r#"
3018                SELECT COUNT(*)
3019                FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
3020                WHERE key = ?1 AND value = ?2
3021                "#,
3022                (&filter.key, &filter.value),
3023                |row| row_usize(row, 0),
3024            )?)
3025        } else {
3026            None
3027        };
3028        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3029        let mut stmt = self.conn.prepare(&sql)?;
3030        let mut edges =
3031            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
3032        let before_limit = edges.len();
3033        let mut next_cursor = None;
3034        if let Some(limit) = options.limit
3035            && edges.len() > limit
3036        {
3037            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
3038            edges.truncate(limit);
3039        }
3040        let expected_indexes = if options.property_filters.is_empty() {
3041            vec!["idx_graph_edges_edge_key"]
3042        } else {
3043            vec![
3044                "idx_graph_edge_properties_key_value_edge",
3045                "idx_graph_edges_edge_key",
3046            ]
3047        };
3048        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3049        if !options.property_filters.is_empty() {
3050            if let Some(row_count) = primary_property_row_count {
3051                diagnostics.push(format!(
3052                    "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
3053                ));
3054            }
3055            diagnostics.push(
3056                "edge property scan drives from SQLite materialized property rows before joining graph_edges"
3057                    .to_string(),
3058            );
3059        }
3060        if options.cursor.is_some() {
3061            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
3062        }
3063        if next_cursor.is_some() {
3064            diagnostics.push(
3065                "result was truncated; pass page.next_cursor as --cursor for the next page"
3066                    .to_string(),
3067            );
3068        }
3069        Ok(GraphPagedSubgraph {
3070            page: GraphQueryPage {
3071                cursor: options.cursor,
3072                limit: options.limit,
3073                next_cursor,
3074                returned_nodes: 0,
3075                returned_edges: edges.len(),
3076                truncated: options.limit.is_some_and(|limit| before_limit > limit),
3077                diagnostics,
3078            },
3079            nodes: Vec::new(),
3080            edges,
3081        })
3082    }
3083
3084    fn paged_incident_edges(
3085        &self,
3086        node_id: &str,
3087        kind: Option<&str>,
3088        options: GraphQueryOptions,
3089    ) -> Result<GraphPagedSubgraph> {
3090        let (sql, values) = sqlite_incident_edges_union_query(
3091            node_id,
3092            kind,
3093            &options.property_filters,
3094            options.cursor.as_deref(),
3095            options.limit,
3096        );
3097        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3098        let mut stmt = self.conn.prepare(&sql)?;
3099        let mut edges =
3100            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
3101        let before_limit = edges.len();
3102        let mut next_cursor = None;
3103        if let Some(limit) = options.limit
3104            && edges.len() > limit
3105        {
3106            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
3107            edges.truncate(limit);
3108        }
3109        let expected_indexes = if options.property_filters.is_empty() {
3110            vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
3111        } else {
3112            vec![
3113                "idx_graph_edges_from_kind",
3114                "idx_graph_edges_to_kind",
3115                "idx_graph_edge_properties_key_value_edge",
3116            ]
3117        };
3118        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3119        diagnostics.push(
3120            "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
3121                .to_string(),
3122        );
3123        if !options.property_filters.is_empty() {
3124            diagnostics.push(
3125                "edge property filters were evaluated by SQLite materialized property rows before paging"
3126                    .to_string(),
3127            );
3128        }
3129        if options.cursor.is_some() {
3130            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
3131        }
3132        if next_cursor.is_some() {
3133            diagnostics.push(
3134                "result was truncated; pass page.next_cursor as --cursor for the next page"
3135                    .to_string(),
3136            );
3137        }
3138        Ok(GraphPagedSubgraph {
3139            page: GraphQueryPage {
3140                cursor: options.cursor,
3141                limit: options.limit,
3142                next_cursor,
3143                returned_nodes: 0,
3144                returned_edges: edges.len(),
3145                truncated: options.limit.is_some_and(|limit| before_limit > limit),
3146                diagnostics,
3147            },
3148            nodes: Vec::new(),
3149            edges,
3150        })
3151    }
3152
3153    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
3154        if node_ids.is_empty() {
3155            return Ok(Vec::new());
3156        }
3157        if node_ids.len() <= 20 {
3158            return self.edges_between_nodes_inline(node_ids);
3159        }
3160        self.assert_not_in_temp_table_section();
3161        self.temp_table_active.set(true);
3162        let result = (|| -> Result<Vec<GraphEdge>> {
3163            let tx = self.conn.unchecked_transaction()?;
3164            tx.execute_batch(
3165                r#"
3166                CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
3167                DELETE FROM _edges_between_ids;
3168                "#,
3169            )?;
3170            for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
3171                let row_placeholders: Vec<String> =
3172                    chunk.iter().map(|_| "(?)".to_string()).collect();
3173                let placeholders = row_placeholders.join(", ");
3174                let sql =
3175                    format!("INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}");
3176                let values: Vec<Value> =
3177                    chunk.iter().map(|id| Value::Text((*id).clone())).collect();
3178                tx.execute(&sql, params_from_iter(values.iter()))?;
3179            }
3180            let edges = {
3181                let mut stmt = tx.prepare(
3182                    r#"
3183                    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3184                    FROM graph_edges e
3185                    WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
3186                      AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
3187                    ORDER BY e.from_id, e.kind, e.to_id
3188                    "#,
3189                )?;
3190                collect_rows(stmt.query_map([], edge_from_row)?)?
3191            };
3192            tx.finish()?;
3193            Ok(edges)
3194        })();
3195        self.temp_table_active.set(false);
3196        result
3197    }
3198
3199    fn ranked_neighborhood(
3200        &self,
3201        center_id: &str,
3202        options: &RankedNeighborhoodOptions,
3203    ) -> Result<Option<RankedNeighborhoodResult>> {
3204        if self.node(center_id)?.is_none() {
3205            return Ok(None);
3206        }
3207        let center = self.node(center_id)?.unwrap();
3208
3209        let base_score_expr = match options.scoring {
3210            tsift_core::NeighborhoodScoring::BreadthFirst => {
3211                "MAX(0, 120 - (walk.depth * 18))".to_string()
3212            }
3213            tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
3214                "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
3215                 WHEN 'semantic_relation' THEN 34 \
3216                 WHEN 'mentions_entity' THEN 28 \
3217                 WHEN 'mentions_concept' THEN 28 \
3218                 WHEN 'tagged_entity' THEN 28 \
3219                 WHEN 'tagged_concept' THEN 28 \
3220                 WHEN 'related_concept' THEN 28 \
3221                 WHEN 'mentions' THEN 22 \
3222                 WHEN 'calls' THEN 20 \
3223                 WHEN 'requests_context' THEN 18 \
3224                 WHEN 'scopes_context' THEN 18 \
3225                 WHEN 'scopes_source' THEN 18 \
3226                 WHEN 'explains_result' THEN 18 \
3227                 WHEN 'defines' THEN 12 \
3228                 WHEN 'contains' THEN 12 \
3229                 WHEN 'belongs_to' THEN 12 \
3230                 ELSE 8 END".to_string()
3231            }
3232            tsift_core::NeighborhoodScoring::DegreeWeighted => {
3233                "MAX(0, 120 - (walk.depth * 18)) + CASE \
3234                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
3235                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
3236                 ELSE 0 END"
3237                    .to_string()
3238            }
3239        };
3240        let now_unix = options.observed_at_now_unix.unwrap_or_else(unix_now);
3241        let observed_at_half_life_secs = options.observed_at_half_life_secs.max(1);
3242        let observed_at_weight = options.observed_at_weight.max(0);
3243        let memory_node_boost = options.memory_node_boost.max(0);
3244        let observed_at_value = "COALESCE(\
3245            CAST(json_extract(n_score.freshness_json, '$.observed_at_unix') AS INTEGER), \
3246            CAST(json_extract(n_score.properties_json, '$.observed_at_unix') AS INTEGER), \
3247            CAST(json_extract(n_score.properties_json, '$.max_observed_at_unix') AS INTEGER)\
3248        )";
3249        let observed_at_expr = if observed_at_weight == 0 {
3250            "0".to_string()
3251        } else {
3252            format!(
3253                "CASE \
3254                 WHEN {observed_at_value} IS NULL THEN 0 \
3255                 WHEN ({now_unix} - {observed_at_value}) < {observed_at_half_life_secs} THEN {observed_at_weight} \
3256                 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 2) THEN {observed_at_weight} / 2 \
3257                 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 4) THEN {observed_at_weight} / 4 \
3258                 ELSE 0 END"
3259            )
3260        };
3261        let confidence_value =
3262            "CAST(json_extract(n_score.properties_json, '$.confidence') AS REAL)";
3263        let memory_signal_expr = if memory_node_boost == 0 {
3264            "0".to_string()
3265        } else {
3266            format!(
3267                "(CASE \
3268                  WHEN n_score.kind = 'memory_projection' THEN 0 \
3269                  WHEN n_score.kind IN ('finding', 'decision', 'memory_event') THEN {memory_node_boost} \
3270                  WHEN n_score.kind IN ('note', 'memory_session') THEN {memory_node_boost} / 2 \
3271                  WHEN n_score.kind IN ('source_handle', 'semantic_concept', 'semantic_vector_handle') \
3272                    AND json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3273                  WHEN n_score.kind LIKE 'memory_%' THEN {memory_node_boost} \
3274                  WHEN json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3275                  ELSE 0 END) \
3276                 + (CASE \
3277                  WHEN json_extract(n_score.properties_json, '$.confidence') IS NULL THEN 0 \
3278                  WHEN {confidence_value} <= 0.0 THEN 0 \
3279                  WHEN {confidence_value} >= 1.0 THEN {memory_node_boost} \
3280                  ELSE CAST(ROUND({memory_node_boost} * {confidence_value}) AS INTEGER) END)"
3281            )
3282        };
3283        let score_expr =
3284            format!("({base_score_expr}) + ({observed_at_expr}) + ({memory_signal_expr})");
3285
3286        let use_degree_cache = matches!(
3287            options.scoring,
3288            tsift_core::NeighborhoodScoring::DegreeWeighted
3289        );
3290        let degree_cte = if use_degree_cache {
3291            "degree_cache AS ( \
3292            SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
3293            FROM graph_nodes n), "
3294        } else {
3295            ""
3296        };
3297        let mut sql = format!(
3298            r#"
3299WITH RECURSIVE {degree_cte}walk(id, depth, edge_kind, score) AS (
3300SELECT ?, 0, '', ?
3301UNION
3302SELECT e.to_id, walk.depth + 1, e.kind,
3303"#,
3304        );
3305        sql.push_str(&format!("    {}\n", score_expr));
3306        sql.push_str(
3307            r#"
3308FROM walk
3309JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3310ON e.from_id = walk.id
3311JOIN graph_nodes n_score ON n_score.id = e.to_id
3312WHERE walk.depth < ?
3313"#,
3314        );
3315        let mut values = vec![
3316            Value::Text(center_id.to_string()),
3317            Value::Integer(i64::MAX),
3318            Value::Integer(options.depth as i64),
3319        ];
3320        if let Some(kind) = &options.edge_kind {
3321            sql.push_str(" AND e.kind = ?");
3322            values.push(Value::Text(kind.clone()));
3323        }
3324        sql.push_str(
3325            r#"
3326            ),
3327scored_nodes AS (
3328SELECT walk.id, MAX(walk.score) AS score,
3329n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3330FROM walk
3331JOIN graph_nodes n ON n.id = walk.id
3332GROUP BY walk.id
3333            ),
3334            ranked AS (
3335                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3336                FROM scored_nodes
3337                ORDER BY score DESC, id ASC
3338            ),
3339            kept AS (
3340                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3341                FROM ranked
3342                LIMIT ?
3343            ),
3344            total AS (
3345                SELECT COUNT(*) AS cnt FROM scored_nodes
3346            )
3347            SELECT
3348                'meta' AS row_type,
3349                (SELECT cnt FROM total) AS total_discovered,
3350                0 AS node_id, '' AS node_kind, '' AS node_label,
3351                '' AS node_props, '' AS node_prov, '' AS node_fresh,
3352                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3353                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3354            UNION ALL
3355            SELECT
3356                'node' AS row_type,
3357                0 AS total_discovered,
3358                k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
3359                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3360                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3361            FROM kept k
3362            UNION ALL
3363            SELECT
3364                'edge' AS row_type,
3365                0 AS total_discovered,
3366                '' AS node_id, '' AS node_kind, '' AS node_label,
3367                '' AS node_props, '' AS node_prov, '' AS node_fresh,
3368                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3369            FROM graph_edges e
3370            WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
3371              AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
3372"#,
3373        );
3374        values.push(Value::Integer(options.max_nodes.saturating_add(1) as i64));
3375
3376        let mut stmt = self.conn.prepare(&sql)?;
3377        let mut nodes = vec![center.clone()];
3378        let mut edges = Vec::new();
3379        let mut total_discovered = 0usize;
3380
3381        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3382            let row_type: String = row.get(0)?;
3383            match row_type.as_str() {
3384                "meta" => Ok(QueryResult::Meta {
3385                    total: row.get::<_, i64>(1)? as usize,
3386                }),
3387                "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
3388                "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
3389                _ => Err(rusqlite::Error::InvalidQuery),
3390            }
3391        })?;
3392        for row_result in rows {
3393            match row_result? {
3394                QueryResult::Meta { total } => {
3395                    total_discovered = total;
3396                }
3397                QueryResult::Node(node) => {
3398                    if node.id != center_id {
3399                        nodes.push(node);
3400                    }
3401                }
3402                QueryResult::Edge(edge) => {
3403                    edges.push(edge);
3404                }
3405            }
3406        }
3407
3408        let total_discovered = total_discovered.max(nodes.len());
3409        let pruned_count = total_discovered.saturating_sub(nodes.len());
3410
3411        match options.property_mode {
3412            PropertyMode::Full => {}
3413            PropertyMode::Omit => {
3414                for n in &mut nodes {
3415                    n.properties.clear();
3416                }
3417                for e in &mut edges {
3418                    e.properties.clear();
3419                }
3420            }
3421            PropertyMode::Sample => {
3422                let mut seen_kinds = std::collections::BTreeSet::new();
3423                for n in &mut nodes {
3424                    if !seen_kinds.contains(&n.kind) {
3425                        seen_kinds.insert(n.kind.clone());
3426                    } else {
3427                        n.properties.clear();
3428                    }
3429                }
3430                for e in &mut edges {
3431                    e.properties.clear();
3432                }
3433            }
3434        }
3435
3436        Ok(Some(RankedNeighborhoodResult {
3437            nodes,
3438            edges,
3439            pruned_count,
3440            total_discovered,
3441        }))
3442    }
3443
3444    fn neighborhood(
3445        &self,
3446        center_id: &str,
3447        depth: usize,
3448        kind: Option<&str>,
3449    ) -> Result<Option<GraphSubgraph>> {
3450        self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
3451            .map(|result| {
3452                result.map(|result| {
3453                    GraphSubgraph {
3454                        nodes: result.nodes,
3455                        edges: result.edges,
3456                    }
3457                    .sorted()
3458                })
3459            })
3460    }
3461
3462    fn paged_neighborhood(
3463        &self,
3464        center_id: &str,
3465        depth: usize,
3466        kind: Option<&str>,
3467        options: GraphQueryOptions,
3468    ) -> Result<Option<GraphPagedSubgraph>> {
3469        if self.node(center_id)?.is_none() {
3470            return Ok(None);
3471        }
3472        let mut sql = String::from(
3473            r#"
3474            WITH RECURSIVE walk(id, depth) AS (
3475                SELECT ?, 0
3476                UNION
3477                SELECT e.to_id, walk.depth + 1
3478                FROM walk
3479                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3480                    ON e.from_id = walk.id
3481                WHERE walk.depth < ?
3482            "#,
3483        );
3484        let mut values = vec![
3485            Value::Text(center_id.to_string()),
3486            Value::Integer(depth as i64),
3487        ];
3488        if let Some(kind) = kind {
3489            sql.push_str(" AND e.kind = ?");
3490            values.push(Value::Text(kind.to_string()));
3491        }
3492        sql.push_str(
3493            r#"
3494            ),
3495            filtered_nodes AS (
3496            SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3497            FROM walk
3498            JOIN graph_nodes n ON n.id = walk.id
3499            WHERE 1 = 1
3500            "#,
3501        );
3502        push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
3503        if let Some(cursor) = &options.cursor {
3504            sql.push_str(" AND n.id > ?");
3505            values.push(Value::Text(cursor.clone()));
3506        }
3507        sql.push_str(
3508            r#"
3509            ),
3510            page_nodes AS (
3511                SELECT id, kind, label, properties_json, provenance_json, freshness_json
3512                FROM filtered_nodes
3513                ORDER BY id
3514            "#,
3515        );
3516        if let Some(limit) = options.limit {
3517            sql.push_str(" LIMIT ?");
3518            values.push(Value::Integer(limit.saturating_add(1) as i64));
3519        }
3520        sql.push_str(
3521            r#"
3522            ),
3523            walk_edges AS (
3524                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3525                FROM walk
3526                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3527                    ON e.from_id = walk.id
3528                WHERE walk.depth < ?
3529            "#,
3530        );
3531        values.push(Value::Integer(depth as i64));
3532        if let Some(kind) = kind {
3533            sql.push_str(" AND e.kind = ?");
3534            values.push(Value::Text(kind.to_string()));
3535        }
3536        sql.push_str(
3537            r#"
3538            )
3539            SELECT
3540                'node' AS row_type,
3541                p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
3542                NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
3543                NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
3544            FROM page_nodes p
3545            UNION ALL
3546            SELECT DISTINCT
3547                'edge' AS row_type,
3548                NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
3549                NULL AS provenance_json, NULL AS freshness_json,
3550                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3551            FROM walk_edges e
3552            WHERE e.from_id IN (SELECT id FROM page_nodes)
3553              AND e.to_id IN (SELECT id FROM page_nodes)
3554            "#,
3555        );
3556
3557        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3558        let mut stmt = self.conn.prepare(&sql)?;
3559        let mut nodes = Vec::new();
3560        let mut edges = Vec::new();
3561        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3562            let row_type: String = row.get(0)?;
3563            match row_type.as_str() {
3564                "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
3565                "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
3566                _ => Err(rusqlite::Error::InvalidQuery),
3567            }
3568        })?;
3569        for row in rows {
3570            let (node, edge) = row?;
3571            if let Some(node) = node {
3572                nodes.push(node);
3573            }
3574            if let Some(edge) = edge {
3575                edges.push(edge);
3576            }
3577        }
3578        nodes.sort_by(|left, right| left.id.cmp(&right.id));
3579        let before_limit = nodes.len();
3580        let mut next_cursor = None;
3581        if let Some(limit) = options.limit
3582            && nodes.len() > limit
3583        {
3584            next_cursor = nodes
3585                .get(limit.saturating_sub(1))
3586                .map(|node| node.id.clone());
3587            nodes.truncate(limit);
3588        }
3589        let node_ids = nodes
3590            .iter()
3591            .map(|node| node.id.as_str())
3592            .collect::<BTreeSet<_>>();
3593        edges.retain(|edge| {
3594            node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
3595        });
3596        edges.sort_by(|left, right| {
3597            left.from_id
3598                .cmp(&right.from_id)
3599                .then(left.kind.cmp(&right.kind))
3600                .then(left.to_id.cmp(&right.to_id))
3601        });
3602        let expected_indexes = if options.property_filters.is_empty() {
3603            vec!["idx_graph_edges_from_kind"]
3604        } else {
3605            vec![
3606                "idx_graph_edges_from_kind",
3607                "idx_graph_node_properties_key_value_node",
3608            ]
3609        };
3610        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3611        diagnostics.push(
3612            "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
3613        );
3614        if !options.property_filters.is_empty() {
3615            diagnostics.push(
3616                "property filters were evaluated by SQLite materialized property rows before paging"
3617                    .to_string(),
3618            );
3619        }
3620        if options.cursor.is_some() {
3621            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
3622        }
3623        if next_cursor.is_some() {
3624            diagnostics.push(
3625                "result was truncated; pass page.next_cursor as --cursor for the next page"
3626                    .to_string(),
3627            );
3628        }
3629        Ok(Some(GraphPagedSubgraph {
3630            page: GraphQueryPage {
3631                cursor: options.cursor,
3632                limit: options.limit,
3633                next_cursor,
3634                returned_nodes: nodes.len(),
3635                returned_edges: edges.len(),
3636                truncated: options.limit.is_some_and(|limit| before_limit > limit),
3637                diagnostics,
3638            },
3639            nodes,
3640            edges,
3641        }))
3642    }
3643
3644    fn shortest_path(
3645        &self,
3646        from_id: &str,
3647        to_id: &str,
3648        kind: Option<&str>,
3649    ) -> Result<Option<GraphPath>> {
3650        self.shortest_path_with_max_hops(from_id, to_id, kind, None)
3651    }
3652
3653    fn shortest_path_with_max_hops(
3654        &self,
3655        from_id: &str,
3656        to_id: &str,
3657        kind: Option<&str>,
3658        max_hops: Option<usize>,
3659    ) -> Result<Option<GraphPath>> {
3660        if from_id == to_id {
3661            return Ok(Some(GraphPath {
3662                nodes: vec![from_id.to_string()],
3663                hops: 0,
3664            }));
3665        }
3666        let hop_limit = max_hops.unwrap_or(usize::MAX);
3667        if hop_limit == 0 {
3668            return Ok(None);
3669        }
3670
3671        self.assert_not_in_temp_table_section();
3672        self.temp_table_active.set(true);
3673        let result = (|| -> Result<Option<GraphPath>> {
3674            let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
3675            let tbl = format!("_tsift_frontier_{call_id}");
3676
3677            let mut visited = BTreeSet::from([from_id.to_string()]);
3678            let mut parent =
3679                BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
3680            let mut frontier = vec![from_id.to_string()];
3681            self.conn.execute_batch(&format!(
3682                r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
3683               DELETE FROM "{tbl}";"#,
3684            ))?;
3685            let select_sql = if kind.is_some() {
3686                format!(
3687                    r#"SELECT e.from_id, e.to_id
3688                   FROM "{tbl}" f
3689                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3690                       ON e.from_id = f.id
3691                   WHERE e.kind = ?
3692                   ORDER BY e.from_id, e.to_id, e.kind"#,
3693                )
3694            } else {
3695                format!(
3696                    r#"SELECT e.from_id, e.to_id
3697                   FROM "{tbl}" f
3698                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3699                       ON e.from_id = f.id
3700                   ORDER BY e.from_id, e.to_id, e.kind"#,
3701                )
3702            };
3703            let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3704            let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3705            let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3706            let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3707            let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3708            let mut found_path: Option<GraphPath> = None;
3709            for _depth in 0..hop_limit {
3710                if frontier.is_empty() {
3711                    break;
3712                }
3713                self.conn.execute(&delete_sql, [])?;
3714                for id in &frontier {
3715                    frontier_insert_stmt.execute([id.as_str()])?;
3716                }
3717                let mut next_frontier = BTreeSet::new();
3718                let rows = if let Some(kind) = kind {
3719                    collect_rows(frontier_select_stmt.query_map([kind], |row| {
3720                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3721                    })?)?
3722                } else {
3723                    collect_rows(frontier_select_stmt.query_map([], |row| {
3724                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3725                    })?)?
3726                };
3727                for (from, next) in rows {
3728                    if !visited.insert(next.clone()) {
3729                        continue;
3730                    }
3731                    parent.insert(next.clone(), from);
3732                    if next == to_id {
3733                        let mut nodes = vec![to_id.to_string()];
3734                        let mut cursor = to_id;
3735                        while let Some(previous) = parent.get(cursor) {
3736                            if previous.is_empty() {
3737                                break;
3738                            }
3739                            nodes.push(previous.clone());
3740                            cursor = previous;
3741                        }
3742                        nodes.reverse();
3743                        found_path = Some(GraphPath {
3744                            hops: nodes.len().saturating_sub(1),
3745                            nodes,
3746                        });
3747                        break;
3748                    }
3749                    next_frontier.insert(next);
3750                }
3751                if found_path.is_some() {
3752                    break;
3753                }
3754                frontier = next_frontier.into_iter().collect();
3755            }
3756            let _ = self.conn.execute_batch(&drop_sql);
3757            Ok(found_path)
3758        })();
3759        self.temp_table_active.set(false);
3760        result
3761    }
3762
3763    fn reachable_nodes_by_kind(
3764        &self,
3765        from_id: &str,
3766        kind: &str,
3767        depth: usize,
3768        limit: usize,
3769    ) -> Result<Vec<(GraphNode, GraphPath)>> {
3770        Ok(self
3771            .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3772            .remove(kind)
3773            .unwrap_or_default())
3774    }
3775
3776    fn reachable_nodes_by_kinds(
3777        &self,
3778        from_id: &str,
3779        kinds: &[&str],
3780        depth: usize,
3781        limit: usize,
3782    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3783        let mut requested = kinds
3784            .iter()
3785            .map(|kind| (*kind).to_string())
3786            .collect::<BTreeSet<_>>()
3787            .into_iter()
3788            .collect::<Vec<_>>();
3789        let mut results = requested
3790            .iter()
3791            .map(|kind| (kind.clone(), Vec::new()))
3792            .collect::<BTreeMap<_, _>>();
3793        if requested.is_empty() {
3794            return Ok(results);
3795        }
3796        requested.sort();
3797        let placeholders = std::iter::repeat_n("?", requested.len())
3798            .collect::<Vec<_>>()
3799            .join(", ");
3800        let mut sql = format!(
3801            r#"
3802            WITH RECURSIVE walk(id, depth, path) AS (
3803                SELECT ?, 0, char(31) || ? || char(31)
3804                UNION ALL
3805                SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3806                FROM walk
3807                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3808                    ON e.from_id = walk.id
3809                WHERE walk.depth < ?
3810                  AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3811            ),
3812            ranked AS (
3813                SELECT
3814                    n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3815                    walk.path, walk.depth,
3816                    ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3817                FROM walk
3818                JOIN graph_nodes n ON n.id = walk.id
3819                WHERE n.kind IN ({placeholders}) AND n.id <> ?
3820            ),
3821            kind_ranked AS (
3822                SELECT *,
3823                    ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3824                FROM ranked
3825                WHERE rn = 1
3826            )
3827            SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3828            FROM kind_ranked
3829            "#,
3830        );
3831        let mut values = vec![
3832            Value::Text(from_id.to_string()),
3833            Value::Text(from_id.to_string()),
3834            Value::Integer(depth as i64),
3835        ];
3836        values.extend(requested.iter().cloned().map(Value::Text));
3837        values.push(Value::Text(from_id.to_string()));
3838        if limit > 0 && limit != usize::MAX {
3839            sql.push_str(" WHERE kind_rank <= ?");
3840            values.push(Value::Integer(limit as i64));
3841        }
3842        sql.push_str(" ORDER BY kind, depth, label, id");
3843        let mut stmt = self.conn.prepare(&sql)?;
3844        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3845            let node = node_from_row(row)?;
3846            let path: String = row.get(6)?;
3847            let hops = row_usize(row, 7)?;
3848            Ok((
3849                node,
3850                GraphPath {
3851                    nodes: path
3852                        .split('\u{1f}')
3853                        .filter(|part| !part.is_empty())
3854                        .map(str::to_string)
3855                        .collect(),
3856                    hops,
3857                },
3858            ))
3859        })?)?;
3860        for (node, path) in rows {
3861            results
3862                .entry(node.kind.clone())
3863                .or_default()
3864                .push((node, path));
3865        }
3866        Ok(results)
3867    }
3868
3869    fn evidence_target_candidates(
3870        &self,
3871        target: &str,
3872        kinds: &[&str],
3873        preferred_path: Option<&str>,
3874    ) -> Result<Vec<GraphNode>> {
3875        if kinds.is_empty() {
3876            return Ok(Vec::new());
3877        }
3878
3879        let normalized = target.trim().trim_start_matches('#');
3880        let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3881            .collect::<Vec<_>>()
3882            .join(", ");
3883        let kind_rank = kinds
3884            .iter()
3885            .enumerate()
3886            .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3887            .collect::<Vec<_>>()
3888            .join(" ");
3889        let path_filter = if preferred_path.is_some() {
3890            r#"
3891AND EXISTS (
3892    SELECT 1
3893    FROM graph_node_properties p_path INDEXED BY idx_graph_node_properties_key_value_node
3894    WHERE p_path.node_id = n.id
3895      AND p_path.key = 'path'
3896      AND p_path.value = ?
3897)
3898"#
3899        } else {
3900            ""
3901        };
3902        let sql = format!(
3903            r#"
3904SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3905FROM graph_nodes n
3906WHERE n.kind IN ({kind_placeholders})
3907              AND (
3908                EXISTS (
3909                    SELECT 1
3910                    FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3911                    WHERE p_handle.node_id = n.id
3912                      AND p_handle.key = 'handle'
3913                      AND p_handle.value = ?
3914                )
3915                OR EXISTS (
3916                    SELECT 1
3917                    FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3918                    WHERE p_ref.node_id = n.id
3919                      AND p_ref.key = 'ref_id'
3920                      AND p_ref.value = ?
3921)
3922OR n.label = ?
3923OR n.label = ?
3924)
3925{path_filter}
3926ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3927"#
3928        );
3929        let mut values = kinds
3930            .iter()
3931            .map(|kind| Value::Text((*kind).to_string()))
3932            .collect::<Vec<_>>();
3933        values.push(Value::Text(target.to_string()));
3934        values.push(Value::Text(normalized.to_string()));
3935        values.push(Value::Text(target.to_string()));
3936        values.push(Value::Text(format!("#{normalized}")));
3937        if let Some(path) = preferred_path {
3938            values.push(Value::Text(path.to_string()));
3939        }
3940        values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3941        let mut stmt = self.conn.prepare(&sql)?;
3942        collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)
3943    }
3944
3945    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3946        if let Some(node) = self.node(target)? {
3947            return Ok(Some(node));
3948        }
3949        Ok(self
3950            .evidence_target_candidates(target, kinds, None)?
3951            .into_iter()
3952            .next())
3953    }
3954}
3955
3956fn to_json<T: Serialize>(value: &T) -> Result<String> {
3957    serde_json::to_string(value).map_err(Into::into)
3958}
3959
3960fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3961    let payload = serde_json::to_vec(value)?;
3962    Ok(blake3::hash(&payload).to_hex().to_string())
3963}
3964
3965fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3966    value.as_ref().map(to_json).transpose()
3967}
3968
3969fn collect_rows<T>(
3970    rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3971) -> Result<Vec<T>> {
3972    rows.collect::<std::result::Result<Vec<_>, _>>()
3973        .map_err(Into::into)
3974}
3975
3976enum QueryResult {
3977    Meta { total: usize },
3978    Node(GraphNode),
3979    Edge(GraphEdge),
3980}
3981
3982fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3983    let properties_col = offset + 3;
3984    let provenance_col = offset + 4;
3985    let freshness_col = offset + 5;
3986    let properties_json: String = row.get(properties_col)?;
3987    let provenance_json: String = row.get(provenance_col)?;
3988    let freshness_json: Option<String> = row.get(freshness_col)?;
3989    Ok(GraphNode {
3990        id: row.get(offset)?,
3991        kind: row.get(offset + 1)?,
3992        label: row.get(offset + 2)?,
3993        properties: from_json(properties_col, &properties_json)?,
3994        provenance: from_json(provenance_col, &provenance_json)?,
3995        freshness: optional_from_json(freshness_col, freshness_json)?,
3996    })
3997}
3998
3999fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
4000    node_from_row_at(row, 0)
4001}
4002
4003fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
4004    let properties_col = offset + 4;
4005    let provenance_col = offset + 5;
4006    let freshness_col = offset + 6;
4007    let properties_json: String = row.get(properties_col)?;
4008    let provenance_json: String = row.get(provenance_col)?;
4009    let freshness_json: Option<String> = row.get(freshness_col)?;
4010    Ok(GraphEdge {
4011        id: row.get(offset)?,
4012        from_id: row.get(offset + 1)?,
4013        to_id: row.get(offset + 2)?,
4014        kind: row.get(offset + 3)?,
4015        properties: from_json(properties_col, &properties_json)?,
4016        provenance: from_json(provenance_col, &provenance_json)?,
4017        freshness: optional_from_json(freshness_col, freshness_json)?,
4018    })
4019}
4020
4021fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
4022    edge_from_row_at(row, 0)
4023}
4024
4025fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
4026    serde_json::from_str(raw)
4027        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
4028}
4029
4030fn optional_from_json<T: DeserializeOwned>(
4031    column: usize,
4032    raw: Option<String>,
4033) -> rusqlite::Result<Option<T>> {
4034    raw.map(|value| from_json(column, &value)).transpose()
4035}
4036
4037fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
4038    nodes
4039        .iter()
4040        .find(|node| node.kind == "projection_meta")
4041        .and_then(|node| node.properties.get("projection_version").cloned())
4042}
4043
4044fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
4045    nodes
4046        .iter()
4047        .find(|node| node.kind == "projection_meta")
4048        .and_then(|node| node.properties.get("content_hash").cloned())
4049}
4050
4051fn unix_now() -> i64 {
4052    std::time::SystemTime::now()
4053        .duration_since(std::time::UNIX_EPOCH)
4054        .map(|duration| duration.as_secs() as i64)
4055        .unwrap_or_default()
4056}
4057
4058fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
4059    let page_count = conn.query_row("PRAGMA page_count", [], |row| row_u64(row, 0))?;
4060    let page_size = conn.query_row("PRAGMA page_size", [], |row| row_u64(row, 0))?;
4061    Ok(page_count.saturating_mul(page_size))
4062}
4063
4064fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
4065    let freelist_count = conn.query_row("PRAGMA freelist_count", [], |row| row_u64(row, 0))?;
4066    let page_size = conn.query_row("PRAGMA page_size", [], |row| row_u64(row, 0))?;
4067    Ok(freelist_count.saturating_mul(page_size))
4068}
4069
4070#[cfg(test)]
4071mod tests {
4072    use super::*;
4073
4074    fn sample_provenance() -> GraphProvenance {
4075        GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
4076    }
4077
4078    fn sample_projection() -> GraphProjection {
4079        let source = sample_provenance();
4080        GraphProjection {
4081            nodes: vec![
4082                GraphNode::new("doc:livekit", "document", "LiveKit guide")
4083                    .with_property("domain", "livekit")
4084                    .with_provenance(source.clone())
4085                    .with_freshness(GraphFreshness::content_hash("node-hash")),
4086                GraphNode::new("topic:rooms", "topic", "Rooms"),
4087                GraphNode::new("topic:egress", "topic", "Egress"),
4088            ],
4089            edges: vec![
4090                GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4091                    .with_property("confidence", "0.91")
4092                    .with_provenance(source.clone())
4093                    .with_freshness(GraphFreshness::content_hash("edge-hash")),
4094                GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
4095            ],
4096        }
4097    }
4098
4099    fn assert_projection_store_contract(store: &impl GraphStore) {
4100        let projection = sample_projection();
4101        projection.upsert_into(store).unwrap();
4102
4103        assert_eq!(
4104            store.node("doc:livekit").unwrap(),
4105            projection
4106                .nodes
4107                .iter()
4108                .find(|node| node.id == "doc:livekit")
4109                .cloned()
4110        );
4111        assert_eq!(
4112            store.nodes_by_kind("topic").unwrap(),
4113            vec![
4114                GraphNode::new("topic:egress", "topic", "Egress"),
4115                GraphNode::new("topic:rooms", "topic", "Rooms"),
4116            ]
4117        );
4118
4119        let mentions = store
4120            .outgoing_edges("doc:livekit", Some("mentions"))
4121            .unwrap();
4122        assert_eq!(mentions.len(), 1);
4123        assert_eq!(mentions[0].to_id, "topic:rooms");
4124        assert_eq!(
4125            mentions[0].properties.get("confidence"),
4126            Some(&"0.91".into())
4127        );
4128
4129        let path = store
4130            .shortest_path("doc:livekit", "topic:egress", None)
4131            .unwrap()
4132            .unwrap();
4133        assert_eq!(
4134            path.nodes,
4135            vec!["doc:livekit", "topic:rooms", "topic:egress"]
4136        );
4137    }
4138
4139    #[test]
4140    fn sqlite_store_round_trips_generic_nodes_edges() {
4141        let store = SqliteGraphStore::in_memory().unwrap();
4142        let source = sample_provenance();
4143        let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4144            .with_property("domain", "livekit")
4145            .with_provenance(source.clone())
4146            .with_freshness(GraphFreshness::content_hash("node-hash"));
4147        let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
4148        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4149            .with_property("confidence", "0.91")
4150            .with_provenance(source)
4151            .with_freshness(GraphFreshness::content_hash("edge-hash"));
4152
4153        store.upsert_node(&node).unwrap();
4154        store.upsert_node(&topic).unwrap();
4155        store.upsert_edge(&edge).unwrap();
4156
4157        assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
4158        assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
4159        assert_eq!(store.all_nodes().unwrap().len(), 2);
4160        assert_eq!(store.all_edges().unwrap().len(), 1);
4161        assert_eq!(
4162            store
4163                .outgoing_edges("doc:livekit", Some("mentions"))
4164                .unwrap(),
4165            vec![edge]
4166        );
4167    }
4168
4169    #[test]
4170    fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
4171        let store = SqliteGraphStore::in_memory().unwrap();
4172        for node in [
4173            GraphNode::new("doc:livekit", "document", "LiveKit guide"),
4174            GraphNode::new("topic:rooms", "topic", "Rooms"),
4175            GraphNode::new("topic:egress", "topic", "Egress"),
4176        ] {
4177            store.upsert_node(&node).unwrap();
4178        }
4179        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4180            .with_property("confidence", "0.91");
4181        let edge_id = edge.id.clone();
4182        store.upsert_edge(&edge).unwrap();
4183        store
4184            .upsert_edge(
4185                &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
4186                    .with_property("confidence", "0.42"),
4187            )
4188            .unwrap();
4189
4190        assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
4191        let mut expected_incident_ids = vec![
4192            GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
4193            GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
4194        ];
4195        expected_incident_ids.sort();
4196        assert_eq!(
4197            store
4198                .incident_edges("topic:rooms", None)
4199                .unwrap()
4200                .into_iter()
4201                .map(|edge| edge.id)
4202                .collect::<Vec<_>>(),
4203            expected_incident_ids
4204        );
4205
4206        let page = store
4207            .paged_edges(
4208                Some("mentions"),
4209                GraphQueryOptions {
4210                    property_filters: vec![GraphPropertyFilter {
4211                        key: "confidence".to_string(),
4212                        value: "0.91".to_string(),
4213                    }],
4214                    ..GraphQueryOptions::default()
4215                },
4216            )
4217            .unwrap();
4218        assert_eq!(page.edges.len(), 1);
4219        assert_eq!(page.edges[0].id, edge_id);
4220        assert!(
4221            page.page
4222                .diagnostics
4223                .iter()
4224                .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
4225            "{:?}",
4226            page.page.diagnostics
4227        );
4228        assert!(
4229            page.page
4230                .diagnostics
4231                .iter()
4232                .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
4233            "{:?}",
4234            page.page.diagnostics
4235        );
4236        assert!(
4237            page.page.diagnostics.iter().any(|diagnostic| diagnostic
4238                .contains("edge property primary filter matched 1 materialized row")),
4239            "{:?}",
4240            page.page.diagnostics
4241        );
4242        assert!(
4243            page.page
4244                .diagnostics
4245                .iter()
4246                .any(|diagnostic| diagnostic
4247                    .contains("drives from SQLite materialized property rows")),
4248            "{:?}",
4249            page.page.diagnostics
4250        );
4251
4252        let property_rows: usize = store
4253            .conn
4254            .query_row(
4255                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4256                [],
4257                |row| row_usize(row, 0),
4258            )
4259            .unwrap();
4260        assert_eq!(property_rows, 2);
4261    }
4262
4263    #[test]
4264    fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
4265        let sqlite = SqliteGraphStore::in_memory().unwrap();
4266        assert_projection_store_contract(&sqlite);
4267    }
4268
4269    #[test]
4270    fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
4271        fn assert_crud_contract(store: &impl GraphStore) {
4272            let projection = sample_projection();
4273            projection.upsert_into(store).unwrap();
4274
4275            let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
4276            assert_eq!(
4277                neighborhood
4278                    .nodes
4279                    .iter()
4280                    .map(|node| node.id.as_str())
4281                    .collect::<Vec<_>>(),
4282                vec!["doc:livekit", "topic:egress", "topic:rooms"]
4283            );
4284            assert_eq!(
4285                neighborhood
4286                    .edges
4287                    .iter()
4288                    .map(|edge| (
4289                        edge.from_id.as_str(),
4290                        edge.kind.as_str(),
4291                        edge.to_id.as_str()
4292                    ))
4293                    .collect::<Vec<_>>(),
4294                vec![
4295                    ("doc:livekit", "mentions", "topic:rooms"),
4296                    ("topic:rooms", "related_to", "topic:egress"),
4297                ]
4298            );
4299
4300            assert_eq!(
4301                store
4302                    .delete_edge("topic:rooms", "topic:egress", "related_to")
4303                    .unwrap(),
4304                1
4305            );
4306            assert!(
4307                store
4308                    .shortest_path("doc:livekit", "topic:egress", None)
4309                    .unwrap()
4310                    .is_none()
4311            );
4312            assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
4313            assert!(store.node("topic:rooms").unwrap().is_none());
4314            assert!(
4315                store
4316                    .outgoing_edges("doc:livekit", None)
4317                    .unwrap()
4318                    .is_empty()
4319            );
4320        }
4321
4322        assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
4323    }
4324
4325    #[test]
4326    fn sqlite_ranked_neighborhood_prefers_recent_memory_nodes_when_pruning() {
4327        let store = SqliteGraphStore::in_memory().unwrap();
4328        store
4329            .upsert_node(&GraphNode::new("center", "file", "center"))
4330            .unwrap();
4331        store
4332            .upsert_node(&GraphNode::new("aaa-code", "symbol", "code candidate"))
4333            .unwrap();
4334        store
4335            .upsert_node(
4336                &GraphNode::new("mmm-stale", "memory_event", "stale memory")
4337                    .with_property("provider", "tsift-memory")
4338                    .with_property("observed_at_unix", "1000"),
4339            )
4340            .unwrap();
4341        store
4342            .upsert_node(
4343                &GraphNode::new("zzz-fresh", "memory_event", "fresh memory")
4344                    .with_property("provider", "tsift-memory")
4345                    .with_property("observed_at_unix", "1995"),
4346            )
4347            .unwrap();
4348        store
4349            .upsert_edge(&GraphEdge::new("center", "aaa-code", "mentions"))
4350            .unwrap();
4351        store
4352            .upsert_edge(&GraphEdge::new("center", "mmm-stale", "mentions"))
4353            .unwrap();
4354        store
4355            .upsert_edge(&GraphEdge::new("center", "zzz-fresh", "mentions"))
4356            .unwrap();
4357
4358        let options = RankedNeighborhoodOptions::new(1, 1)
4359            .with_observed_at_now_unix(2000)
4360            .with_observed_at_half_life_secs(100);
4361        let result = store
4362            .ranked_neighborhood("center", &options)
4363            .unwrap()
4364            .unwrap();
4365        let ids: Vec<_> = result.nodes.iter().map(|node| node.id.as_str()).collect();
4366        assert!(ids.contains(&"center"));
4367        assert!(
4368            ids.contains(&"zzz-fresh"),
4369            "fresh memory node should survive pruning: {ids:?}"
4370        );
4371        assert!(!ids.contains(&"aaa-code"));
4372        assert!(!ids.contains(&"mmm-stale"));
4373    }
4374
4375    #[test]
4376    fn sqlite_semantic_seeded_neighborhood_scores_before_sql_edge_cap() {
4377        let store = SqliteGraphStore::in_memory().unwrap();
4378        store
4379            .upsert_node(&GraphNode::new("seed", "semantic_concept", "graph budget"))
4380            .unwrap();
4381        store
4382            .upsert_node(&GraphNode::new("zzz_high", "symbol", "high_signal"))
4383            .unwrap();
4384        store
4385            .upsert_edge(&GraphEdge::new("zzz_high", "seed", "mentions_concept"))
4386            .unwrap();
4387        for idx in 0..24 {
4388            let id = format!("aaa_low_{idx:02}");
4389            store
4390                .upsert_node(&GraphNode::new(id.clone(), "note", format!("low {idx}")))
4391                .unwrap();
4392            store
4393                .upsert_edge(&GraphEdge::new(id, "seed", "weak_link"))
4394                .unwrap();
4395        }
4396
4397        let options = SemanticSeededNeighborhoodOptions::new(1, 3)
4398            .with_edge_scan_cap(16)
4399            .with_node_discovery_cap(9);
4400        let result = store
4401            .semantic_seeded_neighborhood(&["seed".to_string()], &options)
4402            .unwrap();
4403        let ids = result
4404            .nodes
4405            .iter()
4406            .map(|node| node.id.as_str())
4407            .collect::<Vec<_>>();
4408
4409        assert_eq!(ids.len(), 3);
4410        assert_eq!(ids[0], "seed");
4411        assert_eq!(ids[1], "zzz_high");
4412        assert_eq!(result.skipped_by_edge_cap, 9);
4413        assert!(result.truncated);
4414    }
4415
4416    #[test]
4417    fn sqlite_upsert_projection_batches_rows_and_properties() {
4418        let mut store = SqliteGraphStore::in_memory().unwrap();
4419        let mut projection = sample_projection();
4420        store.upsert_projection(&projection).unwrap();
4421
4422        let page = store
4423            .paged_nodes_by_kind(
4424                "document",
4425                GraphQueryOptions {
4426                    property_filters: vec![GraphPropertyFilter {
4427                        key: "domain".to_string(),
4428                        value: "livekit".to_string(),
4429                    }],
4430                    ..GraphQueryOptions::default()
4431                },
4432            )
4433            .unwrap();
4434        assert_eq!(page.nodes[0].id, "doc:livekit");
4435
4436        projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4437            .with_property("domain", "recording");
4438        store.upsert_projection(&projection).unwrap();
4439
4440        let old_property_count: usize = store
4441            .conn
4442            .query_row(
4443                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
4444                [],
4445                |row| row_usize(row, 0),
4446            )
4447            .unwrap();
4448        let updated_page = store
4449            .paged_nodes_by_kind(
4450                "document",
4451                GraphQueryOptions {
4452                    property_filters: vec![GraphPropertyFilter {
4453                        key: "domain".to_string(),
4454                        value: "recording".to_string(),
4455                    }],
4456                    ..GraphQueryOptions::default()
4457                },
4458            )
4459            .unwrap();
4460        assert_eq!(old_property_count, 0);
4461        assert_eq!(updated_page.nodes[0].id, "doc:livekit");
4462        let edge_property_count: usize = store
4463            .conn
4464            .query_row(
4465                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4466                [],
4467                |row| row_usize(row, 0),
4468            )
4469            .unwrap();
4470        assert_eq!(edge_property_count, 1);
4471
4472        let changes_before = store.conn.total_changes();
4473        store.upsert_projection(&projection).unwrap();
4474        assert_eq!(
4475            store.conn.total_changes(),
4476            changes_before,
4477            "unchanged projection rows should not rewrite graph rows or materialized properties"
4478        );
4479    }
4480
4481    #[test]
4482    fn sqlite_semantic_top_candidates_use_materialized_vector_table() {
4483        let store = SqliteGraphStore::in_memory().unwrap();
4484        store
4485            .upsert_node(
4486                &GraphNode::new("concept:graph", "semantic_concept", "graph navigation")
4487                    .with_property("embedding_model", "fixture-v1")
4488                    .with_property("embedding", "1.0,0.0"),
4489            )
4490            .unwrap();
4491        store
4492            .upsert_node(
4493                &GraphNode::new("concept:sqlite", "semantic_concept", "sqlite search")
4494                    .with_property("embedding", "0.0,1.0"),
4495            )
4496            .unwrap();
4497        store
4498            .upsert_node(&GraphNode::new("entity:skip", "semantic_entity", "skipped"))
4499            .unwrap();
4500
4501        let vector_rows: usize = store
4502            .conn
4503            .query_row(
4504                "SELECT COUNT(*) FROM graph_node_semantic_vectors",
4505                [],
4506                |row| row_usize(row, 0),
4507            )
4508            .unwrap();
4509        assert_eq!(vector_rows, 2);
4510
4511        let candidates = store
4512            .semantic_top_candidates(&[1.0, 0.0], &["semantic_concept"], 1)
4513            .unwrap();
4514        assert_eq!(candidates.len(), 1);
4515        assert_eq!(candidates[0].node.id, "concept:graph");
4516        assert_eq!(candidates[0].score, 1.0);
4517
4518        store
4519            .upsert_node(&GraphNode::new(
4520                "concept:graph",
4521                "semantic_concept",
4522                "graph navigation",
4523            ))
4524            .unwrap();
4525        let vector_rows_after_update: usize = store
4526            .conn
4527            .query_row(
4528                "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE node_id = 'concept:graph'",
4529                [],
4530                |row| row_usize(row, 0),
4531            )
4532            .unwrap();
4533        assert_eq!(vector_rows_after_update, 0);
4534    }
4535
4536    #[test]
4537    fn sqlite_store_filters_edges_by_kind_and_paths() {
4538        let store = SqliteGraphStore::in_memory().unwrap();
4539        for id in ["a", "b", "c"] {
4540            store
4541                .upsert_node(&GraphNode::new(id, "symbol", id))
4542                .unwrap();
4543        }
4544        store
4545            .upsert_edge(&GraphEdge::new("a", "b", "calls"))
4546            .unwrap();
4547        store
4548            .upsert_edge(&GraphEdge::new("a", "c", "documents"))
4549            .unwrap();
4550        store
4551            .upsert_edge(&GraphEdge::new("b", "c", "calls"))
4552            .unwrap();
4553
4554        let calls = store.outgoing_edges("a", Some("calls")).unwrap();
4555        assert_eq!(calls.len(), 1);
4556        assert_eq!(calls[0].to_id, "b");
4557        assert_eq!(store.graph_counts().unwrap(), (3, 3));
4558        assert_eq!(
4559            store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
4560            "b"
4561        );
4562
4563        let path = store
4564            .shortest_path("a", "c", Some("calls"))
4565            .unwrap()
4566            .unwrap();
4567        assert_eq!(path.nodes, vec!["a", "b", "c"]);
4568        assert_eq!(path.hops, 2);
4569
4570        assert!(
4571            store
4572                .shortest_path("c", "a", Some("calls"))
4573                .unwrap()
4574                .is_none()
4575        );
4576    }
4577
4578    #[test]
4579    fn sqlite_store_batches_edges_between_node_sets() {
4580        let store = SqliteGraphStore::in_memory().unwrap();
4581        for id in ["a", "b", "c", "outside"] {
4582            store
4583                .upsert_node(&GraphNode::new(id, "symbol", id))
4584                .unwrap();
4585        }
4586        for edge in [
4587            GraphEdge::new("a", "b", "calls"),
4588            GraphEdge::new("b", "c", "calls"),
4589            GraphEdge::new("a", "outside", "calls"),
4590            GraphEdge::new("outside", "c", "calls"),
4591        ] {
4592            store.upsert_edge(&edge).unwrap();
4593        }
4594
4595        let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
4596            .into_iter()
4597            .collect::<BTreeSet<_>>();
4598        let edge_keys = store
4599            .edges_between_nodes(&scoped)
4600            .unwrap()
4601            .into_iter()
4602            .map(|edge| (edge.from_id, edge.kind, edge.to_id))
4603            .collect::<Vec<_>>();
4604
4605        assert_eq!(
4606            edge_keys,
4607            vec![
4608                ("a".to_string(), "calls".to_string(), "b".to_string()),
4609                ("b".to_string(), "calls".to_string(), "c".to_string()),
4610            ]
4611        );
4612    }
4613
4614    #[test]
4615    fn wal_checkpoint_outcome_is_recorded_not_swallowed() {
4616        let dir = tempfile::tempdir().unwrap();
4617        let db_path = dir.path().join("graph.db");
4618        let mut store = SqliteGraphStore::open(&db_path).unwrap();
4619        let refresh = store
4620            .replace_projection_with_version("root", &sample_projection(), Some("v1"), None)
4621            .unwrap();
4622        // With no concurrent reader the TRUNCATE checkpoint succeeds and the
4623        // outcome is recorded as a phase instead of being silently discarded.
4624        assert!(
4625            refresh
4626                .phase_timings
4627                .iter()
4628                .any(|phase| phase.name == "wal_checkpoint:ok"),
4629            "expected a wal_checkpoint:ok phase: {:?}",
4630            refresh
4631                .phase_timings
4632                .iter()
4633                .map(|phase| &phase.name)
4634                .collect::<Vec<_>>()
4635        );
4636    }
4637
4638    #[test]
4639    fn wal_checkpoint_records_busy_when_a_reader_blocks_truncate() {
4640        let dir = tempfile::tempdir().unwrap();
4641        let db_path = dir.path().join("graph.db");
4642        let mut store = SqliteGraphStore::open(&db_path).unwrap();
4643        store
4644            .replace_projection_with_version("root", &sample_projection(), Some("v1"), None)
4645            .unwrap();
4646
4647        // A second connection holds an open read transaction, pinning the WAL so
4648        // a TRUNCATE checkpoint cannot reset it (busy) — the path that previously
4649        // grew the -wal file silently (#gdbwalcheckpoint).
4650        let reader = rusqlite::Connection::open(&db_path).unwrap();
4651        reader.execute_batch("BEGIN").unwrap();
4652        let _pinned: i64 = reader
4653            .query_row("SELECT count(*) FROM graph_operator_stats", [], |row| {
4654                row.get(0)
4655            })
4656            .unwrap();
4657
4658        let mut projection = sample_projection();
4659        projection
4660            .nodes
4661            .push(GraphNode::new("topic:extra", "topic", "Extra"));
4662        let refresh = store
4663            .replace_projection_with_version("root", &projection, Some("v2"), None)
4664            .unwrap();
4665
4666        assert!(
4667            refresh
4668                .phase_timings
4669                .iter()
4670                .any(|phase| phase.name == "wal_checkpoint:busy"),
4671            "expected a wal_checkpoint:busy phase while a reader holds the WAL: {:?}",
4672            refresh
4673                .phase_timings
4674                .iter()
4675                .map(|phase| &phase.name)
4676                .collect::<Vec<_>>()
4677        );
4678        drop(reader);
4679    }
4680
4681    #[test]
4682    fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
4683        let mut store = SqliteGraphStore::in_memory().unwrap();
4684        let mut projection = sample_projection();
4685        projection.nodes.push(
4686            GraphNode::new(
4687                "projection:fixture",
4688                "projection_meta",
4689                "fixture projection",
4690            )
4691            .with_property("projection_version", "fixture-v1")
4692            .with_property("content_hash", "hash-a"),
4693        );
4694        store
4695            .replace_projection_with_version(
4696                "root",
4697                &projection,
4698                Some("fixture-v1"),
4699                Some("commit-a".to_string()),
4700            )
4701            .unwrap();
4702
4703        projection.nodes.retain(|node| node.id != "topic:egress");
4704        projection.edges.retain(|edge| edge.to_id != "topic:egress");
4705        let refresh = store
4706            .replace_projection_with_version(
4707                "root",
4708                &projection,
4709                Some("fixture-v2"),
4710                Some("commit-b".to_string()),
4711            )
4712            .unwrap();
4713
4714        assert_eq!(refresh.projection_version, "fixture-v2");
4715        assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
4716        assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
4717        assert_eq!(refresh.tombstoned_edges.len(), 1);
4718        assert_eq!(refresh.deleted_nodes, 1);
4719        assert_eq!(refresh.deleted_edges, 1);
4720        assert_eq!(refresh.unchanged_nodes, 3);
4721        assert_eq!(refresh.upserted_nodes, 0);
4722        assert_eq!(refresh.unchanged_properties, 4);
4723        assert_eq!(refresh.upserted_properties, 0);
4724        assert_eq!(refresh.deleted_properties, 0);
4725        assert!(
4726            refresh
4727                .phase_timings
4728                .iter()
4729                .any(|phase| phase.name == "sqlite_property_row_staging"),
4730            "{:?}",
4731            refresh.phase_timings
4732        );
4733        assert!(
4734            refresh
4735                .phase_timings
4736                .iter()
4737                .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
4738            "{:?}",
4739            refresh.phase_timings
4740        );
4741        let version = store.projection_version("root").unwrap().unwrap();
4742        assert_eq!(version.projection_version, "fixture-v2");
4743        assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
4744        let cached_counts: (usize, usize, usize, usize) = store
4745            .conn
4746            .query_row(
4747                r#"
4748                SELECT nodes, edges, tombstone_nodes, tombstone_edges
4749                FROM graph_operator_stats
4750                WHERE scope = 'root'
4751                "#,
4752                [],
4753                |row| {
4754                    Ok((
4755                        row_usize(row, 0)?,
4756                        row_usize(row, 1)?,
4757                        row_usize(row, 2)?,
4758                        row_usize(row, 3)?,
4759                    ))
4760                },
4761            )
4762            .unwrap();
4763        assert_eq!(cached_counts, (3, 1, 1, 1));
4764
4765        projection
4766            .nodes
4767            .push(GraphNode::new("topic:egress", "topic", "Egress"));
4768        let refresh = store
4769            .replace_projection_with_version(
4770                "root",
4771                &projection,
4772                Some("fixture-v3"),
4773                Some("commit-c".to_string()),
4774            )
4775            .unwrap();
4776        assert_eq!(refresh.pruned_tombstones, 1);
4777        assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
4778
4779        projection.nodes.retain(|node| node.id != "topic:egress");
4780        store
4781            .replace_projection_with_version(
4782                "root",
4783                &projection,
4784                Some("fixture-v4"),
4785                Some("commit-d".to_string()),
4786            )
4787            .unwrap();
4788        assert_eq!(store.compact_storage("root", true).unwrap(), 2);
4789        let cached_counts: (usize, usize, usize, usize) = store
4790            .conn
4791            .query_row(
4792                r#"
4793                SELECT nodes, edges, tombstone_nodes, tombstone_edges
4794                FROM graph_operator_stats
4795                WHERE scope = 'root'
4796                "#,
4797                [],
4798                |row| {
4799                    Ok((
4800                        row_usize(row, 0)?,
4801                        row_usize(row, 1)?,
4802                        row_usize(row, 2)?,
4803                        row_usize(row, 3)?,
4804                    ))
4805                },
4806            )
4807            .unwrap();
4808        assert_eq!(cached_counts, (3, 1, 0, 0));
4809    }
4810
4811    #[test]
4812    fn sqlite_shortest_path_uses_bounded_frontier() {
4813        let store = SqliteGraphStore::in_memory().unwrap();
4814        for idx in 0..80 {
4815            store
4816                .upsert_node(&GraphNode::new(
4817                    format!("node:{idx:02}"),
4818                    "symbol",
4819                    format!("node {idx:02}"),
4820                ))
4821                .unwrap();
4822        }
4823        for idx in 0..79 {
4824            store
4825                .upsert_edge(&GraphEdge::new(
4826                    format!("node:{idx:02}"),
4827                    format!("node:{:02}", idx + 1),
4828                    "calls",
4829                ))
4830                .unwrap();
4831        }
4832        store
4833            .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
4834            .unwrap();
4835
4836        assert!(
4837            store
4838                .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
4839                .unwrap()
4840                .is_none()
4841        );
4842        let path = store
4843            .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
4844            .unwrap()
4845            .unwrap();
4846        assert_eq!(path.hops, 79);
4847        assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
4848        assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
4849
4850        let direct = store
4851            .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
4852            .unwrap()
4853            .unwrap();
4854        assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
4855    }
4856
4857    #[test]
4858    fn sqlite_resolves_evidence_targets_with_indexed_properties() {
4859        let store = SqliteGraphStore::in_memory().unwrap();
4860        for node in [
4861            GraphNode::new("gbak-refresh", "backlog", "#refresh")
4862                .with_property("ref_id", "refresh")
4863                .with_property("path", "tasks/current.md")
4864                .with_property("handle", "backlog-handle"),
4865            GraphNode::new("gbak-zrefresh", "backlog", "#refresh")
4866                .with_property("ref_id", "refresh")
4867                .with_property("path", "tasks/other.md"),
4868            GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
4869                .with_property("ref_id", "refresh"),
4870            GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
4871                .with_property("ref_id", "refresh"),
4872        ] {
4873            store.upsert_node(&node).unwrap();
4874        }
4875
4876        let by_ref = store
4877            .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
4878            .unwrap()
4879            .unwrap();
4880        assert_eq!(by_ref.id, "gbak-refresh");
4881        let by_handle = store
4882            .resolve_evidence_target("backlog-handle", &["backlog"])
4883            .unwrap()
4884            .unwrap();
4885        assert_eq!(by_handle.id, "gbak-refresh");
4886        let by_path = store
4887            .evidence_target_candidates("#refresh", &["backlog"], Some("tasks/other.md"))
4888            .unwrap();
4889        assert_eq!(by_path.len(), 1);
4890        assert_eq!(by_path[0].id, "gbak-zrefresh");
4891    }
4892
4893    #[test]
4894    fn sqlite_schema_migration_backfills_materialized_node_properties() {
4895        let conn = Connection::open_in_memory().unwrap();
4896        conn.execute_batch(
4897            r#"
4898            PRAGMA user_version = 2;
4899            CREATE TABLE graph_nodes (
4900                id TEXT PRIMARY KEY,
4901                kind TEXT NOT NULL,
4902                label TEXT NOT NULL,
4903                properties_json TEXT NOT NULL DEFAULT '{}',
4904                provenance_json TEXT NOT NULL DEFAULT '[]',
4905                freshness_json TEXT,
4906                row_hash TEXT,
4907                source_watermark TEXT
4908            );
4909            CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
4910            CREATE TABLE graph_edges (
4911                from_id TEXT NOT NULL,
4912                to_id TEXT NOT NULL,
4913                kind TEXT NOT NULL,
4914                properties_json TEXT NOT NULL DEFAULT '{}',
4915                provenance_json TEXT NOT NULL DEFAULT '[]',
4916                freshness_json TEXT,
4917                row_hash TEXT,
4918                source_watermark TEXT,
4919                PRIMARY KEY (from_id, to_id, kind)
4920            );
4921            CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
4922            CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
4923            CREATE TABLE graph_projection_versions (
4924                scope TEXT PRIMARY KEY,
4925                projection_version TEXT NOT NULL,
4926                content_hash TEXT,
4927                source_watermark TEXT,
4928                observed_at_unix INTEGER NOT NULL
4929            );
4930            CREATE TABLE graph_tombstones (
4931                row_key TEXT PRIMARY KEY,
4932                row_kind TEXT NOT NULL,
4933                deleted_at_unix INTEGER NOT NULL
4934            );
4935            INSERT INTO graph_nodes
4936            (id, kind, label, properties_json, provenance_json)
4937            VALUES
4938            ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
4939            ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]'),
4940            ('concept:graph', 'semantic_concept', 'Graph navigation', '{"embedding":"1.0,0.0","embedding_model":"fixture-v1"}', '[]');
4941            INSERT INTO graph_edges
4942                (from_id, to_id, kind, properties_json, provenance_json)
4943            VALUES
4944                ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
4945            "#,
4946        )
4947        .unwrap();
4948
4949        let store = SqliteGraphStore::from_connection(conn).unwrap();
4950        let version: i64 = store
4951            .conn
4952            .pragma_query_value(None, "user_version", |row| row.get(0))
4953            .unwrap();
4954        assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
4955        let property_rows: usize = store
4956            .conn
4957            .query_row(
4958                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
4959                [],
4960                |row| row_usize(row, 0),
4961            )
4962            .unwrap();
4963        assert_eq!(property_rows, 2);
4964        let edge_property_rows: usize = store
4965            .conn
4966            .query_row(
4967                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4968                [],
4969                |row| row_usize(row, 0),
4970            )
4971            .unwrap();
4972        assert_eq!(edge_property_rows, 1);
4973        let semantic_vector_rows: usize = store
4974            .conn
4975            .query_row(
4976                "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE model = 'fixture-v1'",
4977                [],
4978                |row| row_usize(row, 0),
4979            )
4980            .unwrap();
4981        assert_eq!(semantic_vector_rows, 1);
4982        let edge = store
4983            .edge(&GraphEdge::stable_id(
4984                "topic:rooms",
4985                "topic:egress",
4986                "mentions",
4987            ))
4988            .unwrap()
4989            .unwrap();
4990        assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4991
4992        let page = store
4993            .paged_nodes_by_kind(
4994                "topic",
4995                GraphQueryOptions {
4996                    property_filters: vec![GraphPropertyFilter {
4997                        key: "domain".to_string(),
4998                        value: "livekit".to_string(),
4999                    }],
5000                    ..GraphQueryOptions::default()
5001                },
5002            )
5003            .unwrap();
5004        assert_eq!(page.nodes[0].id, "topic:rooms");
5005        assert!(
5006            page.page
5007                .diagnostics
5008                .iter()
5009                .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
5010            "{:?}",
5011            page.page.diagnostics
5012        );
5013    }
5014
5015    #[test]
5016    fn sqlite_store_batches_reachable_nodes_by_kinds() {
5017        let store = SqliteGraphStore::in_memory().unwrap();
5018        for node in [
5019            GraphNode::new("start", "backlog", "start"),
5020            GraphNode::new("ctx", "worker_context", "context"),
5021            GraphNode::new("src", "source_handle", "source"),
5022            GraphNode::new("sem", "semantic_concept", "concept"),
5023        ] {
5024            store.upsert_node(&node).unwrap();
5025        }
5026        store
5027            .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
5028            .unwrap();
5029        store
5030            .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
5031            .unwrap();
5032        store
5033            .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
5034            .unwrap();
5035
5036        let rows = store
5037            .reachable_nodes_by_kinds(
5038                "start",
5039                &["worker_context", "source_handle", "semantic_concept"],
5040                2,
5041                8,
5042            )
5043            .unwrap();
5044        assert_eq!(rows["worker_context"][0].0.id, "ctx");
5045        assert_eq!(
5046            rows["source_handle"][0].1.nodes,
5047            vec!["start", "ctx", "src"]
5048        );
5049        assert_eq!(rows["semantic_concept"][0].1.hops, 1);
5050    }
5051
5052    #[test]
5053    fn sqlite_projection_refresh_handles_bulk_row_replacement() {
5054        let mut store = SqliteGraphStore::in_memory().unwrap();
5055        let source = GraphProvenance::new("fixture", "bulk");
5056        let mut projection = GraphProjection::default();
5057        for idx in 0..128 {
5058            projection.nodes.push(
5059                GraphNode::new(
5060                    format!("node:{idx:03}"),
5061                    if idx % 2 == 0 { "symbol" } else { "file" },
5062                    format!("bulk node {idx:03}"),
5063                )
5064                .with_property("ordinal", idx.to_string())
5065                .with_provenance(source.clone())
5066                .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
5067            );
5068        }
5069        for idx in 0..127 {
5070            projection.edges.push(
5071                GraphEdge::new(
5072                    format!("node:{idx:03}"),
5073                    format!("node:{:03}", idx + 1),
5074                    "next",
5075                )
5076                .with_property("ordinal", idx.to_string())
5077                .with_provenance(source.clone())
5078                .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
5079            );
5080        }
5081
5082        store
5083            .replace_projection_with_version(
5084                "root",
5085                &projection,
5086                Some("bulk-v1"),
5087                Some("commit-a".to_string()),
5088            )
5089            .unwrap();
5090
5091        projection
5092            .nodes
5093            .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
5094        projection.edges.retain(|edge| {
5095            !edge.from_id.ends_with("000")
5096                && !edge.to_id.ends_with("000")
5097                && !edge.from_id.ends_with("064")
5098                && !edge.to_id.ends_with("064")
5099        });
5100        let refresh = store
5101            .replace_projection_with_version(
5102                "root",
5103                &projection,
5104                Some("bulk-v2"),
5105                Some("commit-b".to_string()),
5106            )
5107            .unwrap();
5108
5109        assert_eq!(store.all_nodes().unwrap().len(), 126);
5110        assert_eq!(store.all_edges().unwrap().len(), 124);
5111        assert_eq!(
5112            refresh.tombstoned_nodes,
5113            vec!["node:000".to_string(), "node:064".to_string()]
5114        );
5115        assert_eq!(refresh.tombstoned_edges.len(), 3);
5116        assert_eq!(refresh.deleted_nodes, 2);
5117        assert_eq!(refresh.deleted_edges, 3);
5118        assert_eq!(refresh.unchanged_nodes, 126);
5119        assert_eq!(refresh.unchanged_edges, 124);
5120        assert_eq!(refresh.upserted_nodes, 0);
5121        assert_eq!(refresh.upserted_edges, 0);
5122        assert_eq!(refresh.unchanged_properties, 250);
5123        assert_eq!(refresh.upserted_properties, 0);
5124        assert!(
5125            refresh
5126                .phase_timings
5127                .iter()
5128                .any(|phase| phase.name == "sqlite_node_staging"
5129                    && phase.detail.contains("126 unchanged skipped")
5130                    && phase.detail.contains("multi-row chunks up to 500 rows")),
5131            "{:?}",
5132            refresh.phase_timings
5133        );
5134        assert!(
5135            refresh
5136                .phase_timings
5137                .iter()
5138                .any(|phase| phase.name == "sqlite_node_staging"
5139                    && phase.detail.contains("124 unchanged skipped")),
5140            "{:?}",
5141            refresh.phase_timings
5142        );
5143        let staged_node_properties: usize = store
5144            .conn
5145            .query_row(
5146                "SELECT COUNT(*) FROM temp.next_graph_node_properties",
5147                [],
5148                |row| row_usize(row, 0),
5149            )
5150            .unwrap();
5151        let staged_edge_properties: usize = store
5152            .conn
5153            .query_row(
5154                "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
5155                [],
5156                |row| row_usize(row, 0),
5157            )
5158            .unwrap();
5159        assert_eq!(staged_node_properties, 0);
5160        assert_eq!(staged_edge_properties, 0);
5161        assert!(
5162            refresh
5163                .phase_timings
5164                .iter()
5165                .any(|phase| phase.name == "sqlite_property_row_staging"
5166                    && phase.detail.contains("new/changed node rows")),
5167            "{:?}",
5168            refresh.phase_timings
5169        );
5170        assert!(
5171            refresh
5172                .phase_timings
5173                .iter()
5174                .any(|phase| phase.name == "sqlite_edge_property_row_staging"
5175                    && phase.detail.contains("new/changed edge rows")),
5176            "{:?}",
5177            refresh.phase_timings
5178        );
5179        assert_eq!(
5180            store
5181                .projection_version("root")
5182                .unwrap()
5183                .unwrap()
5184                .source_watermark
5185                .as_deref(),
5186            Some("commit-b")
5187        );
5188    }
5189
5190    #[test]
5191    fn sqlite_reentrant_temp_table_guard_panics() {
5192        let store = SqliteGraphStore::in_memory().unwrap();
5193        store.temp_table_active.set(true);
5194        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
5195            store.assert_not_in_temp_table_section();
5196        }));
5197        assert!(result.is_err());
5198    }
5199
5200    #[test]
5201    fn sqlite_temp_table_guard_clears_after_method() {
5202        let mut store = SqliteGraphStore::in_memory().unwrap();
5203        let projection = GraphProjection {
5204            nodes: vec![],
5205            edges: vec![],
5206        };
5207        store.replace_projection(&projection).unwrap();
5208        assert!(!store.temp_table_active.get());
5209    }
5210
5211    #[test]
5212    fn derive_ontology_summarizes_types_and_relations() {
5213        let mut store = SqliteGraphStore::in_memory().unwrap();
5214        let seed = GraphProjection {
5215            nodes: vec![
5216                GraphNode::new("fn:a", "function", "a"),
5217                GraphNode::new("fn:b", "function", "b"),
5218                GraphNode::new("mod:m", "module", "m"),
5219            ],
5220            edges: vec![
5221                GraphEdge::new("fn:a", "fn:b", "calls"),
5222                GraphEdge::new("mod:m", "fn:a", "contains"),
5223            ],
5224        };
5225        store.upsert_projection(&seed).unwrap();
5226
5227        let onto = store.derive_ontology().unwrap();
5228        let type_kinds: std::collections::BTreeSet<_> =
5229            onto.nodes.iter().map(|n| n.label.clone()).collect();
5230        assert!(type_kinds.contains("function"));
5231        assert!(type_kinds.contains("module"));
5232        assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
5233
5234        let rel: std::collections::BTreeSet<_> = onto
5235            .edges
5236            .iter()
5237            .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
5238            .collect();
5239        assert!(rel.contains(&(
5240            "ontology_type:function".into(),
5241            "ontology_relation:calls".into(),
5242            "ontology_type:function".into()
5243        )));
5244        assert!(rel.contains(&(
5245            "ontology_type:module".into(),
5246            "ontology_relation:contains".into(),
5247            "ontology_type:function".into()
5248        )));
5249
5250        let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
5251        assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
5252
5253        // Idempotent: upserting the ontology then re-deriving excludes ontology rows.
5254        store.upsert_projection(&onto).unwrap();
5255        let onto2 = store.derive_ontology().unwrap();
5256        assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
5257        assert_eq!(onto2.nodes.len(), onto.nodes.len());
5258        assert_eq!(onto2.edges.len(), onto.edges.len());
5259    }
5260
5261    /// #kgsameas: link_nodes_by_shared_property stars duplicate nodes sharing a
5262    /// prefixed property value to the group's smallest node id, is idempotent, and
5263    /// ignores values without the prefix / singleton groups.
5264    #[test]
5265    fn link_nodes_by_shared_property_stars_duplicates_idempotently() {
5266        let mut store = SqliteGraphStore::in_memory().unwrap();
5267        let seed = GraphProjection {
5268            nodes: vec![
5269                // Three nodes reconciled to one canonical entity (kgent-canon).
5270                GraphNode::new("kgent-z", "semantic_entity", "Dup")
5271                    .with_property("entity_id", "kgent-canon"),
5272                GraphNode::new("kgent-a", "semantic_entity", "Dup")
5273                    .with_property("entity_id", "kgent-canon"),
5274                GraphNode::new("kgent-m", "semantic_entity", "Dup")
5275                    .with_property("entity_id", "kgent-canon"),
5276                // A singleton canonical entity — no link.
5277                GraphNode::new("kgent-solo", "semantic_entity", "Solo")
5278                    .with_property("entity_id", "kgent-other"),
5279                // A local-slug id — not a merge key (no kgent- prefix).
5280                GraphNode::new("kgent-local", "semantic_entity", "Local")
5281                    .with_property("entity_id", "e0"),
5282            ],
5283            edges: vec![],
5284        };
5285        store.upsert_projection(&seed).unwrap();
5286
5287        let written = store
5288            .link_nodes_by_shared_property(
5289                "semantic_entity",
5290                "entity_id",
5291                "kgent-",
5292                "same_as",
5293                &[("provider", "tsift-kg")],
5294            )
5295            .unwrap();
5296        // 3-member group -> 2 star edges; singleton + local-slug contribute none.
5297        assert_eq!(written, 2);
5298
5299        // Representative is the smallest node id (kgent-a); members star to it.
5300        let same_as: Vec<(String, String)> = {
5301            let mut stmt = store
5302                .conn
5303                .prepare("SELECT from_id, to_id FROM graph_edges WHERE kind = 'same_as' ORDER BY from_id")
5304                .unwrap();
5305            let rows = stmt
5306                .query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)))
5307                .unwrap();
5308            rows.map(|r| r.unwrap()).collect()
5309        };
5310        assert_eq!(
5311            same_as,
5312            vec![
5313                ("kgent-m".to_string(), "kgent-a".to_string()),
5314                ("kgent-z".to_string(), "kgent-a".to_string()),
5315            ]
5316        );
5317
5318        // Idempotent: a second run writes the same set, no duplicate edges.
5319        let written2 = store
5320            .link_nodes_by_shared_property(
5321                "semantic_entity",
5322                "entity_id",
5323                "kgent-",
5324                "same_as",
5325                &[("provider", "tsift-kg")],
5326            )
5327            .unwrap();
5328        assert_eq!(written2, 2);
5329        let edge_count: i64 = store
5330            .conn
5331            .query_row(
5332                "SELECT COUNT(*) FROM graph_edges WHERE kind = 'same_as'",
5333                [],
5334                |r| r.get(0),
5335            )
5336            .unwrap();
5337        assert_eq!(edge_count, 2, "re-run must not duplicate same_as edges");
5338    }
5339
5340    /// #kgrefreshdup: delete_source_projection removes a provider's nodes for a
5341    /// source (cascading to edges/properties) but must NOT touch nodes from a
5342    /// different provider that happen to share the same `source_ref` path.
5343    #[test]
5344    fn delete_source_projection_is_scoped_by_provider_and_cascades() {
5345        let mut store = SqliteGraphStore::in_memory().unwrap();
5346        let seed = GraphProjection {
5347            nodes: vec![
5348                // KG nodes for src/a.rs.
5349                GraphNode::new("kg:a:1", "semantic_entity", "Alpha")
5350                    .with_property("provider", "tsift-kg")
5351                    .with_property("source_ref", "src/a.rs"),
5352                GraphNode::new("kg:a:2", "semantic_entity", "Beta")
5353                    .with_property("provider", "tsift-kg")
5354                    .with_property("source_ref", "src/a.rs"),
5355                // KG node for a different source.
5356                GraphNode::new("kg:b:1", "semantic_entity", "Gamma")
5357                    .with_property("provider", "tsift-kg")
5358                    .with_property("source_ref", "src/b.rs"),
5359                // A non-KG (e.g. AST) node sharing the src/a.rs source_ref.
5360                GraphNode::new("ast:a:1", "function", "fn_in_a")
5361                    .with_property("provider", "tsift-ast")
5362                    .with_property("source_ref", "src/a.rs"),
5363            ],
5364            edges: vec![
5365                GraphEdge::new("kg:a:1", "kg:a:2", "semantic_relation")
5366                    .with_property("provider", "tsift-kg"),
5367                GraphEdge::new("ast:a:1", "kg:a:1", "references"),
5368            ],
5369        };
5370        store.upsert_projection(&seed).unwrap();
5371
5372        let deleted = store
5373            .delete_source_projection("src/a.rs", "tsift-kg")
5374            .unwrap();
5375        assert_eq!(deleted, 2, "only the two KG nodes for src/a.rs are deleted");
5376
5377        // KG nodes for src/a.rs are gone.
5378        assert!(store.node("kg:a:1").unwrap().is_none());
5379        assert!(store.node("kg:a:2").unwrap().is_none());
5380        // KG node for the other source survives.
5381        assert!(store.node("kg:b:1").unwrap().is_some());
5382        // The non-KG node sharing the source_ref is untouched.
5383        assert!(store.node("ast:a:1").unwrap().is_some());
5384
5385        // The semantic_relation edge between deleted nodes cascaded away; the
5386        // edge touching the surviving AST node + a deleted KG node also cascades.
5387        let edge_count: i64 = store
5388            .conn
5389            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| row.get(0))
5390            .unwrap();
5391        assert_eq!(edge_count, 0);
5392
5393        // Properties for the deleted nodes cascaded too.
5394        let orphan_props: i64 = store
5395            .conn
5396            .query_row(
5397                "SELECT COUNT(*) FROM graph_node_properties WHERE node_id IN ('kg:a:1','kg:a:2')",
5398                [],
5399                |row| row.get(0),
5400            )
5401            .unwrap();
5402        assert_eq!(orphan_props, 0);
5403    }
5404}