Skip to main content

tsift_sqlite/
lib.rs

1use anyhow::{Context, Result, bail};
2use rusqlite::{
3    Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4    types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::cell::Cell;
8use std::collections::{BTreeMap, BTreeSet};
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15    ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16    ConvexRowsGraphClient, GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL,
17    GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY, GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY, GraphEdge,
18    GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath, GraphProjection, GraphPropertyFilter,
19    GraphProvenance, GraphQueryOptions, GraphQueryPage, GraphSemanticCandidate, GraphStore,
20    GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
21    SQLITE_GRAPH_SCHEMA_VERSION, SemanticSeededNeighborhoodExpansion,
22    SemanticSeededNeighborhoodOptions, TerseGraphEdge, TerseGraphNode, apply_graph_edge_query_page,
23    apply_graph_query_page, graph_edge_id, graph_semantic_cosine,
24    graph_semantic_top_candidates_by_property_scan, parse_graph_semantic_vector_property,
25    shortest_path_using_outgoing, stable_graph_edge_id,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "snake_case")]
30pub enum ReadOnlyRecovery {
31    SnapshotFallback,
32    SnapshotFallbackWal,
33}
34
35pub fn read_only_snapshot_recovery(
36    db_path: &Path,
37    err: &anyhow::Error,
38) -> Option<ReadOnlyRecovery> {
39    if !error_mentions_locked_db(err) {
40        return None;
41    }
42    if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
43        Some(ReadOnlyRecovery::SnapshotFallbackWal)
44    } else if rollback_journal_path(db_path).exists() {
45        Some(ReadOnlyRecovery::SnapshotFallback)
46    } else {
47        None
48    }
49}
50
51pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
52    let mut journal = db_path.as_os_str().to_os_string();
53    journal.push("-journal");
54    PathBuf::from(journal)
55}
56
57pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
58    let mut wal = db_path.as_os_str().to_os_string();
59    wal.push("-wal");
60    PathBuf::from(wal)
61}
62
63pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
64    let mut shm = db_path.as_os_str().to_os_string();
65    shm.push("-shm");
66    PathBuf::from(shm)
67}
68
69pub fn copy_read_only_snapshot(
70    db_path: &Path,
71    default_stem: &str,
72) -> Result<(PathBuf, Vec<PathBuf>)> {
73    let snapshot_path = snapshot_copy_path(db_path, default_stem);
74    std::fs::copy(db_path, &snapshot_path).with_context(|| {
75        format!(
76            "copying locked db {} to snapshot {}",
77            db_path.display(),
78            snapshot_path.display()
79        )
80    })?;
81    let mut cleanup_paths = vec![snapshot_path.clone()];
82    copy_optional_snapshot_sidecar(
83        &wal_sidecar_path(db_path),
84        &wal_sidecar_path(&snapshot_path),
85        &mut cleanup_paths,
86    )?;
87    copy_optional_snapshot_sidecar(
88        &shared_memory_sidecar_path(db_path),
89        &shared_memory_sidecar_path(&snapshot_path),
90        &mut cleanup_paths,
91    )?;
92    Ok((snapshot_path, cleanup_paths))
93}
94
95pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
96    err.chain()
97        .any(|cause| cause.to_string().contains("database is locked"))
98}
99
100fn copy_optional_snapshot_sidecar(
101    source_path: &Path,
102    snapshot_path: &Path,
103    cleanup_paths: &mut Vec<PathBuf>,
104) -> Result<()> {
105    match std::fs::copy(source_path, snapshot_path) {
106        Ok(_) => {
107            cleanup_paths.push(snapshot_path.to_path_buf());
108            Ok(())
109        }
110        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
111        Err(err) => Err(err).with_context(|| {
112            format!(
113                "copying SQLite sidecar {} to snapshot {}",
114                source_path.display(),
115                snapshot_path.display()
116            )
117        }),
118    }
119}
120
121fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
122    let nanos = SystemTime::now()
123        .duration_since(UNIX_EPOCH)
124        .unwrap_or(Duration::ZERO)
125        .as_nanos();
126    let stem = db_path
127        .file_stem()
128        .and_then(|stem| stem.to_str())
129        .unwrap_or(default_stem);
130    let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
131    file_name.push(".db");
132    std::env::temp_dir().join(file_name)
133}
134const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
135const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
136
137/// SQLite-backed graph store.
138///
139/// # Temp-table invariant
140///
141/// Several methods — `replace_projection_with_version`, `upsert_projection`,
142/// `edges_between_nodes`, and `breadth_first_search` — create connection-scoped
143/// `TEMP TABLE` staging areas inside the underlying SQLite connection. Two
144/// concurrent calls on the same `SqliteGraphStore` (or sharing the same
145/// connection through `SqliteReadOnlyConnection`) would collide on those temp
146/// table names and produce incorrect results or errors.
147///
148/// **Only one temp-table-using method may be active at a time per connection.**
149pub struct SqliteGraphStore {
150    conn: Connection,
151    _snapshot_copy: Option<SnapshotCopyGuard>,
152    read_only_recovery: Option<ReadOnlyRecovery>,
153    temp_table_active: Cell<bool>,
154}
155
156/// Read-only handle to a SQLite graph database connection.
157///
158/// # Temp-table invariant
159///
160/// When this connection is unwrapped into a `SqliteGraphStore` via
161/// `SqliteGraphStore::from_read_only_connection`, the same temp-table
162/// concurrency rules apply: only one temp-table-using method
163/// (`replace_projection_with_version`, `upsert_projection`,
164/// `edges_between_nodes`, `breadth_first_search`) may be active at a time
165/// per connection.
166pub struct SqliteReadOnlyConnection {
167    conn: Connection,
168    _snapshot_copy: Option<SnapshotCopyGuard>,
169    recovery: Option<ReadOnlyRecovery>,
170}
171
172static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
173
174impl SqliteReadOnlyConnection {
175    pub fn conn(&self) -> &Connection {
176        &self.conn
177    }
178
179    pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
180        self.recovery
181    }
182}
183
184struct SnapshotCopyGuard {
185    paths: Vec<PathBuf>,
186}
187
188impl Drop for SnapshotCopyGuard {
189    fn drop(&mut self) {
190        for path in &self.paths {
191            let _ = std::fs::remove_file(path);
192        }
193    }
194}
195
196fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
197    conn.busy_timeout(Duration::from_secs(5))?;
198    conn.pragma_update(None, "journal_mode", "WAL")?;
199    let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
200    if mode.to_lowercase() != "wal" {
201        bail!(
202            "graph substrate db {} requires WAL mode for concurrent reads, got {}",
203            db_path.display(),
204            mode
205        );
206    }
207    conn.pragma_update(
208        None,
209        "wal_autocheckpoint",
210        SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
211    )?;
212    let checkpoint_pages: i64 =
213        conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
214    if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
215        bail!(
216            "graph substrate db {} requires wal_autocheckpoint={}, got {}",
217            db_path.display(),
218            SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
219            checkpoint_pages
220        );
221    }
222    Ok(())
223}
224
225fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
226    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
227    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
228    for row in rows {
229        if row? == column {
230            return Ok(true);
231        }
232    }
233    Ok(false)
234}
235
236fn sqlite_table_exists(conn: &Connection, table: &str) -> Result<bool> {
237    conn.query_row(
238        r#"
239        SELECT EXISTS(
240            SELECT 1
241            FROM sqlite_master
242            WHERE type = 'table' AND name = ?1
243        )
244        "#,
245        [table],
246        |row| row.get::<_, bool>(0),
247    )
248    .map_err(Into::into)
249}
250
251fn add_column_if_missing(
252    conn: &Connection,
253    table: &str,
254    column: &str,
255    definition: &str,
256) -> Result<()> {
257    if !sqlite_column_exists(conn, table, column)? {
258        conn.execute(
259            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
260            [],
261        )?;
262    }
263    Ok(())
264}
265
266fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
267    if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
268        return Ok(());
269    }
270    let rows = {
271        let mut stmt = conn.prepare(
272            r#"
273            SELECT from_id, to_id, kind
274            FROM graph_edges
275            WHERE edge_key IS NULL OR edge_key = ''
276            ORDER BY from_id, kind, to_id
277            "#,
278        )?;
279        collect_rows(stmt.query_map([], |row| {
280            Ok((
281                row.get::<_, String>(0)?,
282                row.get::<_, String>(1)?,
283                row.get::<_, String>(2)?,
284            ))
285        })?)?
286    };
287    let mut update = conn.prepare(
288        r#"
289        UPDATE graph_edges
290        SET edge_key = ?4
291        WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
292        "#,
293    )?;
294    for (from_id, to_id, kind) in rows {
295        update.execute((
296            &from_id,
297            &to_id,
298            &kind,
299            stable_graph_edge_id(&from_id, &to_id, &kind),
300        ))?;
301    }
302    Ok(())
303}
304
305fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
306    if old_version < 2 {
307        add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
308        add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
309        add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
310        add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
311    }
312    if old_version < 3 {
313        rebuild_graph_node_properties(conn)?;
314    }
315    if old_version < 4 {
316        ensure_sqlite_graph_operator_stats_schema(conn)?;
317    }
318    if old_version < 5 {
319        add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
320        backfill_graph_edge_keys(conn)?;
321        ensure_sqlite_graph_edge_properties_schema(conn)?;
322        rebuild_graph_edge_properties(conn)?;
323    }
324    if old_version < 6 {
325        ensure_sqlite_graph_semantic_vectors_schema(conn)?;
326        rebuild_graph_node_semantic_vectors(conn)?;
327    }
328    Ok(())
329}
330
331fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
332    conn.execute_batch(
333        r#"
334        CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
335            ON graph_nodes(kind, label, id);
336        CREATE TABLE IF NOT EXISTS graph_operator_stats (
337            scope TEXT PRIMARY KEY,
338            nodes INTEGER NOT NULL,
339            edges INTEGER NOT NULL,
340            tombstone_nodes INTEGER NOT NULL,
341            tombstone_edges INTEGER NOT NULL,
342            file_size_bytes INTEGER,
343            freelist_bytes INTEGER,
344            observed_at_unix INTEGER NOT NULL
345        );
346        "#,
347    )?;
348    Ok(())
349}
350
351fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
352    conn.execute_batch(
353        r#"
354        CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
355            ON graph_edges(edge_key);
356        CREATE TABLE IF NOT EXISTS graph_edge_properties (
357            edge_key TEXT NOT NULL,
358            key TEXT NOT NULL,
359            value TEXT NOT NULL,
360            PRIMARY KEY (edge_key, key),
361            FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
362        );
363        CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
364            ON graph_edge_properties(key, value, edge_key);
365        "#,
366    )?;
367    Ok(())
368}
369
370fn ensure_sqlite_graph_semantic_vectors_schema(conn: &Connection) -> Result<()> {
371    conn.execute_batch(
372        r#"
373        CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
374            node_id TEXT PRIMARY KEY,
375            kind TEXT NOT NULL,
376            model TEXT NOT NULL,
377            dimensions INTEGER NOT NULL,
378            vector_blob BLOB NOT NULL,
379            FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
380        );
381        CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
382            ON graph_node_semantic_vectors(kind, dimensions, node_id);
383        "#,
384    )?;
385    Ok(())
386}
387
388fn replace_node_properties(
389    conn: &Connection,
390    node_id: &str,
391    properties: &BTreeMap<String, String>,
392) -> Result<()> {
393    conn.execute(
394        "DELETE FROM graph_node_properties WHERE node_id = ?1",
395        [node_id],
396    )?;
397    let mut insert = conn.prepare(
398        r#"
399        INSERT INTO graph_node_properties (node_id, key, value)
400        VALUES (?1, ?2, ?3)
401        ON CONFLICT(node_id, key) DO UPDATE SET
402            value = excluded.value
403        "#,
404    )?;
405    for (key, value) in properties {
406        insert.execute((node_id, key, value))?;
407    }
408    Ok(())
409}
410
411struct GraphSemanticVectorRow {
412    model: String,
413    dimensions: usize,
414    vector_blob: Vec<u8>,
415}
416
417fn graph_semantic_vector_row(
418    properties: &BTreeMap<String, String>,
419) -> Option<GraphSemanticVectorRow> {
420    let vector = properties
421        .get(GRAPH_SEMANTIC_VECTOR_PROPERTY_KEY)
422        .and_then(|value| parse_graph_semantic_vector_property(value))?;
423    Some(GraphSemanticVectorRow {
424        model: properties
425            .get(GRAPH_SEMANTIC_VECTOR_MODEL_PROPERTY_KEY)
426            .cloned()
427            .unwrap_or_else(|| GRAPH_SEMANTIC_VECTOR_DEFAULT_MODEL.to_string()),
428        dimensions: vector.len(),
429        vector_blob: semantic_vector_to_blob(&vector),
430    })
431}
432
433fn semantic_vector_to_blob(vector: &[f64]) -> Vec<u8> {
434    let mut blob = Vec::with_capacity(std::mem::size_of_val(vector));
435    for value in vector {
436        blob.extend_from_slice(&value.to_le_bytes());
437    }
438    blob
439}
440
441fn semantic_vector_from_blob(blob: &[u8], dimensions: usize) -> Option<Vec<f64>> {
442    if dimensions == 0 || blob.len() != dimensions * std::mem::size_of::<f64>() {
443        return None;
444    }
445    let mut vector = Vec::with_capacity(dimensions);
446    for chunk in blob.chunks_exact(std::mem::size_of::<f64>()) {
447        let value = f64::from_le_bytes(chunk.try_into().ok()?);
448        if !value.is_finite() {
449            return None;
450        }
451        vector.push(value);
452    }
453    Some(vector)
454}
455
456fn replace_node_semantic_vector(
457    conn: &Connection,
458    node_id: &str,
459    kind: &str,
460    properties: &BTreeMap<String, String>,
461) -> Result<()> {
462    conn.execute(
463        "DELETE FROM graph_node_semantic_vectors WHERE node_id = ?1",
464        [node_id],
465    )?;
466    let Some(row) = graph_semantic_vector_row(properties) else {
467        return Ok(());
468    };
469    conn.execute(
470        r#"
471        INSERT INTO graph_node_semantic_vectors
472            (node_id, kind, model, dimensions, vector_blob)
473        VALUES (?1, ?2, ?3, ?4, ?5)
474        "#,
475        (
476            node_id,
477            kind,
478            row.model,
479            row.dimensions as i64,
480            row.vector_blob,
481        ),
482    )?;
483    Ok(())
484}
485
486fn replace_edge_properties(
487    conn: &Connection,
488    edge_key: &str,
489    properties: &BTreeMap<String, String>,
490) -> Result<()> {
491    conn.execute(
492        "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
493        [edge_key],
494    )?;
495    let mut insert = conn.prepare(
496        r#"
497        INSERT INTO graph_edge_properties (edge_key, key, value)
498        VALUES (?1, ?2, ?3)
499        ON CONFLICT(edge_key, key) DO UPDATE SET
500            value = excluded.value
501        "#,
502    )?;
503    for (key, value) in properties {
504        insert.execute((edge_key, key, value))?;
505    }
506    Ok(())
507}
508
509fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
510    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
511        return Ok(());
512    }
513    conn.execute_batch(
514        r#"
515        DELETE FROM graph_node_properties;
516        INSERT INTO graph_node_properties (node_id, key, value)
517        SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
518        FROM graph_nodes, json_each(graph_nodes.properties_json)
519        WHERE json_each.key IS NOT NULL
520          AND json_each.value IS NOT NULL
521        "#,
522    )?;
523    Ok(())
524}
525
526fn rebuild_graph_node_semantic_vectors(conn: &Connection) -> Result<()> {
527    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")?
528        || !sqlite_table_exists(conn, "graph_node_semantic_vectors")?
529    {
530        return Ok(());
531    }
532    conn.execute("DELETE FROM graph_node_semantic_vectors", [])?;
533    let rows = {
534        let mut stmt = conn.prepare(
535            r#"
536            SELECT id, kind, properties_json
537            FROM graph_nodes
538            WHERE json_extract(properties_json, '$.embedding') IS NOT NULL
539            ORDER BY id
540            "#,
541        )?;
542        collect_rows(stmt.query_map([], |row| {
543            Ok((
544                row.get::<_, String>(0)?,
545                row.get::<_, String>(1)?,
546                row.get::<_, String>(2)?,
547            ))
548        })?)?
549    };
550    for (node_id, kind, properties_json) in rows {
551        let properties: BTreeMap<String, String> = serde_json::from_str(&properties_json)
552            .with_context(|| format!("parsing semantic properties for graph node {node_id}"))?;
553        replace_node_semantic_vector(conn, &node_id, &kind, &properties)?;
554    }
555    Ok(())
556}
557
558fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
559    if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
560        || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
561    {
562        return Ok(());
563    }
564    conn.execute_batch(
565        r#"
566        DELETE FROM graph_edge_properties;
567        INSERT INTO graph_edge_properties (edge_key, key, value)
568        SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
569        FROM graph_edges, json_each(graph_edges.properties_json)
570        WHERE graph_edges.edge_key IS NOT NULL
571          AND graph_edges.edge_key <> ''
572          AND json_each.key IS NOT NULL
573          AND json_each.value IS NOT NULL
574        "#,
575    )?;
576    Ok(())
577}
578
579pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
580    let conn = Connection::open_with_flags(
581        db_path,
582        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
583    )
584    .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
585    conn.busy_timeout(Duration::from_secs(5))?;
586    Ok(SqliteReadOnlyConnection {
587        conn,
588        _snapshot_copy: None,
589        recovery: None,
590    })
591}
592
593pub fn open_graph_read_only_connection_resilient(
594    db_path: &Path,
595) -> Result<SqliteReadOnlyConnection> {
596    match open_graph_read_only_connection(db_path).and_then(|connection| {
597        connection
598            .conn
599            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
600        Ok(connection)
601    }) {
602        Ok(connection) => Ok(connection),
603        Err(err) => match read_only_snapshot_recovery(db_path, &err) {
604            Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
605            None => Err(err),
606        },
607    }
608}
609
610fn open_graph_read_only_snapshot(
611    db_path: &Path,
612    recovery: ReadOnlyRecovery,
613) -> Result<SqliteReadOnlyConnection> {
614    let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
615    let conn = Connection::open_with_flags(
616        &snapshot_path,
617        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
618    )
619    .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
620    conn.busy_timeout(Duration::from_secs(5))?;
621    Ok(SqliteReadOnlyConnection {
622        conn,
623        _snapshot_copy: Some(SnapshotCopyGuard {
624            paths: cleanup_paths,
625        }),
626        recovery: Some(recovery),
627    })
628}
629
630#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
631pub struct SqliteProjectionRefreshPhase {
632    pub name: String,
633    pub duration_micros: u128,
634    pub detail: String,
635}
636
637#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
638pub struct SqliteProjectionRefresh {
639    pub scope: String,
640    pub projection_version: String,
641    pub source_watermark: Option<String>,
642    pub tombstoned_nodes: Vec<String>,
643    pub tombstoned_edges: Vec<String>,
644    pub upserted_nodes: usize,
645    pub upserted_edges: usize,
646    pub unchanged_nodes: usize,
647    pub unchanged_edges: usize,
648    pub upserted_properties: usize,
649    pub unchanged_properties: usize,
650    pub deleted_properties: usize,
651    pub deleted_nodes: usize,
652    pub deleted_edges: usize,
653    pub pruned_tombstones: usize,
654    pub file_size_bytes_before: Option<u64>,
655    pub file_size_bytes_after: Option<u64>,
656    pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct SqliteProjectionVersion {
661    pub projection_version: String,
662    pub content_hash: Option<String>,
663    pub source_watermark: Option<String>,
664}
665
666fn sqlite_refresh_phase_timing(
667    name: &str,
668    started: Instant,
669    detail: &str,
670) -> SqliteProjectionRefreshPhase {
671    SqliteProjectionRefreshPhase {
672        name: name.to_string(),
673        duration_micros: started.elapsed().as_micros(),
674        detail: detail.to_string(),
675    }
676}
677
678fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
679    let row = format!(
680        "({})",
681        (0..column_count)
682            .map(|_| "?")
683            .collect::<Vec<_>>()
684            .join(", ")
685    );
686    (0..row_count)
687        .map(|_| row.as_str())
688        .collect::<Vec<_>>()
689        .join(", ")
690}
691
692fn sqlite_stage_all_ids(
693    tx: &rusqlite::Transaction<'_>,
694    nodes: &[GraphNode],
695    edges: &[GraphEdge],
696) -> Result<()> {
697    for chunk in nodes
698        .iter()
699        .map(|n| &n.id)
700        .collect::<Vec<_>>()
701        .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
702    {
703        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
704        let sql = format!(
705            "INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}",
706            placeholders.join(", ")
707        );
708        let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
709        tx.execute(&sql, params_from_iter(values.iter()))?;
710    }
711    for chunk in edges
712        .iter()
713        .map(graph_edge_id)
714        .collect::<Vec<_>>()
715        .chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS)
716    {
717        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
718        let sql = format!(
719            "INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}",
720            placeholders.join(", ")
721        );
722        let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
723        tx.execute(&sql, params_from_iter(values.iter()))?;
724    }
725    Ok(())
726}
727
728fn sqlite_stage_projection_nodes(
729    tx: &rusqlite::Transaction<'_>,
730    nodes: &[&GraphNode],
731    source_watermark: Option<&str>,
732) -> Result<()> {
733    let mut insert_semantic_vector = tx.prepare(
734        r#"
735        INSERT INTO next_graph_node_semantic_vectors
736            (node_id, kind, model, dimensions, vector_blob)
737        VALUES (?1, ?2, ?3, ?4, ?5)
738        "#,
739    )?;
740    for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
741        let sql = format!(
742            r#"
743            INSERT INTO next_graph_nodes
744                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
745            VALUES {}
746            "#,
747            sqlite_graph_staging_placeholders(8, chunk.len())
748        );
749        let mut values = Vec::with_capacity(chunk.len() * 8);
750        for node in chunk {
751            values.push(Value::Text(node.id.clone()));
752            values.push(Value::Text(node.kind.clone()));
753            values.push(Value::Text(node.label.clone()));
754            values.push(Value::Text(to_json(&node.properties)?));
755            values.push(Value::Text(to_json(&node.provenance)?));
756            values.push(
757                optional_to_json(&node.freshness)?
758                    .map(Value::Text)
759                    .unwrap_or(Value::Null),
760            );
761            values.push(Value::Text(row_hash(node)?));
762            values.push(
763                source_watermark
764                    .map(|watermark| Value::Text(watermark.to_string()))
765                    .unwrap_or(Value::Null),
766            );
767        }
768        tx.execute(&sql, params_from_iter(values))?;
769        for node in chunk {
770            let Some(row) = graph_semantic_vector_row(&node.properties) else {
771                continue;
772            };
773            insert_semantic_vector.execute((
774                &node.id,
775                &node.kind,
776                row.model,
777                row.dimensions as i64,
778                row.vector_blob,
779            ))?;
780        }
781    }
782    Ok(())
783}
784
785fn sqlite_stage_projection_edges(
786    tx: &rusqlite::Transaction<'_>,
787    edges: &[&GraphEdge],
788    source_watermark: Option<&str>,
789) -> Result<()> {
790    for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
791        let sql = format!(
792            r#"
793            INSERT INTO next_graph_edges
794                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
795            VALUES {}
796            "#,
797            sqlite_graph_staging_placeholders(9, chunk.len())
798        );
799        let mut values = Vec::with_capacity(chunk.len() * 9);
800        for edge in chunk {
801            values.push(Value::Text(graph_edge_id(edge)));
802            values.push(Value::Text(edge.from_id.clone()));
803            values.push(Value::Text(edge.to_id.clone()));
804            values.push(Value::Text(edge.kind.clone()));
805            values.push(Value::Text(to_json(&edge.properties)?));
806            values.push(Value::Text(to_json(&edge.provenance)?));
807            values.push(
808                optional_to_json(&edge.freshness)?
809                    .map(Value::Text)
810                    .unwrap_or(Value::Null),
811            );
812            values.push(Value::Text(row_hash(edge)?));
813            values.push(
814                source_watermark
815                    .map(|watermark| Value::Text(watermark.to_string()))
816                    .unwrap_or(Value::Null),
817            );
818        }
819        tx.execute(&sql, params_from_iter(values))?;
820    }
821    Ok(())
822}
823
824impl SqliteGraphStore {
825    fn assert_not_in_temp_table_section(&self) {
826        if self.temp_table_active.get() {
827            panic!(
828                "SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection"
829            );
830        }
831    }
832
833    pub fn open(db_path: &Path) -> Result<Self> {
834        if let Some(parent) = db_path.parent() {
835            std::fs::create_dir_all(parent)
836                .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
837        }
838        let conn = Connection::open(db_path)
839            .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
840        configure_writable_graph_connection(&conn, db_path)?;
841        Self::from_connection(conn)
842    }
843
844    pub fn in_memory() -> Result<Self> {
845        let conn = Connection::open_in_memory()?;
846        conn.busy_timeout(Duration::from_secs(5))?;
847        Self::from_connection(conn)
848    }
849
850    pub fn open_read_only(db_path: &Path) -> Result<Self> {
851        let connection = open_graph_read_only_connection(db_path)?;
852        Self::from_read_only_connection(connection)
853    }
854
855    pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
856        let connection = open_graph_read_only_connection_resilient(db_path)?;
857        Self::from_read_only_connection(connection)
858    }
859
860    pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
861        self.read_only_recovery
862    }
863
864    pub fn has_user_triggers(&self) -> Result<bool> {
865        self.conn
866            .query_row(
867                r#"
868                SELECT EXISTS(
869                    SELECT 1
870                    FROM sqlite_master
871                    WHERE type = 'trigger'
872                      AND name NOT LIKE 'sqlite_%'
873                )
874                "#,
875                [],
876                |row| row.get::<_, bool>(0),
877            )
878            .map_err(Into::into)
879    }
880
881    fn from_connection(conn: Connection) -> Result<Self> {
882        conn.pragma_update(None, "foreign_keys", "ON")?;
883        let user_version: i64 =
884            conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
885        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
886            bail!(
887                "graph.db schema version {} is newer than supported version {}",
888                user_version,
889                SQLITE_GRAPH_SCHEMA_VERSION
890            );
891        }
892        conn.execute_batch(
893            r#"
894            CREATE TABLE IF NOT EXISTS graph_nodes (
895                id TEXT PRIMARY KEY,
896                kind TEXT NOT NULL,
897                label TEXT NOT NULL,
898                properties_json TEXT NOT NULL DEFAULT '{}',
899                provenance_json TEXT NOT NULL DEFAULT '[]',
900                freshness_json TEXT,
901                row_hash TEXT,
902                source_watermark TEXT
903            );
904            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
905                ON graph_nodes(kind);
906            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
907                ON graph_nodes(kind, label, id);
908
909            CREATE TABLE IF NOT EXISTS graph_edges (
910                edge_key TEXT NOT NULL UNIQUE,
911                from_id TEXT NOT NULL,
912                to_id TEXT NOT NULL,
913                kind TEXT NOT NULL,
914                properties_json TEXT NOT NULL DEFAULT '{}',
915                provenance_json TEXT NOT NULL DEFAULT '[]',
916                freshness_json TEXT,
917                row_hash TEXT,
918                source_watermark TEXT,
919                PRIMARY KEY (from_id, to_id, kind),
920                FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
921                FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
922            );
923            CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
924                ON graph_edges(from_id, kind);
925            CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
926                ON graph_edges(to_id, kind);
927
928            CREATE TABLE IF NOT EXISTS graph_projection_versions (
929                scope TEXT PRIMARY KEY,
930                projection_version TEXT NOT NULL,
931                content_hash TEXT,
932                source_watermark TEXT,
933                observed_at_unix INTEGER NOT NULL
934            );
935
936            CREATE TABLE IF NOT EXISTS graph_tombstones (
937                row_key TEXT PRIMARY KEY,
938                row_kind TEXT NOT NULL,
939                deleted_at_unix INTEGER NOT NULL
940            );
941
942            CREATE TABLE IF NOT EXISTS graph_node_properties (
943                node_id TEXT NOT NULL,
944                key TEXT NOT NULL,
945                value TEXT NOT NULL,
946                PRIMARY KEY (node_id, key),
947                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
948            );
949            CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
950                ON graph_node_properties(key, value, node_id);
951
952            CREATE TABLE IF NOT EXISTS graph_node_semantic_vectors (
953                node_id TEXT PRIMARY KEY,
954                kind TEXT NOT NULL,
955                model TEXT NOT NULL,
956                dimensions INTEGER NOT NULL,
957                vector_blob BLOB NOT NULL,
958                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
959            );
960            CREATE INDEX IF NOT EXISTS idx_graph_node_semantic_vectors_kind_dims
961                ON graph_node_semantic_vectors(kind, dimensions, node_id);
962
963            CREATE TABLE IF NOT EXISTS graph_operator_stats (
964                scope TEXT PRIMARY KEY,
965                nodes INTEGER NOT NULL,
966                edges INTEGER NOT NULL,
967                tombstone_nodes INTEGER NOT NULL,
968                tombstone_edges INTEGER NOT NULL,
969                file_size_bytes INTEGER,
970                freelist_bytes INTEGER,
971                observed_at_unix INTEGER NOT NULL
972            );
973            "#,
974        )?;
975        if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
976            migrate_sqlite_graph_schema(&conn, user_version)?;
977            conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
978        }
979        Ok(Self {
980            conn,
981            _snapshot_copy: None,
982            read_only_recovery: None,
983            temp_table_active: Cell::new(false),
984        })
985    }
986
987    fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
988        connection.conn.pragma_update(None, "foreign_keys", "ON")?;
989        let user_version: i64 =
990            connection
991                .conn
992                .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
993        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
994            bail!(
995                "graph.db schema version {} is newer than supported version {}",
996                user_version,
997                SQLITE_GRAPH_SCHEMA_VERSION
998            );
999        }
1000        connection
1001            .conn
1002            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
1003        Ok(Self {
1004            conn: connection.conn,
1005            _snapshot_copy: connection._snapshot_copy,
1006            read_only_recovery: connection.recovery,
1007            temp_table_active: Cell::new(false),
1008        })
1009    }
1010
1011    pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1012        self.replace_projection_with_version("root", projection, None, None)
1013            .map(|_| ())
1014    }
1015
1016    pub fn replace_projection_with_version(
1017        &mut self,
1018        scope: impl Into<String>,
1019        projection: &GraphProjection,
1020        projection_version: Option<&str>,
1021        source_watermark: Option<String>,
1022    ) -> Result<SqliteProjectionRefresh> {
1023        self.assert_not_in_temp_table_section();
1024        self.temp_table_active.set(true);
1025        let scope = scope.into();
1026        let result = self.replace_projection_with_version_fallible(
1027            scope,
1028            projection,
1029            projection_version,
1030            source_watermark,
1031        );
1032        self.temp_table_active.set(false);
1033        if let Ok(ref refresh) = result {
1034            let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
1035            let autocheckpoint = if total_rows > 10000 {
1036                8192
1037            } else if total_rows > 1000 {
1038                4096
1039            } else {
1040                2048
1041            };
1042            let _ = self
1043                .conn
1044                .pragma_update(None, "wal_autocheckpoint", autocheckpoint);
1045            let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
1046        }
1047        result
1048    }
1049
1050    fn replace_projection_with_version_fallible(
1051        &mut self,
1052        scope: String,
1053        projection: &GraphProjection,
1054        projection_version: Option<&str>,
1055        source_watermark: Option<String>,
1056    ) -> Result<SqliteProjectionRefresh> {
1057        let projection_version = projection_version
1058            .map(str::to_string)
1059            .or_else(|| projection_version_from_nodes(&projection.nodes))
1060            .unwrap_or_else(|| "unversioned".to_string());
1061        let projection_hash = projection_hash_from_nodes(&projection.nodes);
1062        let observed_at_unix = unix_now();
1063        let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
1064        let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
1065        let mut phase_timings = Vec::new();
1066
1067        let tx = self.conn.transaction()?;
1068        let started = Instant::now();
1069        tx.execute_batch(
1070            r#"
1071            CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
1072                id TEXT PRIMARY KEY,
1073                kind TEXT NOT NULL,
1074                label TEXT NOT NULL,
1075                properties_json TEXT NOT NULL,
1076                provenance_json TEXT NOT NULL,
1077                freshness_json TEXT,
1078                row_hash TEXT NOT NULL,
1079                source_watermark TEXT
1080            );
1081            CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
1082                edge_key TEXT PRIMARY KEY,
1083                from_id TEXT NOT NULL,
1084                to_id TEXT NOT NULL,
1085                kind TEXT NOT NULL,
1086                properties_json TEXT NOT NULL,
1087                provenance_json TEXT NOT NULL,
1088                freshness_json TEXT,
1089                row_hash TEXT NOT NULL,
1090                source_watermark TEXT
1091            );
1092            CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
1093                ON next_graph_edges(from_id, to_id, kind);
1094            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
1095                node_id TEXT NOT NULL,
1096                key TEXT NOT NULL,
1097                value TEXT NOT NULL,
1098                PRIMARY KEY (node_id, key)
1099            );
1100            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_semantic_vectors (
1101                node_id TEXT PRIMARY KEY,
1102                kind TEXT NOT NULL,
1103                model TEXT NOT NULL,
1104                dimensions INTEGER NOT NULL,
1105                vector_blob BLOB NOT NULL
1106            );
1107            CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
1108                edge_key TEXT NOT NULL,
1109                key TEXT NOT NULL,
1110                value TEXT NOT NULL,
1111                PRIMARY KEY (edge_key, key)
1112            );
1113            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
1114                id TEXT PRIMARY KEY
1115            );
1116            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
1117                edge_key TEXT PRIMARY KEY
1118            );
1119            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
1120                id TEXT PRIMARY KEY
1121            );
1122            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
1123                edge_key TEXT PRIMARY KEY
1124            );
1125            DELETE FROM next_graph_nodes;
1126            DELETE FROM next_graph_edges;
1127            DELETE FROM next_graph_node_properties;
1128            DELETE FROM next_graph_node_semantic_vectors;
1129            DELETE FROM next_graph_edge_properties;
1130            DELETE FROM next_graph_changed_nodes;
1131            DELETE FROM next_graph_changed_edges;
1132            DELETE FROM next_graph_all_node_ids;
1133            DELETE FROM next_graph_all_edge_keys;
1134            "#,
1135        )?;
1136        phase_timings.push(sqlite_refresh_phase_timing(
1137            "sqlite_temp_table_prepare",
1138            started,
1139            "create and clear refresh staging tables before row loading",
1140        ));
1141        let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
1142            (
1143                projection.nodes.iter().collect(),
1144                projection.edges.iter().collect(),
1145                0usize,
1146                0usize,
1147            )
1148        } else {
1149            let existing_node_hashes: BTreeMap<String, String> = {
1150                let mut stmt =
1151                    tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
1152                let rows = stmt.query_map([], |row| {
1153                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1154                })?;
1155                collect_rows(rows)?.into_iter().collect()
1156            };
1157            let existing_edge_hashes: BTreeMap<String, String> = {
1158                let mut stmt = tx.prepare(
1159                    "SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL",
1160                )?;
1161                let rows = stmt.query_map([], |row| {
1162                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1163                })?;
1164                collect_rows(rows)?.into_iter().collect()
1165            };
1166            let cn: Vec<&GraphNode> = projection
1167                .nodes
1168                .iter()
1169                .filter(|n| {
1170                    let hash = row_hash(*n).ok();
1171                    hash.as_ref()
1172                        .is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
1173                })
1174                .collect();
1175            let ce: Vec<&GraphEdge> = projection
1176                .edges
1177                .iter()
1178                .filter(|e| {
1179                    let hash = row_hash(*e).ok();
1180                    let ek = graph_edge_id(e);
1181                    hash.as_ref()
1182                        .is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
1183                })
1184                .collect();
1185            let sn = projection.nodes.len() - cn.len();
1186            let se = projection.edges.len() - ce.len();
1187            (cn, ce, sn, se)
1188        };
1189        {
1190            let started = Instant::now();
1191            sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
1192            sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
1193            sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
1194            phase_timings.push(sqlite_refresh_phase_timing(
1195                "sqlite_node_staging",
1196                started,
1197                &format!(
1198                    "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
1199                    changed_nodes.len(),
1200                    skipped_nodes,
1201                    changed_edges.len(),
1202                    skipped_edges,
1203                    SQLITE_GRAPH_STAGING_CHUNK_ROWS
1204                ),
1205            ));
1206        }
1207        {
1208            let started = Instant::now();
1209            let changed_nodes_sql = if force_refresh_writes {
1210                r#"
1211                INSERT INTO next_graph_changed_nodes (id)
1212                SELECT id
1213                FROM next_graph_nodes
1214                "#
1215            } else {
1216                r#"
1217                INSERT INTO next_graph_changed_nodes (id)
1218                SELECT n.id
1219                FROM next_graph_nodes n
1220                LEFT JOIN graph_nodes g ON g.id = n.id
1221                WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
1222                "#
1223            };
1224            tx.execute(changed_nodes_sql, [])?;
1225            tx.execute_batch(
1226                r#"
1227                INSERT INTO next_graph_node_properties (node_id, key, value)
1228                SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
1229                FROM next_graph_nodes n
1230                JOIN next_graph_changed_nodes c ON c.id = n.id,
1231                     json_each(n.properties_json)
1232                WHERE json_each.key IS NOT NULL
1233                  AND json_each.value IS NOT NULL
1234                "#,
1235            )?;
1236            phase_timings.push(sqlite_refresh_phase_timing(
1237                "sqlite_property_row_staging",
1238                started,
1239                "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1240            ));
1241        }
1242        {
1243            let started = Instant::now();
1244            let changed_edges_sql = if force_refresh_writes {
1245                r#"
1246                INSERT INTO next_graph_changed_edges (edge_key)
1247                SELECT edge_key
1248                FROM next_graph_edges
1249                "#
1250            } else {
1251                r#"
1252                INSERT INTO next_graph_changed_edges (edge_key)
1253                SELECT n.edge_key
1254                FROM next_graph_edges n
1255                LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1256                WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1257                "#
1258            };
1259            tx.execute(changed_edges_sql, [])?;
1260            tx.execute_batch(
1261                r#"
1262                INSERT INTO next_graph_edge_properties (edge_key, key, value)
1263                SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1264                FROM next_graph_edges e
1265                JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1266                     json_each(e.properties_json)
1267                WHERE json_each.key IS NOT NULL
1268                  AND json_each.value IS NOT NULL
1269                "#,
1270            )?;
1271            phase_timings.push(sqlite_refresh_phase_timing(
1272                "sqlite_edge_property_row_staging",
1273                started,
1274                "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1275            ));
1276        }
1277
1278        let delta_started = Instant::now();
1279        let tombstoned_nodes = {
1280            let sql = if force_refresh_writes {
1281                r#"
1282                SELECT g.id
1283                FROM graph_nodes g
1284                LEFT JOIN next_graph_nodes n ON n.id = g.id
1285                WHERE n.id IS NULL
1286                ORDER BY g.id
1287                "#
1288            } else {
1289                r#"
1290                SELECT g.id
1291                FROM graph_nodes g
1292                LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1293                WHERE n.id IS NULL
1294                ORDER BY g.id
1295                "#
1296            };
1297            let mut stmt = tx.prepare(sql)?;
1298            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1299        };
1300        let tombstoned_edges = {
1301            let sql = if force_refresh_writes {
1302                r#"
1303                SELECT g.edge_key
1304                FROM graph_edges g
1305                LEFT JOIN next_graph_edges n
1306                    ON n.edge_key = g.edge_key
1307                WHERE n.edge_key IS NULL
1308                ORDER BY g.edge_key
1309                "#
1310            } else {
1311                r#"
1312                SELECT g.edge_key
1313                FROM graph_edges g
1314                LEFT JOIN next_graph_all_edge_keys n
1315                    ON n.edge_key = g.edge_key
1316                WHERE n.edge_key IS NULL
1317                ORDER BY g.edge_key
1318                "#
1319            };
1320            let mut stmt = tx.prepare(sql)?;
1321            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1322        };
1323        let unchanged_nodes: usize = if force_refresh_writes {
1324            tx.query_row(
1325                r#"
1326                SELECT COUNT(*)
1327                FROM next_graph_nodes n
1328                JOIN graph_nodes g ON g.id = n.id
1329                WHERE g.row_hash = n.row_hash
1330                "#,
1331                [],
1332                |row| row.get(0),
1333            )?
1334        } else {
1335            skipped_nodes
1336        };
1337        let unchanged_edges: usize = if force_refresh_writes {
1338            tx.query_row(
1339                r#"
1340                SELECT COUNT(*)
1341                FROM next_graph_edges n
1342                JOIN graph_edges g
1343                    ON g.edge_key = n.edge_key
1344                WHERE g.row_hash = n.row_hash
1345                "#,
1346                [],
1347                |row| row.get(0),
1348            )?
1349        } else {
1350            skipped_edges
1351        };
1352        let reused_owner_node_properties: usize = if force_refresh_writes {
1353            tx.query_row(
1354                r#"
1355                SELECT COUNT(*)
1356                FROM graph_node_properties g
1357                JOIN next_graph_nodes n ON n.id = g.node_id
1358                LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1359                WHERE c.id IS NULL
1360                "#,
1361                [],
1362                |row| row.get(0),
1363            )?
1364        } else {
1365            tx.query_row(
1366                r#"
1367                SELECT COUNT(*)
1368                FROM graph_node_properties g
1369                JOIN next_graph_all_node_ids a ON a.id = g.node_id
1370                LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1371                WHERE c.id IS NULL
1372                "#,
1373                [],
1374                |row| row.get(0),
1375            )?
1376        };
1377        let reused_owner_edge_properties: usize = if force_refresh_writes {
1378            tx.query_row(
1379                r#"
1380                SELECT COUNT(*)
1381                FROM graph_edge_properties g
1382                JOIN next_graph_edges n ON n.edge_key = g.edge_key
1383                LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1384                WHERE c.edge_key IS NULL
1385                "#,
1386                [],
1387                |row| row.get(0),
1388            )?
1389        } else {
1390            tx.query_row(
1391                r#"
1392                SELECT COUNT(*)
1393                FROM graph_edge_properties g
1394                JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1395                LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1396                WHERE c.edge_key IS NULL
1397                "#,
1398                [],
1399                |row| row.get(0),
1400            )?
1401        };
1402        let unchanged_changed_node_properties: usize = tx.query_row(
1403            r#"
1404            SELECT COUNT(*)
1405            FROM next_graph_node_properties n
1406            JOIN graph_node_properties g
1407                ON g.node_id = n.node_id AND g.key = n.key
1408            WHERE g.value = n.value
1409            "#,
1410            [],
1411            |row| row.get(0),
1412        )?;
1413        let unchanged_changed_edge_properties: usize = tx.query_row(
1414            r#"
1415            SELECT COUNT(*)
1416            FROM next_graph_edge_properties n
1417            JOIN graph_edge_properties g
1418                ON g.edge_key = n.edge_key AND g.key = n.key
1419            WHERE g.value = n.value
1420            "#,
1421            [],
1422            |row| row.get(0),
1423        )?;
1424        let unchanged_properties = reused_owner_node_properties
1425            + reused_owner_edge_properties
1426            + unchanged_changed_node_properties
1427            + unchanged_changed_edge_properties;
1428
1429        let deleted_edges = if force_refresh_writes {
1430            tx.execute(
1431                r#"
1432                DELETE FROM graph_edges
1433                WHERE NOT EXISTS (
1434                    SELECT 1
1435                    FROM next_graph_edges n
1436                    WHERE n.edge_key = graph_edges.edge_key
1437                )
1438                "#,
1439                [],
1440            )?
1441        } else {
1442            tx.execute(
1443                r#"
1444                DELETE FROM graph_edges
1445                WHERE NOT EXISTS (
1446                    SELECT 1
1447                    FROM next_graph_all_edge_keys n
1448                    WHERE n.edge_key = graph_edges.edge_key
1449                )
1450                "#,
1451                [],
1452            )?
1453        };
1454        let deleted_nodes = if force_refresh_writes {
1455            tx.execute(
1456                r#"
1457                DELETE FROM graph_nodes
1458                WHERE NOT EXISTS (
1459                    SELECT 1
1460                    FROM next_graph_nodes n
1461                    WHERE n.id = graph_nodes.id
1462                )
1463                "#,
1464                [],
1465            )?
1466        } else {
1467            tx.execute(
1468                r#"
1469                DELETE FROM graph_nodes
1470                WHERE NOT EXISTS (
1471                    SELECT 1
1472                    FROM next_graph_all_node_ids n
1473                    WHERE n.id = graph_nodes.id
1474                )
1475                "#,
1476                [],
1477            )?
1478        };
1479
1480        let upsert_nodes_sql = r#"
1481            INSERT INTO graph_nodes
1482                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1483            SELECT
1484                n.id,
1485                n.kind,
1486                n.label,
1487                n.properties_json,
1488                n.provenance_json,
1489                n.freshness_json,
1490                n.row_hash,
1491                n.source_watermark
1492            FROM next_graph_nodes n
1493            JOIN next_graph_changed_nodes c ON c.id = n.id
1494            WHERE true
1495            ON CONFLICT(id) DO UPDATE SET
1496                kind = excluded.kind,
1497                label = excluded.label,
1498                properties_json = excluded.properties_json,
1499                provenance_json = excluded.provenance_json,
1500                freshness_json = excluded.freshness_json,
1501                row_hash = excluded.row_hash,
1502                source_watermark = excluded.source_watermark
1503            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1504            "#;
1505        tx.execute(upsert_nodes_sql, [])?;
1506        tx.execute(
1507            r#"
1508                DELETE FROM graph_node_semantic_vectors
1509                WHERE EXISTS (
1510                    SELECT 1
1511                    FROM next_graph_changed_nodes c
1512                    WHERE c.id = graph_node_semantic_vectors.node_id
1513                )
1514                AND NOT EXISTS (
1515                    SELECT 1
1516                    FROM next_graph_node_semantic_vectors n
1517                    WHERE n.node_id = graph_node_semantic_vectors.node_id
1518                )
1519                "#,
1520            [],
1521        )?;
1522        tx.execute(
1523            r#"
1524                INSERT INTO graph_node_semantic_vectors
1525                    (node_id, kind, model, dimensions, vector_blob)
1526                SELECT n.node_id, n.kind, n.model, n.dimensions, n.vector_blob
1527                FROM next_graph_node_semantic_vectors n
1528                WHERE true
1529                ON CONFLICT(node_id) DO UPDATE SET
1530                    kind = excluded.kind,
1531                    model = excluded.model,
1532                    dimensions = excluded.dimensions,
1533                    vector_blob = excluded.vector_blob
1534                "#,
1535            [],
1536        )?;
1537        let upsert_edges_sql = r#"
1538            INSERT INTO graph_edges
1539            (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1540            SELECT
1541                n.edge_key,
1542                n.from_id,
1543                n.to_id,
1544                n.kind,
1545                n.properties_json,
1546                n.provenance_json,
1547                n.freshness_json,
1548                n.row_hash,
1549                n.source_watermark
1550            FROM next_graph_edges n
1551            JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1552            WHERE true
1553            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1554                edge_key = excluded.edge_key,
1555                properties_json = excluded.properties_json,
1556                provenance_json = excluded.provenance_json,
1557                freshness_json = excluded.freshness_json,
1558                row_hash = excluded.row_hash,
1559                source_watermark = excluded.source_watermark
1560            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1561            "#;
1562        tx.execute(upsert_edges_sql, [])?;
1563        let deleted_node_properties = tx.execute(
1564            r#"
1565            DELETE FROM graph_node_properties
1566            WHERE EXISTS (
1567                SELECT 1
1568                FROM next_graph_changed_nodes c
1569                WHERE c.id = graph_node_properties.node_id
1570            )
1571              AND NOT EXISTS (
1572                SELECT 1
1573                FROM next_graph_node_properties n
1574                WHERE n.node_id = graph_node_properties.node_id
1575                  AND n.key = graph_node_properties.key
1576            )
1577            "#,
1578            [],
1579        )?;
1580        let deleted_edge_properties = tx.execute(
1581            r#"
1582            DELETE FROM graph_edge_properties
1583            WHERE EXISTS (
1584                SELECT 1
1585                FROM next_graph_changed_edges c
1586                WHERE c.edge_key = graph_edge_properties.edge_key
1587            )
1588              AND NOT EXISTS (
1589                SELECT 1
1590                FROM next_graph_edge_properties n
1591                WHERE n.edge_key = graph_edge_properties.edge_key
1592                  AND n.key = graph_edge_properties.key
1593            )
1594            "#,
1595            [],
1596        )?;
1597        let deleted_properties = deleted_node_properties + deleted_edge_properties;
1598        let upsert_properties_sql = r#"
1599            INSERT INTO graph_node_properties (node_id, key, value)
1600            SELECT n.node_id, n.key, n.value
1601            FROM next_graph_node_properties n
1602            LEFT JOIN graph_node_properties g
1603                ON g.node_id = n.node_id AND g.key = n.key
1604            WHERE g.node_id IS NULL OR g.value IS NOT n.value
1605            ON CONFLICT(node_id, key) DO UPDATE SET
1606                value = excluded.value
1607            WHERE graph_node_properties.value IS NOT excluded.value
1608            "#;
1609        let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1610        let upsert_edge_properties_sql = r#"
1611            INSERT INTO graph_edge_properties (edge_key, key, value)
1612            SELECT n.edge_key, n.key, n.value
1613            FROM next_graph_edge_properties n
1614            LEFT JOIN graph_edge_properties g
1615                ON g.edge_key = n.edge_key AND g.key = n.key
1616            WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1617            ON CONFLICT(edge_key, key) DO UPDATE SET
1618                value = excluded.value
1619            WHERE graph_edge_properties.value IS NOT excluded.value
1620            "#;
1621        let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1622        let upserted_properties = upserted_node_properties + upserted_edge_properties;
1623        tx.execute(
1624            r#"
1625            INSERT INTO graph_projection_versions
1626                (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1627            VALUES (?1, ?2, ?3, ?4, ?5)
1628            ON CONFLICT(scope) DO UPDATE SET
1629                projection_version = excluded.projection_version,
1630                content_hash = excluded.content_hash,
1631                source_watermark = excluded.source_watermark,
1632                observed_at_unix = excluded.observed_at_unix
1633            "#,
1634            (
1635                &scope,
1636                &projection_version,
1637                &projection_hash,
1638                &source_watermark,
1639                observed_at_unix,
1640            ),
1641        )?;
1642        let pruned_node_tombstones = tx.execute(
1643            r#"
1644            DELETE FROM graph_tombstones
1645            WHERE row_kind = 'node'
1646              AND EXISTS (
1647                SELECT 1
1648                FROM next_graph_nodes n
1649                WHERE n.id = substr(graph_tombstones.row_key, 6)
1650              )
1651            "#,
1652            [],
1653        )?;
1654        let pruned_edge_tombstones = tx.execute(
1655            r#"
1656            DELETE FROM graph_tombstones
1657            WHERE row_kind = 'edge'
1658              AND EXISTS (
1659                SELECT 1
1660                FROM next_graph_edges n
1661                WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1662              )
1663            "#,
1664            [],
1665        )?;
1666        {
1667            let mut insert_node_tombstone = tx.prepare(
1668                r#"
1669                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1670                VALUES (?1, 'node', ?2)
1671                ON CONFLICT(row_key) DO UPDATE SET
1672                    row_kind = excluded.row_kind,
1673                    deleted_at_unix = excluded.deleted_at_unix
1674                "#,
1675            )?;
1676            for id in &tombstoned_nodes {
1677                insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1678            }
1679        }
1680        {
1681            let mut insert_edge_tombstone = tx.prepare(
1682                r#"
1683                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1684                VALUES (?1, 'edge', ?2)
1685                ON CONFLICT(row_key) DO UPDATE SET
1686                    row_kind = excluded.row_kind,
1687                    deleted_at_unix = excluded.deleted_at_unix
1688                "#,
1689            )?;
1690            for key in &tombstoned_edges {
1691                insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1692            }
1693        }
1694        let tombstone_node_count: usize = tx.query_row(
1695            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1696            [],
1697            |row| row.get(0),
1698        )?;
1699        let tombstone_edge_count: usize = tx.query_row(
1700            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1701            [],
1702            |row| row.get(0),
1703        )?;
1704        tx.execute(
1705            r#"
1706            INSERT INTO graph_operator_stats
1707                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1708            VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1709            ON CONFLICT(scope) DO UPDATE SET
1710                nodes = excluded.nodes,
1711                edges = excluded.edges,
1712                tombstone_nodes = excluded.tombstone_nodes,
1713                tombstone_edges = excluded.tombstone_edges,
1714                file_size_bytes = excluded.file_size_bytes,
1715                freelist_bytes = excluded.freelist_bytes,
1716                observed_at_unix = excluded.observed_at_unix
1717            "#,
1718            (
1719                &scope,
1720                projection.nodes.len() as i64,
1721                projection.edges.len() as i64,
1722                tombstone_node_count as i64,
1723                tombstone_edge_count as i64,
1724                observed_at_unix,
1725            ),
1726        )?;
1727        phase_timings.push(sqlite_refresh_phase_timing(
1728            "sqlite_delta_write",
1729            delta_started,
1730            "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1731        ));
1732        let commit_started = Instant::now();
1733        tx.commit()?;
1734        phase_timings.push(sqlite_refresh_phase_timing(
1735            "sqlite_commit",
1736            commit_started,
1737            "commit refresh transaction and publish old-or-new graph visibility",
1738        ));
1739        let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1740        let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1741        let stats_started = Instant::now();
1742        self.conn.execute(
1743            r#"
1744            UPDATE graph_operator_stats
1745            SET file_size_bytes = ?2,
1746                freelist_bytes = ?3,
1747                observed_at_unix = ?4
1748            WHERE scope = ?1
1749            "#,
1750            (
1751                &scope,
1752                file_size_bytes_after.map(|value| value as i64),
1753                freelist_bytes_after.map(|value| value as i64),
1754                unix_now(),
1755            ),
1756        )?;
1757        phase_timings.push(sqlite_refresh_phase_timing(
1758            "sqlite_stats_cache_update",
1759            stats_started,
1760            "persist post-commit file and freelist proof for status/doctor",
1761        ));
1762        Ok(SqliteProjectionRefresh {
1763            scope,
1764            projection_version,
1765            source_watermark,
1766            upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1767            upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1768            unchanged_nodes,
1769            unchanged_edges,
1770            upserted_properties,
1771            unchanged_properties,
1772            deleted_properties,
1773            deleted_nodes,
1774            deleted_edges,
1775            pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1776            file_size_bytes_before,
1777            file_size_bytes_after,
1778            tombstoned_nodes,
1779            tombstoned_edges,
1780            phase_timings,
1781        })
1782    }
1783
1784    /// Derive a Semantic Ontology Graph (`#memgraphrag-ont`, MemGraphRAG arxiv
1785    /// 2606.00610 third layer) from the instance graph: one `ontology_type` node
1786    /// per distinct node kind, and one `ontology_relation:<edge_kind>` edge per
1787    /// observed `(from_kind, edge_kind, to_kind)` triple, each carrying an
1788    /// `instance_count`. This data-driven schema lets retrieval start from
1789    /// abstract types and prune by permitted inter-type relations. Existing
1790    /// ontology rows are excluded so the derivation is idempotent and never folds
1791    /// the ontology layer into itself.
1792    pub fn derive_ontology(&self) -> Result<GraphProjection> {
1793        let mut projection = GraphProjection::default();
1794
1795        let mut node_stmt = self.conn.prepare(
1796            "SELECT kind, COUNT(*) FROM graph_nodes \
1797             WHERE kind != 'ontology_type' \
1798             GROUP BY kind ORDER BY kind",
1799        )?;
1800        let node_rows = node_stmt.query_map([], |row| {
1801            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1802        })?;
1803        for row in node_rows {
1804            let (kind, count) = row?;
1805            projection.nodes.push(
1806                GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1807                    .with_property("type_kind", &kind)
1808                    .with_property("instance_count", count.to_string())
1809                    .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1810            );
1811        }
1812
1813        let mut rel_stmt = self.conn.prepare(
1814            "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1815             FROM graph_edges e \
1816             JOIN graph_nodes n1 ON e.from_id = n1.id \
1817             JOIN graph_nodes n2 ON e.to_id = n2.id \
1818             WHERE e.kind NOT LIKE 'ontology_relation:%' \
1819               AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1820             GROUP BY n1.kind, e.kind, n2.kind \
1821             ORDER BY n1.kind, e.kind, n2.kind",
1822        )?;
1823        let rel_rows = rel_stmt.query_map([], |row| {
1824            Ok((
1825                row.get::<_, String>(0)?,
1826                row.get::<_, String>(1)?,
1827                row.get::<_, String>(2)?,
1828                row.get::<_, i64>(3)?,
1829            ))
1830        })?;
1831        for row in rel_rows {
1832            let (from_kind, edge_kind, to_kind, count) = row?;
1833            projection.edges.push(
1834                GraphEdge::new(
1835                    format!("ontology_type:{from_kind}"),
1836                    format!("ontology_type:{to_kind}"),
1837                    format!("ontology_relation:{edge_kind}"),
1838                )
1839                .with_property("edge_kind", &edge_kind)
1840                .with_property("instance_count", count.to_string())
1841                .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1842            );
1843        }
1844
1845        Ok(projection)
1846    }
1847
1848    pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1849        let tx = self.conn.transaction()?;
1850        {
1851            let mut insert_node = tx.prepare(
1852                r#"
1853                INSERT INTO graph_nodes
1854                    (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1855                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1856                ON CONFLICT(id) DO UPDATE SET
1857                    kind = excluded.kind,
1858                    label = excluded.label,
1859                    properties_json = excluded.properties_json,
1860                provenance_json = excluded.provenance_json,
1861                freshness_json = excluded.freshness_json,
1862                row_hash = excluded.row_hash,
1863                source_watermark = excluded.source_watermark
1864            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1865               OR graph_nodes.source_watermark IS NOT excluded.source_watermark
1866            "#,
1867            )?;
1868            let mut delete_properties =
1869                tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1870            let mut insert_property = tx.prepare(
1871                r#"
1872                INSERT INTO graph_node_properties (node_id, key, value)
1873                VALUES (?1, ?2, ?3)
1874                "#,
1875            )?;
1876            for node in &projection.nodes {
1877                let changed = insert_node.execute((
1878                    &node.id,
1879                    &node.kind,
1880                    &node.label,
1881                    to_json(&node.properties)?,
1882                    to_json(&node.provenance)?,
1883                    optional_to_json(&node.freshness)?,
1884                    row_hash(node)?,
1885                ))?;
1886                if changed > 0 {
1887                    delete_properties.execute([&node.id])?;
1888                    for (key, value) in &node.properties {
1889                        insert_property.execute((&node.id, key, value))?;
1890                    }
1891                    replace_node_semantic_vector(&tx, &node.id, &node.kind, &node.properties)?;
1892                }
1893            }
1894        }
1895        {
1896            let mut insert_edge = tx.prepare(
1897                r#"
1898                INSERT INTO graph_edges
1899                    (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1900                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1901                ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1902                    edge_key = excluded.edge_key,
1903                    properties_json = excluded.properties_json,
1904                    provenance_json = excluded.provenance_json,
1905                freshness_json = excluded.freshness_json,
1906                row_hash = excluded.row_hash,
1907                source_watermark = excluded.source_watermark
1908            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1909               OR graph_edges.source_watermark IS NOT excluded.source_watermark
1910            "#,
1911            )?;
1912            let mut delete_properties =
1913                tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1914            let mut insert_property = tx.prepare(
1915                r#"
1916                INSERT INTO graph_edge_properties (edge_key, key, value)
1917                VALUES (?1, ?2, ?3)
1918                "#,
1919            )?;
1920            for edge in &projection.edges {
1921                let edge_key = graph_edge_id(edge);
1922                let changed = insert_edge.execute((
1923                    &edge_key,
1924                    &edge.from_id,
1925                    &edge.to_id,
1926                    &edge.kind,
1927                    to_json(&edge.properties)?,
1928                    to_json(&edge.provenance)?,
1929                    optional_to_json(&edge.freshness)?,
1930                    row_hash(edge)?,
1931                ))?;
1932                if changed > 0 {
1933                    delete_properties.execute([&edge_key])?;
1934                    for (key, value) in &edge.properties {
1935                        insert_property.execute((&edge_key, key, value))?;
1936                    }
1937                }
1938            }
1939        }
1940        tx.commit()?;
1941        Ok(())
1942    }
1943
1944    pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1945        self.conn
1946            .query_row(
1947                r#"
1948                SELECT projection_version, content_hash, source_watermark
1949                FROM graph_projection_versions
1950                WHERE scope = ?1
1951                "#,
1952                [scope],
1953                |row| {
1954                    Ok(SqliteProjectionVersion {
1955                        projection_version: row.get(0)?,
1956                        content_hash: row.get(1)?,
1957                        source_watermark: row.get(2)?,
1958                    })
1959                },
1960            )
1961            .optional()
1962            .map_err(Into::into)
1963    }
1964
1965    pub fn update_projection_source_watermark(
1966        &mut self,
1967        scope: &str,
1968        source_watermark: Option<String>,
1969    ) -> Result<()> {
1970        self.conn.execute(
1971            r#"
1972            UPDATE graph_projection_versions
1973            SET source_watermark = ?2
1974            WHERE scope = ?1
1975            "#,
1976            (scope, source_watermark),
1977        )?;
1978        Ok(())
1979    }
1980
1981    pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1982        let pruned_tombstones = if prune_tombstones {
1983            self.conn.execute("DELETE FROM graph_tombstones", [])?
1984        } else {
1985            0
1986        };
1987        self.conn.execute_batch(
1988            r#"
1989            PRAGMA wal_checkpoint(TRUNCATE);
1990            VACUUM;
1991            "#,
1992        )?;
1993        let nodes = self
1994            .conn
1995            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1996                row.get::<_, i64>(0)
1997            })?;
1998        let edges = self
1999            .conn
2000            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2001                row.get::<_, i64>(0)
2002            })?;
2003        let tombstone_nodes = self.conn.query_row(
2004            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
2005            [],
2006            |row| row.get::<_, i64>(0),
2007        )?;
2008        let tombstone_edges = self.conn.query_row(
2009            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
2010            [],
2011            |row| row.get::<_, i64>(0),
2012        )?;
2013        let file_size_bytes = sqlite_database_size_bytes(&self.conn)
2014            .ok()
2015            .map(|value| value as i64);
2016        let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
2017            .ok()
2018            .map(|value| value as i64);
2019        self.conn.execute(
2020            r#"
2021            INSERT INTO graph_operator_stats
2022                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
2023            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
2024            ON CONFLICT(scope) DO UPDATE SET
2025                nodes = excluded.nodes,
2026                edges = excluded.edges,
2027                tombstone_nodes = excluded.tombstone_nodes,
2028                tombstone_edges = excluded.tombstone_edges,
2029                file_size_bytes = excluded.file_size_bytes,
2030                freelist_bytes = excluded.freelist_bytes,
2031                observed_at_unix = excluded.observed_at_unix
2032            "#,
2033            (
2034                scope,
2035                nodes,
2036                edges,
2037                tombstone_nodes,
2038                tombstone_edges,
2039                file_size_bytes,
2040                freelist_bytes,
2041            ),
2042        )?;
2043        Ok(pruned_tombstones)
2044    }
2045
2046    fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2047        let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
2048        let in_clause = placeholders.join(", ");
2049        let sql = format!(
2050            "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
2051             FROM graph_edges e \
2052             WHERE e.from_id IN ({in_clause}) \
2053               AND e.to_id IN ({in_clause}) \
2054             ORDER BY e.from_id, e.kind, e.to_id"
2055        );
2056        let values: Vec<Value> = node_ids
2057            .iter()
2058            .chain(node_ids.iter())
2059            .map(|id| Value::Text(id.clone()))
2060            .collect();
2061        let mut stmt = self.conn.prepare(&sql)?;
2062        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2063    }
2064}
2065
2066fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
2067    let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
2068    collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2069        row.get::<_, String>(3)
2070    })?)
2071}
2072
2073fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
2074    let mut diagnostics = vec![format!(
2075        "sqlite query pushdown active; plan: {}",
2076        plan.join(" | ")
2077    )];
2078    for expected_index in expected_indexes {
2079        if plan.iter().any(|row| row.contains(expected_index)) {
2080            diagnostics.push(format!("sqlite query plan uses {expected_index}"));
2081        } else {
2082            diagnostics.push(format!(
2083                "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
2084            ));
2085        }
2086    }
2087    diagnostics
2088}
2089
2090#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2091pub struct TerseDiagnostic {
2092    pub code: &'static str,
2093    #[serde(skip_serializing_if = "Option::is_none")]
2094    pub index: Option<String>,
2095}
2096
2097#[allow(dead_code)]
2098fn terse_query_plan_diagnostics(
2099    plan: &[String],
2100    expected_indexes: &[&str],
2101) -> Vec<TerseDiagnostic> {
2102    let mut diagnostics = vec![TerseDiagnostic {
2103        code: "plan_active",
2104        index: None,
2105    }];
2106    for expected_index in expected_indexes {
2107        if plan.iter().any(|row| row.contains(expected_index)) {
2108            diagnostics.push(TerseDiagnostic {
2109                code: "idx_ok",
2110                index: Some(expected_index.to_string()),
2111            });
2112        } else {
2113            diagnostics.push(TerseDiagnostic {
2114                code: "idx_missing",
2115                index: Some(expected_index.to_string()),
2116            });
2117        }
2118    }
2119    diagnostics
2120}
2121
2122fn push_sqlite_property_filter_exists(
2123    sql: &mut String,
2124    values: &mut Vec<Value>,
2125    node_alias: &str,
2126    filters: &[GraphPropertyFilter],
2127) {
2128    for (index, filter) in filters.iter().enumerate() {
2129        sql.push_str(&format!(
2130            r#"
2131            AND EXISTS (
2132                SELECT 1
2133                FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
2134                WHERE p{index}.node_id = {node_alias}.id
2135                  AND p{index}.key = ?
2136                  AND p{index}.value = ?
2137            )
2138            "#
2139        ));
2140        values.push(Value::Text(filter.key.clone()));
2141        values.push(Value::Text(filter.value.clone()));
2142    }
2143}
2144
2145fn push_sqlite_edge_property_filter_exists(
2146    sql: &mut String,
2147    values: &mut Vec<Value>,
2148    edge_alias: &str,
2149    filters: &[GraphPropertyFilter],
2150) {
2151    for (index, filter) in filters.iter().enumerate() {
2152        sql.push_str(&format!(
2153            r#"
2154            AND EXISTS (
2155                SELECT 1
2156                FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
2157                WHERE ep{index}.edge_key = {edge_alias}.edge_key
2158                  AND ep{index}.key = ?
2159                  AND ep{index}.value = ?
2160            )
2161            "#
2162        ));
2163        values.push(Value::Text(filter.key.clone()));
2164        values.push(Value::Text(filter.value.clone()));
2165    }
2166}
2167
2168struct SqliteIncidentEdgeBranch<'a> {
2169    index_name: &'a str,
2170    endpoint_column: &'a str,
2171    node_id: &'a str,
2172    kind: Option<&'a str>,
2173    filters: &'a [GraphPropertyFilter],
2174    cursor: Option<&'a str>,
2175}
2176
2177fn push_sqlite_incident_edge_branch(
2178    sql: &mut String,
2179    values: &mut Vec<Value>,
2180    branch: SqliteIncidentEdgeBranch<'_>,
2181) {
2182    sql.push_str(&format!(
2183        r#"
2184        SELECT
2185            e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2186        FROM graph_edges e INDEXED BY {index_name}
2187        WHERE e.{endpoint_column} = ?
2188        "#,
2189        index_name = branch.index_name,
2190        endpoint_column = branch.endpoint_column,
2191    ));
2192    values.push(Value::Text(branch.node_id.to_string()));
2193    if let Some(kind) = branch.kind {
2194        sql.push_str(" AND e.kind = ?");
2195        values.push(Value::Text(kind.to_string()));
2196    }
2197    push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
2198    if let Some(cursor) = branch.cursor {
2199        sql.push_str(" AND e.edge_key > ?");
2200        values.push(Value::Text(cursor.to_string()));
2201    }
2202}
2203
2204fn sqlite_incident_edges_union_query(
2205    node_id: &str,
2206    kind: Option<&str>,
2207    filters: &[GraphPropertyFilter],
2208    cursor: Option<&str>,
2209    limit: Option<usize>,
2210) -> (String, Vec<Value>) {
2211    let mut sql = String::from(
2212        r#"
2213        SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2214        FROM (
2215        "#,
2216    );
2217    let mut values = Vec::new();
2218    push_sqlite_incident_edge_branch(
2219        &mut sql,
2220        &mut values,
2221        SqliteIncidentEdgeBranch {
2222            index_name: "idx_graph_edges_from_kind",
2223            endpoint_column: "from_id",
2224            node_id,
2225            kind,
2226            filters,
2227            cursor,
2228        },
2229    );
2230    sql.push_str(" UNION ");
2231    push_sqlite_incident_edge_branch(
2232        &mut sql,
2233        &mut values,
2234        SqliteIncidentEdgeBranch {
2235            index_name: "idx_graph_edges_to_kind",
2236            endpoint_column: "to_id",
2237            node_id,
2238            kind,
2239            filters,
2240            cursor,
2241        },
2242    );
2243    sql.push_str(
2244        r#"
2245        ) e
2246        ORDER BY e.edge_key
2247        "#,
2248    );
2249    if let Some(limit) = limit {
2250        sql.push_str(" LIMIT ?");
2251        values.push(Value::Integer(limit.saturating_add(1) as i64));
2252    }
2253    (sql, values)
2254}
2255
2256fn sqlite_semantic_seeded_edge_score_expr(edge_alias: &str, direction_bonus: &str) -> String {
2257    format!(
2258        "(CASE {edge_alias}.kind \
2259WHEN 'semantic_relation' THEN 340 \
2260WHEN 'mentions_entity' THEN 280 \
2261WHEN 'mentions_concept' THEN 280 \
2262WHEN 'tagged_entity' THEN 280 \
2263WHEN 'tagged_concept' THEN 280 \
2264WHEN 'related_concept' THEN 280 \
2265WHEN 'mentions' THEN 220 \
2266WHEN 'calls' THEN 200 \
2267WHEN 'requests_context' THEN 180 \
2268WHEN 'scopes_context' THEN 180 \
2269WHEN 'scopes_source' THEN 180 \
2270WHEN 'explains_result' THEN 180 \
2271WHEN 'defines' THEN 120 \
2272WHEN 'contains' THEN 120 \
2273WHEN 'belongs_to' THEN 120 \
2274WHEN {edge_alias}.kind LIKE '%community%' THEN 200 \
2275WHEN {edge_alias}.kind LIKE '%semantic%' \
2276OR {edge_alias}.kind LIKE '%concept%' \
2277OR {edge_alias}.kind LIKE '%entity%' THEN 240 \
2278ELSE 80 END) \
2279+ ({direction_bonus}) \
2280+ (CASE {edge_alias}.kind \
2281WHEN 'mentions_concept' THEN 30 \
2282WHEN 'mentions_entity' THEN 30 \
2283WHEN 'tagged_concept' THEN 30 \
2284WHEN 'tagged_entity' THEN 30 \
2285WHEN 'related_concept' THEN 30 \
2286WHEN 'semantic_relation' THEN 28 \
2287WHEN 'calls' THEN 24 \
2288WHEN 'mentions' THEN 22 \
2289WHEN 'requests_context' THEN 18 \
2290WHEN 'scopes_context' THEN 18 \
2291WHEN 'scopes_source' THEN 18 \
2292WHEN 'explains_result' THEN 18 \
2293WHEN 'defines' THEN 12 \
2294WHEN 'contains' THEN 12 \
2295WHEN 'belongs_to' THEN 12 \
2296ELSE 0 END)"
2297    )
2298}
2299
2300impl GraphStore for SqliteGraphStore {
2301    fn upsert_node(&self, node: &GraphNode) -> Result<()> {
2302        self.conn.execute(
2303            r#"
2304            INSERT INTO graph_nodes
2305                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2306            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
2307            ON CONFLICT(id) DO UPDATE SET
2308                kind = excluded.kind,
2309                label = excluded.label,
2310                properties_json = excluded.properties_json,
2311                provenance_json = excluded.provenance_json,
2312                freshness_json = excluded.freshness_json,
2313                row_hash = excluded.row_hash,
2314                source_watermark = excluded.source_watermark
2315            "#,
2316            (
2317                &node.id,
2318                &node.kind,
2319                &node.label,
2320                to_json(&node.properties)?,
2321                to_json(&node.provenance)?,
2322                optional_to_json(&node.freshness)?,
2323                row_hash(node)?,
2324            ),
2325        )?;
2326        replace_node_properties(&self.conn, &node.id, &node.properties)?;
2327        replace_node_semantic_vector(&self.conn, &node.id, &node.kind, &node.properties)?;
2328        Ok(())
2329    }
2330
2331    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2332        let edge_key = graph_edge_id(edge);
2333        self.conn.execute(
2334            r#"
2335            INSERT INTO graph_edges
2336                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2337            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2338            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2339                edge_key = excluded.edge_key,
2340                properties_json = excluded.properties_json,
2341                provenance_json = excluded.provenance_json,
2342                freshness_json = excluded.freshness_json,
2343                row_hash = excluded.row_hash,
2344                source_watermark = excluded.source_watermark
2345            "#,
2346            (
2347                &edge_key,
2348                &edge.from_id,
2349                &edge.to_id,
2350                &edge.kind,
2351                to_json(&edge.properties)?,
2352                to_json(&edge.provenance)?,
2353                optional_to_json(&edge.freshness)?,
2354                row_hash(edge)?,
2355            ),
2356        )?;
2357        replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2358        Ok(())
2359    }
2360
2361    fn delete_node(&self, id: &str) -> Result<usize> {
2362        self.conn
2363            .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2364            .map_err(Into::into)
2365    }
2366
2367    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2368        self.conn
2369            .execute(
2370                "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2371                (from_id, to_id, kind),
2372            )
2373            .map_err(Into::into)
2374    }
2375
2376    fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2377        self.conn
2378            .query_row(
2379                r#"
2380SELECT id, kind, label, properties_json, provenance_json, freshness_json
2381                FROM graph_nodes
2382                WHERE id = ?1
2383                "#,
2384                [id],
2385                node_from_row,
2386            )
2387            .optional()
2388            .map_err(Into::into)
2389    }
2390
2391    fn nodes_by_ids(&self, ids: &[String]) -> Result<Vec<GraphNode>> {
2392        let unique_ids = ids.iter().cloned().collect::<BTreeSet<_>>();
2393        if unique_ids.is_empty() {
2394            return Ok(Vec::new());
2395        }
2396
2397        let mut nodes = Vec::new();
2398        let id_refs = unique_ids.iter().collect::<Vec<_>>();
2399        for chunk in id_refs.chunks(450) {
2400            let placeholders = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
2401            let sql = format!(
2402                "SELECT id, kind, label, properties_json, provenance_json, freshness_json \
2403FROM graph_nodes \
2404WHERE id IN ({placeholders}) \
2405ORDER BY id"
2406            );
2407            let values = chunk
2408                .iter()
2409                .map(|id| Value::Text((*id).clone()))
2410                .collect::<Vec<_>>();
2411            let mut stmt = self.conn.prepare(&sql)?;
2412            nodes.extend(collect_rows(
2413                stmt.query_map(params_from_iter(values.iter()), node_from_row)?,
2414            )?);
2415        }
2416        nodes.sort_by(|left, right| left.id.cmp(&right.id));
2417        Ok(nodes)
2418    }
2419
2420    fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2421        let mut stmt = self.conn.prepare(
2422            r#"
2423SELECT id, kind, label, properties_json, provenance_json, freshness_json
2424            FROM graph_nodes
2425            ORDER BY id
2426            "#,
2427        )?;
2428        collect_rows(stmt.query_map([], node_from_row)?)
2429    }
2430
2431    fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2432        let mut stmt = self.conn.prepare(
2433            r#"
2434            SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2435            FROM graph_edges
2436            ORDER BY from_id, kind, to_id
2437            "#,
2438        )?;
2439        collect_rows(stmt.query_map([], edge_from_row)?)
2440    }
2441
2442    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2443        self.conn
2444            .query_row(
2445                r#"
2446                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2447                FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2448                WHERE edge_key = ?1
2449                "#,
2450                [edge_id],
2451                edge_from_row,
2452            )
2453            .optional()
2454            .map_err(Into::into)
2455    }
2456
2457    fn graph_counts(&self) -> Result<(usize, usize)> {
2458        let nodes = self
2459            .conn
2460            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2461                row.get::<_, usize>(0)
2462            })?;
2463        let edges = self
2464            .conn
2465            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2466                row.get::<_, usize>(0)
2467            })?;
2468        Ok((nodes, edges))
2469    }
2470
2471    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2472        match kind {
2473            Some(kind) => self
2474                .conn
2475                .query_row(
2476                    r#"
2477                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2478                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2479                    WHERE from_id <> to_id AND kind = ?1
2480                    ORDER BY from_id, kind, to_id
2481                    LIMIT 1
2482                    "#,
2483                    [kind],
2484                    edge_from_row,
2485                )
2486                .optional()
2487                .map_err(Into::into),
2488            None => self
2489                .conn
2490                .query_row(
2491                    r#"
2492                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2493                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2494                    WHERE from_id <> to_id
2495                    ORDER BY from_id, kind, to_id
2496                    LIMIT 1
2497                    "#,
2498                    [],
2499                    edge_from_row,
2500                )
2501                .optional()
2502                .map_err(Into::into),
2503        }
2504    }
2505
2506    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2507        self.conn
2508            .query_row(
2509                r#"
2510                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2511                       ep.key, ep.value
2512                FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2513                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2514                  ON e.edge_key = ep.edge_key
2515                WHERE e.from_id <> e.to_id
2516                ORDER BY ep.key, ep.value, ep.edge_key
2517                LIMIT 1
2518                "#,
2519                [],
2520                |row| {
2521                    Ok((
2522                        edge_from_row(row)?,
2523                        GraphPropertyFilter {
2524                            key: row.get(7)?,
2525                            value: row.get(8)?,
2526                        },
2527                    ))
2528                },
2529            )
2530            .optional()
2531            .map_err(Into::into)
2532    }
2533
2534    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2535        let mut stmt = self.conn.prepare(
2536            r#"
2537            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2538            FROM graph_nodes
2539            WHERE kind = ?1
2540            ORDER BY id
2541            "#,
2542        )?;
2543        collect_rows(stmt.query_map([kind], node_from_row)?)
2544    }
2545
2546    fn semantic_top_candidates(
2547        &self,
2548        query_vector: &[f64],
2549        kinds: &[&str],
2550        limit: usize,
2551    ) -> Result<Vec<GraphSemanticCandidate>> {
2552        if query_vector.is_empty() || kinds.is_empty() {
2553            return Ok(Vec::new());
2554        }
2555        if !sqlite_table_exists(&self.conn, "graph_node_semantic_vectors")? {
2556            return graph_semantic_top_candidates_by_property_scan(
2557                self,
2558                query_vector,
2559                kinds,
2560                limit,
2561            );
2562        }
2563
2564        let unique_kinds = kinds.iter().copied().collect::<BTreeSet<_>>();
2565        if unique_kinds.is_empty() {
2566            return Ok(Vec::new());
2567        }
2568        let kind_placeholders = unique_kinds
2569            .iter()
2570            .map(|_| "?")
2571            .collect::<Vec<_>>()
2572            .join(", ");
2573        let sql = format!(
2574            r#"
2575            SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2576                   graph_node_semantic_vectors.vector_blob,
2577                   graph_node_semantic_vectors.dimensions
2578            FROM graph_node_semantic_vectors INDEXED BY idx_graph_node_semantic_vectors_kind_dims
2579            JOIN graph_nodes n ON n.id = graph_node_semantic_vectors.node_id
2580            WHERE graph_node_semantic_vectors.dimensions = ?
2581              AND graph_node_semantic_vectors.kind IN ({kind_placeholders})
2582            ORDER BY graph_node_semantic_vectors.kind, n.label, n.id
2583            "#
2584        );
2585        let mut values = vec![Value::Integer(query_vector.len() as i64)];
2586        values.extend(
2587            unique_kinds
2588                .into_iter()
2589                .map(|kind| Value::Text(kind.to_string())),
2590        );
2591        let rows = {
2592            let mut stmt = self.conn.prepare(&sql)?;
2593            collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2594                Ok((
2595                    node_from_row_at(row, 0)?,
2596                    row.get::<_, Vec<u8>>(6)?,
2597                    row.get::<_, i64>(7)?,
2598                ))
2599            })?)?
2600        };
2601
2602        let mut candidates = rows
2603            .into_iter()
2604            .filter_map(|(node, blob, dimensions)| {
2605                let dimensions = usize::try_from(dimensions).ok()?;
2606                let vector = semantic_vector_from_blob(&blob, dimensions)?;
2607                Some(GraphSemanticCandidate {
2608                    score: graph_semantic_cosine(query_vector, &vector),
2609                    node,
2610                })
2611            })
2612            .collect::<Vec<_>>();
2613        candidates.sort_by(|left, right| {
2614            right
2615                .score
2616                .partial_cmp(&left.score)
2617                .unwrap_or(std::cmp::Ordering::Equal)
2618                .then_with(|| left.node.kind.cmp(&right.node.kind))
2619                .then_with(|| left.node.label.cmp(&right.node.label))
2620                .then_with(|| left.node.id.cmp(&right.node.id))
2621        });
2622        if limit > 0 && candidates.len() > limit {
2623            candidates.truncate(limit);
2624        }
2625        Ok(candidates)
2626    }
2627
2628    fn paged_nodes_by_kind(
2629        &self,
2630        kind: &str,
2631        options: GraphQueryOptions,
2632    ) -> Result<GraphPagedSubgraph> {
2633        let mut sql = String::from(
2634            r#"
2635            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2636            FROM graph_nodes
2637            WHERE kind = ?
2638            "#,
2639        );
2640        let mut values = vec![Value::Text(kind.to_string())];
2641        push_sqlite_property_filter_exists(
2642            &mut sql,
2643            &mut values,
2644            "graph_nodes",
2645            &options.property_filters,
2646        );
2647        if let Some(cursor) = &options.cursor {
2648            sql.push_str(" AND id > ?");
2649            values.push(Value::Text(cursor.clone()));
2650        }
2651        sql.push_str(" ORDER BY id");
2652        if let Some(limit) = options.limit {
2653            sql.push_str(" LIMIT ?");
2654            values.push(Value::Integer(limit.saturating_add(1) as i64));
2655        }
2656
2657        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2658        let mut stmt = self.conn.prepare(&sql)?;
2659        let mut nodes =
2660            collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2661        let before_limit = nodes.len();
2662        let mut next_cursor = None;
2663        if let Some(limit) = options.limit
2664            && nodes.len() > limit
2665        {
2666            next_cursor = nodes
2667                .get(limit.saturating_sub(1))
2668                .map(|node| node.id.clone());
2669            nodes.truncate(limit);
2670        }
2671        let expected_indexes = if options.property_filters.is_empty() {
2672            vec!["idx_graph_nodes_kind"]
2673        } else {
2674            vec![
2675                "idx_graph_nodes_kind",
2676                "idx_graph_node_properties_key_value_node",
2677            ]
2678        };
2679        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2680        if !options.property_filters.is_empty() {
2681            diagnostics.push(
2682                "property filters were evaluated by SQLite materialized property rows before paging"
2683                    .to_string(),
2684            );
2685        }
2686        if options.cursor.is_some() {
2687            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2688        }
2689        if next_cursor.is_some() {
2690            diagnostics.push(
2691                "result was truncated; pass page.next_cursor as --cursor for the next page"
2692                    .to_string(),
2693            );
2694        }
2695        Ok(GraphPagedSubgraph {
2696            page: GraphQueryPage {
2697                cursor: options.cursor,
2698                limit: options.limit,
2699                next_cursor,
2700                returned_nodes: nodes.len(),
2701                returned_edges: 0,
2702                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2703                diagnostics,
2704            },
2705            nodes,
2706            edges: Vec::new(),
2707        })
2708    }
2709
2710    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2711        match kind {
2712            Some(kind) => {
2713                let mut stmt = self.conn.prepare(
2714                    r#"
2715                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2716                    FROM graph_edges
2717                    WHERE from_id = ?1 AND kind = ?2
2718                    ORDER BY to_id, kind
2719                    "#,
2720                )?;
2721                collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2722            }
2723            None => {
2724                let mut stmt = self.conn.prepare(
2725                    r#"
2726                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2727                    FROM graph_edges
2728                    WHERE from_id = ?1
2729                    ORDER BY to_id, kind
2730                    "#,
2731                )?;
2732                collect_rows(stmt.query_map([from_id], edge_from_row)?)
2733            }
2734        }
2735    }
2736
2737    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2738        let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2739        let mut stmt = self.conn.prepare(&sql)?;
2740        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2741    }
2742
2743    fn semantic_seeded_expansion_edges(
2744        &self,
2745        current_id: &str,
2746        options: &SemanticSeededNeighborhoodOptions,
2747    ) -> Result<SemanticSeededNeighborhoodExpansion> {
2748        let from_score = sqlite_semantic_seeded_edge_score_expr("e", "8");
2749        let to_score = sqlite_semantic_seeded_edge_score_expr(
2750            "e",
2751            "CASE WHEN e.from_id = e.to_id THEN 8 ELSE 4 END",
2752        );
2753        let limit_clause = if options.edge_scan_cap > 0 {
2754            "LIMIT ?"
2755        } else {
2756            ""
2757        };
2758        let sql = format!(
2759            r#"
2760WITH candidate_edges AS (
2761    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2762           {from_score} AS score
2763    FROM graph_edges e INDEXED BY idx_graph_edges_from_kind
2764    WHERE e.from_id = ?
2765    UNION ALL
2766    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2767           {to_score} AS score
2768    FROM graph_edges e INDEXED BY idx_graph_edges_to_kind
2769    WHERE e.to_id = ?
2770),
2771ranked_edges AS (
2772    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2773           MAX(score) AS score
2774    FROM candidate_edges
2775    GROUP BY edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2776),
2777limited_edges AS (
2778    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json,
2779           score, COUNT(*) OVER () AS total_edges
2780    FROM ranked_edges
2781    ORDER BY score DESC, edge_key ASC
2782    {limit_clause}
2783)
2784SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, total_edges
2785FROM limited_edges
2786ORDER BY score DESC, edge_key ASC
2787"#
2788        );
2789        let mut values = vec![
2790            Value::Text(current_id.to_string()),
2791            Value::Text(current_id.to_string()),
2792        ];
2793        if options.edge_scan_cap > 0 {
2794            values.push(Value::Integer(
2795                options
2796                    .edge_scan_cap
2797                    .saturating_add(1)
2798                    .min(i64::MAX as usize) as i64,
2799            ));
2800        }
2801        let mut stmt = self.conn.prepare(&sql)?;
2802        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2803            Ok((edge_from_row_at(row, 0)?, row.get::<_, i64>(7)? as usize))
2804        })?)?;
2805        let total_candidates = rows.first().map(|(_, total)| *total).unwrap_or(0);
2806        let mut edges = rows.into_iter().map(|(edge, _)| edge).collect::<Vec<_>>();
2807        let mut skipped_by_edge_cap = 0usize;
2808        if options.edge_scan_cap > 0 && total_candidates > options.edge_scan_cap {
2809            skipped_by_edge_cap = total_candidates - options.edge_scan_cap;
2810            edges.truncate(options.edge_scan_cap);
2811        }
2812        Ok(SemanticSeededNeighborhoodExpansion {
2813            edges,
2814            skipped_by_edge_cap,
2815        })
2816    }
2817
2818    fn paged_edges(
2819        &self,
2820        kind: Option<&str>,
2821        options: GraphQueryOptions,
2822    ) -> Result<GraphPagedSubgraph> {
2823        let primary_property_filter = options.property_filters.first();
2824        let mut values = Vec::new();
2825        let mut sql = if let Some(filter) = primary_property_filter {
2826            values.push(Value::Text(filter.key.clone()));
2827            values.push(Value::Text(filter.value.clone()));
2828            String::from(
2829                r#"
2830                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2831                FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2832                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2833                  ON e.edge_key = ep0.edge_key
2834                WHERE ep0.key = ?
2835                  AND ep0.value = ?
2836                "#,
2837            )
2838        } else {
2839            String::from(
2840                r#"
2841                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2842                FROM graph_edges e
2843                WHERE 1 = 1
2844                "#,
2845            )
2846        };
2847        if let Some(kind) = kind {
2848            sql.push_str(" AND e.kind = ?");
2849            values.push(Value::Text(kind.to_string()));
2850        }
2851        push_sqlite_edge_property_filter_exists(
2852            &mut sql,
2853            &mut values,
2854            "e",
2855            if primary_property_filter.is_some() {
2856                &options.property_filters[1..]
2857            } else {
2858                &options.property_filters
2859            },
2860        );
2861        if let Some(cursor) = &options.cursor {
2862            if primary_property_filter.is_some() {
2863                sql.push_str(" AND ep0.edge_key > ?");
2864            } else {
2865                sql.push_str(" AND e.edge_key > ?");
2866            }
2867            values.push(Value::Text(cursor.clone()));
2868        }
2869        if primary_property_filter.is_some() {
2870            sql.push_str(" ORDER BY ep0.edge_key");
2871        } else {
2872            sql.push_str(" ORDER BY e.edge_key");
2873        }
2874        if let Some(limit) = options.limit {
2875            sql.push_str(" LIMIT ?");
2876            values.push(Value::Integer(limit.saturating_add(1) as i64));
2877        }
2878
2879        let primary_property_row_count = if let Some(filter) = primary_property_filter {
2880            Some(self.conn.query_row(
2881                r#"
2882                SELECT COUNT(*)
2883                FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2884                WHERE key = ?1 AND value = ?2
2885                "#,
2886                (&filter.key, &filter.value),
2887                |row| row.get::<_, usize>(0),
2888            )?)
2889        } else {
2890            None
2891        };
2892        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2893        let mut stmt = self.conn.prepare(&sql)?;
2894        let mut edges =
2895            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2896        let before_limit = edges.len();
2897        let mut next_cursor = None;
2898        if let Some(limit) = options.limit
2899            && edges.len() > limit
2900        {
2901            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2902            edges.truncate(limit);
2903        }
2904        let expected_indexes = if options.property_filters.is_empty() {
2905            vec!["idx_graph_edges_edge_key"]
2906        } else {
2907            vec![
2908                "idx_graph_edge_properties_key_value_edge",
2909                "idx_graph_edges_edge_key",
2910            ]
2911        };
2912        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2913        if !options.property_filters.is_empty() {
2914            if let Some(row_count) = primary_property_row_count {
2915                diagnostics.push(format!(
2916                    "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2917                ));
2918            }
2919            diagnostics.push(
2920                "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2921                    .to_string(),
2922            );
2923        }
2924        if options.cursor.is_some() {
2925            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2926        }
2927        if next_cursor.is_some() {
2928            diagnostics.push(
2929                "result was truncated; pass page.next_cursor as --cursor for the next page"
2930                    .to_string(),
2931            );
2932        }
2933        Ok(GraphPagedSubgraph {
2934            page: GraphQueryPage {
2935                cursor: options.cursor,
2936                limit: options.limit,
2937                next_cursor,
2938                returned_nodes: 0,
2939                returned_edges: edges.len(),
2940                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2941                diagnostics,
2942            },
2943            nodes: Vec::new(),
2944            edges,
2945        })
2946    }
2947
2948    fn paged_incident_edges(
2949        &self,
2950        node_id: &str,
2951        kind: Option<&str>,
2952        options: GraphQueryOptions,
2953    ) -> Result<GraphPagedSubgraph> {
2954        let (sql, values) = sqlite_incident_edges_union_query(
2955            node_id,
2956            kind,
2957            &options.property_filters,
2958            options.cursor.as_deref(),
2959            options.limit,
2960        );
2961        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2962        let mut stmt = self.conn.prepare(&sql)?;
2963        let mut edges =
2964            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2965        let before_limit = edges.len();
2966        let mut next_cursor = None;
2967        if let Some(limit) = options.limit
2968            && edges.len() > limit
2969        {
2970            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2971            edges.truncate(limit);
2972        }
2973        let expected_indexes = if options.property_filters.is_empty() {
2974            vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2975        } else {
2976            vec![
2977                "idx_graph_edges_from_kind",
2978                "idx_graph_edges_to_kind",
2979                "idx_graph_edge_properties_key_value_edge",
2980            ]
2981        };
2982        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2983        diagnostics.push(
2984            "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2985                .to_string(),
2986        );
2987        if !options.property_filters.is_empty() {
2988            diagnostics.push(
2989                "edge property filters were evaluated by SQLite materialized property rows before paging"
2990                    .to_string(),
2991            );
2992        }
2993        if options.cursor.is_some() {
2994            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2995        }
2996        if next_cursor.is_some() {
2997            diagnostics.push(
2998                "result was truncated; pass page.next_cursor as --cursor for the next page"
2999                    .to_string(),
3000            );
3001        }
3002        Ok(GraphPagedSubgraph {
3003            page: GraphQueryPage {
3004                cursor: options.cursor,
3005                limit: options.limit,
3006                next_cursor,
3007                returned_nodes: 0,
3008                returned_edges: edges.len(),
3009                truncated: options.limit.is_some_and(|limit| before_limit > limit),
3010                diagnostics,
3011            },
3012            nodes: Vec::new(),
3013            edges,
3014        })
3015    }
3016
3017    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
3018        if node_ids.is_empty() {
3019            return Ok(Vec::new());
3020        }
3021        if node_ids.len() <= 20 {
3022            return self.edges_between_nodes_inline(node_ids);
3023        }
3024        self.assert_not_in_temp_table_section();
3025        self.temp_table_active.set(true);
3026        let result = (|| -> Result<Vec<GraphEdge>> {
3027            let tx = self.conn.unchecked_transaction()?;
3028            tx.execute_batch(
3029                r#"
3030                CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
3031                DELETE FROM _edges_between_ids;
3032                "#,
3033            )?;
3034            for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
3035                let row_placeholders: Vec<String> =
3036                    chunk.iter().map(|_| "(?)".to_string()).collect();
3037                let placeholders = row_placeholders.join(", ");
3038                let sql =
3039                    format!("INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}");
3040                let values: Vec<Value> =
3041                    chunk.iter().map(|id| Value::Text((*id).clone())).collect();
3042                tx.execute(&sql, params_from_iter(values.iter()))?;
3043            }
3044            let edges = {
3045                let mut stmt = tx.prepare(
3046                    r#"
3047                    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3048                    FROM graph_edges e
3049                    WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
3050                      AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
3051                    ORDER BY e.from_id, e.kind, e.to_id
3052                    "#,
3053                )?;
3054                collect_rows(stmt.query_map([], edge_from_row)?)?
3055            };
3056            tx.finish()?;
3057            Ok(edges)
3058        })();
3059        self.temp_table_active.set(false);
3060        result
3061    }
3062
3063    fn ranked_neighborhood(
3064        &self,
3065        center_id: &str,
3066        options: &RankedNeighborhoodOptions,
3067    ) -> Result<Option<RankedNeighborhoodResult>> {
3068        if self.node(center_id)?.is_none() {
3069            return Ok(None);
3070        }
3071        let center = self.node(center_id)?.unwrap();
3072
3073        let base_score_expr = match options.scoring {
3074            tsift_core::NeighborhoodScoring::BreadthFirst => {
3075                "MAX(0, 120 - (walk.depth * 18))".to_string()
3076            }
3077            tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
3078                "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
3079                 WHEN 'semantic_relation' THEN 34 \
3080                 WHEN 'mentions_entity' THEN 28 \
3081                 WHEN 'mentions_concept' THEN 28 \
3082                 WHEN 'tagged_entity' THEN 28 \
3083                 WHEN 'tagged_concept' THEN 28 \
3084                 WHEN 'related_concept' THEN 28 \
3085                 WHEN 'mentions' THEN 22 \
3086                 WHEN 'calls' THEN 20 \
3087                 WHEN 'requests_context' THEN 18 \
3088                 WHEN 'scopes_context' THEN 18 \
3089                 WHEN 'scopes_source' THEN 18 \
3090                 WHEN 'explains_result' THEN 18 \
3091                 WHEN 'defines' THEN 12 \
3092                 WHEN 'contains' THEN 12 \
3093                 WHEN 'belongs_to' THEN 12 \
3094                 ELSE 8 END".to_string()
3095            }
3096            tsift_core::NeighborhoodScoring::DegreeWeighted => {
3097                "MAX(0, 120 - (walk.depth * 18)) + CASE \
3098                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
3099                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
3100                 ELSE 0 END"
3101                    .to_string()
3102            }
3103        };
3104        let now_unix = options.observed_at_now_unix.unwrap_or_else(unix_now);
3105        let observed_at_half_life_secs = options.observed_at_half_life_secs.max(1);
3106        let observed_at_weight = options.observed_at_weight.max(0);
3107        let memory_node_boost = options.memory_node_boost.max(0);
3108        let observed_at_value = "COALESCE(\
3109            CAST(json_extract(n_score.freshness_json, '$.observed_at_unix') AS INTEGER), \
3110            CAST(json_extract(n_score.properties_json, '$.observed_at_unix') AS INTEGER), \
3111            CAST(json_extract(n_score.properties_json, '$.max_observed_at_unix') AS INTEGER)\
3112        )";
3113        let observed_at_expr = if observed_at_weight == 0 {
3114            "0".to_string()
3115        } else {
3116            format!(
3117                "CASE \
3118                 WHEN {observed_at_value} IS NULL THEN 0 \
3119                 WHEN ({now_unix} - {observed_at_value}) < {observed_at_half_life_secs} THEN {observed_at_weight} \
3120                 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 2) THEN {observed_at_weight} / 2 \
3121                 WHEN ({now_unix} - {observed_at_value}) < ({observed_at_half_life_secs} * 4) THEN {observed_at_weight} / 4 \
3122                 ELSE 0 END"
3123            )
3124        };
3125        let confidence_value =
3126            "CAST(json_extract(n_score.properties_json, '$.confidence') AS REAL)";
3127        let memory_signal_expr = if memory_node_boost == 0 {
3128            "0".to_string()
3129        } else {
3130            format!(
3131                "(CASE \
3132                  WHEN n_score.kind = 'memory_projection' THEN 0 \
3133                  WHEN n_score.kind IN ('finding', 'decision', 'memory_event') THEN {memory_node_boost} \
3134                  WHEN n_score.kind IN ('note', 'memory_session') THEN {memory_node_boost} / 2 \
3135                  WHEN n_score.kind IN ('source_handle', 'semantic_concept', 'semantic_vector_handle') \
3136                    AND json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3137                  WHEN n_score.kind LIKE 'memory_%' THEN {memory_node_boost} \
3138                  WHEN json_extract(n_score.properties_json, '$.provider') = 'tsift-memory' THEN {memory_node_boost} / 2 \
3139                  ELSE 0 END) \
3140                 + (CASE \
3141                  WHEN json_extract(n_score.properties_json, '$.confidence') IS NULL THEN 0 \
3142                  WHEN {confidence_value} <= 0.0 THEN 0 \
3143                  WHEN {confidence_value} >= 1.0 THEN {memory_node_boost} \
3144                  ELSE CAST(ROUND({memory_node_boost} * {confidence_value}) AS INTEGER) END)"
3145            )
3146        };
3147        let score_expr =
3148            format!("({base_score_expr}) + ({observed_at_expr}) + ({memory_signal_expr})");
3149
3150        let use_degree_cache = matches!(
3151            options.scoring,
3152            tsift_core::NeighborhoodScoring::DegreeWeighted
3153        );
3154        let degree_cte = if use_degree_cache {
3155            "degree_cache AS ( \
3156            SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
3157            FROM graph_nodes n), "
3158        } else {
3159            ""
3160        };
3161        let mut sql = format!(
3162            r#"
3163WITH RECURSIVE {degree_cte}walk(id, depth, edge_kind, score) AS (
3164SELECT ?, 0, '', ?
3165UNION
3166SELECT e.to_id, walk.depth + 1, e.kind,
3167"#,
3168        );
3169        sql.push_str(&format!("    {}\n", score_expr));
3170        sql.push_str(
3171            r#"
3172FROM walk
3173JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3174ON e.from_id = walk.id
3175JOIN graph_nodes n_score ON n_score.id = e.to_id
3176WHERE walk.depth < ?
3177"#,
3178        );
3179        let mut values = vec![
3180            Value::Text(center_id.to_string()),
3181            Value::Integer(i64::MAX),
3182            Value::Integer(options.depth as i64),
3183        ];
3184        if let Some(kind) = &options.edge_kind {
3185            sql.push_str(" AND e.kind = ?");
3186            values.push(Value::Text(kind.clone()));
3187        }
3188        sql.push_str(
3189            r#"
3190            ),
3191scored_nodes AS (
3192SELECT walk.id, MAX(walk.score) AS score,
3193n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3194FROM walk
3195JOIN graph_nodes n ON n.id = walk.id
3196GROUP BY walk.id
3197            ),
3198            ranked AS (
3199                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3200                FROM scored_nodes
3201                ORDER BY score DESC, id ASC
3202            ),
3203            kept AS (
3204                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
3205                FROM ranked
3206                LIMIT ?
3207            ),
3208            total AS (
3209                SELECT COUNT(*) AS cnt FROM scored_nodes
3210            )
3211            SELECT
3212                'meta' AS row_type,
3213                (SELECT cnt FROM total) AS total_discovered,
3214                0 AS node_id, '' AS node_kind, '' AS node_label,
3215                '' AS node_props, '' AS node_prov, '' AS node_fresh,
3216                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3217                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3218            UNION ALL
3219            SELECT
3220                'node' AS row_type,
3221                0 AS total_discovered,
3222                k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
3223                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
3224                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
3225            FROM kept k
3226            UNION ALL
3227            SELECT
3228                'edge' AS row_type,
3229                0 AS total_discovered,
3230                '' AS node_id, '' AS node_kind, '' AS node_label,
3231                '' AS node_props, '' AS node_prov, '' AS node_fresh,
3232                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3233            FROM graph_edges e
3234            WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
3235              AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
3236"#,
3237        );
3238        values.push(Value::Integer(options.max_nodes.saturating_add(1) as i64));
3239
3240        let mut stmt = self.conn.prepare(&sql)?;
3241        let mut nodes = vec![center.clone()];
3242        let mut edges = Vec::new();
3243        let mut total_discovered = 0usize;
3244
3245        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3246            let row_type: String = row.get(0)?;
3247            match row_type.as_str() {
3248                "meta" => Ok(QueryResult::Meta {
3249                    total: row.get::<_, i64>(1)? as usize,
3250                }),
3251                "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
3252                "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
3253                _ => Err(rusqlite::Error::InvalidQuery),
3254            }
3255        })?;
3256        for row_result in rows {
3257            match row_result? {
3258                QueryResult::Meta { total } => {
3259                    total_discovered = total;
3260                }
3261                QueryResult::Node(node) => {
3262                    if node.id != center_id {
3263                        nodes.push(node);
3264                    }
3265                }
3266                QueryResult::Edge(edge) => {
3267                    edges.push(edge);
3268                }
3269            }
3270        }
3271
3272        let total_discovered = total_discovered.max(nodes.len());
3273        let pruned_count = total_discovered.saturating_sub(nodes.len());
3274
3275        match options.property_mode {
3276            PropertyMode::Full => {}
3277            PropertyMode::Omit => {
3278                for n in &mut nodes {
3279                    n.properties.clear();
3280                }
3281                for e in &mut edges {
3282                    e.properties.clear();
3283                }
3284            }
3285            PropertyMode::Sample => {
3286                let mut seen_kinds = std::collections::BTreeSet::new();
3287                for n in &mut nodes {
3288                    if !seen_kinds.contains(&n.kind) {
3289                        seen_kinds.insert(n.kind.clone());
3290                    } else {
3291                        n.properties.clear();
3292                    }
3293                }
3294                for e in &mut edges {
3295                    e.properties.clear();
3296                }
3297            }
3298        }
3299
3300        Ok(Some(RankedNeighborhoodResult {
3301            nodes,
3302            edges,
3303            pruned_count,
3304            total_discovered,
3305        }))
3306    }
3307
3308    fn neighborhood(
3309        &self,
3310        center_id: &str,
3311        depth: usize,
3312        kind: Option<&str>,
3313    ) -> Result<Option<GraphSubgraph>> {
3314        self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
3315            .map(|result| {
3316                result.map(|result| {
3317                    GraphSubgraph {
3318                        nodes: result.nodes,
3319                        edges: result.edges,
3320                    }
3321                    .sorted()
3322                })
3323            })
3324    }
3325
3326    fn paged_neighborhood(
3327        &self,
3328        center_id: &str,
3329        depth: usize,
3330        kind: Option<&str>,
3331        options: GraphQueryOptions,
3332    ) -> Result<Option<GraphPagedSubgraph>> {
3333        if self.node(center_id)?.is_none() {
3334            return Ok(None);
3335        }
3336        let mut sql = String::from(
3337            r#"
3338            WITH RECURSIVE walk(id, depth) AS (
3339                SELECT ?, 0
3340                UNION
3341                SELECT e.to_id, walk.depth + 1
3342                FROM walk
3343                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3344                    ON e.from_id = walk.id
3345                WHERE walk.depth < ?
3346            "#,
3347        );
3348        let mut values = vec![
3349            Value::Text(center_id.to_string()),
3350            Value::Integer(depth as i64),
3351        ];
3352        if let Some(kind) = kind {
3353            sql.push_str(" AND e.kind = ?");
3354            values.push(Value::Text(kind.to_string()));
3355        }
3356        sql.push_str(
3357            r#"
3358            ),
3359            filtered_nodes AS (
3360            SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3361            FROM walk
3362            JOIN graph_nodes n ON n.id = walk.id
3363            WHERE 1 = 1
3364            "#,
3365        );
3366        push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
3367        if let Some(cursor) = &options.cursor {
3368            sql.push_str(" AND n.id > ?");
3369            values.push(Value::Text(cursor.clone()));
3370        }
3371        sql.push_str(
3372            r#"
3373            ),
3374            page_nodes AS (
3375                SELECT id, kind, label, properties_json, provenance_json, freshness_json
3376                FROM filtered_nodes
3377                ORDER BY id
3378            "#,
3379        );
3380        if let Some(limit) = options.limit {
3381            sql.push_str(" LIMIT ?");
3382            values.push(Value::Integer(limit.saturating_add(1) as i64));
3383        }
3384        sql.push_str(
3385            r#"
3386            ),
3387            walk_edges AS (
3388                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3389                FROM walk
3390                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3391                    ON e.from_id = walk.id
3392                WHERE walk.depth < ?
3393            "#,
3394        );
3395        values.push(Value::Integer(depth as i64));
3396        if let Some(kind) = kind {
3397            sql.push_str(" AND e.kind = ?");
3398            values.push(Value::Text(kind.to_string()));
3399        }
3400        sql.push_str(
3401            r#"
3402            )
3403            SELECT
3404                'node' AS row_type,
3405                p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
3406                NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
3407                NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
3408            FROM page_nodes p
3409            UNION ALL
3410            SELECT DISTINCT
3411                'edge' AS row_type,
3412                NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
3413                NULL AS provenance_json, NULL AS freshness_json,
3414                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
3415            FROM walk_edges e
3416            WHERE e.from_id IN (SELECT id FROM page_nodes)
3417              AND e.to_id IN (SELECT id FROM page_nodes)
3418            "#,
3419        );
3420
3421        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
3422        let mut stmt = self.conn.prepare(&sql)?;
3423        let mut nodes = Vec::new();
3424        let mut edges = Vec::new();
3425        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
3426            let row_type: String = row.get(0)?;
3427            match row_type.as_str() {
3428                "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
3429                "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
3430                _ => Err(rusqlite::Error::InvalidQuery),
3431            }
3432        })?;
3433        for row in rows {
3434            let (node, edge) = row?;
3435            if let Some(node) = node {
3436                nodes.push(node);
3437            }
3438            if let Some(edge) = edge {
3439                edges.push(edge);
3440            }
3441        }
3442        nodes.sort_by(|left, right| left.id.cmp(&right.id));
3443        let before_limit = nodes.len();
3444        let mut next_cursor = None;
3445        if let Some(limit) = options.limit
3446            && nodes.len() > limit
3447        {
3448            next_cursor = nodes
3449                .get(limit.saturating_sub(1))
3450                .map(|node| node.id.clone());
3451            nodes.truncate(limit);
3452        }
3453        let node_ids = nodes
3454            .iter()
3455            .map(|node| node.id.as_str())
3456            .collect::<BTreeSet<_>>();
3457        edges.retain(|edge| {
3458            node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
3459        });
3460        edges.sort_by(|left, right| {
3461            left.from_id
3462                .cmp(&right.from_id)
3463                .then(left.kind.cmp(&right.kind))
3464                .then(left.to_id.cmp(&right.to_id))
3465        });
3466        let expected_indexes = if options.property_filters.is_empty() {
3467            vec!["idx_graph_edges_from_kind"]
3468        } else {
3469            vec![
3470                "idx_graph_edges_from_kind",
3471                "idx_graph_node_properties_key_value_node",
3472            ]
3473        };
3474        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
3475        diagnostics.push(
3476            "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
3477        );
3478        if !options.property_filters.is_empty() {
3479            diagnostics.push(
3480                "property filters were evaluated by SQLite materialized property rows before paging"
3481                    .to_string(),
3482            );
3483        }
3484        if options.cursor.is_some() {
3485            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
3486        }
3487        if next_cursor.is_some() {
3488            diagnostics.push(
3489                "result was truncated; pass page.next_cursor as --cursor for the next page"
3490                    .to_string(),
3491            );
3492        }
3493        Ok(Some(GraphPagedSubgraph {
3494            page: GraphQueryPage {
3495                cursor: options.cursor,
3496                limit: options.limit,
3497                next_cursor,
3498                returned_nodes: nodes.len(),
3499                returned_edges: edges.len(),
3500                truncated: options.limit.is_some_and(|limit| before_limit > limit),
3501                diagnostics,
3502            },
3503            nodes,
3504            edges,
3505        }))
3506    }
3507
3508    fn shortest_path(
3509        &self,
3510        from_id: &str,
3511        to_id: &str,
3512        kind: Option<&str>,
3513    ) -> Result<Option<GraphPath>> {
3514        self.shortest_path_with_max_hops(from_id, to_id, kind, None)
3515    }
3516
3517    fn shortest_path_with_max_hops(
3518        &self,
3519        from_id: &str,
3520        to_id: &str,
3521        kind: Option<&str>,
3522        max_hops: Option<usize>,
3523    ) -> Result<Option<GraphPath>> {
3524        if from_id == to_id {
3525            return Ok(Some(GraphPath {
3526                nodes: vec![from_id.to_string()],
3527                hops: 0,
3528            }));
3529        }
3530        let hop_limit = max_hops.unwrap_or(usize::MAX);
3531        if hop_limit == 0 {
3532            return Ok(None);
3533        }
3534
3535        self.assert_not_in_temp_table_section();
3536        self.temp_table_active.set(true);
3537        let result = (|| -> Result<Option<GraphPath>> {
3538            let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
3539            let tbl = format!("_tsift_frontier_{call_id}");
3540
3541            let mut visited = BTreeSet::from([from_id.to_string()]);
3542            let mut parent =
3543                BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
3544            let mut frontier = vec![from_id.to_string()];
3545            self.conn.execute_batch(&format!(
3546                r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
3547               DELETE FROM "{tbl}";"#,
3548            ))?;
3549            let select_sql = if kind.is_some() {
3550                format!(
3551                    r#"SELECT e.from_id, e.to_id
3552                   FROM "{tbl}" f
3553                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3554                       ON e.from_id = f.id
3555                   WHERE e.kind = ?
3556                   ORDER BY e.from_id, e.to_id, e.kind"#,
3557                )
3558            } else {
3559                format!(
3560                    r#"SELECT e.from_id, e.to_id
3561                   FROM "{tbl}" f
3562                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3563                       ON e.from_id = f.id
3564                   ORDER BY e.from_id, e.to_id, e.kind"#,
3565                )
3566            };
3567            let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3568            let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3569            let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3570            let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3571            let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3572            let mut found_path: Option<GraphPath> = None;
3573            for _depth in 0..hop_limit {
3574                if frontier.is_empty() {
3575                    break;
3576                }
3577                self.conn.execute(&delete_sql, [])?;
3578                for id in &frontier {
3579                    frontier_insert_stmt.execute([id.as_str()])?;
3580                }
3581                let mut next_frontier = BTreeSet::new();
3582                let rows = if let Some(kind) = kind {
3583                    collect_rows(frontier_select_stmt.query_map([kind], |row| {
3584                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3585                    })?)?
3586                } else {
3587                    collect_rows(frontier_select_stmt.query_map([], |row| {
3588                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3589                    })?)?
3590                };
3591                for (from, next) in rows {
3592                    if !visited.insert(next.clone()) {
3593                        continue;
3594                    }
3595                    parent.insert(next.clone(), from);
3596                    if next == to_id {
3597                        let mut nodes = vec![to_id.to_string()];
3598                        let mut cursor = to_id;
3599                        while let Some(previous) = parent.get(cursor) {
3600                            if previous.is_empty() {
3601                                break;
3602                            }
3603                            nodes.push(previous.clone());
3604                            cursor = previous;
3605                        }
3606                        nodes.reverse();
3607                        found_path = Some(GraphPath {
3608                            hops: nodes.len().saturating_sub(1),
3609                            nodes,
3610                        });
3611                        break;
3612                    }
3613                    next_frontier.insert(next);
3614                }
3615                if found_path.is_some() {
3616                    break;
3617                }
3618                frontier = next_frontier.into_iter().collect();
3619            }
3620            let _ = self.conn.execute_batch(&drop_sql);
3621            Ok(found_path)
3622        })();
3623        self.temp_table_active.set(false);
3624        result
3625    }
3626
3627    fn reachable_nodes_by_kind(
3628        &self,
3629        from_id: &str,
3630        kind: &str,
3631        depth: usize,
3632        limit: usize,
3633    ) -> Result<Vec<(GraphNode, GraphPath)>> {
3634        Ok(self
3635            .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3636            .remove(kind)
3637            .unwrap_or_default())
3638    }
3639
3640    fn reachable_nodes_by_kinds(
3641        &self,
3642        from_id: &str,
3643        kinds: &[&str],
3644        depth: usize,
3645        limit: usize,
3646    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3647        let mut requested = kinds
3648            .iter()
3649            .map(|kind| (*kind).to_string())
3650            .collect::<BTreeSet<_>>()
3651            .into_iter()
3652            .collect::<Vec<_>>();
3653        let mut results = requested
3654            .iter()
3655            .map(|kind| (kind.clone(), Vec::new()))
3656            .collect::<BTreeMap<_, _>>();
3657        if requested.is_empty() {
3658            return Ok(results);
3659        }
3660        requested.sort();
3661        let placeholders = std::iter::repeat_n("?", requested.len())
3662            .collect::<Vec<_>>()
3663            .join(", ");
3664        let mut sql = format!(
3665            r#"
3666            WITH RECURSIVE walk(id, depth, path) AS (
3667                SELECT ?, 0, char(31) || ? || char(31)
3668                UNION ALL
3669                SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3670                FROM walk
3671                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3672                    ON e.from_id = walk.id
3673                WHERE walk.depth < ?
3674                  AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3675            ),
3676            ranked AS (
3677                SELECT
3678                    n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3679                    walk.path, walk.depth,
3680                    ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3681                FROM walk
3682                JOIN graph_nodes n ON n.id = walk.id
3683                WHERE n.kind IN ({placeholders}) AND n.id <> ?
3684            ),
3685            kind_ranked AS (
3686                SELECT *,
3687                    ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3688                FROM ranked
3689                WHERE rn = 1
3690            )
3691            SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3692            FROM kind_ranked
3693            "#,
3694        );
3695        let mut values = vec![
3696            Value::Text(from_id.to_string()),
3697            Value::Text(from_id.to_string()),
3698            Value::Integer(depth as i64),
3699        ];
3700        values.extend(requested.iter().cloned().map(Value::Text));
3701        values.push(Value::Text(from_id.to_string()));
3702        if limit > 0 && limit != usize::MAX {
3703            sql.push_str(" WHERE kind_rank <= ?");
3704            values.push(Value::Integer(limit as i64));
3705        }
3706        sql.push_str(" ORDER BY kind, depth, label, id");
3707        let mut stmt = self.conn.prepare(&sql)?;
3708        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3709            let node = node_from_row(row)?;
3710            let path: String = row.get(6)?;
3711            let hops: usize = row.get(7)?;
3712            Ok((
3713                node,
3714                GraphPath {
3715                    nodes: path
3716                        .split('\u{1f}')
3717                        .filter(|part| !part.is_empty())
3718                        .map(str::to_string)
3719                        .collect(),
3720                    hops,
3721                },
3722            ))
3723        })?)?;
3724        for (node, path) in rows {
3725            results
3726                .entry(node.kind.clone())
3727                .or_default()
3728                .push((node, path));
3729        }
3730        Ok(results)
3731    }
3732
3733    fn evidence_target_candidates(
3734        &self,
3735        target: &str,
3736        kinds: &[&str],
3737        preferred_path: Option<&str>,
3738    ) -> Result<Vec<GraphNode>> {
3739        if kinds.is_empty() {
3740            return Ok(Vec::new());
3741        }
3742
3743        let normalized = target.trim().trim_start_matches('#');
3744        let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3745            .collect::<Vec<_>>()
3746            .join(", ");
3747        let kind_rank = kinds
3748            .iter()
3749            .enumerate()
3750            .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3751            .collect::<Vec<_>>()
3752            .join(" ");
3753        let path_filter = if preferred_path.is_some() {
3754            r#"
3755AND EXISTS (
3756    SELECT 1
3757    FROM graph_node_properties p_path INDEXED BY idx_graph_node_properties_key_value_node
3758    WHERE p_path.node_id = n.id
3759      AND p_path.key = 'path'
3760      AND p_path.value = ?
3761)
3762"#
3763        } else {
3764            ""
3765        };
3766        let sql = format!(
3767            r#"
3768SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3769FROM graph_nodes n
3770WHERE n.kind IN ({kind_placeholders})
3771              AND (
3772                EXISTS (
3773                    SELECT 1
3774                    FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3775                    WHERE p_handle.node_id = n.id
3776                      AND p_handle.key = 'handle'
3777                      AND p_handle.value = ?
3778                )
3779                OR EXISTS (
3780                    SELECT 1
3781                    FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3782                    WHERE p_ref.node_id = n.id
3783                      AND p_ref.key = 'ref_id'
3784                      AND p_ref.value = ?
3785)
3786OR n.label = ?
3787OR n.label = ?
3788)
3789{path_filter}
3790ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3791"#
3792        );
3793        let mut values = kinds
3794            .iter()
3795            .map(|kind| Value::Text((*kind).to_string()))
3796            .collect::<Vec<_>>();
3797        values.push(Value::Text(target.to_string()));
3798        values.push(Value::Text(normalized.to_string()));
3799        values.push(Value::Text(target.to_string()));
3800        values.push(Value::Text(format!("#{normalized}")));
3801        if let Some(path) = preferred_path {
3802            values.push(Value::Text(path.to_string()));
3803        }
3804        values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3805        let mut stmt = self.conn.prepare(&sql)?;
3806        collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)
3807    }
3808
3809    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3810        if let Some(node) = self.node(target)? {
3811            return Ok(Some(node));
3812        }
3813        Ok(self
3814            .evidence_target_candidates(target, kinds, None)?
3815            .into_iter()
3816            .next())
3817    }
3818}
3819
3820fn to_json<T: Serialize>(value: &T) -> Result<String> {
3821    serde_json::to_string(value).map_err(Into::into)
3822}
3823
3824fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3825    let payload = serde_json::to_vec(value)?;
3826    Ok(blake3::hash(&payload).to_hex().to_string())
3827}
3828
3829fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3830    value.as_ref().map(to_json).transpose()
3831}
3832
3833fn collect_rows<T>(
3834    rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3835) -> Result<Vec<T>> {
3836    rows.collect::<std::result::Result<Vec<_>, _>>()
3837        .map_err(Into::into)
3838}
3839
3840enum QueryResult {
3841    Meta { total: usize },
3842    Node(GraphNode),
3843    Edge(GraphEdge),
3844}
3845
3846fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3847    let properties_col = offset + 3;
3848    let provenance_col = offset + 4;
3849    let freshness_col = offset + 5;
3850    let properties_json: String = row.get(properties_col)?;
3851    let provenance_json: String = row.get(provenance_col)?;
3852    let freshness_json: Option<String> = row.get(freshness_col)?;
3853    Ok(GraphNode {
3854        id: row.get(offset)?,
3855        kind: row.get(offset + 1)?,
3856        label: row.get(offset + 2)?,
3857        properties: from_json(properties_col, &properties_json)?,
3858        provenance: from_json(provenance_col, &provenance_json)?,
3859        freshness: optional_from_json(freshness_col, freshness_json)?,
3860    })
3861}
3862
3863fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3864    node_from_row_at(row, 0)
3865}
3866
3867fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3868    let properties_col = offset + 4;
3869    let provenance_col = offset + 5;
3870    let freshness_col = offset + 6;
3871    let properties_json: String = row.get(properties_col)?;
3872    let provenance_json: String = row.get(provenance_col)?;
3873    let freshness_json: Option<String> = row.get(freshness_col)?;
3874    Ok(GraphEdge {
3875        id: row.get(offset)?,
3876        from_id: row.get(offset + 1)?,
3877        to_id: row.get(offset + 2)?,
3878        kind: row.get(offset + 3)?,
3879        properties: from_json(properties_col, &properties_json)?,
3880        provenance: from_json(provenance_col, &provenance_json)?,
3881        freshness: optional_from_json(freshness_col, freshness_json)?,
3882    })
3883}
3884
3885fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3886    edge_from_row_at(row, 0)
3887}
3888
3889fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3890    serde_json::from_str(raw)
3891        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3892}
3893
3894fn optional_from_json<T: DeserializeOwned>(
3895    column: usize,
3896    raw: Option<String>,
3897) -> rusqlite::Result<Option<T>> {
3898    raw.map(|value| from_json(column, &value)).transpose()
3899}
3900
3901fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3902    nodes
3903        .iter()
3904        .find(|node| node.kind == "projection_meta")
3905        .and_then(|node| node.properties.get("projection_version").cloned())
3906}
3907
3908fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3909    nodes
3910        .iter()
3911        .find(|node| node.kind == "projection_meta")
3912        .and_then(|node| node.properties.get("content_hash").cloned())
3913}
3914
3915fn unix_now() -> i64 {
3916    std::time::SystemTime::now()
3917        .duration_since(std::time::UNIX_EPOCH)
3918        .map(|duration| duration.as_secs() as i64)
3919        .unwrap_or_default()
3920}
3921
3922fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3923    let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3924    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3925    Ok(page_count.saturating_mul(page_size))
3926}
3927
3928fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3929    let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3930    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3931    Ok(freelist_count.saturating_mul(page_size))
3932}
3933
3934#[cfg(test)]
3935mod tests {
3936    use super::*;
3937
3938    fn sample_provenance() -> GraphProvenance {
3939        GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3940    }
3941
3942    fn sample_projection() -> GraphProjection {
3943        let source = sample_provenance();
3944        GraphProjection {
3945            nodes: vec![
3946                GraphNode::new("doc:livekit", "document", "LiveKit guide")
3947                    .with_property("domain", "livekit")
3948                    .with_provenance(source.clone())
3949                    .with_freshness(GraphFreshness::content_hash("node-hash")),
3950                GraphNode::new("topic:rooms", "topic", "Rooms"),
3951                GraphNode::new("topic:egress", "topic", "Egress"),
3952            ],
3953            edges: vec![
3954                GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3955                    .with_property("confidence", "0.91")
3956                    .with_provenance(source.clone())
3957                    .with_freshness(GraphFreshness::content_hash("edge-hash")),
3958                GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3959            ],
3960        }
3961    }
3962
3963    fn assert_projection_store_contract(store: &impl GraphStore) {
3964        let projection = sample_projection();
3965        projection.upsert_into(store).unwrap();
3966
3967        assert_eq!(
3968            store.node("doc:livekit").unwrap(),
3969            projection
3970                .nodes
3971                .iter()
3972                .find(|node| node.id == "doc:livekit")
3973                .cloned()
3974        );
3975        assert_eq!(
3976            store.nodes_by_kind("topic").unwrap(),
3977            vec![
3978                GraphNode::new("topic:egress", "topic", "Egress"),
3979                GraphNode::new("topic:rooms", "topic", "Rooms"),
3980            ]
3981        );
3982
3983        let mentions = store
3984            .outgoing_edges("doc:livekit", Some("mentions"))
3985            .unwrap();
3986        assert_eq!(mentions.len(), 1);
3987        assert_eq!(mentions[0].to_id, "topic:rooms");
3988        assert_eq!(
3989            mentions[0].properties.get("confidence"),
3990            Some(&"0.91".into())
3991        );
3992
3993        let path = store
3994            .shortest_path("doc:livekit", "topic:egress", None)
3995            .unwrap()
3996            .unwrap();
3997        assert_eq!(
3998            path.nodes,
3999            vec!["doc:livekit", "topic:rooms", "topic:egress"]
4000        );
4001    }
4002
4003    #[test]
4004    fn sqlite_store_round_trips_generic_nodes_edges() {
4005        let store = SqliteGraphStore::in_memory().unwrap();
4006        let source = sample_provenance();
4007        let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4008            .with_property("domain", "livekit")
4009            .with_provenance(source.clone())
4010            .with_freshness(GraphFreshness::content_hash("node-hash"));
4011        let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
4012        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4013            .with_property("confidence", "0.91")
4014            .with_provenance(source)
4015            .with_freshness(GraphFreshness::content_hash("edge-hash"));
4016
4017        store.upsert_node(&node).unwrap();
4018        store.upsert_node(&topic).unwrap();
4019        store.upsert_edge(&edge).unwrap();
4020
4021        assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
4022        assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
4023        assert_eq!(store.all_nodes().unwrap().len(), 2);
4024        assert_eq!(store.all_edges().unwrap().len(), 1);
4025        assert_eq!(
4026            store
4027                .outgoing_edges("doc:livekit", Some("mentions"))
4028                .unwrap(),
4029            vec![edge]
4030        );
4031    }
4032
4033    #[test]
4034    fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
4035        let store = SqliteGraphStore::in_memory().unwrap();
4036        for node in [
4037            GraphNode::new("doc:livekit", "document", "LiveKit guide"),
4038            GraphNode::new("topic:rooms", "topic", "Rooms"),
4039            GraphNode::new("topic:egress", "topic", "Egress"),
4040        ] {
4041            store.upsert_node(&node).unwrap();
4042        }
4043        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
4044            .with_property("confidence", "0.91");
4045        let edge_id = edge.id.clone();
4046        store.upsert_edge(&edge).unwrap();
4047        store
4048            .upsert_edge(
4049                &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
4050                    .with_property("confidence", "0.42"),
4051            )
4052            .unwrap();
4053
4054        assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
4055        let mut expected_incident_ids = vec![
4056            GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
4057            GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
4058        ];
4059        expected_incident_ids.sort();
4060        assert_eq!(
4061            store
4062                .incident_edges("topic:rooms", None)
4063                .unwrap()
4064                .into_iter()
4065                .map(|edge| edge.id)
4066                .collect::<Vec<_>>(),
4067            expected_incident_ids
4068        );
4069
4070        let page = store
4071            .paged_edges(
4072                Some("mentions"),
4073                GraphQueryOptions {
4074                    property_filters: vec![GraphPropertyFilter {
4075                        key: "confidence".to_string(),
4076                        value: "0.91".to_string(),
4077                    }],
4078                    ..GraphQueryOptions::default()
4079                },
4080            )
4081            .unwrap();
4082        assert_eq!(page.edges.len(), 1);
4083        assert_eq!(page.edges[0].id, edge_id);
4084        assert!(
4085            page.page
4086                .diagnostics
4087                .iter()
4088                .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
4089            "{:?}",
4090            page.page.diagnostics
4091        );
4092        assert!(
4093            page.page
4094                .diagnostics
4095                .iter()
4096                .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
4097            "{:?}",
4098            page.page.diagnostics
4099        );
4100        assert!(
4101            page.page.diagnostics.iter().any(|diagnostic| diagnostic
4102                .contains("edge property primary filter matched 1 materialized row")),
4103            "{:?}",
4104            page.page.diagnostics
4105        );
4106        assert!(
4107            page.page
4108                .diagnostics
4109                .iter()
4110                .any(|diagnostic| diagnostic
4111                    .contains("drives from SQLite materialized property rows")),
4112            "{:?}",
4113            page.page.diagnostics
4114        );
4115
4116        let property_rows: usize = store
4117            .conn
4118            .query_row(
4119                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4120                [],
4121                |row| row.get(0),
4122            )
4123            .unwrap();
4124        assert_eq!(property_rows, 2);
4125    }
4126
4127    #[test]
4128    fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
4129        let sqlite = SqliteGraphStore::in_memory().unwrap();
4130        assert_projection_store_contract(&sqlite);
4131    }
4132
4133    #[test]
4134    fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
4135        fn assert_crud_contract(store: &impl GraphStore) {
4136            let projection = sample_projection();
4137            projection.upsert_into(store).unwrap();
4138
4139            let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
4140            assert_eq!(
4141                neighborhood
4142                    .nodes
4143                    .iter()
4144                    .map(|node| node.id.as_str())
4145                    .collect::<Vec<_>>(),
4146                vec!["doc:livekit", "topic:egress", "topic:rooms"]
4147            );
4148            assert_eq!(
4149                neighborhood
4150                    .edges
4151                    .iter()
4152                    .map(|edge| (
4153                        edge.from_id.as_str(),
4154                        edge.kind.as_str(),
4155                        edge.to_id.as_str()
4156                    ))
4157                    .collect::<Vec<_>>(),
4158                vec![
4159                    ("doc:livekit", "mentions", "topic:rooms"),
4160                    ("topic:rooms", "related_to", "topic:egress"),
4161                ]
4162            );
4163
4164            assert_eq!(
4165                store
4166                    .delete_edge("topic:rooms", "topic:egress", "related_to")
4167                    .unwrap(),
4168                1
4169            );
4170            assert!(
4171                store
4172                    .shortest_path("doc:livekit", "topic:egress", None)
4173                    .unwrap()
4174                    .is_none()
4175            );
4176            assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
4177            assert!(store.node("topic:rooms").unwrap().is_none());
4178            assert!(
4179                store
4180                    .outgoing_edges("doc:livekit", None)
4181                    .unwrap()
4182                    .is_empty()
4183            );
4184        }
4185
4186        assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
4187    }
4188
4189    #[test]
4190    fn sqlite_ranked_neighborhood_prefers_recent_memory_nodes_when_pruning() {
4191        let store = SqliteGraphStore::in_memory().unwrap();
4192        store
4193            .upsert_node(&GraphNode::new("center", "file", "center"))
4194            .unwrap();
4195        store
4196            .upsert_node(&GraphNode::new("aaa-code", "symbol", "code candidate"))
4197            .unwrap();
4198        store
4199            .upsert_node(
4200                &GraphNode::new("mmm-stale", "memory_event", "stale memory")
4201                    .with_property("provider", "tsift-memory")
4202                    .with_property("observed_at_unix", "1000"),
4203            )
4204            .unwrap();
4205        store
4206            .upsert_node(
4207                &GraphNode::new("zzz-fresh", "memory_event", "fresh memory")
4208                    .with_property("provider", "tsift-memory")
4209                    .with_property("observed_at_unix", "1995"),
4210            )
4211            .unwrap();
4212        store
4213            .upsert_edge(&GraphEdge::new("center", "aaa-code", "mentions"))
4214            .unwrap();
4215        store
4216            .upsert_edge(&GraphEdge::new("center", "mmm-stale", "mentions"))
4217            .unwrap();
4218        store
4219            .upsert_edge(&GraphEdge::new("center", "zzz-fresh", "mentions"))
4220            .unwrap();
4221
4222        let options = RankedNeighborhoodOptions::new(1, 1)
4223            .with_observed_at_now_unix(2000)
4224            .with_observed_at_half_life_secs(100);
4225        let result = store
4226            .ranked_neighborhood("center", &options)
4227            .unwrap()
4228            .unwrap();
4229        let ids: Vec<_> = result.nodes.iter().map(|node| node.id.as_str()).collect();
4230        assert!(ids.contains(&"center"));
4231        assert!(
4232            ids.contains(&"zzz-fresh"),
4233            "fresh memory node should survive pruning: {ids:?}"
4234        );
4235        assert!(!ids.contains(&"aaa-code"));
4236        assert!(!ids.contains(&"mmm-stale"));
4237    }
4238
4239    #[test]
4240    fn sqlite_semantic_seeded_neighborhood_scores_before_sql_edge_cap() {
4241        let store = SqliteGraphStore::in_memory().unwrap();
4242        store
4243            .upsert_node(&GraphNode::new("seed", "semantic_concept", "graph budget"))
4244            .unwrap();
4245        store
4246            .upsert_node(&GraphNode::new("zzz_high", "symbol", "high_signal"))
4247            .unwrap();
4248        store
4249            .upsert_edge(&GraphEdge::new("zzz_high", "seed", "mentions_concept"))
4250            .unwrap();
4251        for idx in 0..24 {
4252            let id = format!("aaa_low_{idx:02}");
4253            store
4254                .upsert_node(&GraphNode::new(id.clone(), "note", format!("low {idx}")))
4255                .unwrap();
4256            store
4257                .upsert_edge(&GraphEdge::new(id, "seed", "weak_link"))
4258                .unwrap();
4259        }
4260
4261        let options = SemanticSeededNeighborhoodOptions::new(1, 3)
4262            .with_edge_scan_cap(16)
4263            .with_node_discovery_cap(9);
4264        let result = store
4265            .semantic_seeded_neighborhood(&["seed".to_string()], &options)
4266            .unwrap();
4267        let ids = result
4268            .nodes
4269            .iter()
4270            .map(|node| node.id.as_str())
4271            .collect::<Vec<_>>();
4272
4273        assert_eq!(ids.len(), 3);
4274        assert_eq!(ids[0], "seed");
4275        assert_eq!(ids[1], "zzz_high");
4276        assert_eq!(result.skipped_by_edge_cap, 9);
4277        assert!(result.truncated);
4278    }
4279
4280    #[test]
4281    fn sqlite_upsert_projection_batches_rows_and_properties() {
4282        let mut store = SqliteGraphStore::in_memory().unwrap();
4283        let mut projection = sample_projection();
4284        store.upsert_projection(&projection).unwrap();
4285
4286        let page = store
4287            .paged_nodes_by_kind(
4288                "document",
4289                GraphQueryOptions {
4290                    property_filters: vec![GraphPropertyFilter {
4291                        key: "domain".to_string(),
4292                        value: "livekit".to_string(),
4293                    }],
4294                    ..GraphQueryOptions::default()
4295                },
4296            )
4297            .unwrap();
4298        assert_eq!(page.nodes[0].id, "doc:livekit");
4299
4300        projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
4301            .with_property("domain", "recording");
4302        store.upsert_projection(&projection).unwrap();
4303
4304        let old_property_count: usize = store
4305            .conn
4306            .query_row(
4307                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
4308                [],
4309                |row| row.get(0),
4310            )
4311            .unwrap();
4312        let updated_page = store
4313            .paged_nodes_by_kind(
4314                "document",
4315                GraphQueryOptions {
4316                    property_filters: vec![GraphPropertyFilter {
4317                        key: "domain".to_string(),
4318                        value: "recording".to_string(),
4319                    }],
4320                    ..GraphQueryOptions::default()
4321                },
4322            )
4323            .unwrap();
4324        assert_eq!(old_property_count, 0);
4325        assert_eq!(updated_page.nodes[0].id, "doc:livekit");
4326        let edge_property_count: usize = store
4327            .conn
4328            .query_row(
4329                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4330                [],
4331                |row| row.get(0),
4332            )
4333            .unwrap();
4334        assert_eq!(edge_property_count, 1);
4335
4336        let changes_before = store.conn.total_changes();
4337        store.upsert_projection(&projection).unwrap();
4338        assert_eq!(
4339            store.conn.total_changes(),
4340            changes_before,
4341            "unchanged projection rows should not rewrite graph rows or materialized properties"
4342        );
4343    }
4344
4345    #[test]
4346    fn sqlite_semantic_top_candidates_use_materialized_vector_table() {
4347        let store = SqliteGraphStore::in_memory().unwrap();
4348        store
4349            .upsert_node(
4350                &GraphNode::new("concept:graph", "semantic_concept", "graph navigation")
4351                    .with_property("embedding_model", "fixture-v1")
4352                    .with_property("embedding", "1.0,0.0"),
4353            )
4354            .unwrap();
4355        store
4356            .upsert_node(
4357                &GraphNode::new("concept:sqlite", "semantic_concept", "sqlite search")
4358                    .with_property("embedding", "0.0,1.0"),
4359            )
4360            .unwrap();
4361        store
4362            .upsert_node(&GraphNode::new("entity:skip", "semantic_entity", "skipped"))
4363            .unwrap();
4364
4365        let vector_rows: usize = store
4366            .conn
4367            .query_row(
4368                "SELECT COUNT(*) FROM graph_node_semantic_vectors",
4369                [],
4370                |row| row.get(0),
4371            )
4372            .unwrap();
4373        assert_eq!(vector_rows, 2);
4374
4375        let candidates = store
4376            .semantic_top_candidates(&[1.0, 0.0], &["semantic_concept"], 1)
4377            .unwrap();
4378        assert_eq!(candidates.len(), 1);
4379        assert_eq!(candidates[0].node.id, "concept:graph");
4380        assert_eq!(candidates[0].score, 1.0);
4381
4382        store
4383            .upsert_node(&GraphNode::new(
4384                "concept:graph",
4385                "semantic_concept",
4386                "graph navigation",
4387            ))
4388            .unwrap();
4389        let vector_rows_after_update: usize = store
4390            .conn
4391            .query_row(
4392                "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE node_id = 'concept:graph'",
4393                [],
4394                |row| row.get(0),
4395            )
4396            .unwrap();
4397        assert_eq!(vector_rows_after_update, 0);
4398    }
4399
4400    #[test]
4401    fn sqlite_store_filters_edges_by_kind_and_paths() {
4402        let store = SqliteGraphStore::in_memory().unwrap();
4403        for id in ["a", "b", "c"] {
4404            store
4405                .upsert_node(&GraphNode::new(id, "symbol", id))
4406                .unwrap();
4407        }
4408        store
4409            .upsert_edge(&GraphEdge::new("a", "b", "calls"))
4410            .unwrap();
4411        store
4412            .upsert_edge(&GraphEdge::new("a", "c", "documents"))
4413            .unwrap();
4414        store
4415            .upsert_edge(&GraphEdge::new("b", "c", "calls"))
4416            .unwrap();
4417
4418        let calls = store.outgoing_edges("a", Some("calls")).unwrap();
4419        assert_eq!(calls.len(), 1);
4420        assert_eq!(calls[0].to_id, "b");
4421        assert_eq!(store.graph_counts().unwrap(), (3, 3));
4422        assert_eq!(
4423            store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
4424            "b"
4425        );
4426
4427        let path = store
4428            .shortest_path("a", "c", Some("calls"))
4429            .unwrap()
4430            .unwrap();
4431        assert_eq!(path.nodes, vec!["a", "b", "c"]);
4432        assert_eq!(path.hops, 2);
4433
4434        assert!(
4435            store
4436                .shortest_path("c", "a", Some("calls"))
4437                .unwrap()
4438                .is_none()
4439        );
4440    }
4441
4442    #[test]
4443    fn sqlite_store_batches_edges_between_node_sets() {
4444        let store = SqliteGraphStore::in_memory().unwrap();
4445        for id in ["a", "b", "c", "outside"] {
4446            store
4447                .upsert_node(&GraphNode::new(id, "symbol", id))
4448                .unwrap();
4449        }
4450        for edge in [
4451            GraphEdge::new("a", "b", "calls"),
4452            GraphEdge::new("b", "c", "calls"),
4453            GraphEdge::new("a", "outside", "calls"),
4454            GraphEdge::new("outside", "c", "calls"),
4455        ] {
4456            store.upsert_edge(&edge).unwrap();
4457        }
4458
4459        let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
4460            .into_iter()
4461            .collect::<BTreeSet<_>>();
4462        let edge_keys = store
4463            .edges_between_nodes(&scoped)
4464            .unwrap()
4465            .into_iter()
4466            .map(|edge| (edge.from_id, edge.kind, edge.to_id))
4467            .collect::<Vec<_>>();
4468
4469        assert_eq!(
4470            edge_keys,
4471            vec![
4472                ("a".to_string(), "calls".to_string(), "b".to_string()),
4473                ("b".to_string(), "calls".to_string(), "c".to_string()),
4474            ]
4475        );
4476    }
4477
4478    #[test]
4479    fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
4480        let mut store = SqliteGraphStore::in_memory().unwrap();
4481        let mut projection = sample_projection();
4482        projection.nodes.push(
4483            GraphNode::new(
4484                "projection:fixture",
4485                "projection_meta",
4486                "fixture projection",
4487            )
4488            .with_property("projection_version", "fixture-v1")
4489            .with_property("content_hash", "hash-a"),
4490        );
4491        store
4492            .replace_projection_with_version(
4493                "root",
4494                &projection,
4495                Some("fixture-v1"),
4496                Some("commit-a".to_string()),
4497            )
4498            .unwrap();
4499
4500        projection.nodes.retain(|node| node.id != "topic:egress");
4501        projection.edges.retain(|edge| edge.to_id != "topic:egress");
4502        let refresh = store
4503            .replace_projection_with_version(
4504                "root",
4505                &projection,
4506                Some("fixture-v2"),
4507                Some("commit-b".to_string()),
4508            )
4509            .unwrap();
4510
4511        assert_eq!(refresh.projection_version, "fixture-v2");
4512        assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
4513        assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
4514        assert_eq!(refresh.tombstoned_edges.len(), 1);
4515        assert_eq!(refresh.deleted_nodes, 1);
4516        assert_eq!(refresh.deleted_edges, 1);
4517        assert_eq!(refresh.unchanged_nodes, 3);
4518        assert_eq!(refresh.upserted_nodes, 0);
4519        assert_eq!(refresh.unchanged_properties, 4);
4520        assert_eq!(refresh.upserted_properties, 0);
4521        assert_eq!(refresh.deleted_properties, 0);
4522        assert!(
4523            refresh
4524                .phase_timings
4525                .iter()
4526                .any(|phase| phase.name == "sqlite_property_row_staging"),
4527            "{:?}",
4528            refresh.phase_timings
4529        );
4530        assert!(
4531            refresh
4532                .phase_timings
4533                .iter()
4534                .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
4535            "{:?}",
4536            refresh.phase_timings
4537        );
4538        let version = store.projection_version("root").unwrap().unwrap();
4539        assert_eq!(version.projection_version, "fixture-v2");
4540        assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
4541        let cached_counts: (usize, usize, usize, usize) = store
4542            .conn
4543            .query_row(
4544                r#"
4545                SELECT nodes, edges, tombstone_nodes, tombstone_edges
4546                FROM graph_operator_stats
4547                WHERE scope = 'root'
4548                "#,
4549                [],
4550                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
4551            )
4552            .unwrap();
4553        assert_eq!(cached_counts, (3, 1, 1, 1));
4554
4555        projection
4556            .nodes
4557            .push(GraphNode::new("topic:egress", "topic", "Egress"));
4558        let refresh = store
4559            .replace_projection_with_version(
4560                "root",
4561                &projection,
4562                Some("fixture-v3"),
4563                Some("commit-c".to_string()),
4564            )
4565            .unwrap();
4566        assert_eq!(refresh.pruned_tombstones, 1);
4567        assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
4568
4569        projection.nodes.retain(|node| node.id != "topic:egress");
4570        store
4571            .replace_projection_with_version(
4572                "root",
4573                &projection,
4574                Some("fixture-v4"),
4575                Some("commit-d".to_string()),
4576            )
4577            .unwrap();
4578        assert_eq!(store.compact_storage("root", true).unwrap(), 2);
4579        let cached_counts: (usize, usize, usize, usize) = store
4580            .conn
4581            .query_row(
4582                r#"
4583                SELECT nodes, edges, tombstone_nodes, tombstone_edges
4584                FROM graph_operator_stats
4585                WHERE scope = 'root'
4586                "#,
4587                [],
4588                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
4589            )
4590            .unwrap();
4591        assert_eq!(cached_counts, (3, 1, 0, 0));
4592    }
4593
4594    #[test]
4595    fn sqlite_shortest_path_uses_bounded_frontier() {
4596        let store = SqliteGraphStore::in_memory().unwrap();
4597        for idx in 0..80 {
4598            store
4599                .upsert_node(&GraphNode::new(
4600                    format!("node:{idx:02}"),
4601                    "symbol",
4602                    format!("node {idx:02}"),
4603                ))
4604                .unwrap();
4605        }
4606        for idx in 0..79 {
4607            store
4608                .upsert_edge(&GraphEdge::new(
4609                    format!("node:{idx:02}"),
4610                    format!("node:{:02}", idx + 1),
4611                    "calls",
4612                ))
4613                .unwrap();
4614        }
4615        store
4616            .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
4617            .unwrap();
4618
4619        assert!(
4620            store
4621                .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
4622                .unwrap()
4623                .is_none()
4624        );
4625        let path = store
4626            .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
4627            .unwrap()
4628            .unwrap();
4629        assert_eq!(path.hops, 79);
4630        assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
4631        assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
4632
4633        let direct = store
4634            .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
4635            .unwrap()
4636            .unwrap();
4637        assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
4638    }
4639
4640    #[test]
4641    fn sqlite_resolves_evidence_targets_with_indexed_properties() {
4642        let store = SqliteGraphStore::in_memory().unwrap();
4643        for node in [
4644            GraphNode::new("gbak-refresh", "backlog", "#refresh")
4645                .with_property("ref_id", "refresh")
4646                .with_property("path", "tasks/current.md")
4647                .with_property("handle", "backlog-handle"),
4648            GraphNode::new("gbak-zrefresh", "backlog", "#refresh")
4649                .with_property("ref_id", "refresh")
4650                .with_property("path", "tasks/other.md"),
4651            GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
4652                .with_property("ref_id", "refresh"),
4653            GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
4654                .with_property("ref_id", "refresh"),
4655        ] {
4656            store.upsert_node(&node).unwrap();
4657        }
4658
4659        let by_ref = store
4660            .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
4661            .unwrap()
4662            .unwrap();
4663        assert_eq!(by_ref.id, "gbak-refresh");
4664        let by_handle = store
4665            .resolve_evidence_target("backlog-handle", &["backlog"])
4666            .unwrap()
4667            .unwrap();
4668        assert_eq!(by_handle.id, "gbak-refresh");
4669        let by_path = store
4670            .evidence_target_candidates("#refresh", &["backlog"], Some("tasks/other.md"))
4671            .unwrap();
4672        assert_eq!(by_path.len(), 1);
4673        assert_eq!(by_path[0].id, "gbak-zrefresh");
4674    }
4675
4676    #[test]
4677    fn sqlite_schema_migration_backfills_materialized_node_properties() {
4678        let conn = Connection::open_in_memory().unwrap();
4679        conn.execute_batch(
4680            r#"
4681            PRAGMA user_version = 2;
4682            CREATE TABLE graph_nodes (
4683                id TEXT PRIMARY KEY,
4684                kind TEXT NOT NULL,
4685                label TEXT NOT NULL,
4686                properties_json TEXT NOT NULL DEFAULT '{}',
4687                provenance_json TEXT NOT NULL DEFAULT '[]',
4688                freshness_json TEXT,
4689                row_hash TEXT,
4690                source_watermark TEXT
4691            );
4692            CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
4693            CREATE TABLE graph_edges (
4694                from_id TEXT NOT NULL,
4695                to_id TEXT NOT NULL,
4696                kind TEXT NOT NULL,
4697                properties_json TEXT NOT NULL DEFAULT '{}',
4698                provenance_json TEXT NOT NULL DEFAULT '[]',
4699                freshness_json TEXT,
4700                row_hash TEXT,
4701                source_watermark TEXT,
4702                PRIMARY KEY (from_id, to_id, kind)
4703            );
4704            CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
4705            CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
4706            CREATE TABLE graph_projection_versions (
4707                scope TEXT PRIMARY KEY,
4708                projection_version TEXT NOT NULL,
4709                content_hash TEXT,
4710                source_watermark TEXT,
4711                observed_at_unix INTEGER NOT NULL
4712            );
4713            CREATE TABLE graph_tombstones (
4714                row_key TEXT PRIMARY KEY,
4715                row_kind TEXT NOT NULL,
4716                deleted_at_unix INTEGER NOT NULL
4717            );
4718            INSERT INTO graph_nodes
4719            (id, kind, label, properties_json, provenance_json)
4720            VALUES
4721            ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
4722            ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]'),
4723            ('concept:graph', 'semantic_concept', 'Graph navigation', '{"embedding":"1.0,0.0","embedding_model":"fixture-v1"}', '[]');
4724            INSERT INTO graph_edges
4725                (from_id, to_id, kind, properties_json, provenance_json)
4726            VALUES
4727                ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
4728            "#,
4729        )
4730        .unwrap();
4731
4732        let store = SqliteGraphStore::from_connection(conn).unwrap();
4733        let version: i64 = store
4734            .conn
4735            .pragma_query_value(None, "user_version", |row| row.get(0))
4736            .unwrap();
4737        assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
4738        let property_rows: usize = store
4739            .conn
4740            .query_row(
4741                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
4742                [],
4743                |row| row.get(0),
4744            )
4745            .unwrap();
4746        assert_eq!(property_rows, 2);
4747        let edge_property_rows: usize = store
4748            .conn
4749            .query_row(
4750                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4751                [],
4752                |row| row.get(0),
4753            )
4754            .unwrap();
4755        assert_eq!(edge_property_rows, 1);
4756        let semantic_vector_rows: usize = store
4757            .conn
4758            .query_row(
4759                "SELECT COUNT(*) FROM graph_node_semantic_vectors WHERE model = 'fixture-v1'",
4760                [],
4761                |row| row.get(0),
4762            )
4763            .unwrap();
4764        assert_eq!(semantic_vector_rows, 1);
4765        let edge = store
4766            .edge(&GraphEdge::stable_id(
4767                "topic:rooms",
4768                "topic:egress",
4769                "mentions",
4770            ))
4771            .unwrap()
4772            .unwrap();
4773        assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4774
4775        let page = store
4776            .paged_nodes_by_kind(
4777                "topic",
4778                GraphQueryOptions {
4779                    property_filters: vec![GraphPropertyFilter {
4780                        key: "domain".to_string(),
4781                        value: "livekit".to_string(),
4782                    }],
4783                    ..GraphQueryOptions::default()
4784                },
4785            )
4786            .unwrap();
4787        assert_eq!(page.nodes[0].id, "topic:rooms");
4788        assert!(
4789            page.page
4790                .diagnostics
4791                .iter()
4792                .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
4793            "{:?}",
4794            page.page.diagnostics
4795        );
4796    }
4797
4798    #[test]
4799    fn sqlite_store_batches_reachable_nodes_by_kinds() {
4800        let store = SqliteGraphStore::in_memory().unwrap();
4801        for node in [
4802            GraphNode::new("start", "backlog", "start"),
4803            GraphNode::new("ctx", "worker_context", "context"),
4804            GraphNode::new("src", "source_handle", "source"),
4805            GraphNode::new("sem", "semantic_concept", "concept"),
4806        ] {
4807            store.upsert_node(&node).unwrap();
4808        }
4809        store
4810            .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
4811            .unwrap();
4812        store
4813            .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
4814            .unwrap();
4815        store
4816            .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
4817            .unwrap();
4818
4819        let rows = store
4820            .reachable_nodes_by_kinds(
4821                "start",
4822                &["worker_context", "source_handle", "semantic_concept"],
4823                2,
4824                8,
4825            )
4826            .unwrap();
4827        assert_eq!(rows["worker_context"][0].0.id, "ctx");
4828        assert_eq!(
4829            rows["source_handle"][0].1.nodes,
4830            vec!["start", "ctx", "src"]
4831        );
4832        assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4833    }
4834
4835    #[test]
4836    fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4837        let mut store = SqliteGraphStore::in_memory().unwrap();
4838        let source = GraphProvenance::new("fixture", "bulk");
4839        let mut projection = GraphProjection::default();
4840        for idx in 0..128 {
4841            projection.nodes.push(
4842                GraphNode::new(
4843                    format!("node:{idx:03}"),
4844                    if idx % 2 == 0 { "symbol" } else { "file" },
4845                    format!("bulk node {idx:03}"),
4846                )
4847                .with_property("ordinal", idx.to_string())
4848                .with_provenance(source.clone())
4849                .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4850            );
4851        }
4852        for idx in 0..127 {
4853            projection.edges.push(
4854                GraphEdge::new(
4855                    format!("node:{idx:03}"),
4856                    format!("node:{:03}", idx + 1),
4857                    "next",
4858                )
4859                .with_property("ordinal", idx.to_string())
4860                .with_provenance(source.clone())
4861                .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4862            );
4863        }
4864
4865        store
4866            .replace_projection_with_version(
4867                "root",
4868                &projection,
4869                Some("bulk-v1"),
4870                Some("commit-a".to_string()),
4871            )
4872            .unwrap();
4873
4874        projection
4875            .nodes
4876            .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4877        projection.edges.retain(|edge| {
4878            !edge.from_id.ends_with("000")
4879                && !edge.to_id.ends_with("000")
4880                && !edge.from_id.ends_with("064")
4881                && !edge.to_id.ends_with("064")
4882        });
4883        let refresh = store
4884            .replace_projection_with_version(
4885                "root",
4886                &projection,
4887                Some("bulk-v2"),
4888                Some("commit-b".to_string()),
4889            )
4890            .unwrap();
4891
4892        assert_eq!(store.all_nodes().unwrap().len(), 126);
4893        assert_eq!(store.all_edges().unwrap().len(), 124);
4894        assert_eq!(
4895            refresh.tombstoned_nodes,
4896            vec!["node:000".to_string(), "node:064".to_string()]
4897        );
4898        assert_eq!(refresh.tombstoned_edges.len(), 3);
4899        assert_eq!(refresh.deleted_nodes, 2);
4900        assert_eq!(refresh.deleted_edges, 3);
4901        assert_eq!(refresh.unchanged_nodes, 126);
4902        assert_eq!(refresh.unchanged_edges, 124);
4903        assert_eq!(refresh.upserted_nodes, 0);
4904        assert_eq!(refresh.upserted_edges, 0);
4905        assert_eq!(refresh.unchanged_properties, 250);
4906        assert_eq!(refresh.upserted_properties, 0);
4907        assert!(
4908            refresh
4909                .phase_timings
4910                .iter()
4911                .any(|phase| phase.name == "sqlite_node_staging"
4912                    && phase.detail.contains("126 unchanged skipped")
4913                    && phase.detail.contains("multi-row chunks up to 500 rows")),
4914            "{:?}",
4915            refresh.phase_timings
4916        );
4917        assert!(
4918            refresh
4919                .phase_timings
4920                .iter()
4921                .any(|phase| phase.name == "sqlite_node_staging"
4922                    && phase.detail.contains("124 unchanged skipped")),
4923            "{:?}",
4924            refresh.phase_timings
4925        );
4926        let staged_node_properties: usize = store
4927            .conn
4928            .query_row(
4929                "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4930                [],
4931                |row| row.get(0),
4932            )
4933            .unwrap();
4934        let staged_edge_properties: usize = store
4935            .conn
4936            .query_row(
4937                "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4938                [],
4939                |row| row.get(0),
4940            )
4941            .unwrap();
4942        assert_eq!(staged_node_properties, 0);
4943        assert_eq!(staged_edge_properties, 0);
4944        assert!(
4945            refresh
4946                .phase_timings
4947                .iter()
4948                .any(|phase| phase.name == "sqlite_property_row_staging"
4949                    && phase.detail.contains("new/changed node rows")),
4950            "{:?}",
4951            refresh.phase_timings
4952        );
4953        assert!(
4954            refresh
4955                .phase_timings
4956                .iter()
4957                .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4958                    && phase.detail.contains("new/changed edge rows")),
4959            "{:?}",
4960            refresh.phase_timings
4961        );
4962        assert_eq!(
4963            store
4964                .projection_version("root")
4965                .unwrap()
4966                .unwrap()
4967                .source_watermark
4968                .as_deref(),
4969            Some("commit-b")
4970        );
4971    }
4972
4973    #[test]
4974    fn sqlite_reentrant_temp_table_guard_panics() {
4975        let store = SqliteGraphStore::in_memory().unwrap();
4976        store.temp_table_active.set(true);
4977        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4978            store.assert_not_in_temp_table_section();
4979        }));
4980        assert!(result.is_err());
4981    }
4982
4983    #[test]
4984    fn sqlite_temp_table_guard_clears_after_method() {
4985        let mut store = SqliteGraphStore::in_memory().unwrap();
4986        let projection = GraphProjection {
4987            nodes: vec![],
4988            edges: vec![],
4989        };
4990        store.replace_projection(&projection).unwrap();
4991        assert!(!store.temp_table_active.get());
4992    }
4993
4994    #[test]
4995    fn derive_ontology_summarizes_types_and_relations() {
4996        let mut store = SqliteGraphStore::in_memory().unwrap();
4997        let seed = GraphProjection {
4998            nodes: vec![
4999                GraphNode::new("fn:a", "function", "a"),
5000                GraphNode::new("fn:b", "function", "b"),
5001                GraphNode::new("mod:m", "module", "m"),
5002            ],
5003            edges: vec![
5004                GraphEdge::new("fn:a", "fn:b", "calls"),
5005                GraphEdge::new("mod:m", "fn:a", "contains"),
5006            ],
5007        };
5008        store.upsert_projection(&seed).unwrap();
5009
5010        let onto = store.derive_ontology().unwrap();
5011        let type_kinds: std::collections::BTreeSet<_> =
5012            onto.nodes.iter().map(|n| n.label.clone()).collect();
5013        assert!(type_kinds.contains("function"));
5014        assert!(type_kinds.contains("module"));
5015        assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
5016
5017        let rel: std::collections::BTreeSet<_> = onto
5018            .edges
5019            .iter()
5020            .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
5021            .collect();
5022        assert!(rel.contains(&(
5023            "ontology_type:function".into(),
5024            "ontology_relation:calls".into(),
5025            "ontology_type:function".into()
5026        )));
5027        assert!(rel.contains(&(
5028            "ontology_type:module".into(),
5029            "ontology_relation:contains".into(),
5030            "ontology_type:function".into()
5031        )));
5032
5033        let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
5034        assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
5035
5036        // Idempotent: upserting the ontology then re-deriving excludes ontology rows.
5037        store.upsert_projection(&onto).unwrap();
5038        let onto2 = store.derive_ontology().unwrap();
5039        assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
5040        assert_eq!(onto2.nodes.len(), onto.nodes.len());
5041        assert_eq!(onto2.edges.len(), onto.edges.len());
5042    }
5043}