Skip to main content

tsift_sqlite/
lib.rs

1use anyhow::{Context, Result, bail};
2use rusqlite::{
3    Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4    types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::collections::{BTreeMap, BTreeSet};
8use std::cell::Cell;
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15    ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16    ConvexRowsGraphClient, GraphEdge, GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath,
17    GraphProjection, GraphPropertyFilter, GraphProvenance, GraphQueryOptions, GraphQueryPage,
18    GraphStore, GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
19    SQLITE_GRAPH_SCHEMA_VERSION, TerseGraphEdge, TerseGraphNode,
20    apply_graph_edge_query_page,
21    apply_graph_query_page, graph_edge_id, shortest_path_using_outgoing, stable_graph_edge_id,
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "snake_case")]
26pub enum ReadOnlyRecovery {
27    SnapshotFallback,
28    SnapshotFallbackWal,
29}
30
31pub fn read_only_snapshot_recovery(
32    db_path: &Path,
33    err: &anyhow::Error,
34) -> Option<ReadOnlyRecovery> {
35    if !error_mentions_locked_db(err) {
36        return None;
37    }
38    if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
39        Some(ReadOnlyRecovery::SnapshotFallbackWal)
40    } else if rollback_journal_path(db_path).exists() {
41        Some(ReadOnlyRecovery::SnapshotFallback)
42    } else {
43        None
44    }
45}
46
47pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
48    let mut journal = db_path.as_os_str().to_os_string();
49    journal.push("-journal");
50    PathBuf::from(journal)
51}
52
53pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
54    let mut wal = db_path.as_os_str().to_os_string();
55    wal.push("-wal");
56    PathBuf::from(wal)
57}
58
59pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
60    let mut shm = db_path.as_os_str().to_os_string();
61    shm.push("-shm");
62    PathBuf::from(shm)
63}
64
65pub fn copy_read_only_snapshot(
66    db_path: &Path,
67    default_stem: &str,
68) -> Result<(PathBuf, Vec<PathBuf>)> {
69    let snapshot_path = snapshot_copy_path(db_path, default_stem);
70    std::fs::copy(db_path, &snapshot_path).with_context(|| {
71        format!(
72            "copying locked db {} to snapshot {}",
73            db_path.display(),
74            snapshot_path.display()
75        )
76    })?;
77    let mut cleanup_paths = vec![snapshot_path.clone()];
78    copy_optional_snapshot_sidecar(
79        &wal_sidecar_path(db_path),
80        &wal_sidecar_path(&snapshot_path),
81        &mut cleanup_paths,
82    )?;
83    copy_optional_snapshot_sidecar(
84        &shared_memory_sidecar_path(db_path),
85        &shared_memory_sidecar_path(&snapshot_path),
86        &mut cleanup_paths,
87    )?;
88    Ok((snapshot_path, cleanup_paths))
89}
90
91pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
92    err.chain()
93        .any(|cause| cause.to_string().contains("database is locked"))
94}
95
96fn copy_optional_snapshot_sidecar(
97    source_path: &Path,
98    snapshot_path: &Path,
99    cleanup_paths: &mut Vec<PathBuf>,
100) -> Result<()> {
101    match std::fs::copy(source_path, snapshot_path) {
102        Ok(_) => {
103            cleanup_paths.push(snapshot_path.to_path_buf());
104            Ok(())
105        }
106        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
107        Err(err) => Err(err).with_context(|| {
108            format!(
109                "copying SQLite sidecar {} to snapshot {}",
110                source_path.display(),
111                snapshot_path.display()
112            )
113        }),
114    }
115}
116
117fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
118    let nanos = SystemTime::now()
119        .duration_since(UNIX_EPOCH)
120        .unwrap_or(Duration::ZERO)
121        .as_nanos();
122    let stem = db_path
123        .file_stem()
124        .and_then(|stem| stem.to_str())
125        .unwrap_or(default_stem);
126    let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
127    file_name.push(".db");
128    std::env::temp_dir().join(file_name)
129}
130const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
131const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
132
133/// SQLite-backed graph store.
134///
135/// # Temp-table invariant
136///
137/// Several methods — `replace_projection_with_version`, `upsert_projection`,
138/// `edges_between_nodes`, and `breadth_first_search` — create connection-scoped
139/// `TEMP TABLE` staging areas inside the underlying SQLite connection. Two
140/// concurrent calls on the same `SqliteGraphStore` (or sharing the same
141/// connection through `SqliteReadOnlyConnection`) would collide on those temp
142/// table names and produce incorrect results or errors.
143///
144/// **Only one temp-table-using method may be active at a time per connection.**
145pub struct SqliteGraphStore {
146    conn: Connection,
147    _snapshot_copy: Option<SnapshotCopyGuard>,
148    read_only_recovery: Option<ReadOnlyRecovery>,
149    temp_table_active: Cell<bool>,
150}
151
152/// Read-only handle to a SQLite graph database connection.
153///
154/// # Temp-table invariant
155///
156/// When this connection is unwrapped into a `SqliteGraphStore` via
157/// `SqliteGraphStore::from_read_only_connection`, the same temp-table
158/// concurrency rules apply: only one temp-table-using method
159/// (`replace_projection_with_version`, `upsert_projection`,
160/// `edges_between_nodes`, `breadth_first_search`) may be active at a time
161/// per connection.
162pub struct SqliteReadOnlyConnection {
163    conn: Connection,
164    _snapshot_copy: Option<SnapshotCopyGuard>,
165    recovery: Option<ReadOnlyRecovery>,
166}
167
168static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
169
170impl SqliteReadOnlyConnection {
171    pub fn conn(&self) -> &Connection {
172        &self.conn
173    }
174
175    pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
176        self.recovery
177    }
178}
179
180struct SnapshotCopyGuard {
181    paths: Vec<PathBuf>,
182}
183
184impl Drop for SnapshotCopyGuard {
185    fn drop(&mut self) {
186        for path in &self.paths {
187            let _ = std::fs::remove_file(path);
188        }
189    }
190}
191
192fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
193    conn.busy_timeout(Duration::from_secs(5))?;
194    conn.pragma_update(None, "journal_mode", "WAL")?;
195    let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
196    if mode.to_lowercase() != "wal" {
197        bail!(
198            "graph substrate db {} requires WAL mode for concurrent reads, got {}",
199            db_path.display(),
200            mode
201        );
202    }
203    conn.pragma_update(
204        None,
205        "wal_autocheckpoint",
206        SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
207    )?;
208    let checkpoint_pages: i64 =
209        conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
210    if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
211        bail!(
212            "graph substrate db {} requires wal_autocheckpoint={}, got {}",
213            db_path.display(),
214            SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
215            checkpoint_pages
216        );
217    }
218    Ok(())
219}
220
221fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
222    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
223    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
224    for row in rows {
225        if row? == column {
226            return Ok(true);
227        }
228    }
229    Ok(false)
230}
231
232fn add_column_if_missing(
233    conn: &Connection,
234    table: &str,
235    column: &str,
236    definition: &str,
237) -> Result<()> {
238    if !sqlite_column_exists(conn, table, column)? {
239        conn.execute(
240            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
241            [],
242        )?;
243    }
244    Ok(())
245}
246
247fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
248    if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
249        return Ok(());
250    }
251    let rows = {
252        let mut stmt = conn.prepare(
253            r#"
254            SELECT from_id, to_id, kind
255            FROM graph_edges
256            WHERE edge_key IS NULL OR edge_key = ''
257            ORDER BY from_id, kind, to_id
258            "#,
259        )?;
260        collect_rows(stmt.query_map([], |row| {
261            Ok((
262                row.get::<_, String>(0)?,
263                row.get::<_, String>(1)?,
264                row.get::<_, String>(2)?,
265            ))
266        })?)?
267    };
268    let mut update = conn.prepare(
269        r#"
270        UPDATE graph_edges
271        SET edge_key = ?4
272        WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
273        "#,
274    )?;
275    for (from_id, to_id, kind) in rows {
276        update.execute((
277            &from_id,
278            &to_id,
279            &kind,
280            stable_graph_edge_id(&from_id, &to_id, &kind),
281        ))?;
282    }
283    Ok(())
284}
285
286fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
287    if old_version < 2 {
288        add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
289        add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
290        add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
291        add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
292    }
293    if old_version < 3 {
294        rebuild_graph_node_properties(conn)?;
295    }
296    if old_version < 4 {
297        ensure_sqlite_graph_operator_stats_schema(conn)?;
298    }
299    if old_version < 5 {
300        add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
301        backfill_graph_edge_keys(conn)?;
302        ensure_sqlite_graph_edge_properties_schema(conn)?;
303        rebuild_graph_edge_properties(conn)?;
304    }
305    Ok(())
306}
307
308fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
309    conn.execute_batch(
310        r#"
311        CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
312            ON graph_nodes(kind, label, id);
313        CREATE TABLE IF NOT EXISTS graph_operator_stats (
314            scope TEXT PRIMARY KEY,
315            nodes INTEGER NOT NULL,
316            edges INTEGER NOT NULL,
317            tombstone_nodes INTEGER NOT NULL,
318            tombstone_edges INTEGER NOT NULL,
319            file_size_bytes INTEGER,
320            freelist_bytes INTEGER,
321            observed_at_unix INTEGER NOT NULL
322        );
323        "#,
324    )?;
325    Ok(())
326}
327
328fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
329    conn.execute_batch(
330        r#"
331        CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
332            ON graph_edges(edge_key);
333        CREATE TABLE IF NOT EXISTS graph_edge_properties (
334            edge_key TEXT NOT NULL,
335            key TEXT NOT NULL,
336            value TEXT NOT NULL,
337            PRIMARY KEY (edge_key, key),
338            FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
339        );
340        CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
341            ON graph_edge_properties(key, value, edge_key);
342        "#,
343    )?;
344    Ok(())
345}
346
347fn replace_node_properties(
348    conn: &Connection,
349    node_id: &str,
350    properties: &BTreeMap<String, String>,
351) -> Result<()> {
352    conn.execute(
353        "DELETE FROM graph_node_properties WHERE node_id = ?1",
354        [node_id],
355    )?;
356    let mut insert = conn.prepare(
357        r#"
358        INSERT INTO graph_node_properties (node_id, key, value)
359        VALUES (?1, ?2, ?3)
360        ON CONFLICT(node_id, key) DO UPDATE SET
361            value = excluded.value
362        "#,
363    )?;
364    for (key, value) in properties {
365        insert.execute((node_id, key, value))?;
366    }
367    Ok(())
368}
369
370fn replace_edge_properties(
371    conn: &Connection,
372    edge_key: &str,
373    properties: &BTreeMap<String, String>,
374) -> Result<()> {
375    conn.execute(
376        "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
377        [edge_key],
378    )?;
379    let mut insert = conn.prepare(
380        r#"
381        INSERT INTO graph_edge_properties (edge_key, key, value)
382        VALUES (?1, ?2, ?3)
383        ON CONFLICT(edge_key, key) DO UPDATE SET
384            value = excluded.value
385        "#,
386    )?;
387    for (key, value) in properties {
388        insert.execute((edge_key, key, value))?;
389    }
390    Ok(())
391}
392
393fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
394    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
395        return Ok(());
396    }
397    conn.execute_batch(
398        r#"
399        DELETE FROM graph_node_properties;
400        INSERT INTO graph_node_properties (node_id, key, value)
401        SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
402        FROM graph_nodes, json_each(graph_nodes.properties_json)
403        WHERE json_each.key IS NOT NULL
404          AND json_each.value IS NOT NULL
405        "#,
406    )?;
407    Ok(())
408}
409
410fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
411    if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
412        || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
413    {
414        return Ok(());
415    }
416    conn.execute_batch(
417        r#"
418        DELETE FROM graph_edge_properties;
419        INSERT INTO graph_edge_properties (edge_key, key, value)
420        SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
421        FROM graph_edges, json_each(graph_edges.properties_json)
422        WHERE graph_edges.edge_key IS NOT NULL
423          AND graph_edges.edge_key <> ''
424          AND json_each.key IS NOT NULL
425          AND json_each.value IS NOT NULL
426        "#,
427    )?;
428    Ok(())
429}
430
431pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
432    let conn = Connection::open_with_flags(
433        db_path,
434        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
435    )
436    .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
437    conn.busy_timeout(Duration::from_secs(5))?;
438    Ok(SqliteReadOnlyConnection {
439        conn,
440        _snapshot_copy: None,
441        recovery: None,
442    })
443}
444
445pub fn open_graph_read_only_connection_resilient(
446    db_path: &Path,
447) -> Result<SqliteReadOnlyConnection> {
448    match open_graph_read_only_connection(db_path).and_then(|connection| {
449        connection
450            .conn
451            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
452        Ok(connection)
453    }) {
454        Ok(connection) => Ok(connection),
455        Err(err) => match read_only_snapshot_recovery(db_path, &err) {
456            Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
457            None => Err(err),
458        },
459    }
460}
461
462fn open_graph_read_only_snapshot(
463    db_path: &Path,
464    recovery: ReadOnlyRecovery,
465) -> Result<SqliteReadOnlyConnection> {
466    let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
467    let conn = Connection::open_with_flags(
468        &snapshot_path,
469        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470    )
471    .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
472    conn.busy_timeout(Duration::from_secs(5))?;
473    Ok(SqliteReadOnlyConnection {
474        conn,
475        _snapshot_copy: Some(SnapshotCopyGuard {
476            paths: cleanup_paths,
477        }),
478        recovery: Some(recovery),
479    })
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
483pub struct SqliteProjectionRefreshPhase {
484    pub name: String,
485    pub duration_micros: u128,
486    pub detail: String,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490pub struct SqliteProjectionRefresh {
491    pub scope: String,
492    pub projection_version: String,
493    pub source_watermark: Option<String>,
494    pub tombstoned_nodes: Vec<String>,
495    pub tombstoned_edges: Vec<String>,
496    pub upserted_nodes: usize,
497    pub upserted_edges: usize,
498    pub unchanged_nodes: usize,
499    pub unchanged_edges: usize,
500    pub upserted_properties: usize,
501    pub unchanged_properties: usize,
502    pub deleted_properties: usize,
503    pub deleted_nodes: usize,
504    pub deleted_edges: usize,
505    pub pruned_tombstones: usize,
506    pub file_size_bytes_before: Option<u64>,
507    pub file_size_bytes_after: Option<u64>,
508    pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
512pub struct SqliteProjectionVersion {
513    pub projection_version: String,
514    pub content_hash: Option<String>,
515    pub source_watermark: Option<String>,
516}
517
518fn sqlite_refresh_phase_timing(
519    name: &str,
520    started: Instant,
521    detail: &str,
522) -> SqliteProjectionRefreshPhase {
523    SqliteProjectionRefreshPhase {
524        name: name.to_string(),
525        duration_micros: started.elapsed().as_micros(),
526        detail: detail.to_string(),
527    }
528}
529
530fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
531    let row = format!(
532        "({})",
533        (0..column_count)
534            .map(|_| "?")
535            .collect::<Vec<_>>()
536            .join(", ")
537    );
538    (0..row_count)
539        .map(|_| row.as_str())
540        .collect::<Vec<_>>()
541        .join(", ")
542}
543
544fn sqlite_stage_all_ids(
545    tx: &rusqlite::Transaction<'_>,
546    nodes: &[GraphNode],
547    edges: &[GraphEdge],
548) -> Result<()> {
549    for chunk in nodes.iter().map(|n| &n.id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
550        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
551        let sql = format!("INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}", placeholders.join(", "));
552        let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
553        tx.execute(&sql, params_from_iter(values.iter()))?;
554    }
555    for chunk in edges.iter().map(graph_edge_id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
556        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
557        let sql = format!("INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}", placeholders.join(", "));
558        let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
559        tx.execute(&sql, params_from_iter(values.iter()))?;
560    }
561    Ok(())
562}
563
564fn sqlite_stage_projection_nodes(
565    tx: &rusqlite::Transaction<'_>,
566    nodes: &[&GraphNode],
567    source_watermark: Option<&str>,
568) -> Result<()> {
569    for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
570        let sql = format!(
571            r#"
572            INSERT INTO next_graph_nodes
573                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
574            VALUES {}
575            "#,
576            sqlite_graph_staging_placeholders(8, chunk.len())
577        );
578        let mut values = Vec::with_capacity(chunk.len() * 8);
579        for node in chunk {
580            values.push(Value::Text(node.id.clone()));
581            values.push(Value::Text(node.kind.clone()));
582            values.push(Value::Text(node.label.clone()));
583            values.push(Value::Text(to_json(&node.properties)?));
584            values.push(Value::Text(to_json(&node.provenance)?));
585            values.push(
586                optional_to_json(&node.freshness)?
587                    .map(Value::Text)
588                    .unwrap_or(Value::Null),
589            );
590            values.push(Value::Text(row_hash(node)?));
591            values.push(
592                source_watermark
593                    .map(|watermark| Value::Text(watermark.to_string()))
594                    .unwrap_or(Value::Null),
595            );
596        }
597        tx.execute(&sql, params_from_iter(values))?;
598    }
599    Ok(())
600}
601
602fn sqlite_stage_projection_edges(
603    tx: &rusqlite::Transaction<'_>,
604    edges: &[&GraphEdge],
605    source_watermark: Option<&str>,
606) -> Result<()> {
607    for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
608        let sql = format!(
609            r#"
610            INSERT INTO next_graph_edges
611                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
612            VALUES {}
613            "#,
614            sqlite_graph_staging_placeholders(9, chunk.len())
615        );
616        let mut values = Vec::with_capacity(chunk.len() * 9);
617        for edge in chunk {
618            values.push(Value::Text(graph_edge_id(edge)));
619            values.push(Value::Text(edge.from_id.clone()));
620            values.push(Value::Text(edge.to_id.clone()));
621            values.push(Value::Text(edge.kind.clone()));
622            values.push(Value::Text(to_json(&edge.properties)?));
623            values.push(Value::Text(to_json(&edge.provenance)?));
624            values.push(
625                optional_to_json(&edge.freshness)?
626                    .map(Value::Text)
627                    .unwrap_or(Value::Null),
628            );
629            values.push(Value::Text(row_hash(edge)?));
630            values.push(
631                source_watermark
632                    .map(|watermark| Value::Text(watermark.to_string()))
633                    .unwrap_or(Value::Null),
634            );
635        }
636        tx.execute(&sql, params_from_iter(values))?;
637    }
638    Ok(())
639}
640
641impl SqliteGraphStore {
642    fn assert_not_in_temp_table_section(&self) {
643        if self.temp_table_active.get() {
644            panic!("SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection");
645        }
646    }
647
648    pub fn open(db_path: &Path) -> Result<Self> {
649        if let Some(parent) = db_path.parent() {
650            std::fs::create_dir_all(parent)
651                .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
652        }
653        let conn = Connection::open(db_path)
654            .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
655        configure_writable_graph_connection(&conn, db_path)?;
656        Self::from_connection(conn)
657    }
658
659    pub fn in_memory() -> Result<Self> {
660        let conn = Connection::open_in_memory()?;
661        conn.busy_timeout(Duration::from_secs(5))?;
662        Self::from_connection(conn)
663    }
664
665    pub fn open_read_only(db_path: &Path) -> Result<Self> {
666        let connection = open_graph_read_only_connection(db_path)?;
667        Self::from_read_only_connection(connection)
668    }
669
670    pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
671        let connection = open_graph_read_only_connection_resilient(db_path)?;
672        Self::from_read_only_connection(connection)
673    }
674
675    pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
676        self.read_only_recovery
677    }
678
679    pub fn has_user_triggers(&self) -> Result<bool> {
680        self.conn
681            .query_row(
682                r#"
683                SELECT EXISTS(
684                    SELECT 1
685                    FROM sqlite_master
686                    WHERE type = 'trigger'
687                      AND name NOT LIKE 'sqlite_%'
688                )
689                "#,
690                [],
691                |row| row.get::<_, bool>(0),
692            )
693            .map_err(Into::into)
694    }
695
696    fn from_connection(conn: Connection) -> Result<Self> {
697        conn.pragma_update(None, "foreign_keys", "ON")?;
698        let user_version: i64 =
699            conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
700        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
701            bail!(
702                "graph.db schema version {} is newer than supported version {}",
703                user_version,
704                SQLITE_GRAPH_SCHEMA_VERSION
705            );
706        }
707        conn.execute_batch(
708            r#"
709            CREATE TABLE IF NOT EXISTS graph_nodes (
710                id TEXT PRIMARY KEY,
711                kind TEXT NOT NULL,
712                label TEXT NOT NULL,
713                properties_json TEXT NOT NULL DEFAULT '{}',
714                provenance_json TEXT NOT NULL DEFAULT '[]',
715                freshness_json TEXT,
716                row_hash TEXT,
717                source_watermark TEXT
718            );
719            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
720                ON graph_nodes(kind);
721            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
722                ON graph_nodes(kind, label, id);
723
724            CREATE TABLE IF NOT EXISTS graph_edges (
725                edge_key TEXT NOT NULL UNIQUE,
726                from_id TEXT NOT NULL,
727                to_id TEXT NOT NULL,
728                kind TEXT NOT NULL,
729                properties_json TEXT NOT NULL DEFAULT '{}',
730                provenance_json TEXT NOT NULL DEFAULT '[]',
731                freshness_json TEXT,
732                row_hash TEXT,
733                source_watermark TEXT,
734                PRIMARY KEY (from_id, to_id, kind),
735                FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
736                FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
737            );
738            CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
739                ON graph_edges(from_id, kind);
740            CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
741                ON graph_edges(to_id, kind);
742
743            CREATE TABLE IF NOT EXISTS graph_projection_versions (
744                scope TEXT PRIMARY KEY,
745                projection_version TEXT NOT NULL,
746                content_hash TEXT,
747                source_watermark TEXT,
748                observed_at_unix INTEGER NOT NULL
749            );
750
751            CREATE TABLE IF NOT EXISTS graph_tombstones (
752                row_key TEXT PRIMARY KEY,
753                row_kind TEXT NOT NULL,
754                deleted_at_unix INTEGER NOT NULL
755            );
756
757            CREATE TABLE IF NOT EXISTS graph_node_properties (
758                node_id TEXT NOT NULL,
759                key TEXT NOT NULL,
760                value TEXT NOT NULL,
761                PRIMARY KEY (node_id, key),
762                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
763            );
764            CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
765                ON graph_node_properties(key, value, node_id);
766
767            CREATE TABLE IF NOT EXISTS graph_operator_stats (
768                scope TEXT PRIMARY KEY,
769                nodes INTEGER NOT NULL,
770                edges INTEGER NOT NULL,
771                tombstone_nodes INTEGER NOT NULL,
772                tombstone_edges INTEGER NOT NULL,
773                file_size_bytes INTEGER,
774                freelist_bytes INTEGER,
775                observed_at_unix INTEGER NOT NULL
776            );
777            "#,
778        )?;
779        if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
780            migrate_sqlite_graph_schema(&conn, user_version)?;
781            conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
782        }
783        Ok(Self {
784            conn,
785            _snapshot_copy: None,
786            read_only_recovery: None,
787            temp_table_active: Cell::new(false),
788        })
789    }
790
791    fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
792        connection.conn.pragma_update(None, "foreign_keys", "ON")?;
793        let user_version: i64 =
794            connection
795                .conn
796                .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
797        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
798            bail!(
799                "graph.db schema version {} is newer than supported version {}",
800                user_version,
801                SQLITE_GRAPH_SCHEMA_VERSION
802            );
803        }
804        connection
805            .conn
806            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
807        Ok(Self {
808            conn: connection.conn,
809            _snapshot_copy: connection._snapshot_copy,
810            read_only_recovery: connection.recovery,
811            temp_table_active: Cell::new(false),
812        })
813    }
814
815    pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
816        self.replace_projection_with_version("root", projection, None, None)
817            .map(|_| ())
818    }
819
820    pub fn replace_projection_with_version(
821        &mut self,
822        scope: impl Into<String>,
823        projection: &GraphProjection,
824        projection_version: Option<&str>,
825        source_watermark: Option<String>,
826    ) -> Result<SqliteProjectionRefresh> {
827        self.assert_not_in_temp_table_section();
828        self.temp_table_active.set(true);
829        let scope = scope.into();
830        let result = self.replace_projection_with_version_fallible(scope, projection, projection_version, source_watermark);
831        self.temp_table_active.set(false);
832        if let Ok(ref refresh) = result {
833            let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
834            let autocheckpoint = if total_rows > 10000 {
835                8192
836            } else if total_rows > 1000 {
837                4096
838            } else {
839                2048
840            };
841            let _ = self.conn.pragma_update(None, "wal_autocheckpoint", autocheckpoint);
842            let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
843        }
844        result
845    }
846
847    fn replace_projection_with_version_fallible(
848        &mut self,
849        scope: String,
850        projection: &GraphProjection,
851        projection_version: Option<&str>,
852        source_watermark: Option<String>,
853    ) -> Result<SqliteProjectionRefresh> {
854        let projection_version = projection_version
855            .map(str::to_string)
856            .or_else(|| projection_version_from_nodes(&projection.nodes))
857            .unwrap_or_else(|| "unversioned".to_string());
858        let projection_hash = projection_hash_from_nodes(&projection.nodes);
859        let observed_at_unix = unix_now();
860        let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
861        let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
862        let mut phase_timings = Vec::new();
863
864        let tx = self.conn.transaction()?;
865        let started = Instant::now();
866        tx.execute_batch(
867            r#"
868            CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
869                id TEXT PRIMARY KEY,
870                kind TEXT NOT NULL,
871                label TEXT NOT NULL,
872                properties_json TEXT NOT NULL,
873                provenance_json TEXT NOT NULL,
874                freshness_json TEXT,
875                row_hash TEXT NOT NULL,
876                source_watermark TEXT
877            );
878            CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
879                edge_key TEXT PRIMARY KEY,
880                from_id TEXT NOT NULL,
881                to_id TEXT NOT NULL,
882                kind TEXT NOT NULL,
883                properties_json TEXT NOT NULL,
884                provenance_json TEXT NOT NULL,
885                freshness_json TEXT,
886                row_hash TEXT NOT NULL,
887                source_watermark TEXT
888            );
889            CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
890                ON next_graph_edges(from_id, to_id, kind);
891            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
892                node_id TEXT NOT NULL,
893                key TEXT NOT NULL,
894                value TEXT NOT NULL,
895                PRIMARY KEY (node_id, key)
896            );
897            CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
898                edge_key TEXT NOT NULL,
899                key TEXT NOT NULL,
900                value TEXT NOT NULL,
901                PRIMARY KEY (edge_key, key)
902            );
903            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
904                id TEXT PRIMARY KEY
905            );
906            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
907                edge_key TEXT PRIMARY KEY
908            );
909            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
910                id TEXT PRIMARY KEY
911            );
912            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
913                edge_key TEXT PRIMARY KEY
914            );
915            DELETE FROM next_graph_nodes;
916            DELETE FROM next_graph_edges;
917            DELETE FROM next_graph_node_properties;
918            DELETE FROM next_graph_edge_properties;
919            DELETE FROM next_graph_changed_nodes;
920            DELETE FROM next_graph_changed_edges;
921            DELETE FROM next_graph_all_node_ids;
922            DELETE FROM next_graph_all_edge_keys;
923            "#,
924        )?;
925        phase_timings.push(sqlite_refresh_phase_timing(
926            "sqlite_temp_table_prepare",
927            started,
928            "create and clear refresh staging tables before row loading",
929        ));
930        let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
931            (projection.nodes.iter().collect(), projection.edges.iter().collect(), 0usize, 0usize)
932        } else {
933            let existing_node_hashes: BTreeMap<String, String> = {
934                let mut stmt = tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
935                let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
936                collect_rows(rows)?.into_iter().collect()
937            };
938            let existing_edge_hashes: BTreeMap<String, String> = {
939                let mut stmt = tx.prepare("SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL")?;
940                let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
941                collect_rows(rows)?.into_iter().collect()
942            };
943            let cn: Vec<&GraphNode> = projection.nodes.iter().filter(|n| {
944                let hash = row_hash(*n).ok();
945                hash.as_ref().is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
946            }).collect();
947            let ce: Vec<&GraphEdge> = projection.edges.iter().filter(|e| {
948                let hash = row_hash(*e).ok();
949                let ek = graph_edge_id(e);
950                hash.as_ref().is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
951            }).collect();
952            let sn = projection.nodes.len() - cn.len();
953            let se = projection.edges.len() - ce.len();
954            (cn, ce, sn, se)
955        };
956        {
957            let started = Instant::now();
958            sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
959            sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
960            sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
961            phase_timings.push(sqlite_refresh_phase_timing(
962                "sqlite_node_staging",
963                started,
964                &format!(
965                    "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
966                    changed_nodes.len(),
967                    skipped_nodes,
968                    changed_edges.len(),
969                    skipped_edges,
970                    SQLITE_GRAPH_STAGING_CHUNK_ROWS
971                ),
972            ));
973        }
974        {
975            let started = Instant::now();
976            let changed_nodes_sql = if force_refresh_writes {
977                r#"
978                INSERT INTO next_graph_changed_nodes (id)
979                SELECT id
980                FROM next_graph_nodes
981                "#
982            } else {
983                r#"
984                INSERT INTO next_graph_changed_nodes (id)
985                SELECT n.id
986                FROM next_graph_nodes n
987                LEFT JOIN graph_nodes g ON g.id = n.id
988                WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
989                "#
990            };
991            tx.execute(changed_nodes_sql, [])?;
992            tx.execute_batch(
993                r#"
994                INSERT INTO next_graph_node_properties (node_id, key, value)
995                SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
996                FROM next_graph_nodes n
997                JOIN next_graph_changed_nodes c ON c.id = n.id,
998                     json_each(n.properties_json)
999                WHERE json_each.key IS NOT NULL
1000                  AND json_each.value IS NOT NULL
1001                "#,
1002            )?;
1003            phase_timings.push(sqlite_refresh_phase_timing(
1004                "sqlite_property_row_staging",
1005                started,
1006                "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1007            ));
1008        }
1009        {
1010            let started = Instant::now();
1011            let changed_edges_sql = if force_refresh_writes {
1012                r#"
1013                INSERT INTO next_graph_changed_edges (edge_key)
1014                SELECT edge_key
1015                FROM next_graph_edges
1016                "#
1017            } else {
1018                r#"
1019                INSERT INTO next_graph_changed_edges (edge_key)
1020                SELECT n.edge_key
1021                FROM next_graph_edges n
1022                LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1023                WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1024                "#
1025            };
1026            tx.execute(changed_edges_sql, [])?;
1027            tx.execute_batch(
1028                r#"
1029                INSERT INTO next_graph_edge_properties (edge_key, key, value)
1030                SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1031                FROM next_graph_edges e
1032                JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1033                     json_each(e.properties_json)
1034                WHERE json_each.key IS NOT NULL
1035                  AND json_each.value IS NOT NULL
1036                "#,
1037            )?;
1038            phase_timings.push(sqlite_refresh_phase_timing(
1039                "sqlite_edge_property_row_staging",
1040                started,
1041                "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1042            ));
1043        }
1044
1045        let delta_started = Instant::now();
1046        let tombstoned_nodes = {
1047            let sql = if force_refresh_writes {
1048                r#"
1049                SELECT g.id
1050                FROM graph_nodes g
1051                LEFT JOIN next_graph_nodes n ON n.id = g.id
1052                WHERE n.id IS NULL
1053                ORDER BY g.id
1054                "#
1055            } else {
1056                r#"
1057                SELECT g.id
1058                FROM graph_nodes g
1059                LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1060                WHERE n.id IS NULL
1061                ORDER BY g.id
1062                "#
1063            };
1064            let mut stmt = tx.prepare(sql)?;
1065            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1066        };
1067        let tombstoned_edges = {
1068            let sql = if force_refresh_writes {
1069                r#"
1070                SELECT g.edge_key
1071                FROM graph_edges g
1072                LEFT JOIN next_graph_edges n
1073                    ON n.edge_key = g.edge_key
1074                WHERE n.edge_key IS NULL
1075                ORDER BY g.edge_key
1076                "#
1077            } else {
1078                r#"
1079                SELECT g.edge_key
1080                FROM graph_edges g
1081                LEFT JOIN next_graph_all_edge_keys n
1082                    ON n.edge_key = g.edge_key
1083                WHERE n.edge_key IS NULL
1084                ORDER BY g.edge_key
1085                "#
1086            };
1087            let mut stmt = tx.prepare(sql)?;
1088            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1089        };
1090        let unchanged_nodes: usize = if force_refresh_writes {
1091            tx.query_row(
1092                r#"
1093                SELECT COUNT(*)
1094                FROM next_graph_nodes n
1095                JOIN graph_nodes g ON g.id = n.id
1096                WHERE g.row_hash = n.row_hash
1097                "#,
1098                [],
1099                |row| row.get(0),
1100            )?
1101        } else {
1102            skipped_nodes
1103        };
1104        let unchanged_edges: usize = if force_refresh_writes {
1105            tx.query_row(
1106                r#"
1107                SELECT COUNT(*)
1108                FROM next_graph_edges n
1109                JOIN graph_edges g
1110                    ON g.edge_key = n.edge_key
1111                WHERE g.row_hash = n.row_hash
1112                "#,
1113                [],
1114                |row| row.get(0),
1115            )?
1116        } else {
1117            skipped_edges
1118        };
1119        let reused_owner_node_properties: usize = if force_refresh_writes {
1120            tx.query_row(
1121                r#"
1122                SELECT COUNT(*)
1123                FROM graph_node_properties g
1124                JOIN next_graph_nodes n ON n.id = g.node_id
1125                LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1126                WHERE c.id IS NULL
1127                "#,
1128                [],
1129                |row| row.get(0),
1130            )?
1131        } else {
1132            tx.query_row(
1133                r#"
1134                SELECT COUNT(*)
1135                FROM graph_node_properties g
1136                JOIN next_graph_all_node_ids a ON a.id = g.node_id
1137                LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1138                WHERE c.id IS NULL
1139                "#,
1140                [],
1141                |row| row.get(0),
1142            )?
1143        };
1144        let reused_owner_edge_properties: usize = if force_refresh_writes {
1145            tx.query_row(
1146                r#"
1147                SELECT COUNT(*)
1148                FROM graph_edge_properties g
1149                JOIN next_graph_edges n ON n.edge_key = g.edge_key
1150                LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1151                WHERE c.edge_key IS NULL
1152                "#,
1153                [],
1154                |row| row.get(0),
1155            )?
1156        } else {
1157            tx.query_row(
1158                r#"
1159                SELECT COUNT(*)
1160                FROM graph_edge_properties g
1161                JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1162                LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1163                WHERE c.edge_key IS NULL
1164                "#,
1165                [],
1166                |row| row.get(0),
1167            )?
1168        };
1169        let unchanged_changed_node_properties: usize = tx.query_row(
1170            r#"
1171            SELECT COUNT(*)
1172            FROM next_graph_node_properties n
1173            JOIN graph_node_properties g
1174                ON g.node_id = n.node_id AND g.key = n.key
1175            WHERE g.value = n.value
1176            "#,
1177            [],
1178            |row| row.get(0),
1179        )?;
1180        let unchanged_changed_edge_properties: usize = tx.query_row(
1181            r#"
1182            SELECT COUNT(*)
1183            FROM next_graph_edge_properties n
1184            JOIN graph_edge_properties g
1185                ON g.edge_key = n.edge_key AND g.key = n.key
1186            WHERE g.value = n.value
1187            "#,
1188            [],
1189            |row| row.get(0),
1190        )?;
1191        let unchanged_properties = reused_owner_node_properties
1192            + reused_owner_edge_properties
1193            + unchanged_changed_node_properties
1194            + unchanged_changed_edge_properties;
1195
1196        let deleted_edges = if force_refresh_writes {
1197            tx.execute(
1198                r#"
1199                DELETE FROM graph_edges
1200                WHERE NOT EXISTS (
1201                    SELECT 1
1202                    FROM next_graph_edges n
1203                    WHERE n.edge_key = graph_edges.edge_key
1204                )
1205                "#,
1206                [],
1207            )?
1208        } else {
1209            tx.execute(
1210                r#"
1211                DELETE FROM graph_edges
1212                WHERE NOT EXISTS (
1213                    SELECT 1
1214                    FROM next_graph_all_edge_keys n
1215                    WHERE n.edge_key = graph_edges.edge_key
1216                )
1217                "#,
1218                [],
1219            )?
1220        };
1221        let deleted_nodes = if force_refresh_writes {
1222            tx.execute(
1223                r#"
1224                DELETE FROM graph_nodes
1225                WHERE NOT EXISTS (
1226                    SELECT 1
1227                    FROM next_graph_nodes n
1228                    WHERE n.id = graph_nodes.id
1229                )
1230                "#,
1231                [],
1232            )?
1233        } else {
1234            tx.execute(
1235                r#"
1236                DELETE FROM graph_nodes
1237                WHERE NOT EXISTS (
1238                    SELECT 1
1239                    FROM next_graph_all_node_ids n
1240                    WHERE n.id = graph_nodes.id
1241                )
1242                "#,
1243                [],
1244            )?
1245        };
1246
1247        let upsert_nodes_sql = r#"
1248            INSERT INTO graph_nodes
1249                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1250            SELECT
1251                n.id,
1252                n.kind,
1253                n.label,
1254                n.properties_json,
1255                n.provenance_json,
1256                n.freshness_json,
1257                n.row_hash,
1258                n.source_watermark
1259            FROM next_graph_nodes n
1260            JOIN next_graph_changed_nodes c ON c.id = n.id
1261            WHERE true
1262            ON CONFLICT(id) DO UPDATE SET
1263                kind = excluded.kind,
1264                label = excluded.label,
1265                properties_json = excluded.properties_json,
1266                provenance_json = excluded.provenance_json,
1267                freshness_json = excluded.freshness_json,
1268                row_hash = excluded.row_hash,
1269                source_watermark = excluded.source_watermark
1270            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1271            "#;
1272        tx.execute(upsert_nodes_sql, [])?;
1273        let upsert_edges_sql = r#"
1274            INSERT INTO graph_edges
1275                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1276            SELECT
1277                n.edge_key,
1278                n.from_id,
1279                n.to_id,
1280                n.kind,
1281                n.properties_json,
1282                n.provenance_json,
1283                n.freshness_json,
1284                n.row_hash,
1285                n.source_watermark
1286            FROM next_graph_edges n
1287            JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1288            WHERE true
1289            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1290                edge_key = excluded.edge_key,
1291                properties_json = excluded.properties_json,
1292                provenance_json = excluded.provenance_json,
1293                freshness_json = excluded.freshness_json,
1294                row_hash = excluded.row_hash,
1295                source_watermark = excluded.source_watermark
1296            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1297            "#;
1298        tx.execute(upsert_edges_sql, [])?;
1299        let deleted_node_properties = tx.execute(
1300            r#"
1301            DELETE FROM graph_node_properties
1302            WHERE EXISTS (
1303                SELECT 1
1304                FROM next_graph_changed_nodes c
1305                WHERE c.id = graph_node_properties.node_id
1306            )
1307              AND NOT EXISTS (
1308                SELECT 1
1309                FROM next_graph_node_properties n
1310                WHERE n.node_id = graph_node_properties.node_id
1311                  AND n.key = graph_node_properties.key
1312            )
1313            "#,
1314            [],
1315        )?;
1316        let deleted_edge_properties = tx.execute(
1317            r#"
1318            DELETE FROM graph_edge_properties
1319            WHERE EXISTS (
1320                SELECT 1
1321                FROM next_graph_changed_edges c
1322                WHERE c.edge_key = graph_edge_properties.edge_key
1323            )
1324              AND NOT EXISTS (
1325                SELECT 1
1326                FROM next_graph_edge_properties n
1327                WHERE n.edge_key = graph_edge_properties.edge_key
1328                  AND n.key = graph_edge_properties.key
1329            )
1330            "#,
1331            [],
1332        )?;
1333        let deleted_properties = deleted_node_properties + deleted_edge_properties;
1334        let upsert_properties_sql = r#"
1335            INSERT INTO graph_node_properties (node_id, key, value)
1336            SELECT n.node_id, n.key, n.value
1337            FROM next_graph_node_properties n
1338            LEFT JOIN graph_node_properties g
1339                ON g.node_id = n.node_id AND g.key = n.key
1340            WHERE g.node_id IS NULL OR g.value IS NOT n.value
1341            ON CONFLICT(node_id, key) DO UPDATE SET
1342                value = excluded.value
1343            WHERE graph_node_properties.value IS NOT excluded.value
1344            "#;
1345        let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1346        let upsert_edge_properties_sql = r#"
1347            INSERT INTO graph_edge_properties (edge_key, key, value)
1348            SELECT n.edge_key, n.key, n.value
1349            FROM next_graph_edge_properties n
1350            LEFT JOIN graph_edge_properties g
1351                ON g.edge_key = n.edge_key AND g.key = n.key
1352            WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1353            ON CONFLICT(edge_key, key) DO UPDATE SET
1354                value = excluded.value
1355            WHERE graph_edge_properties.value IS NOT excluded.value
1356            "#;
1357        let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1358        let upserted_properties = upserted_node_properties + upserted_edge_properties;
1359        tx.execute(
1360            r#"
1361            INSERT INTO graph_projection_versions
1362                (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1363            VALUES (?1, ?2, ?3, ?4, ?5)
1364            ON CONFLICT(scope) DO UPDATE SET
1365                projection_version = excluded.projection_version,
1366                content_hash = excluded.content_hash,
1367                source_watermark = excluded.source_watermark,
1368                observed_at_unix = excluded.observed_at_unix
1369            "#,
1370            (
1371                &scope,
1372                &projection_version,
1373                &projection_hash,
1374                &source_watermark,
1375                observed_at_unix,
1376            ),
1377        )?;
1378        let pruned_node_tombstones = tx.execute(
1379            r#"
1380            DELETE FROM graph_tombstones
1381            WHERE row_kind = 'node'
1382              AND EXISTS (
1383                SELECT 1
1384                FROM next_graph_nodes n
1385                WHERE n.id = substr(graph_tombstones.row_key, 6)
1386              )
1387            "#,
1388            [],
1389        )?;
1390        let pruned_edge_tombstones = tx.execute(
1391            r#"
1392            DELETE FROM graph_tombstones
1393            WHERE row_kind = 'edge'
1394              AND EXISTS (
1395                SELECT 1
1396                FROM next_graph_edges n
1397                WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1398              )
1399            "#,
1400            [],
1401        )?;
1402        {
1403            let mut insert_node_tombstone = tx.prepare(
1404                r#"
1405                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1406                VALUES (?1, 'node', ?2)
1407                ON CONFLICT(row_key) DO UPDATE SET
1408                    row_kind = excluded.row_kind,
1409                    deleted_at_unix = excluded.deleted_at_unix
1410                "#,
1411            )?;
1412            for id in &tombstoned_nodes {
1413                insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1414            }
1415        }
1416        {
1417            let mut insert_edge_tombstone = tx.prepare(
1418                r#"
1419                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1420                VALUES (?1, 'edge', ?2)
1421                ON CONFLICT(row_key) DO UPDATE SET
1422                    row_kind = excluded.row_kind,
1423                    deleted_at_unix = excluded.deleted_at_unix
1424                "#,
1425            )?;
1426            for key in &tombstoned_edges {
1427                insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1428            }
1429        }
1430        let tombstone_node_count: usize = tx.query_row(
1431            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1432            [],
1433            |row| row.get(0),
1434        )?;
1435        let tombstone_edge_count: usize = tx.query_row(
1436            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1437            [],
1438            |row| row.get(0),
1439        )?;
1440        tx.execute(
1441            r#"
1442            INSERT INTO graph_operator_stats
1443                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1444            VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1445            ON CONFLICT(scope) DO UPDATE SET
1446                nodes = excluded.nodes,
1447                edges = excluded.edges,
1448                tombstone_nodes = excluded.tombstone_nodes,
1449                tombstone_edges = excluded.tombstone_edges,
1450                file_size_bytes = excluded.file_size_bytes,
1451                freelist_bytes = excluded.freelist_bytes,
1452                observed_at_unix = excluded.observed_at_unix
1453            "#,
1454            (
1455                &scope,
1456                projection.nodes.len() as i64,
1457                projection.edges.len() as i64,
1458                tombstone_node_count as i64,
1459                tombstone_edge_count as i64,
1460                observed_at_unix,
1461            ),
1462        )?;
1463        phase_timings.push(sqlite_refresh_phase_timing(
1464            "sqlite_delta_write",
1465            delta_started,
1466            "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1467        ));
1468        let commit_started = Instant::now();
1469        tx.commit()?;
1470        phase_timings.push(sqlite_refresh_phase_timing(
1471            "sqlite_commit",
1472            commit_started,
1473            "commit refresh transaction and publish old-or-new graph visibility",
1474        ));
1475        let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1476        let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1477        let stats_started = Instant::now();
1478        self.conn.execute(
1479            r#"
1480            UPDATE graph_operator_stats
1481            SET file_size_bytes = ?2,
1482                freelist_bytes = ?3,
1483                observed_at_unix = ?4
1484            WHERE scope = ?1
1485            "#,
1486            (
1487                &scope,
1488                file_size_bytes_after.map(|value| value as i64),
1489                freelist_bytes_after.map(|value| value as i64),
1490                unix_now(),
1491            ),
1492        )?;
1493        phase_timings.push(sqlite_refresh_phase_timing(
1494            "sqlite_stats_cache_update",
1495            stats_started,
1496            "persist post-commit file and freelist proof for status/doctor",
1497        ));
1498        Ok(SqliteProjectionRefresh {
1499            scope,
1500            projection_version,
1501            source_watermark,
1502            upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1503            upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1504            unchanged_nodes,
1505            unchanged_edges,
1506            upserted_properties,
1507            unchanged_properties,
1508            deleted_properties,
1509            deleted_nodes,
1510            deleted_edges,
1511            pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1512            file_size_bytes_before,
1513            file_size_bytes_after,
1514            tombstoned_nodes,
1515            tombstoned_edges,
1516            phase_timings,
1517        })
1518    }
1519
1520    /// Derive a Semantic Ontology Graph (`#memgraphrag-ont`, MemGraphRAG arxiv
1521    /// 2606.00610 third layer) from the instance graph: one `ontology_type` node
1522    /// per distinct node kind, and one `ontology_relation:<edge_kind>` edge per
1523    /// observed `(from_kind, edge_kind, to_kind)` triple, each carrying an
1524    /// `instance_count`. This data-driven schema lets retrieval start from
1525    /// abstract types and prune by permitted inter-type relations. Existing
1526    /// ontology rows are excluded so the derivation is idempotent and never folds
1527    /// the ontology layer into itself.
1528    pub fn derive_ontology(&self) -> Result<GraphProjection> {
1529        let mut projection = GraphProjection::default();
1530
1531        let mut node_stmt = self.conn.prepare(
1532            "SELECT kind, COUNT(*) FROM graph_nodes \
1533             WHERE kind != 'ontology_type' \
1534             GROUP BY kind ORDER BY kind",
1535        )?;
1536        let node_rows =
1537            node_stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)))?;
1538        for row in node_rows {
1539            let (kind, count) = row?;
1540            projection.nodes.push(
1541                GraphNode::new(format!("ontology_type:{kind}"), "ontology_type", &kind)
1542                    .with_property("type_kind", &kind)
1543                    .with_property("instance_count", count.to_string())
1544                    .with_provenance(GraphProvenance::new("tsift-ontology", &kind)),
1545            );
1546        }
1547
1548        let mut rel_stmt = self.conn.prepare(
1549            "SELECT n1.kind, e.kind, n2.kind, COUNT(*) \
1550             FROM graph_edges e \
1551             JOIN graph_nodes n1 ON e.from_id = n1.id \
1552             JOIN graph_nodes n2 ON e.to_id = n2.id \
1553             WHERE e.kind NOT LIKE 'ontology_relation:%' \
1554               AND n1.kind != 'ontology_type' AND n2.kind != 'ontology_type' \
1555             GROUP BY n1.kind, e.kind, n2.kind \
1556             ORDER BY n1.kind, e.kind, n2.kind",
1557        )?;
1558        let rel_rows = rel_stmt.query_map([], |row| {
1559            Ok((
1560                row.get::<_, String>(0)?,
1561                row.get::<_, String>(1)?,
1562                row.get::<_, String>(2)?,
1563                row.get::<_, i64>(3)?,
1564            ))
1565        })?;
1566        for row in rel_rows {
1567            let (from_kind, edge_kind, to_kind, count) = row?;
1568            projection.edges.push(
1569                GraphEdge::new(
1570                    format!("ontology_type:{from_kind}"),
1571                    format!("ontology_type:{to_kind}"),
1572                    format!("ontology_relation:{edge_kind}"),
1573                )
1574                .with_property("edge_kind", &edge_kind)
1575                .with_property("instance_count", count.to_string())
1576                .with_provenance(GraphProvenance::new("tsift-ontology", &edge_kind)),
1577            );
1578        }
1579
1580        Ok(projection)
1581    }
1582
1583    pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1584        let tx = self.conn.transaction()?;
1585        {
1586            let mut insert_node = tx.prepare(
1587                r#"
1588                INSERT INTO graph_nodes
1589                    (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1590                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1591                ON CONFLICT(id) DO UPDATE SET
1592                    kind = excluded.kind,
1593                    label = excluded.label,
1594                    properties_json = excluded.properties_json,
1595                    provenance_json = excluded.provenance_json,
1596                    freshness_json = excluded.freshness_json,
1597                    row_hash = excluded.row_hash,
1598                    source_watermark = excluded.source_watermark
1599                "#,
1600            )?;
1601            let mut delete_properties =
1602                tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1603            let mut insert_property = tx.prepare(
1604                r#"
1605                INSERT INTO graph_node_properties (node_id, key, value)
1606                VALUES (?1, ?2, ?3)
1607                "#,
1608            )?;
1609            for node in &projection.nodes {
1610                insert_node.execute((
1611                    &node.id,
1612                    &node.kind,
1613                    &node.label,
1614                    to_json(&node.properties)?,
1615                    to_json(&node.provenance)?,
1616                    optional_to_json(&node.freshness)?,
1617                    row_hash(node)?,
1618                ))?;
1619                delete_properties.execute([&node.id])?;
1620                for (key, value) in &node.properties {
1621                    insert_property.execute((&node.id, key, value))?;
1622                }
1623            }
1624        }
1625        {
1626            let mut insert_edge = tx.prepare(
1627                r#"
1628                INSERT INTO graph_edges
1629                    (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1630                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1631                ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1632                    edge_key = excluded.edge_key,
1633                    properties_json = excluded.properties_json,
1634                    provenance_json = excluded.provenance_json,
1635                    freshness_json = excluded.freshness_json,
1636                    row_hash = excluded.row_hash,
1637                    source_watermark = excluded.source_watermark
1638                "#,
1639            )?;
1640            let mut delete_properties =
1641                tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1642            let mut insert_property = tx.prepare(
1643                r#"
1644                INSERT INTO graph_edge_properties (edge_key, key, value)
1645                VALUES (?1, ?2, ?3)
1646                "#,
1647            )?;
1648            for edge in &projection.edges {
1649                let edge_key = graph_edge_id(edge);
1650                insert_edge.execute((
1651                    &edge_key,
1652                    &edge.from_id,
1653                    &edge.to_id,
1654                    &edge.kind,
1655                    to_json(&edge.properties)?,
1656                    to_json(&edge.provenance)?,
1657                    optional_to_json(&edge.freshness)?,
1658                    row_hash(edge)?,
1659                ))?;
1660                delete_properties.execute([&edge_key])?;
1661                for (key, value) in &edge.properties {
1662                    insert_property.execute((&edge_key, key, value))?;
1663                }
1664            }
1665        }
1666        tx.commit()?;
1667        Ok(())
1668    }
1669
1670    pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1671        self.conn
1672            .query_row(
1673                r#"
1674                SELECT projection_version, content_hash, source_watermark
1675                FROM graph_projection_versions
1676                WHERE scope = ?1
1677                "#,
1678                [scope],
1679                |row| {
1680                    Ok(SqliteProjectionVersion {
1681                        projection_version: row.get(0)?,
1682                        content_hash: row.get(1)?,
1683                        source_watermark: row.get(2)?,
1684                    })
1685                },
1686            )
1687            .optional()
1688            .map_err(Into::into)
1689    }
1690
1691    pub fn update_projection_source_watermark(
1692        &mut self,
1693        scope: &str,
1694        source_watermark: Option<String>,
1695    ) -> Result<()> {
1696        self.conn.execute(
1697            r#"
1698            UPDATE graph_projection_versions
1699            SET source_watermark = ?2
1700            WHERE scope = ?1
1701            "#,
1702            (scope, source_watermark),
1703        )?;
1704        Ok(())
1705    }
1706
1707    pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1708        let pruned_tombstones = if prune_tombstones {
1709            self.conn.execute("DELETE FROM graph_tombstones", [])?
1710        } else {
1711            0
1712        };
1713        self.conn.execute_batch(
1714            r#"
1715            PRAGMA wal_checkpoint(TRUNCATE);
1716            VACUUM;
1717            "#,
1718        )?;
1719        let nodes = self
1720            .conn
1721            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1722                row.get::<_, i64>(0)
1723            })?;
1724        let edges = self
1725            .conn
1726            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1727                row.get::<_, i64>(0)
1728            })?;
1729        let tombstone_nodes = self.conn.query_row(
1730            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1731            [],
1732            |row| row.get::<_, i64>(0),
1733        )?;
1734        let tombstone_edges = self.conn.query_row(
1735            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1736            [],
1737            |row| row.get::<_, i64>(0),
1738        )?;
1739        let file_size_bytes = sqlite_database_size_bytes(&self.conn)
1740            .ok()
1741            .map(|value| value as i64);
1742        let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
1743            .ok()
1744            .map(|value| value as i64);
1745        self.conn.execute(
1746            r#"
1747            INSERT INTO graph_operator_stats
1748                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1749            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
1750            ON CONFLICT(scope) DO UPDATE SET
1751                nodes = excluded.nodes,
1752                edges = excluded.edges,
1753                tombstone_nodes = excluded.tombstone_nodes,
1754                tombstone_edges = excluded.tombstone_edges,
1755                file_size_bytes = excluded.file_size_bytes,
1756                freelist_bytes = excluded.freelist_bytes,
1757                observed_at_unix = excluded.observed_at_unix
1758            "#,
1759            (
1760                scope,
1761                nodes,
1762                edges,
1763                tombstone_nodes,
1764                tombstone_edges,
1765                file_size_bytes,
1766                freelist_bytes,
1767            ),
1768        )?;
1769        Ok(pruned_tombstones)
1770    }
1771
1772    fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
1773        let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
1774        let in_clause = placeholders.join(", ");
1775        let sql = format!(
1776            "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
1777             FROM graph_edges e \
1778             WHERE e.from_id IN ({in_clause}) \
1779               AND e.to_id IN ({in_clause}) \
1780             ORDER BY e.from_id, e.kind, e.to_id"
1781        );
1782        let values: Vec<Value> = node_ids
1783            .iter()
1784            .chain(node_ids.iter())
1785            .map(|id| Value::Text(id.clone()))
1786            .collect();
1787        let mut stmt = self.conn.prepare(&sql)?;
1788        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
1789    }
1790}
1791
1792fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
1793    let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
1794    collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
1795        row.get::<_, String>(3)
1796    })?)
1797}
1798
1799fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
1800    let mut diagnostics = vec![format!(
1801        "sqlite query pushdown active; plan: {}",
1802        plan.join(" | ")
1803    )];
1804    for expected_index in expected_indexes {
1805        if plan.iter().any(|row| row.contains(expected_index)) {
1806            diagnostics.push(format!("sqlite query plan uses {expected_index}"));
1807        } else {
1808            diagnostics.push(format!(
1809                "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
1810            ));
1811        }
1812    }
1813    diagnostics
1814}
1815
1816#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1817pub struct TerseDiagnostic {
1818    pub code: &'static str,
1819    #[serde(skip_serializing_if = "Option::is_none")]
1820    pub index: Option<String>,
1821}
1822
1823#[allow(dead_code)]
1824fn terse_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<TerseDiagnostic> {
1825    let mut diagnostics = vec![TerseDiagnostic {
1826        code: "plan_active",
1827        index: None,
1828    }];
1829    for expected_index in expected_indexes {
1830        if plan.iter().any(|row| row.contains(expected_index)) {
1831            diagnostics.push(TerseDiagnostic {
1832                code: "idx_ok",
1833                index: Some(expected_index.to_string()),
1834            });
1835        } else {
1836            diagnostics.push(TerseDiagnostic {
1837                code: "idx_missing",
1838                index: Some(expected_index.to_string()),
1839            });
1840        }
1841    }
1842    diagnostics
1843}
1844
1845fn push_sqlite_property_filter_exists(
1846    sql: &mut String,
1847    values: &mut Vec<Value>,
1848    node_alias: &str,
1849    filters: &[GraphPropertyFilter],
1850) {
1851    for (index, filter) in filters.iter().enumerate() {
1852        sql.push_str(&format!(
1853            r#"
1854            AND EXISTS (
1855                SELECT 1
1856                FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
1857                WHERE p{index}.node_id = {node_alias}.id
1858                  AND p{index}.key = ?
1859                  AND p{index}.value = ?
1860            )
1861            "#
1862        ));
1863        values.push(Value::Text(filter.key.clone()));
1864        values.push(Value::Text(filter.value.clone()));
1865    }
1866}
1867
1868fn push_sqlite_edge_property_filter_exists(
1869    sql: &mut String,
1870    values: &mut Vec<Value>,
1871    edge_alias: &str,
1872    filters: &[GraphPropertyFilter],
1873) {
1874    for (index, filter) in filters.iter().enumerate() {
1875        sql.push_str(&format!(
1876            r#"
1877            AND EXISTS (
1878                SELECT 1
1879                FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
1880                WHERE ep{index}.edge_key = {edge_alias}.edge_key
1881                  AND ep{index}.key = ?
1882                  AND ep{index}.value = ?
1883            )
1884            "#
1885        ));
1886        values.push(Value::Text(filter.key.clone()));
1887        values.push(Value::Text(filter.value.clone()));
1888    }
1889}
1890
1891struct SqliteIncidentEdgeBranch<'a> {
1892    index_name: &'a str,
1893    endpoint_column: &'a str,
1894    node_id: &'a str,
1895    kind: Option<&'a str>,
1896    filters: &'a [GraphPropertyFilter],
1897    cursor: Option<&'a str>,
1898}
1899
1900fn push_sqlite_incident_edge_branch(
1901    sql: &mut String,
1902    values: &mut Vec<Value>,
1903    branch: SqliteIncidentEdgeBranch<'_>,
1904) {
1905    sql.push_str(&format!(
1906        r#"
1907        SELECT
1908            e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
1909        FROM graph_edges e INDEXED BY {index_name}
1910        WHERE e.{endpoint_column} = ?
1911        "#,
1912        index_name = branch.index_name,
1913        endpoint_column = branch.endpoint_column,
1914    ));
1915    values.push(Value::Text(branch.node_id.to_string()));
1916    if let Some(kind) = branch.kind {
1917        sql.push_str(" AND e.kind = ?");
1918        values.push(Value::Text(kind.to_string()));
1919    }
1920    push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
1921    if let Some(cursor) = branch.cursor {
1922        sql.push_str(" AND e.edge_key > ?");
1923        values.push(Value::Text(cursor.to_string()));
1924    }
1925}
1926
1927fn sqlite_incident_edges_union_query(
1928    node_id: &str,
1929    kind: Option<&str>,
1930    filters: &[GraphPropertyFilter],
1931    cursor: Option<&str>,
1932    limit: Option<usize>,
1933) -> (String, Vec<Value>) {
1934    let mut sql = String::from(
1935        r#"
1936        SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1937        FROM (
1938        "#,
1939    );
1940    let mut values = Vec::new();
1941    push_sqlite_incident_edge_branch(
1942        &mut sql,
1943        &mut values,
1944        SqliteIncidentEdgeBranch {
1945            index_name: "idx_graph_edges_from_kind",
1946            endpoint_column: "from_id",
1947            node_id,
1948            kind,
1949            filters,
1950            cursor,
1951        },
1952    );
1953    sql.push_str(" UNION ");
1954    push_sqlite_incident_edge_branch(
1955        &mut sql,
1956        &mut values,
1957        SqliteIncidentEdgeBranch {
1958            index_name: "idx_graph_edges_to_kind",
1959            endpoint_column: "to_id",
1960            node_id,
1961            kind,
1962            filters,
1963            cursor,
1964        },
1965    );
1966    sql.push_str(
1967        r#"
1968        ) e
1969        ORDER BY e.edge_key
1970        "#,
1971    );
1972    if let Some(limit) = limit {
1973        sql.push_str(" LIMIT ?");
1974        values.push(Value::Integer(limit.saturating_add(1) as i64));
1975    }
1976    (sql, values)
1977}
1978
1979impl GraphStore for SqliteGraphStore {
1980    fn upsert_node(&self, node: &GraphNode) -> Result<()> {
1981        self.conn.execute(
1982            r#"
1983            INSERT INTO graph_nodes
1984                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1985            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1986            ON CONFLICT(id) DO UPDATE SET
1987                kind = excluded.kind,
1988                label = excluded.label,
1989                properties_json = excluded.properties_json,
1990                provenance_json = excluded.provenance_json,
1991                freshness_json = excluded.freshness_json,
1992                row_hash = excluded.row_hash,
1993                source_watermark = excluded.source_watermark
1994            "#,
1995            (
1996                &node.id,
1997                &node.kind,
1998                &node.label,
1999                to_json(&node.properties)?,
2000                to_json(&node.provenance)?,
2001                optional_to_json(&node.freshness)?,
2002                row_hash(node)?,
2003            ),
2004        )?;
2005        replace_node_properties(&self.conn, &node.id, &node.properties)?;
2006        Ok(())
2007    }
2008
2009    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
2010        let edge_key = graph_edge_id(edge);
2011        self.conn.execute(
2012            r#"
2013            INSERT INTO graph_edges
2014                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
2015            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
2016            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
2017                edge_key = excluded.edge_key,
2018                properties_json = excluded.properties_json,
2019                provenance_json = excluded.provenance_json,
2020                freshness_json = excluded.freshness_json,
2021                row_hash = excluded.row_hash,
2022                source_watermark = excluded.source_watermark
2023            "#,
2024            (
2025                &edge_key,
2026                &edge.from_id,
2027                &edge.to_id,
2028                &edge.kind,
2029                to_json(&edge.properties)?,
2030                to_json(&edge.provenance)?,
2031                optional_to_json(&edge.freshness)?,
2032                row_hash(edge)?,
2033            ),
2034        )?;
2035        replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
2036        Ok(())
2037    }
2038
2039    fn delete_node(&self, id: &str) -> Result<usize> {
2040        self.conn
2041            .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
2042            .map_err(Into::into)
2043    }
2044
2045    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
2046        self.conn
2047            .execute(
2048                "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
2049                (from_id, to_id, kind),
2050            )
2051            .map_err(Into::into)
2052    }
2053
2054    fn node(&self, id: &str) -> Result<Option<GraphNode>> {
2055        self.conn
2056            .query_row(
2057                r#"
2058                SELECT id, kind, label, properties_json, provenance_json, freshness_json
2059                FROM graph_nodes
2060                WHERE id = ?1
2061                "#,
2062                [id],
2063                node_from_row,
2064            )
2065            .optional()
2066            .map_err(Into::into)
2067    }
2068
2069    fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2070        let mut stmt = self.conn.prepare(
2071            r#"
2072            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2073            FROM graph_nodes
2074            ORDER BY id
2075            "#,
2076        )?;
2077        collect_rows(stmt.query_map([], node_from_row)?)
2078    }
2079
2080    fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2081        let mut stmt = self.conn.prepare(
2082            r#"
2083            SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2084            FROM graph_edges
2085            ORDER BY from_id, kind, to_id
2086            "#,
2087        )?;
2088        collect_rows(stmt.query_map([], edge_from_row)?)
2089    }
2090
2091    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2092        self.conn
2093            .query_row(
2094                r#"
2095                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2096                FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2097                WHERE edge_key = ?1
2098                "#,
2099                [edge_id],
2100                edge_from_row,
2101            )
2102            .optional()
2103            .map_err(Into::into)
2104    }
2105
2106    fn graph_counts(&self) -> Result<(usize, usize)> {
2107        let nodes = self
2108            .conn
2109            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2110                row.get::<_, usize>(0)
2111            })?;
2112        let edges = self
2113            .conn
2114            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2115                row.get::<_, usize>(0)
2116            })?;
2117        Ok((nodes, edges))
2118    }
2119
2120    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2121        match kind {
2122            Some(kind) => self
2123                .conn
2124                .query_row(
2125                    r#"
2126                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2127                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2128                    WHERE from_id <> to_id AND kind = ?1
2129                    ORDER BY from_id, kind, to_id
2130                    LIMIT 1
2131                    "#,
2132                    [kind],
2133                    edge_from_row,
2134                )
2135                .optional()
2136                .map_err(Into::into),
2137            None => self
2138                .conn
2139                .query_row(
2140                    r#"
2141                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2142                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2143                    WHERE from_id <> to_id
2144                    ORDER BY from_id, kind, to_id
2145                    LIMIT 1
2146                    "#,
2147                    [],
2148                    edge_from_row,
2149                )
2150                .optional()
2151                .map_err(Into::into),
2152        }
2153    }
2154
2155    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2156        self.conn
2157            .query_row(
2158                r#"
2159                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2160                       ep.key, ep.value
2161                FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2162                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2163                  ON e.edge_key = ep.edge_key
2164                WHERE e.from_id <> e.to_id
2165                ORDER BY ep.key, ep.value, ep.edge_key
2166                LIMIT 1
2167                "#,
2168                [],
2169                |row| {
2170                    Ok((
2171                        edge_from_row(row)?,
2172                        GraphPropertyFilter {
2173                            key: row.get(7)?,
2174                            value: row.get(8)?,
2175                        },
2176                    ))
2177                },
2178            )
2179            .optional()
2180            .map_err(Into::into)
2181    }
2182
2183    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2184        let mut stmt = self.conn.prepare(
2185            r#"
2186            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2187            FROM graph_nodes
2188            WHERE kind = ?1
2189            ORDER BY id
2190            "#,
2191        )?;
2192        collect_rows(stmt.query_map([kind], node_from_row)?)
2193    }
2194
2195    fn paged_nodes_by_kind(
2196        &self,
2197        kind: &str,
2198        options: GraphQueryOptions,
2199    ) -> Result<GraphPagedSubgraph> {
2200        let mut sql = String::from(
2201            r#"
2202            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2203            FROM graph_nodes
2204            WHERE kind = ?
2205            "#,
2206        );
2207        let mut values = vec![Value::Text(kind.to_string())];
2208        push_sqlite_property_filter_exists(
2209            &mut sql,
2210            &mut values,
2211            "graph_nodes",
2212            &options.property_filters,
2213        );
2214        if let Some(cursor) = &options.cursor {
2215            sql.push_str(" AND id > ?");
2216            values.push(Value::Text(cursor.clone()));
2217        }
2218        sql.push_str(" ORDER BY id");
2219        if let Some(limit) = options.limit {
2220            sql.push_str(" LIMIT ?");
2221            values.push(Value::Integer(limit.saturating_add(1) as i64));
2222        }
2223
2224        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2225        let mut stmt = self.conn.prepare(&sql)?;
2226        let mut nodes =
2227            collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2228        let before_limit = nodes.len();
2229        let mut next_cursor = None;
2230        if let Some(limit) = options.limit
2231            && nodes.len() > limit
2232        {
2233            next_cursor = nodes
2234                .get(limit.saturating_sub(1))
2235                .map(|node| node.id.clone());
2236            nodes.truncate(limit);
2237        }
2238        let expected_indexes = if options.property_filters.is_empty() {
2239            vec!["idx_graph_nodes_kind"]
2240        } else {
2241            vec![
2242                "idx_graph_nodes_kind",
2243                "idx_graph_node_properties_key_value_node",
2244            ]
2245        };
2246        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2247        if !options.property_filters.is_empty() {
2248            diagnostics.push(
2249                "property filters were evaluated by SQLite materialized property rows before paging"
2250                    .to_string(),
2251            );
2252        }
2253        if options.cursor.is_some() {
2254            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2255        }
2256        if next_cursor.is_some() {
2257            diagnostics.push(
2258                "result was truncated; pass page.next_cursor as --cursor for the next page"
2259                    .to_string(),
2260            );
2261        }
2262        Ok(GraphPagedSubgraph {
2263            page: GraphQueryPage {
2264                cursor: options.cursor,
2265                limit: options.limit,
2266                next_cursor,
2267                returned_nodes: nodes.len(),
2268                returned_edges: 0,
2269                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2270                diagnostics,
2271            },
2272            nodes,
2273            edges: Vec::new(),
2274        })
2275    }
2276
2277    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2278        match kind {
2279            Some(kind) => {
2280                let mut stmt = self.conn.prepare(
2281                    r#"
2282                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2283                    FROM graph_edges
2284                    WHERE from_id = ?1 AND kind = ?2
2285                    ORDER BY to_id, kind
2286                    "#,
2287                )?;
2288                collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2289            }
2290            None => {
2291                let mut stmt = self.conn.prepare(
2292                    r#"
2293                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2294                    FROM graph_edges
2295                    WHERE from_id = ?1
2296                    ORDER BY to_id, kind
2297                    "#,
2298                )?;
2299                collect_rows(stmt.query_map([from_id], edge_from_row)?)
2300            }
2301        }
2302    }
2303
2304    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2305        let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2306        let mut stmt = self.conn.prepare(&sql)?;
2307        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2308    }
2309
2310    fn paged_edges(
2311        &self,
2312        kind: Option<&str>,
2313        options: GraphQueryOptions,
2314    ) -> Result<GraphPagedSubgraph> {
2315        let primary_property_filter = options.property_filters.first();
2316        let mut values = Vec::new();
2317        let mut sql = if let Some(filter) = primary_property_filter {
2318            values.push(Value::Text(filter.key.clone()));
2319            values.push(Value::Text(filter.value.clone()));
2320            String::from(
2321                r#"
2322                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2323                FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2324                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2325                  ON e.edge_key = ep0.edge_key
2326                WHERE ep0.key = ?
2327                  AND ep0.value = ?
2328                "#,
2329            )
2330        } else {
2331            String::from(
2332                r#"
2333                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2334                FROM graph_edges e
2335                WHERE 1 = 1
2336                "#,
2337            )
2338        };
2339        if let Some(kind) = kind {
2340            sql.push_str(" AND e.kind = ?");
2341            values.push(Value::Text(kind.to_string()));
2342        }
2343        push_sqlite_edge_property_filter_exists(
2344            &mut sql,
2345            &mut values,
2346            "e",
2347            if primary_property_filter.is_some() {
2348                &options.property_filters[1..]
2349            } else {
2350                &options.property_filters
2351            },
2352        );
2353        if let Some(cursor) = &options.cursor {
2354            if primary_property_filter.is_some() {
2355                sql.push_str(" AND ep0.edge_key > ?");
2356            } else {
2357                sql.push_str(" AND e.edge_key > ?");
2358            }
2359            values.push(Value::Text(cursor.clone()));
2360        }
2361        if primary_property_filter.is_some() {
2362            sql.push_str(" ORDER BY ep0.edge_key");
2363        } else {
2364            sql.push_str(" ORDER BY e.edge_key");
2365        }
2366        if let Some(limit) = options.limit {
2367            sql.push_str(" LIMIT ?");
2368            values.push(Value::Integer(limit.saturating_add(1) as i64));
2369        }
2370
2371        let primary_property_row_count = if let Some(filter) = primary_property_filter {
2372            Some(self.conn.query_row(
2373                r#"
2374                SELECT COUNT(*)
2375                FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2376                WHERE key = ?1 AND value = ?2
2377                "#,
2378                (&filter.key, &filter.value),
2379                |row| row.get::<_, usize>(0),
2380            )?)
2381        } else {
2382            None
2383        };
2384        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2385        let mut stmt = self.conn.prepare(&sql)?;
2386        let mut edges =
2387            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2388        let before_limit = edges.len();
2389        let mut next_cursor = None;
2390        if let Some(limit) = options.limit
2391            && edges.len() > limit
2392        {
2393            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2394            edges.truncate(limit);
2395        }
2396        let expected_indexes = if options.property_filters.is_empty() {
2397            vec!["idx_graph_edges_edge_key"]
2398        } else {
2399            vec![
2400                "idx_graph_edge_properties_key_value_edge",
2401                "idx_graph_edges_edge_key",
2402            ]
2403        };
2404        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2405        if !options.property_filters.is_empty() {
2406            if let Some(row_count) = primary_property_row_count {
2407                diagnostics.push(format!(
2408                    "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2409                ));
2410            }
2411            diagnostics.push(
2412                "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2413                    .to_string(),
2414            );
2415        }
2416        if options.cursor.is_some() {
2417            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2418        }
2419        if next_cursor.is_some() {
2420            diagnostics.push(
2421                "result was truncated; pass page.next_cursor as --cursor for the next page"
2422                    .to_string(),
2423            );
2424        }
2425        Ok(GraphPagedSubgraph {
2426            page: GraphQueryPage {
2427                cursor: options.cursor,
2428                limit: options.limit,
2429                next_cursor,
2430                returned_nodes: 0,
2431                returned_edges: edges.len(),
2432                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2433                diagnostics,
2434            },
2435            nodes: Vec::new(),
2436            edges,
2437        })
2438    }
2439
2440    fn paged_incident_edges(
2441        &self,
2442        node_id: &str,
2443        kind: Option<&str>,
2444        options: GraphQueryOptions,
2445    ) -> Result<GraphPagedSubgraph> {
2446        let (sql, values) = sqlite_incident_edges_union_query(
2447            node_id,
2448            kind,
2449            &options.property_filters,
2450            options.cursor.as_deref(),
2451            options.limit,
2452        );
2453        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2454        let mut stmt = self.conn.prepare(&sql)?;
2455        let mut edges =
2456            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2457        let before_limit = edges.len();
2458        let mut next_cursor = None;
2459        if let Some(limit) = options.limit
2460            && edges.len() > limit
2461        {
2462            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2463            edges.truncate(limit);
2464        }
2465        let expected_indexes = if options.property_filters.is_empty() {
2466            vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2467        } else {
2468            vec![
2469                "idx_graph_edges_from_kind",
2470                "idx_graph_edges_to_kind",
2471                "idx_graph_edge_properties_key_value_edge",
2472            ]
2473        };
2474        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2475        diagnostics.push(
2476            "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2477                .to_string(),
2478        );
2479        if !options.property_filters.is_empty() {
2480            diagnostics.push(
2481                "edge property filters were evaluated by SQLite materialized property rows before paging"
2482                    .to_string(),
2483            );
2484        }
2485        if options.cursor.is_some() {
2486            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2487        }
2488        if next_cursor.is_some() {
2489            diagnostics.push(
2490                "result was truncated; pass page.next_cursor as --cursor for the next page"
2491                    .to_string(),
2492            );
2493        }
2494        Ok(GraphPagedSubgraph {
2495            page: GraphQueryPage {
2496                cursor: options.cursor,
2497                limit: options.limit,
2498                next_cursor,
2499                returned_nodes: 0,
2500                returned_edges: edges.len(),
2501                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2502                diagnostics,
2503            },
2504            nodes: Vec::new(),
2505            edges,
2506        })
2507    }
2508
2509    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2510        if node_ids.is_empty() {
2511            return Ok(Vec::new());
2512        }
2513        if node_ids.len() <= 20 {
2514            return self.edges_between_nodes_inline(node_ids);
2515        }
2516        self.assert_not_in_temp_table_section();
2517        self.temp_table_active.set(true);
2518        let result = (|| -> Result<Vec<GraphEdge>> {
2519            let tx = self.conn.unchecked_transaction()?;
2520            tx.execute_batch(
2521                r#"
2522                CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
2523                DELETE FROM _edges_between_ids;
2524                "#,
2525            )?;
2526            for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
2527                let row_placeholders: Vec<String> =
2528                    chunk.iter().map(|_| "(?)".to_string()).collect();
2529                let placeholders = row_placeholders.join(", ");
2530                let sql = format!(
2531                    "INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}"
2532                );
2533                let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
2534                tx.execute(&sql, params_from_iter(values.iter()))?;
2535            }
2536            let edges = {
2537                let mut stmt = tx.prepare(
2538                    r#"
2539                    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2540                    FROM graph_edges e
2541                    WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
2542                      AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
2543                    ORDER BY e.from_id, e.kind, e.to_id
2544                    "#,
2545                )?;
2546                collect_rows(stmt.query_map([], edge_from_row)?)?
2547            };
2548            tx.finish()?;
2549            Ok(edges)
2550        })();
2551        self.temp_table_active.set(false);
2552        result
2553    }
2554
2555    fn ranked_neighborhood(
2556        &self,
2557        center_id: &str,
2558        options: &RankedNeighborhoodOptions,
2559    ) -> Result<Option<RankedNeighborhoodResult>> {
2560        if self.node(center_id)?.is_none() {
2561            return Ok(None);
2562        }
2563        let center = self.node(center_id)?.unwrap();
2564
2565        let score_expr = match options.scoring {
2566            tsift_core::NeighborhoodScoring::BreadthFirst => {
2567                "MAX(0, 120 - (walk.depth * 18))".to_string()
2568            }
2569            tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
2570                "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
2571                 WHEN 'semantic_relation' THEN 34 \
2572                 WHEN 'mentions_entity' THEN 28 \
2573                 WHEN 'mentions_concept' THEN 28 \
2574                 WHEN 'tagged_entity' THEN 28 \
2575                 WHEN 'tagged_concept' THEN 28 \
2576                 WHEN 'related_concept' THEN 28 \
2577                 WHEN 'mentions' THEN 22 \
2578                 WHEN 'calls' THEN 20 \
2579                 WHEN 'requests_context' THEN 18 \
2580                 WHEN 'scopes_context' THEN 18 \
2581                 WHEN 'scopes_source' THEN 18 \
2582                 WHEN 'explains_result' THEN 18 \
2583                 WHEN 'defines' THEN 12 \
2584                 WHEN 'contains' THEN 12 \
2585                 WHEN 'belongs_to' THEN 12 \
2586                 ELSE 8 END".to_string()
2587            }
2588            tsift_core::NeighborhoodScoring::DegreeWeighted => {
2589                "MAX(0, 120 - (walk.depth * 18)) + CASE \
2590                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
2591                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
2592                 ELSE 0 END"
2593                    .to_string()
2594            }
2595        };
2596
2597        let use_degree_cache = matches!(options.scoring, tsift_core::NeighborhoodScoring::DegreeWeighted);
2598        let degree_cte = if use_degree_cache {
2599            "degree_cache AS ( \
2600             SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
2601             FROM graph_nodes n), "
2602        } else {
2603            ""
2604        };
2605        let mut sql = format!(
2606            r#"
2607            WITH {degree_cte}RECURSIVE walk(id, depth, edge_kind, score) AS (
2608                SELECT ?, 0, '', ?
2609                UNION
2610                SELECT e.to_id, walk.depth + 1, e.kind,
2611            "#,
2612        );
2613        sql.push_str(&format!("    {}\n", score_expr));
2614        sql.push_str(
2615            r#"
2616                FROM walk
2617                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2618                    ON e.from_id = walk.id
2619                WHERE walk.depth < ?
2620            "#,
2621        );
2622        let mut values = vec![
2623            Value::Text(center_id.to_string()),
2624            Value::Integer(i64::MAX),
2625            Value::Integer(options.depth as i64),
2626        ];
2627        if let Some(kind) = &options.edge_kind {
2628            sql.push_str(" AND e.kind = ?");
2629            values.push(Value::Text(kind.clone()));
2630        }
2631        sql.push_str(
2632            r#"
2633            ),
2634            scored_nodes AS (
2635                SELECT walk.id, walk.score,
2636                    n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2637                FROM walk
2638                JOIN graph_nodes n ON n.id = walk.id
2639                GROUP BY walk.id
2640            ),
2641            ranked AS (
2642                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2643                FROM scored_nodes
2644                ORDER BY score DESC, id ASC
2645            ),
2646            kept AS (
2647                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2648                FROM ranked
2649                LIMIT ?
2650            ),
2651            total AS (
2652                SELECT COUNT(*) AS cnt FROM scored_nodes
2653            )
2654            SELECT
2655                'meta' AS row_type,
2656                (SELECT cnt FROM total) AS total_discovered,
2657                0 AS node_id, '' AS node_kind, '' AS node_label,
2658                '' AS node_props, '' AS node_prov, '' AS node_fresh,
2659                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2660                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2661            UNION ALL
2662            SELECT
2663                'node' AS row_type,
2664                0 AS total_discovered,
2665                k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
2666                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2667                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2668            FROM kept k
2669            UNION ALL
2670            SELECT
2671                'edge' AS row_type,
2672                0 AS total_discovered,
2673                '' AS node_id, '' AS node_kind, '' AS node_label,
2674                '' AS node_props, '' AS node_prov, '' AS node_fresh,
2675                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2676            FROM graph_edges e
2677            WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
2678              AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
2679            "#,
2680        );
2681        values.push(Value::Integer(options.max_nodes as i64));
2682
2683        let mut stmt = self.conn.prepare(&sql)?;
2684        let mut nodes = vec![center.clone()];
2685        let mut edges = Vec::new();
2686        let mut total_discovered = 0usize;
2687
2688        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2689            let row_type: String = row.get(0)?;
2690            match row_type.as_str() {
2691                "meta" => Ok(QueryResult::Meta {
2692                    total: row.get::<_, i64>(1)? as usize,
2693                }),
2694                "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
2695                "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
2696                _ => Err(rusqlite::Error::InvalidQuery),
2697            }
2698        })?;
2699        for row_result in rows {
2700            match row_result? {
2701                QueryResult::Meta { total } => {
2702                    total_discovered = total;
2703                }
2704                QueryResult::Node(node) => {
2705                    if node.id != center_id {
2706                        nodes.push(node);
2707                    }
2708                }QueryResult::Edge(edge) => {
2709                    edges.push(edge);
2710                }
2711            }
2712        }
2713
2714        let total_discovered = total_discovered.max(nodes.len());
2715        let pruned_count = total_discovered.saturating_sub(nodes.len());
2716
2717        match options.property_mode {
2718            PropertyMode::Full => {}
2719            PropertyMode::Omit => {
2720                for n in &mut nodes {
2721                    n.properties.clear();
2722                }
2723                for e in &mut edges {
2724                    e.properties.clear();
2725                }
2726            }
2727            PropertyMode::Sample => {
2728                let mut seen_kinds = std::collections::BTreeSet::new();
2729                for n in &mut nodes {
2730                    if !seen_kinds.contains(&n.kind) {
2731                        seen_kinds.insert(n.kind.clone());
2732                    } else {
2733                        n.properties.clear();
2734                    }
2735                }
2736                for e in &mut edges {
2737                    e.properties.clear();
2738                }
2739            }
2740        }
2741
2742        Ok(Some(RankedNeighborhoodResult {
2743            nodes,
2744            edges,
2745            pruned_count,
2746            total_discovered,
2747        }))
2748    }
2749
2750    fn neighborhood(
2751        &self,
2752        center_id: &str,
2753        depth: usize,
2754        kind: Option<&str>,
2755    ) -> Result<Option<GraphSubgraph>> {
2756        self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
2757            .map(|result| {
2758                result.map(|result| {
2759                    GraphSubgraph {
2760                        nodes: result.nodes,
2761                        edges: result.edges,
2762                    }
2763                    .sorted()
2764                })
2765            })
2766    }
2767
2768    fn paged_neighborhood(
2769        &self,
2770        center_id: &str,
2771        depth: usize,
2772        kind: Option<&str>,
2773        options: GraphQueryOptions,
2774    ) -> Result<Option<GraphPagedSubgraph>> {
2775        if self.node(center_id)?.is_none() {
2776            return Ok(None);
2777        }
2778        let mut sql = String::from(
2779            r#"
2780            WITH RECURSIVE walk(id, depth) AS (
2781                SELECT ?, 0
2782                UNION
2783                SELECT e.to_id, walk.depth + 1
2784                FROM walk
2785                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2786                    ON e.from_id = walk.id
2787                WHERE walk.depth < ?
2788            "#,
2789        );
2790        let mut values = vec![
2791            Value::Text(center_id.to_string()),
2792            Value::Integer(depth as i64),
2793        ];
2794        if let Some(kind) = kind {
2795            sql.push_str(" AND e.kind = ?");
2796            values.push(Value::Text(kind.to_string()));
2797        }
2798        sql.push_str(
2799            r#"
2800            ),
2801            filtered_nodes AS (
2802            SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2803            FROM walk
2804            JOIN graph_nodes n ON n.id = walk.id
2805            WHERE 1 = 1
2806            "#,
2807        );
2808        push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
2809        if let Some(cursor) = &options.cursor {
2810            sql.push_str(" AND n.id > ?");
2811            values.push(Value::Text(cursor.clone()));
2812        }
2813        sql.push_str(
2814            r#"
2815            ),
2816            page_nodes AS (
2817                SELECT id, kind, label, properties_json, provenance_json, freshness_json
2818                FROM filtered_nodes
2819                ORDER BY id
2820            "#,
2821        );
2822        if let Some(limit) = options.limit {
2823            sql.push_str(" LIMIT ?");
2824            values.push(Value::Integer(limit.saturating_add(1) as i64));
2825        }
2826        sql.push_str(
2827            r#"
2828            ),
2829            walk_edges AS (
2830                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2831                FROM walk
2832                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2833                    ON e.from_id = walk.id
2834                WHERE walk.depth < ?
2835            "#,
2836        );
2837        values.push(Value::Integer(depth as i64));
2838        if let Some(kind) = kind {
2839            sql.push_str(" AND e.kind = ?");
2840            values.push(Value::Text(kind.to_string()));
2841        }
2842        sql.push_str(
2843            r#"
2844            )
2845            SELECT
2846                'node' AS row_type,
2847                p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
2848                NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
2849                NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
2850            FROM page_nodes p
2851            UNION ALL
2852            SELECT DISTINCT
2853                'edge' AS row_type,
2854                NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
2855                NULL AS provenance_json, NULL AS freshness_json,
2856                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2857            FROM walk_edges e
2858            WHERE e.from_id IN (SELECT id FROM page_nodes)
2859              AND e.to_id IN (SELECT id FROM page_nodes)
2860            "#,
2861        );
2862
2863        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2864        let mut stmt = self.conn.prepare(&sql)?;
2865        let mut nodes = Vec::new();
2866        let mut edges = Vec::new();
2867        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2868            let row_type: String = row.get(0)?;
2869            match row_type.as_str() {
2870                "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
2871                "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
2872                _ => Err(rusqlite::Error::InvalidQuery),
2873            }
2874        })?;
2875        for row in rows {
2876            let (node, edge) = row?;
2877            if let Some(node) = node {
2878                nodes.push(node);
2879            }
2880            if let Some(edge) = edge {
2881                edges.push(edge);
2882            }
2883        }
2884        nodes.sort_by(|left, right| left.id.cmp(&right.id));
2885        let before_limit = nodes.len();
2886        let mut next_cursor = None;
2887        if let Some(limit) = options.limit
2888            && nodes.len() > limit
2889        {
2890            next_cursor = nodes
2891                .get(limit.saturating_sub(1))
2892                .map(|node| node.id.clone());
2893            nodes.truncate(limit);
2894        }
2895        let node_ids = nodes
2896            .iter()
2897            .map(|node| node.id.as_str())
2898            .collect::<BTreeSet<_>>();
2899        edges.retain(|edge| {
2900            node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
2901        });
2902        edges.sort_by(|left, right| {
2903            left.from_id
2904                .cmp(&right.from_id)
2905                .then(left.kind.cmp(&right.kind))
2906                .then(left.to_id.cmp(&right.to_id))
2907        });
2908        let expected_indexes = if options.property_filters.is_empty() {
2909            vec!["idx_graph_edges_from_kind"]
2910        } else {
2911            vec![
2912                "idx_graph_edges_from_kind",
2913                "idx_graph_node_properties_key_value_node",
2914            ]
2915        };
2916        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2917        diagnostics.push(
2918            "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
2919        );
2920        if !options.property_filters.is_empty() {
2921            diagnostics.push(
2922                "property filters were evaluated by SQLite materialized property rows before paging"
2923                    .to_string(),
2924            );
2925        }
2926        if options.cursor.is_some() {
2927            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2928        }
2929        if next_cursor.is_some() {
2930            diagnostics.push(
2931                "result was truncated; pass page.next_cursor as --cursor for the next page"
2932                    .to_string(),
2933            );
2934        }
2935        Ok(Some(GraphPagedSubgraph {
2936            page: GraphQueryPage {
2937                cursor: options.cursor,
2938                limit: options.limit,
2939                next_cursor,
2940                returned_nodes: nodes.len(),
2941                returned_edges: edges.len(),
2942                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2943                diagnostics,
2944            },
2945            nodes,
2946            edges,
2947        }))
2948    }
2949
2950    fn shortest_path(
2951        &self,
2952        from_id: &str,
2953        to_id: &str,
2954        kind: Option<&str>,
2955    ) -> Result<Option<GraphPath>> {
2956        self.shortest_path_with_max_hops(from_id, to_id, kind, None)
2957    }
2958
2959    fn shortest_path_with_max_hops(
2960        &self,
2961        from_id: &str,
2962        to_id: &str,
2963        kind: Option<&str>,
2964        max_hops: Option<usize>,
2965    ) -> Result<Option<GraphPath>> {
2966        if from_id == to_id {
2967            return Ok(Some(GraphPath {
2968                nodes: vec![from_id.to_string()],
2969                hops: 0,
2970            }));
2971        }
2972        let hop_limit = max_hops.unwrap_or(usize::MAX);
2973        if hop_limit == 0 {
2974            return Ok(None);
2975        }
2976
2977        self.assert_not_in_temp_table_section();
2978        self.temp_table_active.set(true);
2979        let result = (|| -> Result<Option<GraphPath>> {
2980            let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
2981        let tbl = format!("_tsift_frontier_{call_id}");
2982
2983        let mut visited = BTreeSet::from([from_id.to_string()]);
2984        let mut parent = BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
2985        let mut frontier = vec![from_id.to_string()];
2986        self.conn.execute_batch(&format!(
2987            r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
2988               DELETE FROM "{tbl}";"#,
2989        ))?;
2990        let select_sql = if kind.is_some() {
2991            format!(
2992                r#"SELECT e.from_id, e.to_id
2993                   FROM "{tbl}" f
2994                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2995                       ON e.from_id = f.id
2996                   WHERE e.kind = ?
2997                   ORDER BY e.from_id, e.to_id, e.kind"#,
2998            )
2999        } else {
3000            format!(
3001                r#"SELECT e.from_id, e.to_id
3002                   FROM "{tbl}" f
3003                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3004                       ON e.from_id = f.id
3005                   ORDER BY e.from_id, e.to_id, e.kind"#,
3006            )
3007        };
3008        let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
3009        let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
3010        let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
3011        let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
3012        let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
3013        let mut found_path: Option<GraphPath> = None;
3014        for _depth in 0..hop_limit {
3015            if frontier.is_empty() {
3016                break;
3017            }
3018            self.conn.execute(&delete_sql, [])?;
3019            for id in &frontier {
3020                frontier_insert_stmt.execute([id.as_str()])?;
3021            }
3022            let mut next_frontier = BTreeSet::new();
3023            let rows = if let Some(kind) = kind {
3024                collect_rows(frontier_select_stmt.query_map([kind], |row| {
3025                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3026                })?)?
3027            } else {
3028                collect_rows(frontier_select_stmt.query_map([], |row| {
3029                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3030                })?)?
3031            };
3032            for (from, next) in rows {
3033                if !visited.insert(next.clone()) {
3034                    continue;
3035                }
3036                parent.insert(next.clone(), from);
3037                if next == to_id {
3038                    let mut nodes = vec![to_id.to_string()];
3039                    let mut cursor = to_id;
3040                    while let Some(previous) = parent.get(cursor) {
3041                        if previous.is_empty() {
3042                            break;
3043                        }
3044                        nodes.push(previous.clone());
3045                        cursor = previous;
3046                    }
3047                    nodes.reverse();
3048                    found_path = Some(GraphPath {
3049                        hops: nodes.len().saturating_sub(1),
3050                        nodes,
3051                    });
3052                    break;
3053                }
3054                next_frontier.insert(next);
3055            }
3056            if found_path.is_some() {
3057                break;
3058            }
3059            frontier = next_frontier.into_iter().collect();
3060        }
3061        let _ = self.conn.execute_batch(&drop_sql);
3062        Ok(found_path)
3063        })();
3064        self.temp_table_active.set(false);
3065        result
3066    }
3067
3068    fn reachable_nodes_by_kind(
3069        &self,
3070        from_id: &str,
3071        kind: &str,
3072        depth: usize,
3073        limit: usize,
3074    ) -> Result<Vec<(GraphNode, GraphPath)>> {
3075        Ok(self
3076            .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3077            .remove(kind)
3078            .unwrap_or_default())
3079    }
3080
3081    fn reachable_nodes_by_kinds(
3082        &self,
3083        from_id: &str,
3084        kinds: &[&str],
3085        depth: usize,
3086        limit: usize,
3087    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3088        let mut requested = kinds
3089            .iter()
3090            .map(|kind| (*kind).to_string())
3091            .collect::<BTreeSet<_>>()
3092            .into_iter()
3093            .collect::<Vec<_>>();
3094        let mut results = requested
3095            .iter()
3096            .map(|kind| (kind.clone(), Vec::new()))
3097            .collect::<BTreeMap<_, _>>();
3098        if requested.is_empty() {
3099            return Ok(results);
3100        }
3101        requested.sort();
3102        let placeholders = std::iter::repeat_n("?", requested.len())
3103            .collect::<Vec<_>>()
3104            .join(", ");
3105        let mut sql = format!(
3106            r#"
3107            WITH RECURSIVE walk(id, depth, path) AS (
3108                SELECT ?, 0, char(31) || ? || char(31)
3109                UNION ALL
3110                SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3111                FROM walk
3112                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3113                    ON e.from_id = walk.id
3114                WHERE walk.depth < ?
3115                  AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3116            ),
3117            ranked AS (
3118                SELECT
3119                    n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3120                    walk.path, walk.depth,
3121                    ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3122                FROM walk
3123                JOIN graph_nodes n ON n.id = walk.id
3124                WHERE n.kind IN ({placeholders}) AND n.id <> ?
3125            ),
3126            kind_ranked AS (
3127                SELECT *,
3128                    ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3129                FROM ranked
3130                WHERE rn = 1
3131            )
3132            SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3133            FROM kind_ranked
3134            "#,
3135        );
3136        let mut values = vec![
3137            Value::Text(from_id.to_string()),
3138            Value::Text(from_id.to_string()),
3139            Value::Integer(depth as i64),
3140        ];
3141        values.extend(requested.iter().cloned().map(Value::Text));
3142        values.push(Value::Text(from_id.to_string()));
3143        if limit > 0 && limit != usize::MAX {
3144            sql.push_str(" WHERE kind_rank <= ?");
3145            values.push(Value::Integer(limit as i64));
3146        }
3147        sql.push_str(" ORDER BY kind, depth, label, id");
3148        let mut stmt = self.conn.prepare(&sql)?;
3149        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3150            let node = node_from_row(row)?;
3151            let path: String = row.get(6)?;
3152            let hops: usize = row.get(7)?;
3153            Ok((
3154                node,
3155                GraphPath {
3156                    nodes: path
3157                        .split('\u{1f}')
3158                        .filter(|part| !part.is_empty())
3159                        .map(str::to_string)
3160                        .collect(),
3161                    hops,
3162                },
3163            ))
3164        })?)?;
3165        for (node, path) in rows {
3166            results
3167                .entry(node.kind.clone())
3168                .or_default()
3169                .push((node, path));
3170        }
3171        Ok(results)
3172    }
3173
3174    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3175        if let Some(node) = self.node(target)? {
3176            return Ok(Some(node));
3177        }
3178        if kinds.is_empty() {
3179            return Ok(None);
3180        }
3181
3182        let normalized = target.trim().trim_start_matches('#');
3183        let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3184            .collect::<Vec<_>>()
3185            .join(", ");
3186        let kind_rank = kinds
3187            .iter()
3188            .enumerate()
3189            .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3190            .collect::<Vec<_>>()
3191            .join(" ");
3192        let sql = format!(
3193            r#"
3194            SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3195            FROM graph_nodes n
3196            WHERE n.kind IN ({kind_placeholders})
3197              AND (
3198                EXISTS (
3199                    SELECT 1
3200                    FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3201                    WHERE p_handle.node_id = n.id
3202                      AND p_handle.key = 'handle'
3203                      AND p_handle.value = ?
3204                )
3205                OR EXISTS (
3206                    SELECT 1
3207                    FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3208                    WHERE p_ref.node_id = n.id
3209                      AND p_ref.key = 'ref_id'
3210                      AND p_ref.value = ?
3211                )
3212                OR n.label = ?
3213                OR n.label = ?
3214              )
3215            ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3216            LIMIT 1
3217            "#
3218        );
3219        let mut values = kinds
3220            .iter()
3221            .map(|kind| Value::Text((*kind).to_string()))
3222            .collect::<Vec<_>>();
3223        values.push(Value::Text(target.to_string()));
3224        values.push(Value::Text(normalized.to_string()));
3225        values.push(Value::Text(target.to_string()));
3226        values.push(Value::Text(format!("#{normalized}")));
3227        values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3228        self.conn
3229            .query_row(&sql, params_from_iter(values.iter()), node_from_row)
3230            .optional()
3231            .map_err(Into::into)
3232    }
3233}
3234
3235fn to_json<T: Serialize>(value: &T) -> Result<String> {
3236    serde_json::to_string(value).map_err(Into::into)
3237}
3238
3239fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3240    let payload = serde_json::to_vec(value)?;
3241    Ok(blake3::hash(&payload).to_hex().to_string())
3242}
3243
3244fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3245    value.as_ref().map(to_json).transpose()
3246}
3247
3248fn collect_rows<T>(
3249    rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3250) -> Result<Vec<T>> {
3251    rows.collect::<std::result::Result<Vec<_>, _>>()
3252        .map_err(Into::into)
3253}
3254
3255enum QueryResult {
3256    Meta { total: usize },
3257    Node(GraphNode),
3258    Edge(GraphEdge),
3259}
3260
3261fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3262    let properties_col = offset + 3;
3263    let provenance_col = offset + 4;
3264    let freshness_col = offset + 5;
3265    let properties_json: String = row.get(properties_col)?;
3266    let provenance_json: String = row.get(provenance_col)?;
3267    let freshness_json: Option<String> = row.get(freshness_col)?;
3268    Ok(GraphNode {
3269        id: row.get(offset)?,
3270        kind: row.get(offset + 1)?,
3271        label: row.get(offset + 2)?,
3272        properties: from_json(properties_col, &properties_json)?,
3273        provenance: from_json(provenance_col, &provenance_json)?,
3274        freshness: optional_from_json(freshness_col, freshness_json)?,
3275    })
3276}
3277
3278fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3279    node_from_row_at(row, 0)
3280}
3281
3282fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3283    let properties_col = offset + 4;
3284    let provenance_col = offset + 5;
3285    let freshness_col = offset + 6;
3286    let properties_json: String = row.get(properties_col)?;
3287    let provenance_json: String = row.get(provenance_col)?;
3288    let freshness_json: Option<String> = row.get(freshness_col)?;
3289    Ok(GraphEdge {
3290        id: row.get(offset)?,
3291        from_id: row.get(offset + 1)?,
3292        to_id: row.get(offset + 2)?,
3293        kind: row.get(offset + 3)?,
3294        properties: from_json(properties_col, &properties_json)?,
3295        provenance: from_json(provenance_col, &provenance_json)?,
3296        freshness: optional_from_json(freshness_col, freshness_json)?,
3297    })
3298}
3299
3300fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3301    edge_from_row_at(row, 0)
3302}
3303
3304fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3305    serde_json::from_str(raw)
3306        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3307}
3308
3309fn optional_from_json<T: DeserializeOwned>(
3310    column: usize,
3311    raw: Option<String>,
3312) -> rusqlite::Result<Option<T>> {
3313    raw.map(|value| from_json(column, &value)).transpose()
3314}
3315
3316fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3317    nodes
3318        .iter()
3319        .find(|node| node.kind == "projection_meta")
3320        .and_then(|node| node.properties.get("projection_version").cloned())
3321}
3322
3323fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3324    nodes
3325        .iter()
3326        .find(|node| node.kind == "projection_meta")
3327        .and_then(|node| node.properties.get("content_hash").cloned())
3328}
3329
3330fn unix_now() -> i64 {
3331    std::time::SystemTime::now()
3332        .duration_since(std::time::UNIX_EPOCH)
3333        .map(|duration| duration.as_secs() as i64)
3334        .unwrap_or_default()
3335}
3336
3337fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3338    let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3339    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3340    Ok(page_count.saturating_mul(page_size))
3341}
3342
3343fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3344    let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3345    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3346    Ok(freelist_count.saturating_mul(page_size))
3347}
3348
3349#[cfg(test)]
3350mod tests {
3351    use super::*;
3352
3353    fn sample_provenance() -> GraphProvenance {
3354        GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3355    }
3356
3357    fn sample_projection() -> GraphProjection {
3358        let source = sample_provenance();
3359        GraphProjection {
3360            nodes: vec![
3361                GraphNode::new("doc:livekit", "document", "LiveKit guide")
3362                    .with_property("domain", "livekit")
3363                    .with_provenance(source.clone())
3364                    .with_freshness(GraphFreshness::content_hash("node-hash")),
3365                GraphNode::new("topic:rooms", "topic", "Rooms"),
3366                GraphNode::new("topic:egress", "topic", "Egress"),
3367            ],
3368            edges: vec![
3369                GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3370                    .with_property("confidence", "0.91")
3371                    .with_provenance(source.clone())
3372                    .with_freshness(GraphFreshness::content_hash("edge-hash")),
3373                GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3374            ],
3375        }
3376    }
3377
3378    fn assert_projection_store_contract(store: &impl GraphStore) {
3379        let projection = sample_projection();
3380        projection.upsert_into(store).unwrap();
3381
3382        assert_eq!(
3383            store.node("doc:livekit").unwrap(),
3384            projection
3385                .nodes
3386                .iter()
3387                .find(|node| node.id == "doc:livekit")
3388                .cloned()
3389        );
3390        assert_eq!(
3391            store.nodes_by_kind("topic").unwrap(),
3392            vec![
3393                GraphNode::new("topic:egress", "topic", "Egress"),
3394                GraphNode::new("topic:rooms", "topic", "Rooms"),
3395            ]
3396        );
3397
3398        let mentions = store
3399            .outgoing_edges("doc:livekit", Some("mentions"))
3400            .unwrap();
3401        assert_eq!(mentions.len(), 1);
3402        assert_eq!(mentions[0].to_id, "topic:rooms");
3403        assert_eq!(
3404            mentions[0].properties.get("confidence"),
3405            Some(&"0.91".into())
3406        );
3407
3408        let path = store
3409            .shortest_path("doc:livekit", "topic:egress", None)
3410            .unwrap()
3411            .unwrap();
3412        assert_eq!(
3413            path.nodes,
3414            vec!["doc:livekit", "topic:rooms", "topic:egress"]
3415        );
3416    }
3417
3418    #[test]
3419    fn sqlite_store_round_trips_generic_nodes_edges() {
3420        let store = SqliteGraphStore::in_memory().unwrap();
3421        let source = sample_provenance();
3422        let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3423            .with_property("domain", "livekit")
3424            .with_provenance(source.clone())
3425            .with_freshness(GraphFreshness::content_hash("node-hash"));
3426        let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
3427        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3428            .with_property("confidence", "0.91")
3429            .with_provenance(source)
3430            .with_freshness(GraphFreshness::content_hash("edge-hash"));
3431
3432        store.upsert_node(&node).unwrap();
3433        store.upsert_node(&topic).unwrap();
3434        store.upsert_edge(&edge).unwrap();
3435
3436        assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
3437        assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
3438        assert_eq!(store.all_nodes().unwrap().len(), 2);
3439        assert_eq!(store.all_edges().unwrap().len(), 1);
3440        assert_eq!(
3441            store
3442                .outgoing_edges("doc:livekit", Some("mentions"))
3443                .unwrap(),
3444            vec![edge]
3445        );
3446    }
3447
3448    #[test]
3449    fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
3450        let store = SqliteGraphStore::in_memory().unwrap();
3451        for node in [
3452            GraphNode::new("doc:livekit", "document", "LiveKit guide"),
3453            GraphNode::new("topic:rooms", "topic", "Rooms"),
3454            GraphNode::new("topic:egress", "topic", "Egress"),
3455        ] {
3456            store.upsert_node(&node).unwrap();
3457        }
3458        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3459            .with_property("confidence", "0.91");
3460        let edge_id = edge.id.clone();
3461        store.upsert_edge(&edge).unwrap();
3462        store
3463            .upsert_edge(
3464                &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
3465                    .with_property("confidence", "0.42"),
3466            )
3467            .unwrap();
3468
3469        assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
3470        let mut expected_incident_ids = vec![
3471            GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
3472            GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
3473        ];
3474        expected_incident_ids.sort();
3475        assert_eq!(
3476            store
3477                .incident_edges("topic:rooms", None)
3478                .unwrap()
3479                .into_iter()
3480                .map(|edge| edge.id)
3481                .collect::<Vec<_>>(),
3482            expected_incident_ids
3483        );
3484
3485        let page = store
3486            .paged_edges(
3487                Some("mentions"),
3488                GraphQueryOptions {
3489                    property_filters: vec![GraphPropertyFilter {
3490                        key: "confidence".to_string(),
3491                        value: "0.91".to_string(),
3492                    }],
3493                    ..GraphQueryOptions::default()
3494                },
3495            )
3496            .unwrap();
3497        assert_eq!(page.edges.len(), 1);
3498        assert_eq!(page.edges[0].id, edge_id);
3499        assert!(
3500            page.page
3501                .diagnostics
3502                .iter()
3503                .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
3504            "{:?}",
3505            page.page.diagnostics
3506        );
3507        assert!(
3508            page.page
3509                .diagnostics
3510                .iter()
3511                .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
3512            "{:?}",
3513            page.page.diagnostics
3514        );
3515        assert!(
3516            page.page.diagnostics.iter().any(|diagnostic| diagnostic
3517                .contains("edge property primary filter matched 1 materialized row")),
3518            "{:?}",
3519            page.page.diagnostics
3520        );
3521        assert!(
3522            page.page
3523                .diagnostics
3524                .iter()
3525                .any(|diagnostic| diagnostic
3526                    .contains("drives from SQLite materialized property rows")),
3527            "{:?}",
3528            page.page.diagnostics
3529        );
3530
3531        let property_rows: usize = store
3532            .conn
3533            .query_row(
3534                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3535                [],
3536                |row| row.get(0),
3537            )
3538            .unwrap();
3539        assert_eq!(property_rows, 2);
3540    }
3541
3542    #[test]
3543    fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
3544        let sqlite = SqliteGraphStore::in_memory().unwrap();
3545        assert_projection_store_contract(&sqlite);
3546    }
3547
3548    #[test]
3549    fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
3550        fn assert_crud_contract(store: &impl GraphStore) {
3551            let projection = sample_projection();
3552            projection.upsert_into(store).unwrap();
3553
3554            let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
3555            assert_eq!(
3556                neighborhood
3557                    .nodes
3558                    .iter()
3559                    .map(|node| node.id.as_str())
3560                    .collect::<Vec<_>>(),
3561                vec!["doc:livekit", "topic:egress", "topic:rooms"]
3562            );
3563            assert_eq!(
3564                neighborhood
3565                    .edges
3566                    .iter()
3567                    .map(|edge| (
3568                        edge.from_id.as_str(),
3569                        edge.kind.as_str(),
3570                        edge.to_id.as_str()
3571                    ))
3572                    .collect::<Vec<_>>(),
3573                vec![
3574                    ("doc:livekit", "mentions", "topic:rooms"),
3575                    ("topic:rooms", "related_to", "topic:egress"),
3576                ]
3577            );
3578
3579            assert_eq!(
3580                store
3581                    .delete_edge("topic:rooms", "topic:egress", "related_to")
3582                    .unwrap(),
3583                1
3584            );
3585            assert!(
3586                store
3587                    .shortest_path("doc:livekit", "topic:egress", None)
3588                    .unwrap()
3589                    .is_none()
3590            );
3591            assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
3592            assert!(store.node("topic:rooms").unwrap().is_none());
3593            assert!(
3594                store
3595                    .outgoing_edges("doc:livekit", None)
3596                    .unwrap()
3597                    .is_empty()
3598            );
3599        }
3600
3601        assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
3602    }
3603
3604    #[test]
3605    fn sqlite_upsert_projection_batches_rows_and_properties() {
3606        let mut store = SqliteGraphStore::in_memory().unwrap();
3607        let mut projection = sample_projection();
3608        store.upsert_projection(&projection).unwrap();
3609
3610        let page = store
3611            .paged_nodes_by_kind(
3612                "document",
3613                GraphQueryOptions {
3614                    property_filters: vec![GraphPropertyFilter {
3615                        key: "domain".to_string(),
3616                        value: "livekit".to_string(),
3617                    }],
3618                    ..GraphQueryOptions::default()
3619                },
3620            )
3621            .unwrap();
3622        assert_eq!(page.nodes[0].id, "doc:livekit");
3623
3624        projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3625            .with_property("domain", "recording");
3626        store.upsert_projection(&projection).unwrap();
3627
3628        let old_property_count: usize = store
3629            .conn
3630            .query_row(
3631                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
3632                [],
3633                |row| row.get(0),
3634            )
3635            .unwrap();
3636        let updated_page = store
3637            .paged_nodes_by_kind(
3638                "document",
3639                GraphQueryOptions {
3640                    property_filters: vec![GraphPropertyFilter {
3641                        key: "domain".to_string(),
3642                        value: "recording".to_string(),
3643                    }],
3644                    ..GraphQueryOptions::default()
3645                },
3646            )
3647            .unwrap();
3648        assert_eq!(old_property_count, 0);
3649        assert_eq!(updated_page.nodes[0].id, "doc:livekit");
3650        let edge_property_count: usize = store
3651            .conn
3652            .query_row(
3653                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3654                [],
3655                |row| row.get(0),
3656            )
3657            .unwrap();
3658        assert_eq!(edge_property_count, 1);
3659    }
3660
3661    #[test]
3662    fn sqlite_store_filters_edges_by_kind_and_paths() {
3663        let store = SqliteGraphStore::in_memory().unwrap();
3664        for id in ["a", "b", "c"] {
3665            store
3666                .upsert_node(&GraphNode::new(id, "symbol", id))
3667                .unwrap();
3668        }
3669        store
3670            .upsert_edge(&GraphEdge::new("a", "b", "calls"))
3671            .unwrap();
3672        store
3673            .upsert_edge(&GraphEdge::new("a", "c", "documents"))
3674            .unwrap();
3675        store
3676            .upsert_edge(&GraphEdge::new("b", "c", "calls"))
3677            .unwrap();
3678
3679        let calls = store.outgoing_edges("a", Some("calls")).unwrap();
3680        assert_eq!(calls.len(), 1);
3681        assert_eq!(calls[0].to_id, "b");
3682        assert_eq!(store.graph_counts().unwrap(), (3, 3));
3683        assert_eq!(
3684            store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
3685            "b"
3686        );
3687
3688        let path = store
3689            .shortest_path("a", "c", Some("calls"))
3690            .unwrap()
3691            .unwrap();
3692        assert_eq!(path.nodes, vec!["a", "b", "c"]);
3693        assert_eq!(path.hops, 2);
3694
3695        assert!(
3696            store
3697                .shortest_path("c", "a", Some("calls"))
3698                .unwrap()
3699                .is_none()
3700        );
3701    }
3702
3703    #[test]
3704    fn sqlite_store_batches_edges_between_node_sets() {
3705        let store = SqliteGraphStore::in_memory().unwrap();
3706        for id in ["a", "b", "c", "outside"] {
3707            store
3708                .upsert_node(&GraphNode::new(id, "symbol", id))
3709                .unwrap();
3710        }
3711        for edge in [
3712            GraphEdge::new("a", "b", "calls"),
3713            GraphEdge::new("b", "c", "calls"),
3714            GraphEdge::new("a", "outside", "calls"),
3715            GraphEdge::new("outside", "c", "calls"),
3716        ] {
3717            store.upsert_edge(&edge).unwrap();
3718        }
3719
3720        let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
3721            .into_iter()
3722            .collect::<BTreeSet<_>>();
3723        let edge_keys = store
3724            .edges_between_nodes(&scoped)
3725            .unwrap()
3726            .into_iter()
3727            .map(|edge| (edge.from_id, edge.kind, edge.to_id))
3728            .collect::<Vec<_>>();
3729
3730        assert_eq!(
3731            edge_keys,
3732            vec![
3733                ("a".to_string(), "calls".to_string(), "b".to_string()),
3734                ("b".to_string(), "calls".to_string(), "c".to_string()),
3735            ]
3736        );
3737    }
3738
3739    #[test]
3740    fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
3741        let mut store = SqliteGraphStore::in_memory().unwrap();
3742        let mut projection = sample_projection();
3743        projection.nodes.push(
3744            GraphNode::new(
3745                "projection:fixture",
3746                "projection_meta",
3747                "fixture projection",
3748            )
3749            .with_property("projection_version", "fixture-v1")
3750            .with_property("content_hash", "hash-a"),
3751        );
3752        store
3753            .replace_projection_with_version(
3754                "root",
3755                &projection,
3756                Some("fixture-v1"),
3757                Some("commit-a".to_string()),
3758            )
3759            .unwrap();
3760
3761        projection.nodes.retain(|node| node.id != "topic:egress");
3762        projection.edges.retain(|edge| edge.to_id != "topic:egress");
3763        let refresh = store
3764            .replace_projection_with_version(
3765                "root",
3766                &projection,
3767                Some("fixture-v2"),
3768                Some("commit-b".to_string()),
3769            )
3770            .unwrap();
3771
3772        assert_eq!(refresh.projection_version, "fixture-v2");
3773        assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
3774        assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
3775        assert_eq!(refresh.tombstoned_edges.len(), 1);
3776        assert_eq!(refresh.deleted_nodes, 1);
3777        assert_eq!(refresh.deleted_edges, 1);
3778        assert_eq!(refresh.unchanged_nodes, 3);
3779        assert_eq!(refresh.upserted_nodes, 0);
3780        assert_eq!(refresh.unchanged_properties, 4);
3781        assert_eq!(refresh.upserted_properties, 0);
3782        assert_eq!(refresh.deleted_properties, 0);
3783        assert!(
3784            refresh
3785                .phase_timings
3786                .iter()
3787                .any(|phase| phase.name == "sqlite_property_row_staging"),
3788            "{:?}",
3789            refresh.phase_timings
3790        );
3791        assert!(
3792            refresh
3793                .phase_timings
3794                .iter()
3795                .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
3796            "{:?}",
3797            refresh.phase_timings
3798        );
3799        let version = store.projection_version("root").unwrap().unwrap();
3800        assert_eq!(version.projection_version, "fixture-v2");
3801        assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
3802        let cached_counts: (usize, usize, usize, usize) = store
3803            .conn
3804            .query_row(
3805                r#"
3806                SELECT nodes, edges, tombstone_nodes, tombstone_edges
3807                FROM graph_operator_stats
3808                WHERE scope = 'root'
3809                "#,
3810                [],
3811                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3812            )
3813            .unwrap();
3814        assert_eq!(cached_counts, (3, 1, 1, 1));
3815
3816        projection
3817            .nodes
3818            .push(GraphNode::new("topic:egress", "topic", "Egress"));
3819        let refresh = store
3820            .replace_projection_with_version(
3821                "root",
3822                &projection,
3823                Some("fixture-v3"),
3824                Some("commit-c".to_string()),
3825            )
3826            .unwrap();
3827        assert_eq!(refresh.pruned_tombstones, 1);
3828        assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
3829
3830        projection.nodes.retain(|node| node.id != "topic:egress");
3831        store
3832            .replace_projection_with_version(
3833                "root",
3834                &projection,
3835                Some("fixture-v4"),
3836                Some("commit-d".to_string()),
3837            )
3838            .unwrap();
3839        assert_eq!(store.compact_storage("root", true).unwrap(), 2);
3840        let cached_counts: (usize, usize, usize, usize) = store
3841            .conn
3842            .query_row(
3843                r#"
3844                SELECT nodes, edges, tombstone_nodes, tombstone_edges
3845                FROM graph_operator_stats
3846                WHERE scope = 'root'
3847                "#,
3848                [],
3849                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3850            )
3851            .unwrap();
3852        assert_eq!(cached_counts, (3, 1, 0, 0));
3853    }
3854
3855    #[test]
3856    fn sqlite_shortest_path_uses_bounded_frontier() {
3857        let store = SqliteGraphStore::in_memory().unwrap();
3858        for idx in 0..80 {
3859            store
3860                .upsert_node(&GraphNode::new(
3861                    format!("node:{idx:02}"),
3862                    "symbol",
3863                    format!("node {idx:02}"),
3864                ))
3865                .unwrap();
3866        }
3867        for idx in 0..79 {
3868            store
3869                .upsert_edge(&GraphEdge::new(
3870                    format!("node:{idx:02}"),
3871                    format!("node:{:02}", idx + 1),
3872                    "calls",
3873                ))
3874                .unwrap();
3875        }
3876        store
3877            .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
3878            .unwrap();
3879
3880        assert!(
3881            store
3882                .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
3883                .unwrap()
3884                .is_none()
3885        );
3886        let path = store
3887            .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
3888            .unwrap()
3889            .unwrap();
3890        assert_eq!(path.hops, 79);
3891        assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
3892        assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
3893
3894        let direct = store
3895            .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
3896            .unwrap()
3897            .unwrap();
3898        assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
3899    }
3900
3901    #[test]
3902    fn sqlite_resolves_evidence_targets_with_indexed_properties() {
3903        let store = SqliteGraphStore::in_memory().unwrap();
3904        for node in [
3905            GraphNode::new("gbak-refresh", "backlog", "#refresh")
3906                .with_property("ref_id", "refresh")
3907                .with_property("handle", "backlog-handle"),
3908            GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
3909                .with_property("ref_id", "refresh"),
3910            GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
3911                .with_property("ref_id", "refresh"),
3912        ] {
3913            store.upsert_node(&node).unwrap();
3914        }
3915
3916        let by_ref = store
3917            .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
3918            .unwrap()
3919            .unwrap();
3920        assert_eq!(by_ref.id, "gbak-refresh");
3921        let by_handle = store
3922            .resolve_evidence_target("backlog-handle", &["backlog"])
3923            .unwrap()
3924            .unwrap();
3925        assert_eq!(by_handle.id, "gbak-refresh");
3926    }
3927
3928    #[test]
3929    fn sqlite_schema_migration_backfills_materialized_node_properties() {
3930        let conn = Connection::open_in_memory().unwrap();
3931        conn.execute_batch(
3932            r#"
3933            PRAGMA user_version = 2;
3934            CREATE TABLE graph_nodes (
3935                id TEXT PRIMARY KEY,
3936                kind TEXT NOT NULL,
3937                label TEXT NOT NULL,
3938                properties_json TEXT NOT NULL DEFAULT '{}',
3939                provenance_json TEXT NOT NULL DEFAULT '[]',
3940                freshness_json TEXT,
3941                row_hash TEXT,
3942                source_watermark TEXT
3943            );
3944            CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
3945            CREATE TABLE graph_edges (
3946                from_id TEXT NOT NULL,
3947                to_id TEXT NOT NULL,
3948                kind TEXT NOT NULL,
3949                properties_json TEXT NOT NULL DEFAULT '{}',
3950                provenance_json TEXT NOT NULL DEFAULT '[]',
3951                freshness_json TEXT,
3952                row_hash TEXT,
3953                source_watermark TEXT,
3954                PRIMARY KEY (from_id, to_id, kind)
3955            );
3956            CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
3957            CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
3958            CREATE TABLE graph_projection_versions (
3959                scope TEXT PRIMARY KEY,
3960                projection_version TEXT NOT NULL,
3961                content_hash TEXT,
3962                source_watermark TEXT,
3963                observed_at_unix INTEGER NOT NULL
3964            );
3965            CREATE TABLE graph_tombstones (
3966                row_key TEXT PRIMARY KEY,
3967                row_kind TEXT NOT NULL,
3968                deleted_at_unix INTEGER NOT NULL
3969            );
3970            INSERT INTO graph_nodes
3971                (id, kind, label, properties_json, provenance_json)
3972            VALUES
3973                ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
3974                ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]');
3975            INSERT INTO graph_edges
3976                (from_id, to_id, kind, properties_json, provenance_json)
3977            VALUES
3978                ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
3979            "#,
3980        )
3981        .unwrap();
3982
3983        let store = SqliteGraphStore::from_connection(conn).unwrap();
3984        let version: i64 = store
3985            .conn
3986            .pragma_query_value(None, "user_version", |row| row.get(0))
3987            .unwrap();
3988        assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
3989        let property_rows: usize = store
3990            .conn
3991            .query_row(
3992                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
3993                [],
3994                |row| row.get(0),
3995            )
3996            .unwrap();
3997        assert_eq!(property_rows, 2);
3998        let edge_property_rows: usize = store
3999            .conn
4000            .query_row(
4001                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
4002                [],
4003                |row| row.get(0),
4004            )
4005            .unwrap();
4006        assert_eq!(edge_property_rows, 1);
4007        let edge = store
4008            .edge(&GraphEdge::stable_id(
4009                "topic:rooms",
4010                "topic:egress",
4011                "mentions",
4012            ))
4013            .unwrap()
4014            .unwrap();
4015        assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
4016
4017        let page = store
4018            .paged_nodes_by_kind(
4019                "topic",
4020                GraphQueryOptions {
4021                    property_filters: vec![GraphPropertyFilter {
4022                        key: "domain".to_string(),
4023                        value: "livekit".to_string(),
4024                    }],
4025                    ..GraphQueryOptions::default()
4026                },
4027            )
4028            .unwrap();
4029        assert_eq!(page.nodes[0].id, "topic:rooms");
4030        assert!(
4031            page.page
4032                .diagnostics
4033                .iter()
4034                .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
4035            "{:?}",
4036            page.page.diagnostics
4037        );
4038    }
4039
4040    #[test]
4041    fn sqlite_store_batches_reachable_nodes_by_kinds() {
4042        let store = SqliteGraphStore::in_memory().unwrap();
4043        for node in [
4044            GraphNode::new("start", "backlog", "start"),
4045            GraphNode::new("ctx", "worker_context", "context"),
4046            GraphNode::new("src", "source_handle", "source"),
4047            GraphNode::new("sem", "semantic_concept", "concept"),
4048        ] {
4049            store.upsert_node(&node).unwrap();
4050        }
4051        store
4052            .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
4053            .unwrap();
4054        store
4055            .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
4056            .unwrap();
4057        store
4058            .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
4059            .unwrap();
4060
4061        let rows = store
4062            .reachable_nodes_by_kinds(
4063                "start",
4064                &["worker_context", "source_handle", "semantic_concept"],
4065                2,
4066                8,
4067            )
4068            .unwrap();
4069        assert_eq!(rows["worker_context"][0].0.id, "ctx");
4070        assert_eq!(
4071            rows["source_handle"][0].1.nodes,
4072            vec!["start", "ctx", "src"]
4073        );
4074        assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4075    }
4076
4077    #[test]
4078    fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4079        let mut store = SqliteGraphStore::in_memory().unwrap();
4080        let source = GraphProvenance::new("fixture", "bulk");
4081        let mut projection = GraphProjection::default();
4082        for idx in 0..128 {
4083            projection.nodes.push(
4084                GraphNode::new(
4085                    format!("node:{idx:03}"),
4086                    if idx % 2 == 0 { "symbol" } else { "file" },
4087                    format!("bulk node {idx:03}"),
4088                )
4089                .with_property("ordinal", idx.to_string())
4090                .with_provenance(source.clone())
4091                .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4092            );
4093        }
4094        for idx in 0..127 {
4095            projection.edges.push(
4096                GraphEdge::new(
4097                    format!("node:{idx:03}"),
4098                    format!("node:{:03}", idx + 1),
4099                    "next",
4100                )
4101                .with_property("ordinal", idx.to_string())
4102                .with_provenance(source.clone())
4103                .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4104            );
4105        }
4106
4107        store
4108            .replace_projection_with_version(
4109                "root",
4110                &projection,
4111                Some("bulk-v1"),
4112                Some("commit-a".to_string()),
4113            )
4114            .unwrap();
4115
4116        projection
4117            .nodes
4118            .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4119        projection.edges.retain(|edge| {
4120            !edge.from_id.ends_with("000")
4121                && !edge.to_id.ends_with("000")
4122                && !edge.from_id.ends_with("064")
4123                && !edge.to_id.ends_with("064")
4124        });
4125        let refresh = store
4126            .replace_projection_with_version(
4127                "root",
4128                &projection,
4129                Some("bulk-v2"),
4130                Some("commit-b".to_string()),
4131            )
4132            .unwrap();
4133
4134        assert_eq!(store.all_nodes().unwrap().len(), 126);
4135        assert_eq!(store.all_edges().unwrap().len(), 124);
4136        assert_eq!(
4137            refresh.tombstoned_nodes,
4138            vec!["node:000".to_string(), "node:064".to_string()]
4139        );
4140        assert_eq!(refresh.tombstoned_edges.len(), 3);
4141        assert_eq!(refresh.deleted_nodes, 2);
4142        assert_eq!(refresh.deleted_edges, 3);
4143        assert_eq!(refresh.unchanged_nodes, 126);
4144        assert_eq!(refresh.unchanged_edges, 124);
4145        assert_eq!(refresh.upserted_nodes, 0);
4146        assert_eq!(refresh.upserted_edges, 0);
4147        assert_eq!(refresh.unchanged_properties, 250);
4148        assert_eq!(refresh.upserted_properties, 0);
4149        assert!(
4150            refresh
4151                .phase_timings
4152                .iter()
4153                .any(|phase| phase.name == "sqlite_node_staging"
4154                    && phase.detail.contains("126 unchanged skipped")
4155                    && phase.detail.contains("multi-row chunks up to 500 rows")),
4156            "{:?}",
4157            refresh.phase_timings
4158        );
4159        assert!(
4160            refresh
4161                .phase_timings
4162                .iter()
4163                .any(|phase| phase.name == "sqlite_node_staging"
4164                    && phase.detail.contains("124 unchanged skipped")),
4165            "{:?}",
4166            refresh.phase_timings
4167        );
4168        let staged_node_properties: usize = store
4169            .conn
4170            .query_row(
4171                "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4172                [],
4173                |row| row.get(0),
4174            )
4175            .unwrap();
4176        let staged_edge_properties: usize = store
4177            .conn
4178            .query_row(
4179                "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4180                [],
4181                |row| row.get(0),
4182            )
4183            .unwrap();
4184        assert_eq!(staged_node_properties, 0);
4185        assert_eq!(staged_edge_properties, 0);
4186        assert!(
4187            refresh
4188                .phase_timings
4189                .iter()
4190                .any(|phase| phase.name == "sqlite_property_row_staging"
4191                    && phase.detail.contains("new/changed node rows")),
4192            "{:?}",
4193            refresh.phase_timings
4194        );
4195        assert!(
4196            refresh
4197                .phase_timings
4198                .iter()
4199                .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4200                    && phase.detail.contains("new/changed edge rows")),
4201            "{:?}",
4202            refresh.phase_timings
4203        );
4204        assert_eq!(
4205            store
4206                .projection_version("root")
4207                .unwrap()
4208                .unwrap()
4209                .source_watermark
4210                .as_deref(),
4211            Some("commit-b")
4212        );
4213    }
4214
4215    #[test]
4216    fn sqlite_reentrant_temp_table_guard_panics() {
4217        let store = SqliteGraphStore::in_memory().unwrap();
4218        store.temp_table_active.set(true);
4219        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4220            store.assert_not_in_temp_table_section();
4221        }));
4222        assert!(result.is_err());
4223    }
4224
4225    #[test]
4226    fn sqlite_temp_table_guard_clears_after_method() {
4227        let mut store = SqliteGraphStore::in_memory().unwrap();
4228        let projection = GraphProjection {
4229            nodes: vec![],
4230            edges: vec![],
4231        };
4232        store.replace_projection(&projection).unwrap();
4233        assert!(!store.temp_table_active.get());
4234    }
4235
4236    #[test]
4237    fn derive_ontology_summarizes_types_and_relations() {
4238        let mut store = SqliteGraphStore::in_memory().unwrap();
4239        let seed = GraphProjection {
4240            nodes: vec![
4241                GraphNode::new("fn:a", "function", "a"),
4242                GraphNode::new("fn:b", "function", "b"),
4243                GraphNode::new("mod:m", "module", "m"),
4244            ],
4245            edges: vec![
4246                GraphEdge::new("fn:a", "fn:b", "calls"),
4247                GraphEdge::new("mod:m", "fn:a", "contains"),
4248            ],
4249        };
4250        store.upsert_projection(&seed).unwrap();
4251
4252        let onto = store.derive_ontology().unwrap();
4253        let type_kinds: std::collections::BTreeSet<_> =
4254            onto.nodes.iter().map(|n| n.label.clone()).collect();
4255        assert!(type_kinds.contains("function"));
4256        assert!(type_kinds.contains("module"));
4257        assert!(onto.nodes.iter().all(|n| n.kind == "ontology_type"));
4258
4259        let rel: std::collections::BTreeSet<_> = onto
4260            .edges
4261            .iter()
4262            .map(|e| (e.from_id.clone(), e.kind.clone(), e.to_id.clone()))
4263            .collect();
4264        assert!(rel.contains(&(
4265            "ontology_type:function".into(),
4266            "ontology_relation:calls".into(),
4267            "ontology_type:function".into()
4268        )));
4269        assert!(rel.contains(&(
4270            "ontology_type:module".into(),
4271            "ontology_relation:contains".into(),
4272            "ontology_type:function".into()
4273        )));
4274
4275        let function_node = onto.nodes.iter().find(|n| n.label == "function").unwrap();
4276        assert_eq!(function_node.properties.get("instance_count").unwrap(), "2");
4277
4278        // Idempotent: upserting the ontology then re-deriving excludes ontology rows.
4279        store.upsert_projection(&onto).unwrap();
4280        let onto2 = store.derive_ontology().unwrap();
4281        assert!(onto2.nodes.iter().all(|n| n.kind == "ontology_type"));
4282        assert_eq!(onto2.nodes.len(), onto.nodes.len());
4283        assert_eq!(onto2.edges.len(), onto.edges.len());
4284    }
4285}