Skip to main content

tsift_sqlite/
lib.rs

1use anyhow::{Context, Result, bail};
2use rusqlite::{
3    Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4    types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::collections::{BTreeMap, BTreeSet};
8use std::ffi::OsString;
9use std::path::{Path, PathBuf};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12pub use tsift_core::{
13    ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
14    ConvexRowsGraphClient, GraphEdge, GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath,
15    GraphProjection, GraphPropertyFilter, GraphProvenance, GraphQueryOptions, GraphQueryPage,
16    GraphStore, GraphSubgraph, SQLITE_GRAPH_SCHEMA_VERSION, apply_graph_edge_query_page,
17    apply_graph_query_page, graph_edge_id, shortest_path_using_outgoing, stable_graph_edge_id,
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum ReadOnlyRecovery {
23    SnapshotFallback,
24    SnapshotFallbackWal,
25}
26
27pub fn read_only_snapshot_recovery(
28    db_path: &Path,
29    err: &anyhow::Error,
30) -> Option<ReadOnlyRecovery> {
31    if !error_mentions_locked_db(err) {
32        return None;
33    }
34    if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
35        Some(ReadOnlyRecovery::SnapshotFallbackWal)
36    } else if rollback_journal_path(db_path).exists() {
37        Some(ReadOnlyRecovery::SnapshotFallback)
38    } else {
39        None
40    }
41}
42
43pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
44    let mut journal = db_path.as_os_str().to_os_string();
45    journal.push("-journal");
46    PathBuf::from(journal)
47}
48
49pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
50    let mut wal = db_path.as_os_str().to_os_string();
51    wal.push("-wal");
52    PathBuf::from(wal)
53}
54
55pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
56    let mut shm = db_path.as_os_str().to_os_string();
57    shm.push("-shm");
58    PathBuf::from(shm)
59}
60
61pub fn copy_read_only_snapshot(
62    db_path: &Path,
63    default_stem: &str,
64) -> Result<(PathBuf, Vec<PathBuf>)> {
65    let snapshot_path = snapshot_copy_path(db_path, default_stem);
66    std::fs::copy(db_path, &snapshot_path).with_context(|| {
67        format!(
68            "copying locked db {} to snapshot {}",
69            db_path.display(),
70            snapshot_path.display()
71        )
72    })?;
73    let mut cleanup_paths = vec![snapshot_path.clone()];
74    copy_optional_snapshot_sidecar(
75        &wal_sidecar_path(db_path),
76        &wal_sidecar_path(&snapshot_path),
77        &mut cleanup_paths,
78    )?;
79    copy_optional_snapshot_sidecar(
80        &shared_memory_sidecar_path(db_path),
81        &shared_memory_sidecar_path(&snapshot_path),
82        &mut cleanup_paths,
83    )?;
84    Ok((snapshot_path, cleanup_paths))
85}
86
87pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
88    err.chain()
89        .any(|cause| cause.to_string().contains("database is locked"))
90}
91
92fn copy_optional_snapshot_sidecar(
93    source_path: &Path,
94    snapshot_path: &Path,
95    cleanup_paths: &mut Vec<PathBuf>,
96) -> Result<()> {
97    match std::fs::copy(source_path, snapshot_path) {
98        Ok(_) => {
99            cleanup_paths.push(snapshot_path.to_path_buf());
100            Ok(())
101        }
102        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
103        Err(err) => Err(err).with_context(|| {
104            format!(
105                "copying SQLite sidecar {} to snapshot {}",
106                source_path.display(),
107                snapshot_path.display()
108            )
109        }),
110    }
111}
112
113fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
114    let nanos = SystemTime::now()
115        .duration_since(UNIX_EPOCH)
116        .unwrap_or(Duration::ZERO)
117        .as_nanos();
118    let stem = db_path
119        .file_stem()
120        .and_then(|stem| stem.to_str())
121        .unwrap_or(default_stem);
122    let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
123    file_name.push(".db");
124    std::env::temp_dir().join(file_name)
125}
126const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 256;
127const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 50;
128
129pub struct SqliteGraphStore {
130    conn: Connection,
131    _snapshot_copy: Option<SnapshotCopyGuard>,
132    read_only_recovery: Option<ReadOnlyRecovery>,
133}
134
135pub struct SqliteReadOnlyConnection {
136    conn: Connection,
137    _snapshot_copy: Option<SnapshotCopyGuard>,
138    recovery: Option<ReadOnlyRecovery>,
139}
140
141impl SqliteReadOnlyConnection {
142    pub fn conn(&self) -> &Connection {
143        &self.conn
144    }
145
146    pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
147        self.recovery
148    }
149}
150
151struct SnapshotCopyGuard {
152    paths: Vec<PathBuf>,
153}
154
155impl Drop for SnapshotCopyGuard {
156    fn drop(&mut self) {
157        for path in &self.paths {
158            let _ = std::fs::remove_file(path);
159        }
160    }
161}
162
163fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
164    conn.busy_timeout(Duration::from_secs(5))?;
165    conn.pragma_update(None, "journal_mode", "WAL")?;
166    let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
167    if mode.to_lowercase() != "wal" {
168        bail!(
169            "graph substrate db {} requires WAL mode for concurrent reads, got {}",
170            db_path.display(),
171            mode
172        );
173    }
174    conn.pragma_update(
175        None,
176        "wal_autocheckpoint",
177        SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
178    )?;
179    let checkpoint_pages: i64 =
180        conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
181    if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
182        bail!(
183            "graph substrate db {} requires wal_autocheckpoint={}, got {}",
184            db_path.display(),
185            SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
186            checkpoint_pages
187        );
188    }
189    Ok(())
190}
191
192fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
193    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
194    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
195    for row in rows {
196        if row? == column {
197            return Ok(true);
198        }
199    }
200    Ok(false)
201}
202
203fn add_column_if_missing(
204    conn: &Connection,
205    table: &str,
206    column: &str,
207    definition: &str,
208) -> Result<()> {
209    if !sqlite_column_exists(conn, table, column)? {
210        conn.execute(
211            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
212            [],
213        )?;
214    }
215    Ok(())
216}
217
218fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
219    if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
220        return Ok(());
221    }
222    let rows = {
223        let mut stmt = conn.prepare(
224            r#"
225            SELECT from_id, to_id, kind
226            FROM graph_edges
227            WHERE edge_key IS NULL OR edge_key = ''
228            ORDER BY from_id, kind, to_id
229            "#,
230        )?;
231        collect_rows(stmt.query_map([], |row| {
232            Ok((
233                row.get::<_, String>(0)?,
234                row.get::<_, String>(1)?,
235                row.get::<_, String>(2)?,
236            ))
237        })?)?
238    };
239    let mut update = conn.prepare(
240        r#"
241        UPDATE graph_edges
242        SET edge_key = ?4
243        WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
244        "#,
245    )?;
246    for (from_id, to_id, kind) in rows {
247        update.execute((
248            &from_id,
249            &to_id,
250            &kind,
251            stable_graph_edge_id(&from_id, &to_id, &kind),
252        ))?;
253    }
254    Ok(())
255}
256
257fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
258    if old_version < 2 {
259        add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
260        add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
261        add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
262        add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
263    }
264    if old_version < 3 {
265        rebuild_graph_node_properties(conn)?;
266    }
267    if old_version < 4 {
268        ensure_sqlite_graph_operator_stats_schema(conn)?;
269    }
270    if old_version < 5 {
271        add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
272        backfill_graph_edge_keys(conn)?;
273        ensure_sqlite_graph_edge_properties_schema(conn)?;
274        rebuild_graph_edge_properties(conn)?;
275    }
276    Ok(())
277}
278
279fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
280    conn.execute_batch(
281        r#"
282        CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
283            ON graph_nodes(kind, label, id);
284        CREATE TABLE IF NOT EXISTS graph_operator_stats (
285            scope TEXT PRIMARY KEY,
286            nodes INTEGER NOT NULL,
287            edges INTEGER NOT NULL,
288            tombstone_nodes INTEGER NOT NULL,
289            tombstone_edges INTEGER NOT NULL,
290            file_size_bytes INTEGER,
291            freelist_bytes INTEGER,
292            observed_at_unix INTEGER NOT NULL
293        );
294        "#,
295    )?;
296    Ok(())
297}
298
299fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
300    conn.execute_batch(
301        r#"
302        CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
303            ON graph_edges(edge_key);
304        CREATE TABLE IF NOT EXISTS graph_edge_properties (
305            edge_key TEXT NOT NULL,
306            key TEXT NOT NULL,
307            value TEXT NOT NULL,
308            PRIMARY KEY (edge_key, key),
309            FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
310        );
311        CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
312            ON graph_edge_properties(key, value, edge_key);
313        "#,
314    )?;
315    Ok(())
316}
317
318fn replace_node_properties(
319    conn: &Connection,
320    node_id: &str,
321    properties: &BTreeMap<String, String>,
322) -> Result<()> {
323    conn.execute(
324        "DELETE FROM graph_node_properties WHERE node_id = ?1",
325        [node_id],
326    )?;
327    let mut insert = conn.prepare(
328        r#"
329        INSERT INTO graph_node_properties (node_id, key, value)
330        VALUES (?1, ?2, ?3)
331        ON CONFLICT(node_id, key) DO UPDATE SET
332            value = excluded.value
333        "#,
334    )?;
335    for (key, value) in properties {
336        insert.execute((node_id, key, value))?;
337    }
338    Ok(())
339}
340
341fn replace_edge_properties(
342    conn: &Connection,
343    edge_key: &str,
344    properties: &BTreeMap<String, String>,
345) -> Result<()> {
346    conn.execute(
347        "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
348        [edge_key],
349    )?;
350    let mut insert = conn.prepare(
351        r#"
352        INSERT INTO graph_edge_properties (edge_key, key, value)
353        VALUES (?1, ?2, ?3)
354        ON CONFLICT(edge_key, key) DO UPDATE SET
355            value = excluded.value
356        "#,
357    )?;
358    for (key, value) in properties {
359        insert.execute((edge_key, key, value))?;
360    }
361    Ok(())
362}
363
364fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
365    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
366        return Ok(());
367    }
368    conn.execute_batch(
369        r#"
370        DELETE FROM graph_node_properties;
371        INSERT INTO graph_node_properties (node_id, key, value)
372        SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
373        FROM graph_nodes, json_each(graph_nodes.properties_json)
374        WHERE json_each.key IS NOT NULL
375          AND json_each.value IS NOT NULL
376        "#,
377    )?;
378    Ok(())
379}
380
381fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
382    if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
383        || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
384    {
385        return Ok(());
386    }
387    conn.execute_batch(
388        r#"
389        DELETE FROM graph_edge_properties;
390        INSERT INTO graph_edge_properties (edge_key, key, value)
391        SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
392        FROM graph_edges, json_each(graph_edges.properties_json)
393        WHERE graph_edges.edge_key IS NOT NULL
394          AND graph_edges.edge_key <> ''
395          AND json_each.key IS NOT NULL
396          AND json_each.value IS NOT NULL
397        "#,
398    )?;
399    Ok(())
400}
401
402pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
403    let conn = Connection::open_with_flags(
404        db_path,
405        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
406    )
407    .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
408    conn.busy_timeout(Duration::from_secs(5))?;
409    Ok(SqliteReadOnlyConnection {
410        conn,
411        _snapshot_copy: None,
412        recovery: None,
413    })
414}
415
416pub fn open_graph_read_only_connection_resilient(
417    db_path: &Path,
418) -> Result<SqliteReadOnlyConnection> {
419    match open_graph_read_only_connection(db_path).and_then(|connection| {
420        connection
421            .conn
422            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
423        Ok(connection)
424    }) {
425        Ok(connection) => Ok(connection),
426        Err(err) => match read_only_snapshot_recovery(db_path, &err) {
427            Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
428            None => Err(err),
429        },
430    }
431}
432
433fn open_graph_read_only_snapshot(
434    db_path: &Path,
435    recovery: ReadOnlyRecovery,
436) -> Result<SqliteReadOnlyConnection> {
437    let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
438    let conn = Connection::open_with_flags(
439        &snapshot_path,
440        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
441    )
442    .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
443    conn.busy_timeout(Duration::from_secs(5))?;
444    Ok(SqliteReadOnlyConnection {
445        conn,
446        _snapshot_copy: Some(SnapshotCopyGuard {
447            paths: cleanup_paths,
448        }),
449        recovery: Some(recovery),
450    })
451}
452
453#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
454pub struct SqliteProjectionRefreshPhase {
455    pub name: String,
456    pub duration_micros: u128,
457    pub detail: String,
458}
459
460#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
461pub struct SqliteProjectionRefresh {
462    pub scope: String,
463    pub projection_version: String,
464    pub source_watermark: Option<String>,
465    pub tombstoned_nodes: Vec<String>,
466    pub tombstoned_edges: Vec<String>,
467    pub upserted_nodes: usize,
468    pub upserted_edges: usize,
469    pub unchanged_nodes: usize,
470    pub unchanged_edges: usize,
471    pub upserted_properties: usize,
472    pub unchanged_properties: usize,
473    pub deleted_properties: usize,
474    pub deleted_nodes: usize,
475    pub deleted_edges: usize,
476    pub pruned_tombstones: usize,
477    pub file_size_bytes_before: Option<u64>,
478    pub file_size_bytes_after: Option<u64>,
479    pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
483pub struct SqliteProjectionVersion {
484    pub projection_version: String,
485    pub content_hash: Option<String>,
486    pub source_watermark: Option<String>,
487}
488
489fn sqlite_refresh_phase_timing(
490    name: &str,
491    started: Instant,
492    detail: &str,
493) -> SqliteProjectionRefreshPhase {
494    SqliteProjectionRefreshPhase {
495        name: name.to_string(),
496        duration_micros: started.elapsed().as_micros(),
497        detail: detail.to_string(),
498    }
499}
500
501fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
502    let row = format!(
503        "({})",
504        (0..column_count)
505            .map(|_| "?")
506            .collect::<Vec<_>>()
507            .join(", ")
508    );
509    (0..row_count)
510        .map(|_| row.as_str())
511        .collect::<Vec<_>>()
512        .join(", ")
513}
514
515fn sqlite_stage_projection_nodes(
516    tx: &rusqlite::Transaction<'_>,
517    nodes: &[GraphNode],
518    source_watermark: Option<&str>,
519) -> Result<()> {
520    for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
521        let sql = format!(
522            r#"
523            INSERT INTO next_graph_nodes
524                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
525            VALUES {}
526            "#,
527            sqlite_graph_staging_placeholders(8, chunk.len())
528        );
529        let mut values = Vec::with_capacity(chunk.len() * 8);
530        for node in chunk {
531            values.push(Value::Text(node.id.clone()));
532            values.push(Value::Text(node.kind.clone()));
533            values.push(Value::Text(node.label.clone()));
534            values.push(Value::Text(to_json(&node.properties)?));
535            values.push(Value::Text(to_json(&node.provenance)?));
536            values.push(
537                optional_to_json(&node.freshness)?
538                    .map(Value::Text)
539                    .unwrap_or(Value::Null),
540            );
541            values.push(Value::Text(row_hash(node)?));
542            values.push(
543                source_watermark
544                    .map(|watermark| Value::Text(watermark.to_string()))
545                    .unwrap_or(Value::Null),
546            );
547        }
548        tx.execute(&sql, params_from_iter(values))?;
549    }
550    Ok(())
551}
552
553fn sqlite_stage_projection_edges(
554    tx: &rusqlite::Transaction<'_>,
555    edges: &[GraphEdge],
556    source_watermark: Option<&str>,
557) -> Result<()> {
558    for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
559        let sql = format!(
560            r#"
561            INSERT INTO next_graph_edges
562                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
563            VALUES {}
564            "#,
565            sqlite_graph_staging_placeholders(9, chunk.len())
566        );
567        let mut values = Vec::with_capacity(chunk.len() * 9);
568        for edge in chunk {
569            values.push(Value::Text(graph_edge_id(edge)));
570            values.push(Value::Text(edge.from_id.clone()));
571            values.push(Value::Text(edge.to_id.clone()));
572            values.push(Value::Text(edge.kind.clone()));
573            values.push(Value::Text(to_json(&edge.properties)?));
574            values.push(Value::Text(to_json(&edge.provenance)?));
575            values.push(
576                optional_to_json(&edge.freshness)?
577                    .map(Value::Text)
578                    .unwrap_or(Value::Null),
579            );
580            values.push(Value::Text(row_hash(edge)?));
581            values.push(
582                source_watermark
583                    .map(|watermark| Value::Text(watermark.to_string()))
584                    .unwrap_or(Value::Null),
585            );
586        }
587        tx.execute(&sql, params_from_iter(values))?;
588    }
589    Ok(())
590}
591
592impl SqliteGraphStore {
593    pub fn open(db_path: &Path) -> Result<Self> {
594        if let Some(parent) = db_path.parent() {
595            std::fs::create_dir_all(parent)
596                .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
597        }
598        let conn = Connection::open(db_path)
599            .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
600        configure_writable_graph_connection(&conn, db_path)?;
601        Self::from_connection(conn)
602    }
603
604    pub fn in_memory() -> Result<Self> {
605        let conn = Connection::open_in_memory()?;
606        conn.busy_timeout(Duration::from_secs(5))?;
607        Self::from_connection(conn)
608    }
609
610    pub fn open_read_only(db_path: &Path) -> Result<Self> {
611        let connection = open_graph_read_only_connection(db_path)?;
612        Self::from_read_only_connection(connection)
613    }
614
615    pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
616        let connection = open_graph_read_only_connection_resilient(db_path)?;
617        Self::from_read_only_connection(connection)
618    }
619
620    pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
621        self.read_only_recovery
622    }
623
624    pub fn has_user_triggers(&self) -> Result<bool> {
625        self.conn
626            .query_row(
627                r#"
628                SELECT EXISTS(
629                    SELECT 1
630                    FROM sqlite_master
631                    WHERE type = 'trigger'
632                      AND name NOT LIKE 'sqlite_%'
633                )
634                "#,
635                [],
636                |row| row.get::<_, bool>(0),
637            )
638            .map_err(Into::into)
639    }
640
641    fn from_connection(conn: Connection) -> Result<Self> {
642        conn.pragma_update(None, "foreign_keys", "ON")?;
643        let user_version: i64 =
644            conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
645        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
646            bail!(
647                "graph.db schema version {} is newer than supported version {}",
648                user_version,
649                SQLITE_GRAPH_SCHEMA_VERSION
650            );
651        }
652        conn.execute_batch(
653            r#"
654            CREATE TABLE IF NOT EXISTS graph_nodes (
655                id TEXT PRIMARY KEY,
656                kind TEXT NOT NULL,
657                label TEXT NOT NULL,
658                properties_json TEXT NOT NULL DEFAULT '{}',
659                provenance_json TEXT NOT NULL DEFAULT '[]',
660                freshness_json TEXT,
661                row_hash TEXT,
662                source_watermark TEXT
663            );
664            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
665                ON graph_nodes(kind);
666            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
667                ON graph_nodes(kind, label, id);
668
669            CREATE TABLE IF NOT EXISTS graph_edges (
670                edge_key TEXT NOT NULL UNIQUE,
671                from_id TEXT NOT NULL,
672                to_id TEXT NOT NULL,
673                kind TEXT NOT NULL,
674                properties_json TEXT NOT NULL DEFAULT '{}',
675                provenance_json TEXT NOT NULL DEFAULT '[]',
676                freshness_json TEXT,
677                row_hash TEXT,
678                source_watermark TEXT,
679                PRIMARY KEY (from_id, to_id, kind),
680                FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
681                FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
682            );
683            CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
684                ON graph_edges(from_id, kind);
685            CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
686                ON graph_edges(to_id, kind);
687
688            CREATE TABLE IF NOT EXISTS graph_projection_versions (
689                scope TEXT PRIMARY KEY,
690                projection_version TEXT NOT NULL,
691                content_hash TEXT,
692                source_watermark TEXT,
693                observed_at_unix INTEGER NOT NULL
694            );
695
696            CREATE TABLE IF NOT EXISTS graph_tombstones (
697                row_key TEXT PRIMARY KEY,
698                row_kind TEXT NOT NULL,
699                deleted_at_unix INTEGER NOT NULL
700            );
701
702            CREATE TABLE IF NOT EXISTS graph_node_properties (
703                node_id TEXT NOT NULL,
704                key TEXT NOT NULL,
705                value TEXT NOT NULL,
706                PRIMARY KEY (node_id, key),
707                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
708            );
709            CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
710                ON graph_node_properties(key, value, node_id);
711
712            CREATE TABLE IF NOT EXISTS graph_operator_stats (
713                scope TEXT PRIMARY KEY,
714                nodes INTEGER NOT NULL,
715                edges INTEGER NOT NULL,
716                tombstone_nodes INTEGER NOT NULL,
717                tombstone_edges INTEGER NOT NULL,
718                file_size_bytes INTEGER,
719                freelist_bytes INTEGER,
720                observed_at_unix INTEGER NOT NULL
721            );
722            "#,
723        )?;
724        if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
725            migrate_sqlite_graph_schema(&conn, user_version)?;
726            conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
727        }
728        Ok(Self {
729            conn,
730            _snapshot_copy: None,
731            read_only_recovery: None,
732        })
733    }
734
735    fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
736        connection.conn.pragma_update(None, "foreign_keys", "ON")?;
737        let user_version: i64 =
738            connection
739                .conn
740                .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
741        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
742            bail!(
743                "graph.db schema version {} is newer than supported version {}",
744                user_version,
745                SQLITE_GRAPH_SCHEMA_VERSION
746            );
747        }
748        connection
749            .conn
750            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
751        Ok(Self {
752            conn: connection.conn,
753            _snapshot_copy: connection._snapshot_copy,
754            read_only_recovery: connection.recovery,
755        })
756    }
757
758    pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
759        self.replace_projection_with_version("root", projection, None, None)
760            .map(|_| ())
761    }
762
763    pub fn replace_projection_with_version(
764        &mut self,
765        scope: impl Into<String>,
766        projection: &GraphProjection,
767        projection_version: Option<&str>,
768        source_watermark: Option<String>,
769    ) -> Result<SqliteProjectionRefresh> {
770        let scope = scope.into();
771        let projection_version = projection_version
772            .map(str::to_string)
773            .or_else(|| projection_version_from_nodes(&projection.nodes))
774            .unwrap_or_else(|| "unversioned".to_string());
775        let projection_hash = projection_hash_from_nodes(&projection.nodes);
776        let observed_at_unix = unix_now();
777        let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
778        let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
779        let mut phase_timings = Vec::new();
780
781        let tx = self.conn.transaction()?;
782        let started = Instant::now();
783        tx.execute_batch(
784            r#"
785            CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
786                id TEXT PRIMARY KEY,
787                kind TEXT NOT NULL,
788                label TEXT NOT NULL,
789                properties_json TEXT NOT NULL,
790                provenance_json TEXT NOT NULL,
791                freshness_json TEXT,
792                row_hash TEXT NOT NULL,
793                source_watermark TEXT
794            );
795            CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
796                edge_key TEXT PRIMARY KEY,
797                from_id TEXT NOT NULL,
798                to_id TEXT NOT NULL,
799                kind TEXT NOT NULL,
800                properties_json TEXT NOT NULL,
801                provenance_json TEXT NOT NULL,
802                freshness_json TEXT,
803                row_hash TEXT NOT NULL,
804                source_watermark TEXT
805            );
806            CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
807                ON next_graph_edges(from_id, to_id, kind);
808            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
809                node_id TEXT NOT NULL,
810                key TEXT NOT NULL,
811                value TEXT NOT NULL,
812                PRIMARY KEY (node_id, key)
813            );
814            CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
815                edge_key TEXT NOT NULL,
816                key TEXT NOT NULL,
817                value TEXT NOT NULL,
818                PRIMARY KEY (edge_key, key)
819            );
820            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
821                id TEXT PRIMARY KEY
822            );
823            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
824                edge_key TEXT PRIMARY KEY
825            );
826            DELETE FROM next_graph_nodes;
827            DELETE FROM next_graph_edges;
828            DELETE FROM next_graph_node_properties;
829            DELETE FROM next_graph_edge_properties;
830            DELETE FROM next_graph_changed_nodes;
831            DELETE FROM next_graph_changed_edges;
832            "#,
833        )?;
834        phase_timings.push(sqlite_refresh_phase_timing(
835            "sqlite_temp_table_prepare",
836            started,
837            "create and clear refresh staging tables before row loading",
838        ));
839        {
840            let started = Instant::now();
841            sqlite_stage_projection_nodes(&tx, &projection.nodes, source_watermark.as_deref())?;
842            phase_timings.push(sqlite_refresh_phase_timing(
843                "sqlite_node_staging",
844                started,
845                &format!(
846                    "bulk stage {} graph_nodes rows into temp table using multi-row chunks up to {} rows before delta comparison",
847                    projection.nodes.len(),
848                    SQLITE_GRAPH_STAGING_CHUNK_ROWS
849                ),
850            ));
851        }
852        {
853            let started = Instant::now();
854            sqlite_stage_projection_edges(&tx, &projection.edges, source_watermark.as_deref())?;
855            phase_timings.push(sqlite_refresh_phase_timing(
856                "sqlite_edge_staging",
857                started,
858                &format!(
859                    "bulk stage {} graph_edges rows into temp table using multi-row chunks up to {} rows before delta comparison",
860                    projection.edges.len(),
861                    SQLITE_GRAPH_STAGING_CHUNK_ROWS
862                ),
863            ));
864        }
865        {
866            let started = Instant::now();
867            let changed_nodes_sql = if force_refresh_writes {
868                r#"
869                INSERT INTO next_graph_changed_nodes (id)
870                SELECT id
871                FROM next_graph_nodes
872                "#
873            } else {
874                r#"
875                INSERT INTO next_graph_changed_nodes (id)
876                SELECT n.id
877                FROM next_graph_nodes n
878                LEFT JOIN graph_nodes g ON g.id = n.id
879                WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
880                "#
881            };
882            tx.execute(changed_nodes_sql, [])?;
883            tx.execute_batch(
884                r#"
885                INSERT INTO next_graph_node_properties (node_id, key, value)
886                SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
887                FROM next_graph_nodes n
888                JOIN next_graph_changed_nodes c ON c.id = n.id,
889                     json_each(n.properties_json)
890                WHERE json_each.key IS NOT NULL
891                  AND json_each.value IS NOT NULL
892                "#,
893            )?;
894            phase_timings.push(sqlite_refresh_phase_timing(
895                "sqlite_property_row_staging",
896                started,
897                "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
898            ));
899        }
900        {
901            let started = Instant::now();
902            let changed_edges_sql = if force_refresh_writes {
903                r#"
904                INSERT INTO next_graph_changed_edges (edge_key)
905                SELECT edge_key
906                FROM next_graph_edges
907                "#
908            } else {
909                r#"
910                INSERT INTO next_graph_changed_edges (edge_key)
911                SELECT n.edge_key
912                FROM next_graph_edges n
913                LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
914                WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
915                "#
916            };
917            tx.execute(changed_edges_sql, [])?;
918            tx.execute_batch(
919                r#"
920                INSERT INTO next_graph_edge_properties (edge_key, key, value)
921                SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
922                FROM next_graph_edges e
923                JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
924                     json_each(e.properties_json)
925                WHERE json_each.key IS NOT NULL
926                  AND json_each.value IS NOT NULL
927                "#,
928            )?;
929            phase_timings.push(sqlite_refresh_phase_timing(
930                "sqlite_edge_property_row_staging",
931                started,
932                "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
933            ));
934        }
935
936        let delta_started = Instant::now();
937        let tombstoned_nodes = {
938            let mut stmt = tx.prepare(
939                r#"
940                SELECT g.id
941                FROM graph_nodes g
942                LEFT JOIN next_graph_nodes n ON n.id = g.id
943                WHERE n.id IS NULL
944                ORDER BY g.id
945                "#,
946            )?;
947            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
948        };
949        let tombstoned_edges = {
950            let mut stmt = tx.prepare(
951                r#"
952                SELECT g.edge_key
953                FROM graph_edges g
954                LEFT JOIN next_graph_edges n
955                    ON n.edge_key = g.edge_key
956                WHERE n.edge_key IS NULL
957                ORDER BY g.edge_key
958                "#,
959            )?;
960            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
961        };
962        let unchanged_nodes: usize = tx.query_row(
963            r#"
964            SELECT COUNT(*)
965            FROM next_graph_nodes n
966            JOIN graph_nodes g ON g.id = n.id
967            WHERE g.row_hash = n.row_hash
968            "#,
969            [],
970            |row| row.get(0),
971        )?;
972        let unchanged_edges: usize = tx.query_row(
973            r#"
974            SELECT COUNT(*)
975            FROM next_graph_edges n
976            JOIN graph_edges g
977                ON g.edge_key = n.edge_key
978            WHERE g.row_hash = n.row_hash
979            "#,
980            [],
981            |row| row.get(0),
982        )?;
983        let reused_owner_node_properties: usize = tx.query_row(
984            r#"
985            SELECT COUNT(*)
986            FROM graph_node_properties g
987            JOIN next_graph_nodes n ON n.id = g.node_id
988            LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
989            WHERE c.id IS NULL
990            "#,
991            [],
992            |row| row.get(0),
993        )?;
994        let reused_owner_edge_properties: usize = tx.query_row(
995            r#"
996            SELECT COUNT(*)
997            FROM graph_edge_properties g
998            JOIN next_graph_edges n ON n.edge_key = g.edge_key
999            LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1000            WHERE c.edge_key IS NULL
1001            "#,
1002            [],
1003            |row| row.get(0),
1004        )?;
1005        let unchanged_changed_node_properties: usize = tx.query_row(
1006            r#"
1007            SELECT COUNT(*)
1008            FROM next_graph_node_properties n
1009            JOIN graph_node_properties g
1010                ON g.node_id = n.node_id AND g.key = n.key
1011            WHERE g.value = n.value
1012            "#,
1013            [],
1014            |row| row.get(0),
1015        )?;
1016        let unchanged_changed_edge_properties: usize = tx.query_row(
1017            r#"
1018            SELECT COUNT(*)
1019            FROM next_graph_edge_properties n
1020            JOIN graph_edge_properties g
1021                ON g.edge_key = n.edge_key AND g.key = n.key
1022            WHERE g.value = n.value
1023            "#,
1024            [],
1025            |row| row.get(0),
1026        )?;
1027        let unchanged_properties = reused_owner_node_properties
1028            + reused_owner_edge_properties
1029            + unchanged_changed_node_properties
1030            + unchanged_changed_edge_properties;
1031
1032        let deleted_edges = tx.execute(
1033            r#"
1034            DELETE FROM graph_edges
1035            WHERE NOT EXISTS (
1036                SELECT 1
1037                FROM next_graph_edges n
1038                WHERE n.edge_key = graph_edges.edge_key
1039            )
1040            "#,
1041            [],
1042        )?;
1043        let deleted_nodes = tx.execute(
1044            r#"
1045            DELETE FROM graph_nodes
1046            WHERE NOT EXISTS (
1047                SELECT 1
1048                FROM next_graph_nodes n
1049                WHERE n.id = graph_nodes.id
1050            )
1051            "#,
1052            [],
1053        )?;
1054
1055        let upsert_nodes_sql = r#"
1056            INSERT INTO graph_nodes
1057                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1058            SELECT
1059                n.id,
1060                n.kind,
1061                n.label,
1062                n.properties_json,
1063                n.provenance_json,
1064                n.freshness_json,
1065                n.row_hash,
1066                n.source_watermark
1067            FROM next_graph_nodes n
1068            JOIN next_graph_changed_nodes c ON c.id = n.id
1069            WHERE true
1070            ON CONFLICT(id) DO UPDATE SET
1071                kind = excluded.kind,
1072                label = excluded.label,
1073                properties_json = excluded.properties_json,
1074                provenance_json = excluded.provenance_json,
1075                freshness_json = excluded.freshness_json,
1076                row_hash = excluded.row_hash,
1077                source_watermark = excluded.source_watermark
1078            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1079            "#;
1080        tx.execute(upsert_nodes_sql, [])?;
1081        let upsert_edges_sql = r#"
1082            INSERT INTO graph_edges
1083                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1084            SELECT
1085                n.edge_key,
1086                n.from_id,
1087                n.to_id,
1088                n.kind,
1089                n.properties_json,
1090                n.provenance_json,
1091                n.freshness_json,
1092                n.row_hash,
1093                n.source_watermark
1094            FROM next_graph_edges n
1095            JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1096            WHERE true
1097            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1098                edge_key = excluded.edge_key,
1099                properties_json = excluded.properties_json,
1100                provenance_json = excluded.provenance_json,
1101                freshness_json = excluded.freshness_json,
1102                row_hash = excluded.row_hash,
1103                source_watermark = excluded.source_watermark
1104            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1105            "#;
1106        tx.execute(upsert_edges_sql, [])?;
1107        let deleted_node_properties = tx.execute(
1108            r#"
1109            DELETE FROM graph_node_properties
1110            WHERE EXISTS (
1111                SELECT 1
1112                FROM next_graph_changed_nodes c
1113                WHERE c.id = graph_node_properties.node_id
1114            )
1115              AND NOT EXISTS (
1116                SELECT 1
1117                FROM next_graph_node_properties n
1118                WHERE n.node_id = graph_node_properties.node_id
1119                  AND n.key = graph_node_properties.key
1120            )
1121            "#,
1122            [],
1123        )?;
1124        let deleted_edge_properties = tx.execute(
1125            r#"
1126            DELETE FROM graph_edge_properties
1127            WHERE EXISTS (
1128                SELECT 1
1129                FROM next_graph_changed_edges c
1130                WHERE c.edge_key = graph_edge_properties.edge_key
1131            )
1132              AND NOT EXISTS (
1133                SELECT 1
1134                FROM next_graph_edge_properties n
1135                WHERE n.edge_key = graph_edge_properties.edge_key
1136                  AND n.key = graph_edge_properties.key
1137            )
1138            "#,
1139            [],
1140        )?;
1141        let deleted_properties = deleted_node_properties + deleted_edge_properties;
1142        let upsert_properties_sql = r#"
1143            INSERT INTO graph_node_properties (node_id, key, value)
1144            SELECT n.node_id, n.key, n.value
1145            FROM next_graph_node_properties n
1146            LEFT JOIN graph_node_properties g
1147                ON g.node_id = n.node_id AND g.key = n.key
1148            WHERE g.node_id IS NULL OR g.value IS NOT n.value
1149            ON CONFLICT(node_id, key) DO UPDATE SET
1150                value = excluded.value
1151            WHERE graph_node_properties.value IS NOT excluded.value
1152            "#;
1153        let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1154        let upsert_edge_properties_sql = r#"
1155            INSERT INTO graph_edge_properties (edge_key, key, value)
1156            SELECT n.edge_key, n.key, n.value
1157            FROM next_graph_edge_properties n
1158            LEFT JOIN graph_edge_properties g
1159                ON g.edge_key = n.edge_key AND g.key = n.key
1160            WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1161            ON CONFLICT(edge_key, key) DO UPDATE SET
1162                value = excluded.value
1163            WHERE graph_edge_properties.value IS NOT excluded.value
1164            "#;
1165        let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1166        let upserted_properties = upserted_node_properties + upserted_edge_properties;
1167        tx.execute(
1168            r#"
1169            INSERT INTO graph_projection_versions
1170                (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1171            VALUES (?1, ?2, ?3, ?4, ?5)
1172            ON CONFLICT(scope) DO UPDATE SET
1173                projection_version = excluded.projection_version,
1174                content_hash = excluded.content_hash,
1175                source_watermark = excluded.source_watermark,
1176                observed_at_unix = excluded.observed_at_unix
1177            "#,
1178            (
1179                &scope,
1180                &projection_version,
1181                &projection_hash,
1182                &source_watermark,
1183                observed_at_unix,
1184            ),
1185        )?;
1186        let pruned_node_tombstones = tx.execute(
1187            r#"
1188            DELETE FROM graph_tombstones
1189            WHERE row_kind = 'node'
1190              AND EXISTS (
1191                SELECT 1
1192                FROM next_graph_nodes n
1193                WHERE n.id = substr(graph_tombstones.row_key, 6)
1194              )
1195            "#,
1196            [],
1197        )?;
1198        let pruned_edge_tombstones = tx.execute(
1199            r#"
1200            DELETE FROM graph_tombstones
1201            WHERE row_kind = 'edge'
1202              AND EXISTS (
1203                SELECT 1
1204                FROM next_graph_edges n
1205                WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1206              )
1207            "#,
1208            [],
1209        )?;
1210        {
1211            let mut insert_node_tombstone = tx.prepare(
1212                r#"
1213                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1214                VALUES (?1, 'node', ?2)
1215                ON CONFLICT(row_key) DO UPDATE SET
1216                    row_kind = excluded.row_kind,
1217                    deleted_at_unix = excluded.deleted_at_unix
1218                "#,
1219            )?;
1220            for id in &tombstoned_nodes {
1221                insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1222            }
1223        }
1224        {
1225            let mut insert_edge_tombstone = tx.prepare(
1226                r#"
1227                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1228                VALUES (?1, 'edge', ?2)
1229                ON CONFLICT(row_key) DO UPDATE SET
1230                    row_kind = excluded.row_kind,
1231                    deleted_at_unix = excluded.deleted_at_unix
1232                "#,
1233            )?;
1234            for key in &tombstoned_edges {
1235                insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1236            }
1237        }
1238        let tombstone_node_count: usize = tx.query_row(
1239            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1240            [],
1241            |row| row.get(0),
1242        )?;
1243        let tombstone_edge_count: usize = tx.query_row(
1244            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1245            [],
1246            |row| row.get(0),
1247        )?;
1248        tx.execute(
1249            r#"
1250            INSERT INTO graph_operator_stats
1251                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1252            VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1253            ON CONFLICT(scope) DO UPDATE SET
1254                nodes = excluded.nodes,
1255                edges = excluded.edges,
1256                tombstone_nodes = excluded.tombstone_nodes,
1257                tombstone_edges = excluded.tombstone_edges,
1258                file_size_bytes = excluded.file_size_bytes,
1259                freelist_bytes = excluded.freelist_bytes,
1260                observed_at_unix = excluded.observed_at_unix
1261            "#,
1262            (
1263                &scope,
1264                projection.nodes.len() as i64,
1265                projection.edges.len() as i64,
1266                tombstone_node_count as i64,
1267                tombstone_edge_count as i64,
1268                observed_at_unix,
1269            ),
1270        )?;
1271        phase_timings.push(sqlite_refresh_phase_timing(
1272            "sqlite_delta_write",
1273            delta_started,
1274            "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1275        ));
1276        let commit_started = Instant::now();
1277        tx.commit()?;
1278        phase_timings.push(sqlite_refresh_phase_timing(
1279            "sqlite_commit",
1280            commit_started,
1281            "commit refresh transaction and publish old-or-new graph visibility",
1282        ));
1283        let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1284        let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1285        let stats_started = Instant::now();
1286        self.conn.execute(
1287            r#"
1288            UPDATE graph_operator_stats
1289            SET file_size_bytes = ?2,
1290                freelist_bytes = ?3,
1291                observed_at_unix = ?4
1292            WHERE scope = ?1
1293            "#,
1294            (
1295                &scope,
1296                file_size_bytes_after.map(|value| value as i64),
1297                freelist_bytes_after.map(|value| value as i64),
1298                unix_now(),
1299            ),
1300        )?;
1301        phase_timings.push(sqlite_refresh_phase_timing(
1302            "sqlite_stats_cache_update",
1303            stats_started,
1304            "persist post-commit file and freelist proof for status/doctor",
1305        ));
1306        Ok(SqliteProjectionRefresh {
1307            scope,
1308            projection_version,
1309            source_watermark,
1310            upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1311            upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1312            unchanged_nodes,
1313            unchanged_edges,
1314            upserted_properties,
1315            unchanged_properties,
1316            deleted_properties,
1317            deleted_nodes,
1318            deleted_edges,
1319            pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1320            file_size_bytes_before,
1321            file_size_bytes_after,
1322            tombstoned_nodes,
1323            tombstoned_edges,
1324            phase_timings,
1325        })
1326    }
1327
1328    pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1329        let tx = self.conn.transaction()?;
1330        {
1331            let mut insert_node = tx.prepare(
1332                r#"
1333                INSERT INTO graph_nodes
1334                    (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1335                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1336                ON CONFLICT(id) DO UPDATE SET
1337                    kind = excluded.kind,
1338                    label = excluded.label,
1339                    properties_json = excluded.properties_json,
1340                    provenance_json = excluded.provenance_json,
1341                    freshness_json = excluded.freshness_json,
1342                    row_hash = excluded.row_hash,
1343                    source_watermark = excluded.source_watermark
1344                "#,
1345            )?;
1346            let mut delete_properties =
1347                tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1348            let mut insert_property = tx.prepare(
1349                r#"
1350                INSERT INTO graph_node_properties (node_id, key, value)
1351                VALUES (?1, ?2, ?3)
1352                "#,
1353            )?;
1354            for node in &projection.nodes {
1355                insert_node.execute((
1356                    &node.id,
1357                    &node.kind,
1358                    &node.label,
1359                    to_json(&node.properties)?,
1360                    to_json(&node.provenance)?,
1361                    optional_to_json(&node.freshness)?,
1362                    row_hash(node)?,
1363                ))?;
1364                delete_properties.execute([&node.id])?;
1365                for (key, value) in &node.properties {
1366                    insert_property.execute((&node.id, key, value))?;
1367                }
1368            }
1369        }
1370        {
1371            let mut insert_edge = tx.prepare(
1372                r#"
1373                INSERT INTO graph_edges
1374                    (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1375                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1376                ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1377                    edge_key = excluded.edge_key,
1378                    properties_json = excluded.properties_json,
1379                    provenance_json = excluded.provenance_json,
1380                    freshness_json = excluded.freshness_json,
1381                    row_hash = excluded.row_hash,
1382                    source_watermark = excluded.source_watermark
1383                "#,
1384            )?;
1385            let mut delete_properties =
1386                tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1387            let mut insert_property = tx.prepare(
1388                r#"
1389                INSERT INTO graph_edge_properties (edge_key, key, value)
1390                VALUES (?1, ?2, ?3)
1391                "#,
1392            )?;
1393            for edge in &projection.edges {
1394                let edge_key = graph_edge_id(edge);
1395                insert_edge.execute((
1396                    &edge_key,
1397                    &edge.from_id,
1398                    &edge.to_id,
1399                    &edge.kind,
1400                    to_json(&edge.properties)?,
1401                    to_json(&edge.provenance)?,
1402                    optional_to_json(&edge.freshness)?,
1403                    row_hash(edge)?,
1404                ))?;
1405                delete_properties.execute([&edge_key])?;
1406                for (key, value) in &edge.properties {
1407                    insert_property.execute((&edge_key, key, value))?;
1408                }
1409            }
1410        }
1411        tx.commit()?;
1412        Ok(())
1413    }
1414
1415    pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1416        self.conn
1417            .query_row(
1418                r#"
1419                SELECT projection_version, content_hash, source_watermark
1420                FROM graph_projection_versions
1421                WHERE scope = ?1
1422                "#,
1423                [scope],
1424                |row| {
1425                    Ok(SqliteProjectionVersion {
1426                        projection_version: row.get(0)?,
1427                        content_hash: row.get(1)?,
1428                        source_watermark: row.get(2)?,
1429                    })
1430                },
1431            )
1432            .optional()
1433            .map_err(Into::into)
1434    }
1435
1436    pub fn update_projection_source_watermark(
1437        &mut self,
1438        scope: &str,
1439        source_watermark: Option<String>,
1440    ) -> Result<()> {
1441        self.conn.execute(
1442            r#"
1443            UPDATE graph_projection_versions
1444            SET source_watermark = ?2
1445            WHERE scope = ?1
1446            "#,
1447            (scope, source_watermark),
1448        )?;
1449        Ok(())
1450    }
1451
1452    pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1453        let pruned_tombstones = if prune_tombstones {
1454            self.conn.execute("DELETE FROM graph_tombstones", [])?
1455        } else {
1456            0
1457        };
1458        self.conn.execute_batch(
1459            r#"
1460            PRAGMA wal_checkpoint(TRUNCATE);
1461            VACUUM;
1462            "#,
1463        )?;
1464        let nodes = self
1465            .conn
1466            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1467                row.get::<_, i64>(0)
1468            })?;
1469        let edges = self
1470            .conn
1471            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1472                row.get::<_, i64>(0)
1473            })?;
1474        let tombstone_nodes = self.conn.query_row(
1475            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1476            [],
1477            |row| row.get::<_, i64>(0),
1478        )?;
1479        let tombstone_edges = self.conn.query_row(
1480            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1481            [],
1482            |row| row.get::<_, i64>(0),
1483        )?;
1484        let file_size_bytes = sqlite_database_size_bytes(&self.conn)
1485            .ok()
1486            .map(|value| value as i64);
1487        let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
1488            .ok()
1489            .map(|value| value as i64);
1490        self.conn.execute(
1491            r#"
1492            INSERT INTO graph_operator_stats
1493                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1494            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
1495            ON CONFLICT(scope) DO UPDATE SET
1496                nodes = excluded.nodes,
1497                edges = excluded.edges,
1498                tombstone_nodes = excluded.tombstone_nodes,
1499                tombstone_edges = excluded.tombstone_edges,
1500                file_size_bytes = excluded.file_size_bytes,
1501                freelist_bytes = excluded.freelist_bytes,
1502                observed_at_unix = excluded.observed_at_unix
1503            "#,
1504            (
1505                scope,
1506                nodes,
1507                edges,
1508                tombstone_nodes,
1509                tombstone_edges,
1510                file_size_bytes,
1511                freelist_bytes,
1512            ),
1513        )?;
1514        Ok(pruned_tombstones)
1515    }
1516}
1517
1518fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
1519    let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
1520    collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
1521        row.get::<_, String>(3)
1522    })?)
1523}
1524
1525fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
1526    let mut diagnostics = vec![format!(
1527        "sqlite query pushdown active; plan: {}",
1528        plan.join(" | ")
1529    )];
1530    for expected_index in expected_indexes {
1531        if plan.iter().any(|row| row.contains(expected_index)) {
1532            diagnostics.push(format!("sqlite query plan uses {expected_index}"));
1533        } else {
1534            diagnostics.push(format!(
1535                "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
1536            ));
1537        }
1538    }
1539    diagnostics
1540}
1541
1542fn push_sqlite_property_filter_exists(
1543    sql: &mut String,
1544    values: &mut Vec<Value>,
1545    node_alias: &str,
1546    filters: &[GraphPropertyFilter],
1547) {
1548    for (index, filter) in filters.iter().enumerate() {
1549        sql.push_str(&format!(
1550            r#"
1551            AND EXISTS (
1552                SELECT 1
1553                FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
1554                WHERE p{index}.node_id = {node_alias}.id
1555                  AND p{index}.key = ?
1556                  AND p{index}.value = ?
1557            )
1558            "#
1559        ));
1560        values.push(Value::Text(filter.key.clone()));
1561        values.push(Value::Text(filter.value.clone()));
1562    }
1563}
1564
1565fn push_sqlite_edge_property_filter_exists(
1566    sql: &mut String,
1567    values: &mut Vec<Value>,
1568    edge_alias: &str,
1569    filters: &[GraphPropertyFilter],
1570) {
1571    for (index, filter) in filters.iter().enumerate() {
1572        sql.push_str(&format!(
1573            r#"
1574            AND EXISTS (
1575                SELECT 1
1576                FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
1577                WHERE ep{index}.edge_key = {edge_alias}.edge_key
1578                  AND ep{index}.key = ?
1579                  AND ep{index}.value = ?
1580            )
1581            "#
1582        ));
1583        values.push(Value::Text(filter.key.clone()));
1584        values.push(Value::Text(filter.value.clone()));
1585    }
1586}
1587
1588struct SqliteIncidentEdgeBranch<'a> {
1589    index_name: &'a str,
1590    endpoint_column: &'a str,
1591    node_id: &'a str,
1592    kind: Option<&'a str>,
1593    filters: &'a [GraphPropertyFilter],
1594    cursor: Option<&'a str>,
1595}
1596
1597fn push_sqlite_incident_edge_branch(
1598    sql: &mut String,
1599    values: &mut Vec<Value>,
1600    branch: SqliteIncidentEdgeBranch<'_>,
1601) {
1602    sql.push_str(&format!(
1603        r#"
1604        SELECT
1605            e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
1606        FROM graph_edges e INDEXED BY {index_name}
1607        WHERE e.{endpoint_column} = ?
1608        "#,
1609        index_name = branch.index_name,
1610        endpoint_column = branch.endpoint_column,
1611    ));
1612    values.push(Value::Text(branch.node_id.to_string()));
1613    if let Some(kind) = branch.kind {
1614        sql.push_str(" AND e.kind = ?");
1615        values.push(Value::Text(kind.to_string()));
1616    }
1617    push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
1618    if let Some(cursor) = branch.cursor {
1619        sql.push_str(" AND e.edge_key > ?");
1620        values.push(Value::Text(cursor.to_string()));
1621    }
1622}
1623
1624fn sqlite_incident_edges_union_query(
1625    node_id: &str,
1626    kind: Option<&str>,
1627    filters: &[GraphPropertyFilter],
1628    cursor: Option<&str>,
1629    limit: Option<usize>,
1630) -> (String, Vec<Value>) {
1631    let mut sql = String::from(
1632        r#"
1633        SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1634        FROM (
1635        "#,
1636    );
1637    let mut values = Vec::new();
1638    push_sqlite_incident_edge_branch(
1639        &mut sql,
1640        &mut values,
1641        SqliteIncidentEdgeBranch {
1642            index_name: "idx_graph_edges_from_kind",
1643            endpoint_column: "from_id",
1644            node_id,
1645            kind,
1646            filters,
1647            cursor,
1648        },
1649    );
1650    sql.push_str(" UNION ");
1651    push_sqlite_incident_edge_branch(
1652        &mut sql,
1653        &mut values,
1654        SqliteIncidentEdgeBranch {
1655            index_name: "idx_graph_edges_to_kind",
1656            endpoint_column: "to_id",
1657            node_id,
1658            kind,
1659            filters,
1660            cursor,
1661        },
1662    );
1663    sql.push_str(
1664        r#"
1665        ) e
1666        ORDER BY e.edge_key
1667        "#,
1668    );
1669    if let Some(limit) = limit {
1670        sql.push_str(" LIMIT ?");
1671        values.push(Value::Integer(limit.saturating_add(1) as i64));
1672    }
1673    (sql, values)
1674}
1675
1676impl GraphStore for SqliteGraphStore {
1677    fn upsert_node(&self, node: &GraphNode) -> Result<()> {
1678        self.conn.execute(
1679            r#"
1680            INSERT INTO graph_nodes
1681                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1682            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1683            ON CONFLICT(id) DO UPDATE SET
1684                kind = excluded.kind,
1685                label = excluded.label,
1686                properties_json = excluded.properties_json,
1687                provenance_json = excluded.provenance_json,
1688                freshness_json = excluded.freshness_json,
1689                row_hash = excluded.row_hash,
1690                source_watermark = excluded.source_watermark
1691            "#,
1692            (
1693                &node.id,
1694                &node.kind,
1695                &node.label,
1696                to_json(&node.properties)?,
1697                to_json(&node.provenance)?,
1698                optional_to_json(&node.freshness)?,
1699                row_hash(node)?,
1700            ),
1701        )?;
1702        replace_node_properties(&self.conn, &node.id, &node.properties)?;
1703        Ok(())
1704    }
1705
1706    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
1707        let edge_key = graph_edge_id(edge);
1708        self.conn.execute(
1709            r#"
1710            INSERT INTO graph_edges
1711                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1712            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1713            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1714                edge_key = excluded.edge_key,
1715                properties_json = excluded.properties_json,
1716                provenance_json = excluded.provenance_json,
1717                freshness_json = excluded.freshness_json,
1718                row_hash = excluded.row_hash,
1719                source_watermark = excluded.source_watermark
1720            "#,
1721            (
1722                &edge_key,
1723                &edge.from_id,
1724                &edge.to_id,
1725                &edge.kind,
1726                to_json(&edge.properties)?,
1727                to_json(&edge.provenance)?,
1728                optional_to_json(&edge.freshness)?,
1729                row_hash(edge)?,
1730            ),
1731        )?;
1732        replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
1733        Ok(())
1734    }
1735
1736    fn delete_node(&self, id: &str) -> Result<usize> {
1737        self.conn
1738            .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
1739            .map_err(Into::into)
1740    }
1741
1742    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
1743        self.conn
1744            .execute(
1745                "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
1746                (from_id, to_id, kind),
1747            )
1748            .map_err(Into::into)
1749    }
1750
1751    fn node(&self, id: &str) -> Result<Option<GraphNode>> {
1752        self.conn
1753            .query_row(
1754                r#"
1755                SELECT id, kind, label, properties_json, provenance_json, freshness_json
1756                FROM graph_nodes
1757                WHERE id = ?1
1758                "#,
1759                [id],
1760                node_from_row,
1761            )
1762            .optional()
1763            .map_err(Into::into)
1764    }
1765
1766    fn all_nodes(&self) -> Result<Vec<GraphNode>> {
1767        let mut stmt = self.conn.prepare(
1768            r#"
1769            SELECT id, kind, label, properties_json, provenance_json, freshness_json
1770            FROM graph_nodes
1771            ORDER BY id
1772            "#,
1773        )?;
1774        collect_rows(stmt.query_map([], node_from_row)?)
1775    }
1776
1777    fn all_edges(&self) -> Result<Vec<GraphEdge>> {
1778        let mut stmt = self.conn.prepare(
1779            r#"
1780            SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1781            FROM graph_edges
1782            ORDER BY from_id, kind, to_id
1783            "#,
1784        )?;
1785        collect_rows(stmt.query_map([], edge_from_row)?)
1786    }
1787
1788    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
1789        self.conn
1790            .query_row(
1791                r#"
1792                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1793                FROM graph_edges INDEXED BY idx_graph_edges_edge_key
1794                WHERE edge_key = ?1
1795                "#,
1796                [edge_id],
1797                edge_from_row,
1798            )
1799            .optional()
1800            .map_err(Into::into)
1801    }
1802
1803    fn graph_counts(&self) -> Result<(usize, usize)> {
1804        let nodes = self
1805            .conn
1806            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1807                row.get::<_, usize>(0)
1808            })?;
1809        let edges = self
1810            .conn
1811            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1812                row.get::<_, usize>(0)
1813            })?;
1814        Ok((nodes, edges))
1815    }
1816
1817    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
1818        match kind {
1819            Some(kind) => self
1820                .conn
1821                .query_row(
1822                    r#"
1823                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1824                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
1825                    WHERE from_id <> to_id AND kind = ?1
1826                    ORDER BY from_id, kind, to_id
1827                    LIMIT 1
1828                    "#,
1829                    [kind],
1830                    edge_from_row,
1831                )
1832                .optional()
1833                .map_err(Into::into),
1834            None => self
1835                .conn
1836                .query_row(
1837                    r#"
1838                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1839                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
1840                    WHERE from_id <> to_id
1841                    ORDER BY from_id, kind, to_id
1842                    LIMIT 1
1843                    "#,
1844                    [],
1845                    edge_from_row,
1846                )
1847                .optional()
1848                .map_err(Into::into),
1849        }
1850    }
1851
1852    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
1853        self.conn
1854            .query_row(
1855                r#"
1856                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
1857                       ep.key, ep.value
1858                FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
1859                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
1860                  ON e.edge_key = ep.edge_key
1861                WHERE e.from_id <> e.to_id
1862                ORDER BY ep.key, ep.value, ep.edge_key
1863                LIMIT 1
1864                "#,
1865                [],
1866                |row| {
1867                    Ok((
1868                        edge_from_row(row)?,
1869                        GraphPropertyFilter {
1870                            key: row.get(7)?,
1871                            value: row.get(8)?,
1872                        },
1873                    ))
1874                },
1875            )
1876            .optional()
1877            .map_err(Into::into)
1878    }
1879
1880    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
1881        let mut stmt = self.conn.prepare(
1882            r#"
1883            SELECT id, kind, label, properties_json, provenance_json, freshness_json
1884            FROM graph_nodes
1885            WHERE kind = ?1
1886            ORDER BY id
1887            "#,
1888        )?;
1889        collect_rows(stmt.query_map([kind], node_from_row)?)
1890    }
1891
1892    fn paged_nodes_by_kind(
1893        &self,
1894        kind: &str,
1895        options: GraphQueryOptions,
1896    ) -> Result<GraphPagedSubgraph> {
1897        let mut sql = String::from(
1898            r#"
1899            SELECT id, kind, label, properties_json, provenance_json, freshness_json
1900            FROM graph_nodes
1901            WHERE kind = ?
1902            "#,
1903        );
1904        let mut values = vec![Value::Text(kind.to_string())];
1905        push_sqlite_property_filter_exists(
1906            &mut sql,
1907            &mut values,
1908            "graph_nodes",
1909            &options.property_filters,
1910        );
1911        if let Some(cursor) = &options.cursor {
1912            sql.push_str(" AND id > ?");
1913            values.push(Value::Text(cursor.clone()));
1914        }
1915        sql.push_str(" ORDER BY id");
1916        if let Some(limit) = options.limit {
1917            sql.push_str(" LIMIT ?");
1918            values.push(Value::Integer(limit.saturating_add(1) as i64));
1919        }
1920
1921        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
1922        let mut stmt = self.conn.prepare(&sql)?;
1923        let mut nodes =
1924            collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
1925        let before_limit = nodes.len();
1926        let mut next_cursor = None;
1927        if let Some(limit) = options.limit
1928            && nodes.len() > limit
1929        {
1930            next_cursor = nodes
1931                .get(limit.saturating_sub(1))
1932                .map(|node| node.id.clone());
1933            nodes.truncate(limit);
1934        }
1935        let expected_indexes = if options.property_filters.is_empty() {
1936            vec!["idx_graph_nodes_kind"]
1937        } else {
1938            vec![
1939                "idx_graph_nodes_kind",
1940                "idx_graph_node_properties_key_value_node",
1941            ]
1942        };
1943        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
1944        if !options.property_filters.is_empty() {
1945            diagnostics.push(
1946                "property filters were evaluated by SQLite materialized property rows before paging"
1947                    .to_string(),
1948            );
1949        }
1950        if options.cursor.is_some() {
1951            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
1952        }
1953        if next_cursor.is_some() {
1954            diagnostics.push(
1955                "result was truncated; pass page.next_cursor as --cursor for the next page"
1956                    .to_string(),
1957            );
1958        }
1959        Ok(GraphPagedSubgraph {
1960            page: GraphQueryPage {
1961                cursor: options.cursor,
1962                limit: options.limit,
1963                next_cursor,
1964                returned_nodes: nodes.len(),
1965                returned_edges: 0,
1966                truncated: options.limit.is_some_and(|limit| before_limit > limit),
1967                diagnostics,
1968            },
1969            nodes,
1970            edges: Vec::new(),
1971        })
1972    }
1973
1974    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
1975        match kind {
1976            Some(kind) => {
1977                let mut stmt = self.conn.prepare(
1978                    r#"
1979                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1980                    FROM graph_edges
1981                    WHERE from_id = ?1 AND kind = ?2
1982                    ORDER BY to_id, kind
1983                    "#,
1984                )?;
1985                collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
1986            }
1987            None => {
1988                let mut stmt = self.conn.prepare(
1989                    r#"
1990                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1991                    FROM graph_edges
1992                    WHERE from_id = ?1
1993                    ORDER BY to_id, kind
1994                    "#,
1995                )?;
1996                collect_rows(stmt.query_map([from_id], edge_from_row)?)
1997            }
1998        }
1999    }
2000
2001    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2002        let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2003        let mut stmt = self.conn.prepare(&sql)?;
2004        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2005    }
2006
2007    fn paged_edges(
2008        &self,
2009        kind: Option<&str>,
2010        options: GraphQueryOptions,
2011    ) -> Result<GraphPagedSubgraph> {
2012        let primary_property_filter = options.property_filters.first();
2013        let mut values = Vec::new();
2014        let mut sql = if let Some(filter) = primary_property_filter {
2015            values.push(Value::Text(filter.key.clone()));
2016            values.push(Value::Text(filter.value.clone()));
2017            String::from(
2018                r#"
2019                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2020                FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2021                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2022                  ON e.edge_key = ep0.edge_key
2023                WHERE ep0.key = ?
2024                  AND ep0.value = ?
2025                "#,
2026            )
2027        } else {
2028            String::from(
2029                r#"
2030                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2031                FROM graph_edges e
2032                WHERE 1 = 1
2033                "#,
2034            )
2035        };
2036        if let Some(kind) = kind {
2037            sql.push_str(" AND e.kind = ?");
2038            values.push(Value::Text(kind.to_string()));
2039        }
2040        push_sqlite_edge_property_filter_exists(
2041            &mut sql,
2042            &mut values,
2043            "e",
2044            if primary_property_filter.is_some() {
2045                &options.property_filters[1..]
2046            } else {
2047                &options.property_filters
2048            },
2049        );
2050        if let Some(cursor) = &options.cursor {
2051            if primary_property_filter.is_some() {
2052                sql.push_str(" AND ep0.edge_key > ?");
2053            } else {
2054                sql.push_str(" AND e.edge_key > ?");
2055            }
2056            values.push(Value::Text(cursor.clone()));
2057        }
2058        if primary_property_filter.is_some() {
2059            sql.push_str(" ORDER BY ep0.edge_key");
2060        } else {
2061            sql.push_str(" ORDER BY e.edge_key");
2062        }
2063        if let Some(limit) = options.limit {
2064            sql.push_str(" LIMIT ?");
2065            values.push(Value::Integer(limit.saturating_add(1) as i64));
2066        }
2067
2068        let primary_property_row_count = if let Some(filter) = primary_property_filter {
2069            Some(self.conn.query_row(
2070                r#"
2071                SELECT COUNT(*)
2072                FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2073                WHERE key = ?1 AND value = ?2
2074                "#,
2075                (&filter.key, &filter.value),
2076                |row| row.get::<_, usize>(0),
2077            )?)
2078        } else {
2079            None
2080        };
2081        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2082        let mut stmt = self.conn.prepare(&sql)?;
2083        let mut edges =
2084            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2085        let before_limit = edges.len();
2086        let mut next_cursor = None;
2087        if let Some(limit) = options.limit
2088            && edges.len() > limit
2089        {
2090            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2091            edges.truncate(limit);
2092        }
2093        let expected_indexes = if options.property_filters.is_empty() {
2094            vec!["idx_graph_edges_edge_key"]
2095        } else {
2096            vec![
2097                "idx_graph_edge_properties_key_value_edge",
2098                "idx_graph_edges_edge_key",
2099            ]
2100        };
2101        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2102        if !options.property_filters.is_empty() {
2103            if let Some(row_count) = primary_property_row_count {
2104                diagnostics.push(format!(
2105                    "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2106                ));
2107            }
2108            diagnostics.push(
2109                "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2110                    .to_string(),
2111            );
2112        }
2113        if options.cursor.is_some() {
2114            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2115        }
2116        if next_cursor.is_some() {
2117            diagnostics.push(
2118                "result was truncated; pass page.next_cursor as --cursor for the next page"
2119                    .to_string(),
2120            );
2121        }
2122        Ok(GraphPagedSubgraph {
2123            page: GraphQueryPage {
2124                cursor: options.cursor,
2125                limit: options.limit,
2126                next_cursor,
2127                returned_nodes: 0,
2128                returned_edges: edges.len(),
2129                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2130                diagnostics,
2131            },
2132            nodes: Vec::new(),
2133            edges,
2134        })
2135    }
2136
2137    fn paged_incident_edges(
2138        &self,
2139        node_id: &str,
2140        kind: Option<&str>,
2141        options: GraphQueryOptions,
2142    ) -> Result<GraphPagedSubgraph> {
2143        let (sql, values) = sqlite_incident_edges_union_query(
2144            node_id,
2145            kind,
2146            &options.property_filters,
2147            options.cursor.as_deref(),
2148            options.limit,
2149        );
2150        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2151        let mut stmt = self.conn.prepare(&sql)?;
2152        let mut edges =
2153            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2154        let before_limit = edges.len();
2155        let mut next_cursor = None;
2156        if let Some(limit) = options.limit
2157            && edges.len() > limit
2158        {
2159            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2160            edges.truncate(limit);
2161        }
2162        let expected_indexes = if options.property_filters.is_empty() {
2163            vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2164        } else {
2165            vec![
2166                "idx_graph_edges_from_kind",
2167                "idx_graph_edges_to_kind",
2168                "idx_graph_edge_properties_key_value_edge",
2169            ]
2170        };
2171        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2172        diagnostics.push(
2173            "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2174                .to_string(),
2175        );
2176        if !options.property_filters.is_empty() {
2177            diagnostics.push(
2178                "edge property filters were evaluated by SQLite materialized property rows before paging"
2179                    .to_string(),
2180            );
2181        }
2182        if options.cursor.is_some() {
2183            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2184        }
2185        if next_cursor.is_some() {
2186            diagnostics.push(
2187                "result was truncated; pass page.next_cursor as --cursor for the next page"
2188                    .to_string(),
2189            );
2190        }
2191        Ok(GraphPagedSubgraph {
2192            page: GraphQueryPage {
2193                cursor: options.cursor,
2194                limit: options.limit,
2195                next_cursor,
2196                returned_nodes: 0,
2197                returned_edges: edges.len(),
2198                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2199                diagnostics,
2200            },
2201            nodes: Vec::new(),
2202            edges,
2203        })
2204    }
2205
2206    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2207        if node_ids.is_empty() {
2208            return Ok(Vec::new());
2209        }
2210        let node_ids = node_ids.iter().cloned().collect::<Vec<_>>();
2211        let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
2212        for from_chunk in node_ids.chunks(450) {
2213            let from_placeholders = std::iter::repeat_n("?", from_chunk.len())
2214                .collect::<Vec<_>>()
2215                .join(", ");
2216            for to_chunk in node_ids.chunks(450) {
2217                let to_placeholders = std::iter::repeat_n("?", to_chunk.len())
2218                    .collect::<Vec<_>>()
2219                    .join(", ");
2220                let sql = format!(
2221                    r#"
2222                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2223                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2224                    WHERE from_id IN ({from_placeholders})
2225                      AND to_id IN ({to_placeholders})
2226                    ORDER BY from_id, kind, to_id
2227                    "#
2228                );
2229                let mut values = from_chunk
2230                    .iter()
2231                    .cloned()
2232                    .map(Value::Text)
2233                    .collect::<Vec<_>>();
2234                values.extend(to_chunk.iter().cloned().map(Value::Text));
2235                let mut stmt = self.conn.prepare(&sql)?;
2236                for edge in
2237                    collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?
2238                {
2239                    edges
2240                        .entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
2241                        .or_insert(edge);
2242                }
2243            }
2244        }
2245        Ok(edges.into_values().collect())
2246    }
2247
2248    fn neighborhood(
2249        &self,
2250        center_id: &str,
2251        depth: usize,
2252        kind: Option<&str>,
2253    ) -> Result<Option<GraphSubgraph>> {
2254        self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
2255            .map(|result| {
2256                result.map(|result| {
2257                    GraphSubgraph {
2258                        nodes: result.nodes,
2259                        edges: result.edges,
2260                    }
2261                    .sorted()
2262                })
2263            })
2264    }
2265
2266    fn paged_neighborhood(
2267        &self,
2268        center_id: &str,
2269        depth: usize,
2270        kind: Option<&str>,
2271        options: GraphQueryOptions,
2272    ) -> Result<Option<GraphPagedSubgraph>> {
2273        if self.node(center_id)?.is_none() {
2274            return Ok(None);
2275        }
2276        let mut sql = String::from(
2277            r#"
2278            WITH RECURSIVE walk(id, depth) AS (
2279                SELECT ?, 0
2280                UNION
2281                SELECT e.to_id, walk.depth + 1
2282                FROM walk
2283                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2284                    ON e.from_id = walk.id
2285                WHERE walk.depth < ?
2286            "#,
2287        );
2288        let mut values = vec![
2289            Value::Text(center_id.to_string()),
2290            Value::Integer(depth as i64),
2291        ];
2292        if let Some(kind) = kind {
2293            sql.push_str(" AND e.kind = ?");
2294            values.push(Value::Text(kind.to_string()));
2295        }
2296        sql.push_str(
2297            r#"
2298            ),
2299            filtered_nodes AS (
2300            SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2301            FROM walk
2302            JOIN graph_nodes n ON n.id = walk.id
2303            WHERE 1 = 1
2304            "#,
2305        );
2306        push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
2307        if let Some(cursor) = &options.cursor {
2308            sql.push_str(" AND n.id > ?");
2309            values.push(Value::Text(cursor.clone()));
2310        }
2311        sql.push_str(
2312            r#"
2313            ),
2314            page_nodes AS (
2315                SELECT id, kind, label, properties_json, provenance_json, freshness_json
2316                FROM filtered_nodes
2317                ORDER BY id
2318            "#,
2319        );
2320        if let Some(limit) = options.limit {
2321            sql.push_str(" LIMIT ?");
2322            values.push(Value::Integer(limit.saturating_add(1) as i64));
2323        }
2324        sql.push_str(
2325            r#"
2326            ),
2327            walk_edges AS (
2328                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2329                FROM walk
2330                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2331                    ON e.from_id = walk.id
2332                WHERE walk.depth < ?
2333            "#,
2334        );
2335        values.push(Value::Integer(depth as i64));
2336        if let Some(kind) = kind {
2337            sql.push_str(" AND e.kind = ?");
2338            values.push(Value::Text(kind.to_string()));
2339        }
2340        sql.push_str(
2341            r#"
2342            )
2343            SELECT
2344                'node' AS row_type,
2345                p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
2346                NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
2347                NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
2348            FROM page_nodes p
2349            UNION ALL
2350            SELECT DISTINCT
2351                'edge' AS row_type,
2352                NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
2353                NULL AS provenance_json, NULL AS freshness_json,
2354                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2355            FROM walk_edges e
2356            WHERE e.from_id IN (SELECT id FROM page_nodes)
2357              AND e.to_id IN (SELECT id FROM page_nodes)
2358            "#,
2359        );
2360
2361        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2362        let mut stmt = self.conn.prepare(&sql)?;
2363        let mut nodes = Vec::new();
2364        let mut edges = Vec::new();
2365        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2366            let row_type: String = row.get(0)?;
2367            match row_type.as_str() {
2368                "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
2369                "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
2370                _ => Err(rusqlite::Error::InvalidQuery),
2371            }
2372        })?;
2373        for row in rows {
2374            let (node, edge) = row?;
2375            if let Some(node) = node {
2376                nodes.push(node);
2377            }
2378            if let Some(edge) = edge {
2379                edges.push(edge);
2380            }
2381        }
2382        nodes.sort_by(|left, right| left.id.cmp(&right.id));
2383        let before_limit = nodes.len();
2384        let mut next_cursor = None;
2385        if let Some(limit) = options.limit
2386            && nodes.len() > limit
2387        {
2388            next_cursor = nodes
2389                .get(limit.saturating_sub(1))
2390                .map(|node| node.id.clone());
2391            nodes.truncate(limit);
2392        }
2393        let node_ids = nodes
2394            .iter()
2395            .map(|node| node.id.as_str())
2396            .collect::<BTreeSet<_>>();
2397        edges.retain(|edge| {
2398            node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
2399        });
2400        edges.sort_by(|left, right| {
2401            left.from_id
2402                .cmp(&right.from_id)
2403                .then(left.kind.cmp(&right.kind))
2404                .then(left.to_id.cmp(&right.to_id))
2405        });
2406        let expected_indexes = if options.property_filters.is_empty() {
2407            vec!["idx_graph_edges_from_kind"]
2408        } else {
2409            vec![
2410                "idx_graph_edges_from_kind",
2411                "idx_graph_node_properties_key_value_node",
2412            ]
2413        };
2414        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2415        diagnostics.push(
2416            "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
2417        );
2418        if !options.property_filters.is_empty() {
2419            diagnostics.push(
2420                "property filters were evaluated by SQLite materialized property rows before paging"
2421                    .to_string(),
2422            );
2423        }
2424        if options.cursor.is_some() {
2425            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2426        }
2427        if next_cursor.is_some() {
2428            diagnostics.push(
2429                "result was truncated; pass page.next_cursor as --cursor for the next page"
2430                    .to_string(),
2431            );
2432        }
2433        Ok(Some(GraphPagedSubgraph {
2434            page: GraphQueryPage {
2435                cursor: options.cursor,
2436                limit: options.limit,
2437                next_cursor,
2438                returned_nodes: nodes.len(),
2439                returned_edges: edges.len(),
2440                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2441                diagnostics,
2442            },
2443            nodes,
2444            edges,
2445        }))
2446    }
2447
2448    fn shortest_path(
2449        &self,
2450        from_id: &str,
2451        to_id: &str,
2452        kind: Option<&str>,
2453    ) -> Result<Option<GraphPath>> {
2454        self.shortest_path_with_max_hops(from_id, to_id, kind, None)
2455    }
2456
2457    fn shortest_path_with_max_hops(
2458        &self,
2459        from_id: &str,
2460        to_id: &str,
2461        kind: Option<&str>,
2462        max_hops: Option<usize>,
2463    ) -> Result<Option<GraphPath>> {
2464        if from_id == to_id {
2465            return Ok(Some(GraphPath {
2466                nodes: vec![from_id.to_string()],
2467                hops: 0,
2468            }));
2469        }
2470        let hop_limit = max_hops.unwrap_or(usize::MAX);
2471        if hop_limit == 0 {
2472            return Ok(None);
2473        }
2474
2475        let mut visited = BTreeSet::from([from_id.to_string()]);
2476        let mut parent = BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
2477        let mut frontier = vec![from_id.to_string()];
2478        let mut single_frontier_stmt = if kind.is_none() {
2479            Some(self.conn.prepare(
2480                r#"
2481                SELECT from_id, to_id
2482                FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2483                WHERE from_id = ?1
2484                ORDER BY from_id, to_id, kind
2485                "#,
2486            )?)
2487        } else {
2488            None
2489        };
2490        let mut single_frontier_kind_stmt = if kind.is_some() {
2491            Some(self.conn.prepare(
2492                r#"
2493                SELECT from_id, to_id
2494                FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2495                WHERE from_id = ?1 AND kind = ?2
2496                ORDER BY from_id, to_id, kind
2497                "#,
2498            )?)
2499        } else {
2500            None
2501        };
2502        for _depth in 0..hop_limit {
2503            if frontier.is_empty() {
2504                break;
2505            }
2506            let mut next_frontier = BTreeSet::new();
2507            for chunk in frontier.chunks(256) {
2508                let edges = if chunk.len() == 1 {
2509                    match kind {
2510                        Some(kind) => {
2511                            let stmt = single_frontier_kind_stmt
2512                                .as_mut()
2513                                .context("single-frontier kind statement missing")?;
2514                            collect_rows(stmt.query_map((chunk[0].as_str(), kind), |row| {
2515                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2516                            })?)?
2517                        }
2518                        None => {
2519                            let stmt = single_frontier_stmt
2520                                .as_mut()
2521                                .context("single-frontier statement missing")?;
2522                            collect_rows(stmt.query_map([chunk[0].as_str()], |row| {
2523                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2524                            })?)?
2525                        }
2526                    }
2527                } else {
2528                    let placeholders = std::iter::repeat_n("?", chunk.len())
2529                        .collect::<Vec<_>>()
2530                        .join(", ");
2531                    let mut sql = format!(
2532                        r#"
2533                        SELECT from_id, to_id
2534                        FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2535                        WHERE from_id IN ({placeholders})
2536                        "#
2537                    );
2538                    let mut values = chunk.iter().cloned().map(Value::Text).collect::<Vec<_>>();
2539                    if let Some(kind) = kind {
2540                        sql.push_str(" AND kind = ?");
2541                        values.push(Value::Text(kind.to_string()));
2542                    }
2543                    sql.push_str(" ORDER BY from_id, to_id, kind");
2544                    let mut stmt = self.conn.prepare(&sql)?;
2545                    collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2546                        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2547                    })?)?
2548                };
2549                for (from, next) in edges {
2550                    if !visited.insert(next.clone()) {
2551                        continue;
2552                    }
2553                    parent.insert(next.clone(), from);
2554                    if next == to_id {
2555                        let mut nodes = vec![to_id.to_string()];
2556                        let mut cursor = to_id;
2557                        while let Some(previous) = parent.get(cursor) {
2558                            if previous.is_empty() {
2559                                break;
2560                            }
2561                            nodes.push(previous.clone());
2562                            cursor = previous;
2563                        }
2564                        nodes.reverse();
2565                        return Ok(Some(GraphPath {
2566                            hops: nodes.len().saturating_sub(1),
2567                            nodes,
2568                        }));
2569                    }
2570                    next_frontier.insert(next);
2571                }
2572            }
2573            frontier = next_frontier.into_iter().collect();
2574        }
2575        Ok(None)
2576    }
2577
2578    fn reachable_nodes_by_kind(
2579        &self,
2580        from_id: &str,
2581        kind: &str,
2582        depth: usize,
2583        limit: usize,
2584    ) -> Result<Vec<(GraphNode, GraphPath)>> {
2585        Ok(self
2586            .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
2587            .remove(kind)
2588            .unwrap_or_default())
2589    }
2590
2591    fn reachable_nodes_by_kinds(
2592        &self,
2593        from_id: &str,
2594        kinds: &[&str],
2595        depth: usize,
2596        limit: usize,
2597    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
2598        let mut requested = kinds
2599            .iter()
2600            .map(|kind| (*kind).to_string())
2601            .collect::<BTreeSet<_>>()
2602            .into_iter()
2603            .collect::<Vec<_>>();
2604        let mut results = requested
2605            .iter()
2606            .map(|kind| (kind.clone(), Vec::new()))
2607            .collect::<BTreeMap<_, _>>();
2608        if requested.is_empty() {
2609            return Ok(results);
2610        }
2611        requested.sort();
2612        let placeholders = std::iter::repeat_n("?", requested.len())
2613            .collect::<Vec<_>>()
2614            .join(", ");
2615        let mut sql = format!(
2616            r#"
2617            WITH RECURSIVE walk(id, depth, path) AS (
2618                SELECT ?, 0, char(31) || ? || char(31)
2619                UNION ALL
2620                SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
2621                FROM walk
2622                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2623                    ON e.from_id = walk.id
2624                WHERE walk.depth < ?
2625                  AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
2626            ),
2627            ranked AS (
2628                SELECT
2629                    n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
2630                    walk.path, walk.depth,
2631                    ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
2632                FROM walk
2633                JOIN graph_nodes n ON n.id = walk.id
2634                WHERE n.kind IN ({placeholders}) AND n.id <> ?
2635            ),
2636            kind_ranked AS (
2637                SELECT *,
2638                    ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
2639                FROM ranked
2640                WHERE rn = 1
2641            )
2642            SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
2643            FROM kind_ranked
2644            "#,
2645        );
2646        let mut values = vec![
2647            Value::Text(from_id.to_string()),
2648            Value::Text(from_id.to_string()),
2649            Value::Integer(depth as i64),
2650        ];
2651        values.extend(requested.iter().cloned().map(Value::Text));
2652        values.push(Value::Text(from_id.to_string()));
2653        if limit > 0 && limit != usize::MAX {
2654            sql.push_str(" WHERE kind_rank <= ?");
2655            values.push(Value::Integer(limit as i64));
2656        }
2657        sql.push_str(" ORDER BY kind, depth, label, id");
2658        let mut stmt = self.conn.prepare(&sql)?;
2659        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
2660            let node = node_from_row(row)?;
2661            let path: String = row.get(6)?;
2662            let hops: usize = row.get(7)?;
2663            Ok((
2664                node,
2665                GraphPath {
2666                    nodes: path
2667                        .split('\u{1f}')
2668                        .filter(|part| !part.is_empty())
2669                        .map(str::to_string)
2670                        .collect(),
2671                    hops,
2672                },
2673            ))
2674        })?)?;
2675        for (node, path) in rows {
2676            results
2677                .entry(node.kind.clone())
2678                .or_default()
2679                .push((node, path));
2680        }
2681        Ok(results)
2682    }
2683
2684    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
2685        if let Some(node) = self.node(target)? {
2686            return Ok(Some(node));
2687        }
2688        if kinds.is_empty() {
2689            return Ok(None);
2690        }
2691
2692        let normalized = target.trim().trim_start_matches('#');
2693        let kind_placeholders = std::iter::repeat_n("?", kinds.len())
2694            .collect::<Vec<_>>()
2695            .join(", ");
2696        let kind_rank = kinds
2697            .iter()
2698            .enumerate()
2699            .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
2700            .collect::<Vec<_>>()
2701            .join(" ");
2702        let sql = format!(
2703            r#"
2704            SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2705            FROM graph_nodes n
2706            WHERE n.kind IN ({kind_placeholders})
2707              AND (
2708                EXISTS (
2709                    SELECT 1
2710                    FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
2711                    WHERE p_handle.node_id = n.id
2712                      AND p_handle.key = 'handle'
2713                      AND p_handle.value = ?
2714                )
2715                OR EXISTS (
2716                    SELECT 1
2717                    FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
2718                    WHERE p_ref.node_id = n.id
2719                      AND p_ref.key = 'ref_id'
2720                      AND p_ref.value = ?
2721                )
2722                OR n.label = ?
2723                OR n.label = ?
2724              )
2725            ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
2726            LIMIT 1
2727            "#
2728        );
2729        let mut values = kinds
2730            .iter()
2731            .map(|kind| Value::Text((*kind).to_string()))
2732            .collect::<Vec<_>>();
2733        values.push(Value::Text(target.to_string()));
2734        values.push(Value::Text(normalized.to_string()));
2735        values.push(Value::Text(target.to_string()));
2736        values.push(Value::Text(format!("#{normalized}")));
2737        values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
2738        self.conn
2739            .query_row(&sql, params_from_iter(values.iter()), node_from_row)
2740            .optional()
2741            .map_err(Into::into)
2742    }
2743}
2744
2745fn to_json<T: Serialize>(value: &T) -> Result<String> {
2746    serde_json::to_string(value).map_err(Into::into)
2747}
2748
2749fn row_hash<T: Serialize>(value: &T) -> Result<String> {
2750    let payload = serde_json::to_vec(value)?;
2751    Ok(blake3::hash(&payload).to_hex().to_string())
2752}
2753
2754fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
2755    value.as_ref().map(to_json).transpose()
2756}
2757
2758fn collect_rows<T>(
2759    rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
2760) -> Result<Vec<T>> {
2761    rows.collect::<std::result::Result<Vec<_>, _>>()
2762        .map_err(Into::into)
2763}
2764
2765fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
2766    let properties_col = offset + 3;
2767    let provenance_col = offset + 4;
2768    let freshness_col = offset + 5;
2769    let properties_json: String = row.get(properties_col)?;
2770    let provenance_json: String = row.get(provenance_col)?;
2771    let freshness_json: Option<String> = row.get(freshness_col)?;
2772    Ok(GraphNode {
2773        id: row.get(offset)?,
2774        kind: row.get(offset + 1)?,
2775        label: row.get(offset + 2)?,
2776        properties: from_json(properties_col, &properties_json)?,
2777        provenance: from_json(provenance_col, &provenance_json)?,
2778        freshness: optional_from_json(freshness_col, freshness_json)?,
2779    })
2780}
2781
2782fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
2783    node_from_row_at(row, 0)
2784}
2785
2786fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
2787    let properties_col = offset + 4;
2788    let provenance_col = offset + 5;
2789    let freshness_col = offset + 6;
2790    let properties_json: String = row.get(properties_col)?;
2791    let provenance_json: String = row.get(provenance_col)?;
2792    let freshness_json: Option<String> = row.get(freshness_col)?;
2793    Ok(GraphEdge {
2794        id: row.get(offset)?,
2795        from_id: row.get(offset + 1)?,
2796        to_id: row.get(offset + 2)?,
2797        kind: row.get(offset + 3)?,
2798        properties: from_json(properties_col, &properties_json)?,
2799        provenance: from_json(provenance_col, &provenance_json)?,
2800        freshness: optional_from_json(freshness_col, freshness_json)?,
2801    })
2802}
2803
2804fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
2805    edge_from_row_at(row, 0)
2806}
2807
2808fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
2809    serde_json::from_str(raw)
2810        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
2811}
2812
2813fn optional_from_json<T: DeserializeOwned>(
2814    column: usize,
2815    raw: Option<String>,
2816) -> rusqlite::Result<Option<T>> {
2817    raw.map(|value| from_json(column, &value)).transpose()
2818}
2819
2820fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
2821    nodes
2822        .iter()
2823        .find(|node| node.kind == "projection_meta")
2824        .and_then(|node| node.properties.get("projection_version").cloned())
2825}
2826
2827fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
2828    nodes
2829        .iter()
2830        .find(|node| node.kind == "projection_meta")
2831        .and_then(|node| node.properties.get("content_hash").cloned())
2832}
2833
2834fn unix_now() -> i64 {
2835    std::time::SystemTime::now()
2836        .duration_since(std::time::UNIX_EPOCH)
2837        .map(|duration| duration.as_secs() as i64)
2838        .unwrap_or_default()
2839}
2840
2841fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
2842    let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
2843    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
2844    Ok(page_count.saturating_mul(page_size))
2845}
2846
2847fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
2848    let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
2849    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
2850    Ok(freelist_count.saturating_mul(page_size))
2851}
2852
2853#[cfg(test)]
2854mod tests {
2855    use super::*;
2856
2857    fn sample_provenance() -> GraphProvenance {
2858        GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
2859    }
2860
2861    fn sample_projection() -> GraphProjection {
2862        let source = sample_provenance();
2863        GraphProjection {
2864            nodes: vec![
2865                GraphNode::new("doc:livekit", "document", "LiveKit guide")
2866                    .with_property("domain", "livekit")
2867                    .with_provenance(source.clone())
2868                    .with_freshness(GraphFreshness::content_hash("node-hash")),
2869                GraphNode::new("topic:rooms", "topic", "Rooms"),
2870                GraphNode::new("topic:egress", "topic", "Egress"),
2871            ],
2872            edges: vec![
2873                GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
2874                    .with_property("confidence", "0.91")
2875                    .with_provenance(source.clone())
2876                    .with_freshness(GraphFreshness::content_hash("edge-hash")),
2877                GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
2878            ],
2879        }
2880    }
2881
2882    fn assert_projection_store_contract(store: &impl GraphStore) {
2883        let projection = sample_projection();
2884        projection.upsert_into(store).unwrap();
2885
2886        assert_eq!(
2887            store.node("doc:livekit").unwrap(),
2888            projection
2889                .nodes
2890                .iter()
2891                .find(|node| node.id == "doc:livekit")
2892                .cloned()
2893        );
2894        assert_eq!(
2895            store.nodes_by_kind("topic").unwrap(),
2896            vec![
2897                GraphNode::new("topic:egress", "topic", "Egress"),
2898                GraphNode::new("topic:rooms", "topic", "Rooms"),
2899            ]
2900        );
2901
2902        let mentions = store
2903            .outgoing_edges("doc:livekit", Some("mentions"))
2904            .unwrap();
2905        assert_eq!(mentions.len(), 1);
2906        assert_eq!(mentions[0].to_id, "topic:rooms");
2907        assert_eq!(
2908            mentions[0].properties.get("confidence"),
2909            Some(&"0.91".into())
2910        );
2911
2912        let path = store
2913            .shortest_path("doc:livekit", "topic:egress", None)
2914            .unwrap()
2915            .unwrap();
2916        assert_eq!(
2917            path.nodes,
2918            vec!["doc:livekit", "topic:rooms", "topic:egress"]
2919        );
2920    }
2921
2922    #[test]
2923    fn sqlite_store_round_trips_generic_nodes_edges() {
2924        let store = SqliteGraphStore::in_memory().unwrap();
2925        let source = sample_provenance();
2926        let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
2927            .with_property("domain", "livekit")
2928            .with_provenance(source.clone())
2929            .with_freshness(GraphFreshness::content_hash("node-hash"));
2930        let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
2931        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
2932            .with_property("confidence", "0.91")
2933            .with_provenance(source)
2934            .with_freshness(GraphFreshness::content_hash("edge-hash"));
2935
2936        store.upsert_node(&node).unwrap();
2937        store.upsert_node(&topic).unwrap();
2938        store.upsert_edge(&edge).unwrap();
2939
2940        assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
2941        assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
2942        assert_eq!(store.all_nodes().unwrap().len(), 2);
2943        assert_eq!(store.all_edges().unwrap().len(), 1);
2944        assert_eq!(
2945            store
2946                .outgoing_edges("doc:livekit", Some("mentions"))
2947                .unwrap(),
2948            vec![edge]
2949        );
2950    }
2951
2952    #[test]
2953    fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
2954        let store = SqliteGraphStore::in_memory().unwrap();
2955        for node in [
2956            GraphNode::new("doc:livekit", "document", "LiveKit guide"),
2957            GraphNode::new("topic:rooms", "topic", "Rooms"),
2958            GraphNode::new("topic:egress", "topic", "Egress"),
2959        ] {
2960            store.upsert_node(&node).unwrap();
2961        }
2962        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
2963            .with_property("confidence", "0.91");
2964        let edge_id = edge.id.clone();
2965        store.upsert_edge(&edge).unwrap();
2966        store
2967            .upsert_edge(
2968                &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
2969                    .with_property("confidence", "0.42"),
2970            )
2971            .unwrap();
2972
2973        assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
2974        let mut expected_incident_ids = vec![
2975            GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
2976            GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
2977        ];
2978        expected_incident_ids.sort();
2979        assert_eq!(
2980            store
2981                .incident_edges("topic:rooms", None)
2982                .unwrap()
2983                .into_iter()
2984                .map(|edge| edge.id)
2985                .collect::<Vec<_>>(),
2986            expected_incident_ids
2987        );
2988
2989        let page = store
2990            .paged_edges(
2991                Some("mentions"),
2992                GraphQueryOptions {
2993                    property_filters: vec![GraphPropertyFilter {
2994                        key: "confidence".to_string(),
2995                        value: "0.91".to_string(),
2996                    }],
2997                    ..GraphQueryOptions::default()
2998                },
2999            )
3000            .unwrap();
3001        assert_eq!(page.edges.len(), 1);
3002        assert_eq!(page.edges[0].id, edge_id);
3003        assert!(
3004            page.page
3005                .diagnostics
3006                .iter()
3007                .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
3008            "{:?}",
3009            page.page.diagnostics
3010        );
3011        assert!(
3012            page.page
3013                .diagnostics
3014                .iter()
3015                .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
3016            "{:?}",
3017            page.page.diagnostics
3018        );
3019        assert!(
3020            page.page.diagnostics.iter().any(|diagnostic| diagnostic
3021                .contains("edge property primary filter matched 1 materialized row")),
3022            "{:?}",
3023            page.page.diagnostics
3024        );
3025        assert!(
3026            page.page
3027                .diagnostics
3028                .iter()
3029                .any(|diagnostic| diagnostic
3030                    .contains("drives from SQLite materialized property rows")),
3031            "{:?}",
3032            page.page.diagnostics
3033        );
3034
3035        let property_rows: usize = store
3036            .conn
3037            .query_row(
3038                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3039                [],
3040                |row| row.get(0),
3041            )
3042            .unwrap();
3043        assert_eq!(property_rows, 2);
3044    }
3045
3046    #[test]
3047    fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
3048        let sqlite = SqliteGraphStore::in_memory().unwrap();
3049        assert_projection_store_contract(&sqlite);
3050    }
3051
3052    #[test]
3053    fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
3054        fn assert_crud_contract(store: &impl GraphStore) {
3055            let projection = sample_projection();
3056            projection.upsert_into(store).unwrap();
3057
3058            let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
3059            assert_eq!(
3060                neighborhood
3061                    .nodes
3062                    .iter()
3063                    .map(|node| node.id.as_str())
3064                    .collect::<Vec<_>>(),
3065                vec!["doc:livekit", "topic:egress", "topic:rooms"]
3066            );
3067            assert_eq!(
3068                neighborhood
3069                    .edges
3070                    .iter()
3071                    .map(|edge| (
3072                        edge.from_id.as_str(),
3073                        edge.kind.as_str(),
3074                        edge.to_id.as_str()
3075                    ))
3076                    .collect::<Vec<_>>(),
3077                vec![
3078                    ("doc:livekit", "mentions", "topic:rooms"),
3079                    ("topic:rooms", "related_to", "topic:egress"),
3080                ]
3081            );
3082
3083            assert_eq!(
3084                store
3085                    .delete_edge("topic:rooms", "topic:egress", "related_to")
3086                    .unwrap(),
3087                1
3088            );
3089            assert!(
3090                store
3091                    .shortest_path("doc:livekit", "topic:egress", None)
3092                    .unwrap()
3093                    .is_none()
3094            );
3095            assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
3096            assert!(store.node("topic:rooms").unwrap().is_none());
3097            assert!(
3098                store
3099                    .outgoing_edges("doc:livekit", None)
3100                    .unwrap()
3101                    .is_empty()
3102            );
3103        }
3104
3105        assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
3106    }
3107
3108    #[test]
3109    fn sqlite_upsert_projection_batches_rows_and_properties() {
3110        let mut store = SqliteGraphStore::in_memory().unwrap();
3111        let mut projection = sample_projection();
3112        store.upsert_projection(&projection).unwrap();
3113
3114        let page = store
3115            .paged_nodes_by_kind(
3116                "document",
3117                GraphQueryOptions {
3118                    property_filters: vec![GraphPropertyFilter {
3119                        key: "domain".to_string(),
3120                        value: "livekit".to_string(),
3121                    }],
3122                    ..GraphQueryOptions::default()
3123                },
3124            )
3125            .unwrap();
3126        assert_eq!(page.nodes[0].id, "doc:livekit");
3127
3128        projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3129            .with_property("domain", "recording");
3130        store.upsert_projection(&projection).unwrap();
3131
3132        let old_property_count: usize = store
3133            .conn
3134            .query_row(
3135                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
3136                [],
3137                |row| row.get(0),
3138            )
3139            .unwrap();
3140        let updated_page = store
3141            .paged_nodes_by_kind(
3142                "document",
3143                GraphQueryOptions {
3144                    property_filters: vec![GraphPropertyFilter {
3145                        key: "domain".to_string(),
3146                        value: "recording".to_string(),
3147                    }],
3148                    ..GraphQueryOptions::default()
3149                },
3150            )
3151            .unwrap();
3152        assert_eq!(old_property_count, 0);
3153        assert_eq!(updated_page.nodes[0].id, "doc:livekit");
3154        let edge_property_count: usize = store
3155            .conn
3156            .query_row(
3157                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3158                [],
3159                |row| row.get(0),
3160            )
3161            .unwrap();
3162        assert_eq!(edge_property_count, 1);
3163    }
3164
3165    #[test]
3166    fn sqlite_store_filters_edges_by_kind_and_paths() {
3167        let store = SqliteGraphStore::in_memory().unwrap();
3168        for id in ["a", "b", "c"] {
3169            store
3170                .upsert_node(&GraphNode::new(id, "symbol", id))
3171                .unwrap();
3172        }
3173        store
3174            .upsert_edge(&GraphEdge::new("a", "b", "calls"))
3175            .unwrap();
3176        store
3177            .upsert_edge(&GraphEdge::new("a", "c", "documents"))
3178            .unwrap();
3179        store
3180            .upsert_edge(&GraphEdge::new("b", "c", "calls"))
3181            .unwrap();
3182
3183        let calls = store.outgoing_edges("a", Some("calls")).unwrap();
3184        assert_eq!(calls.len(), 1);
3185        assert_eq!(calls[0].to_id, "b");
3186        assert_eq!(store.graph_counts().unwrap(), (3, 3));
3187        assert_eq!(
3188            store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
3189            "b"
3190        );
3191
3192        let path = store
3193            .shortest_path("a", "c", Some("calls"))
3194            .unwrap()
3195            .unwrap();
3196        assert_eq!(path.nodes, vec!["a", "b", "c"]);
3197        assert_eq!(path.hops, 2);
3198
3199        assert!(
3200            store
3201                .shortest_path("c", "a", Some("calls"))
3202                .unwrap()
3203                .is_none()
3204        );
3205    }
3206
3207    #[test]
3208    fn sqlite_store_batches_edges_between_node_sets() {
3209        let store = SqliteGraphStore::in_memory().unwrap();
3210        for id in ["a", "b", "c", "outside"] {
3211            store
3212                .upsert_node(&GraphNode::new(id, "symbol", id))
3213                .unwrap();
3214        }
3215        for edge in [
3216            GraphEdge::new("a", "b", "calls"),
3217            GraphEdge::new("b", "c", "calls"),
3218            GraphEdge::new("a", "outside", "calls"),
3219            GraphEdge::new("outside", "c", "calls"),
3220        ] {
3221            store.upsert_edge(&edge).unwrap();
3222        }
3223
3224        let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
3225            .into_iter()
3226            .collect::<BTreeSet<_>>();
3227        let edge_keys = store
3228            .edges_between_nodes(&scoped)
3229            .unwrap()
3230            .into_iter()
3231            .map(|edge| (edge.from_id, edge.kind, edge.to_id))
3232            .collect::<Vec<_>>();
3233
3234        assert_eq!(
3235            edge_keys,
3236            vec![
3237                ("a".to_string(), "calls".to_string(), "b".to_string()),
3238                ("b".to_string(), "calls".to_string(), "c".to_string()),
3239            ]
3240        );
3241    }
3242
3243    #[test]
3244    fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
3245        let mut store = SqliteGraphStore::in_memory().unwrap();
3246        let mut projection = sample_projection();
3247        projection.nodes.push(
3248            GraphNode::new(
3249                "projection:fixture",
3250                "projection_meta",
3251                "fixture projection",
3252            )
3253            .with_property("projection_version", "fixture-v1")
3254            .with_property("content_hash", "hash-a"),
3255        );
3256        store
3257            .replace_projection_with_version(
3258                "root",
3259                &projection,
3260                Some("fixture-v1"),
3261                Some("commit-a".to_string()),
3262            )
3263            .unwrap();
3264
3265        projection.nodes.retain(|node| node.id != "topic:egress");
3266        projection.edges.retain(|edge| edge.to_id != "topic:egress");
3267        let refresh = store
3268            .replace_projection_with_version(
3269                "root",
3270                &projection,
3271                Some("fixture-v2"),
3272                Some("commit-b".to_string()),
3273            )
3274            .unwrap();
3275
3276        assert_eq!(refresh.projection_version, "fixture-v2");
3277        assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
3278        assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
3279        assert_eq!(refresh.tombstoned_edges.len(), 1);
3280        assert_eq!(refresh.deleted_nodes, 1);
3281        assert_eq!(refresh.deleted_edges, 1);
3282        assert_eq!(refresh.unchanged_nodes, 3);
3283        assert_eq!(refresh.upserted_nodes, 0);
3284        assert_eq!(refresh.unchanged_properties, 4);
3285        assert_eq!(refresh.upserted_properties, 0);
3286        assert_eq!(refresh.deleted_properties, 0);
3287        assert!(
3288            refresh
3289                .phase_timings
3290                .iter()
3291                .any(|phase| phase.name == "sqlite_property_row_staging"),
3292            "{:?}",
3293            refresh.phase_timings
3294        );
3295        assert!(
3296            refresh
3297                .phase_timings
3298                .iter()
3299                .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
3300            "{:?}",
3301            refresh.phase_timings
3302        );
3303        let version = store.projection_version("root").unwrap().unwrap();
3304        assert_eq!(version.projection_version, "fixture-v2");
3305        assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
3306        let cached_counts: (usize, usize, usize, usize) = store
3307            .conn
3308            .query_row(
3309                r#"
3310                SELECT nodes, edges, tombstone_nodes, tombstone_edges
3311                FROM graph_operator_stats
3312                WHERE scope = 'root'
3313                "#,
3314                [],
3315                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3316            )
3317            .unwrap();
3318        assert_eq!(cached_counts, (3, 1, 1, 1));
3319
3320        projection
3321            .nodes
3322            .push(GraphNode::new("topic:egress", "topic", "Egress"));
3323        let refresh = store
3324            .replace_projection_with_version(
3325                "root",
3326                &projection,
3327                Some("fixture-v3"),
3328                Some("commit-c".to_string()),
3329            )
3330            .unwrap();
3331        assert_eq!(refresh.pruned_tombstones, 1);
3332        assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
3333
3334        projection.nodes.retain(|node| node.id != "topic:egress");
3335        store
3336            .replace_projection_with_version(
3337                "root",
3338                &projection,
3339                Some("fixture-v4"),
3340                Some("commit-d".to_string()),
3341            )
3342            .unwrap();
3343        assert_eq!(store.compact_storage("root", true).unwrap(), 2);
3344        let cached_counts: (usize, usize, usize, usize) = store
3345            .conn
3346            .query_row(
3347                r#"
3348                SELECT nodes, edges, tombstone_nodes, tombstone_edges
3349                FROM graph_operator_stats
3350                WHERE scope = 'root'
3351                "#,
3352                [],
3353                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3354            )
3355            .unwrap();
3356        assert_eq!(cached_counts, (3, 1, 0, 0));
3357    }
3358
3359    #[test]
3360    fn sqlite_shortest_path_uses_bounded_frontier() {
3361        let store = SqliteGraphStore::in_memory().unwrap();
3362        for idx in 0..80 {
3363            store
3364                .upsert_node(&GraphNode::new(
3365                    format!("node:{idx:02}"),
3366                    "symbol",
3367                    format!("node {idx:02}"),
3368                ))
3369                .unwrap();
3370        }
3371        for idx in 0..79 {
3372            store
3373                .upsert_edge(&GraphEdge::new(
3374                    format!("node:{idx:02}"),
3375                    format!("node:{:02}", idx + 1),
3376                    "calls",
3377                ))
3378                .unwrap();
3379        }
3380        store
3381            .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
3382            .unwrap();
3383
3384        assert!(
3385            store
3386                .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
3387                .unwrap()
3388                .is_none()
3389        );
3390        let path = store
3391            .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
3392            .unwrap()
3393            .unwrap();
3394        assert_eq!(path.hops, 79);
3395        assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
3396        assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
3397
3398        let direct = store
3399            .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
3400            .unwrap()
3401            .unwrap();
3402        assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
3403    }
3404
3405    #[test]
3406    fn sqlite_resolves_evidence_targets_with_indexed_properties() {
3407        let store = SqliteGraphStore::in_memory().unwrap();
3408        for node in [
3409            GraphNode::new("gbak-refresh", "backlog", "#refresh")
3410                .with_property("ref_id", "refresh")
3411                .with_property("handle", "backlog-handle"),
3412            GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
3413                .with_property("ref_id", "refresh"),
3414            GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
3415                .with_property("ref_id", "refresh"),
3416        ] {
3417            store.upsert_node(&node).unwrap();
3418        }
3419
3420        let by_ref = store
3421            .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
3422            .unwrap()
3423            .unwrap();
3424        assert_eq!(by_ref.id, "gbak-refresh");
3425        let by_handle = store
3426            .resolve_evidence_target("backlog-handle", &["backlog"])
3427            .unwrap()
3428            .unwrap();
3429        assert_eq!(by_handle.id, "gbak-refresh");
3430    }
3431
3432    #[test]
3433    fn sqlite_schema_migration_backfills_materialized_node_properties() {
3434        let conn = Connection::open_in_memory().unwrap();
3435        conn.execute_batch(
3436            r#"
3437            PRAGMA user_version = 2;
3438            CREATE TABLE graph_nodes (
3439                id TEXT PRIMARY KEY,
3440                kind TEXT NOT NULL,
3441                label TEXT NOT NULL,
3442                properties_json TEXT NOT NULL DEFAULT '{}',
3443                provenance_json TEXT NOT NULL DEFAULT '[]',
3444                freshness_json TEXT,
3445                row_hash TEXT,
3446                source_watermark TEXT
3447            );
3448            CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
3449            CREATE TABLE graph_edges (
3450                from_id TEXT NOT NULL,
3451                to_id TEXT NOT NULL,
3452                kind TEXT NOT NULL,
3453                properties_json TEXT NOT NULL DEFAULT '{}',
3454                provenance_json TEXT NOT NULL DEFAULT '[]',
3455                freshness_json TEXT,
3456                row_hash TEXT,
3457                source_watermark TEXT,
3458                PRIMARY KEY (from_id, to_id, kind)
3459            );
3460            CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
3461            CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
3462            CREATE TABLE graph_projection_versions (
3463                scope TEXT PRIMARY KEY,
3464                projection_version TEXT NOT NULL,
3465                content_hash TEXT,
3466                source_watermark TEXT,
3467                observed_at_unix INTEGER NOT NULL
3468            );
3469            CREATE TABLE graph_tombstones (
3470                row_key TEXT PRIMARY KEY,
3471                row_kind TEXT NOT NULL,
3472                deleted_at_unix INTEGER NOT NULL
3473            );
3474            INSERT INTO graph_nodes
3475                (id, kind, label, properties_json, provenance_json)
3476            VALUES
3477                ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
3478                ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]');
3479            INSERT INTO graph_edges
3480                (from_id, to_id, kind, properties_json, provenance_json)
3481            VALUES
3482                ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
3483            "#,
3484        )
3485        .unwrap();
3486
3487        let store = SqliteGraphStore::from_connection(conn).unwrap();
3488        let version: i64 = store
3489            .conn
3490            .pragma_query_value(None, "user_version", |row| row.get(0))
3491            .unwrap();
3492        assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
3493        let property_rows: usize = store
3494            .conn
3495            .query_row(
3496                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
3497                [],
3498                |row| row.get(0),
3499            )
3500            .unwrap();
3501        assert_eq!(property_rows, 2);
3502        let edge_property_rows: usize = store
3503            .conn
3504            .query_row(
3505                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3506                [],
3507                |row| row.get(0),
3508            )
3509            .unwrap();
3510        assert_eq!(edge_property_rows, 1);
3511        let edge = store
3512            .edge(&GraphEdge::stable_id(
3513                "topic:rooms",
3514                "topic:egress",
3515                "mentions",
3516            ))
3517            .unwrap()
3518            .unwrap();
3519        assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
3520
3521        let page = store
3522            .paged_nodes_by_kind(
3523                "topic",
3524                GraphQueryOptions {
3525                    property_filters: vec![GraphPropertyFilter {
3526                        key: "domain".to_string(),
3527                        value: "livekit".to_string(),
3528                    }],
3529                    ..GraphQueryOptions::default()
3530                },
3531            )
3532            .unwrap();
3533        assert_eq!(page.nodes[0].id, "topic:rooms");
3534        assert!(
3535            page.page
3536                .diagnostics
3537                .iter()
3538                .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
3539            "{:?}",
3540            page.page.diagnostics
3541        );
3542    }
3543
3544    #[test]
3545    fn sqlite_store_batches_reachable_nodes_by_kinds() {
3546        let store = SqliteGraphStore::in_memory().unwrap();
3547        for node in [
3548            GraphNode::new("start", "backlog", "start"),
3549            GraphNode::new("ctx", "worker_context", "context"),
3550            GraphNode::new("src", "source_handle", "source"),
3551            GraphNode::new("sem", "semantic_concept", "concept"),
3552        ] {
3553            store.upsert_node(&node).unwrap();
3554        }
3555        store
3556            .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
3557            .unwrap();
3558        store
3559            .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
3560            .unwrap();
3561        store
3562            .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
3563            .unwrap();
3564
3565        let rows = store
3566            .reachable_nodes_by_kinds(
3567                "start",
3568                &["worker_context", "source_handle", "semantic_concept"],
3569                2,
3570                8,
3571            )
3572            .unwrap();
3573        assert_eq!(rows["worker_context"][0].0.id, "ctx");
3574        assert_eq!(
3575            rows["source_handle"][0].1.nodes,
3576            vec!["start", "ctx", "src"]
3577        );
3578        assert_eq!(rows["semantic_concept"][0].1.hops, 1);
3579    }
3580
3581    #[test]
3582    fn sqlite_projection_refresh_handles_bulk_row_replacement() {
3583        let mut store = SqliteGraphStore::in_memory().unwrap();
3584        let source = GraphProvenance::new("fixture", "bulk");
3585        let mut projection = GraphProjection::default();
3586        for idx in 0..128 {
3587            projection.nodes.push(
3588                GraphNode::new(
3589                    format!("node:{idx:03}"),
3590                    if idx % 2 == 0 { "symbol" } else { "file" },
3591                    format!("bulk node {idx:03}"),
3592                )
3593                .with_property("ordinal", idx.to_string())
3594                .with_provenance(source.clone())
3595                .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
3596            );
3597        }
3598        for idx in 0..127 {
3599            projection.edges.push(
3600                GraphEdge::new(
3601                    format!("node:{idx:03}"),
3602                    format!("node:{:03}", idx + 1),
3603                    "next",
3604                )
3605                .with_property("ordinal", idx.to_string())
3606                .with_provenance(source.clone())
3607                .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
3608            );
3609        }
3610
3611        store
3612            .replace_projection_with_version(
3613                "root",
3614                &projection,
3615                Some("bulk-v1"),
3616                Some("commit-a".to_string()),
3617            )
3618            .unwrap();
3619
3620        projection
3621            .nodes
3622            .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
3623        projection.edges.retain(|edge| {
3624            !edge.from_id.ends_with("000")
3625                && !edge.to_id.ends_with("000")
3626                && !edge.from_id.ends_with("064")
3627                && !edge.to_id.ends_with("064")
3628        });
3629        let refresh = store
3630            .replace_projection_with_version(
3631                "root",
3632                &projection,
3633                Some("bulk-v2"),
3634                Some("commit-b".to_string()),
3635            )
3636            .unwrap();
3637
3638        assert_eq!(store.all_nodes().unwrap().len(), 126);
3639        assert_eq!(store.all_edges().unwrap().len(), 124);
3640        assert_eq!(
3641            refresh.tombstoned_nodes,
3642            vec!["node:000".to_string(), "node:064".to_string()]
3643        );
3644        assert_eq!(refresh.tombstoned_edges.len(), 3);
3645        assert_eq!(refresh.deleted_nodes, 2);
3646        assert_eq!(refresh.deleted_edges, 3);
3647        assert_eq!(refresh.unchanged_nodes, 126);
3648        assert_eq!(refresh.unchanged_edges, 124);
3649        assert_eq!(refresh.upserted_nodes, 0);
3650        assert_eq!(refresh.upserted_edges, 0);
3651        assert_eq!(refresh.unchanged_properties, 250);
3652        assert_eq!(refresh.upserted_properties, 0);
3653        assert!(
3654            refresh
3655                .phase_timings
3656                .iter()
3657                .any(|phase| phase.name == "sqlite_node_staging"
3658                    && phase.detail.contains("bulk stage 126 graph_nodes rows")
3659                    && phase.detail.contains("multi-row chunks up to 50 rows")),
3660            "{:?}",
3661            refresh.phase_timings
3662        );
3663        assert!(
3664            refresh
3665                .phase_timings
3666                .iter()
3667                .any(|phase| phase.name == "sqlite_edge_staging"
3668                    && phase.detail.contains("bulk stage 124 graph_edges rows")
3669                    && phase.detail.contains("multi-row chunks up to 50 rows")),
3670            "{:?}",
3671            refresh.phase_timings
3672        );
3673        let staged_node_properties: usize = store
3674            .conn
3675            .query_row(
3676                "SELECT COUNT(*) FROM temp.next_graph_node_properties",
3677                [],
3678                |row| row.get(0),
3679            )
3680            .unwrap();
3681        let staged_edge_properties: usize = store
3682            .conn
3683            .query_row(
3684                "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
3685                [],
3686                |row| row.get(0),
3687            )
3688            .unwrap();
3689        assert_eq!(staged_node_properties, 0);
3690        assert_eq!(staged_edge_properties, 0);
3691        assert!(
3692            refresh
3693                .phase_timings
3694                .iter()
3695                .any(|phase| phase.name == "sqlite_property_row_staging"
3696                    && phase.detail.contains("new/changed node rows")),
3697            "{:?}",
3698            refresh.phase_timings
3699        );
3700        assert!(
3701            refresh
3702                .phase_timings
3703                .iter()
3704                .any(|phase| phase.name == "sqlite_edge_property_row_staging"
3705                    && phase.detail.contains("new/changed edge rows")),
3706            "{:?}",
3707            refresh.phase_timings
3708        );
3709        assert_eq!(
3710            store
3711                .projection_version("root")
3712                .unwrap()
3713                .unwrap()
3714                .source_watermark
3715                .as_deref(),
3716            Some("commit-b")
3717        );
3718    }
3719}