Skip to main content

tsift_sqlite/
lib.rs

1use anyhow::{Context, Result, bail};
2use rusqlite::{
3    Connection, OpenFlags, OptionalExtension, Row, params_from_iter,
4    types::{Type, Value},
5};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::collections::{BTreeMap, BTreeSet};
8use std::cell::Cell;
9use std::ffi::OsString;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14pub use tsift_core::{
15    ConvexEdgeRow, ConvexGraphClient, ConvexGraphStore, ConvexNodeRow, ConvexProjectionRows,
16    ConvexRowsGraphClient, GraphEdge, GraphFreshness, GraphNode, GraphPagedSubgraph, GraphPath,
17    GraphProjection, GraphPropertyFilter, GraphProvenance, GraphQueryOptions, GraphQueryPage,
18    GraphStore, GraphSubgraph, PropertyMode, RankedNeighborhoodOptions, RankedNeighborhoodResult,
19    SQLITE_GRAPH_SCHEMA_VERSION, TerseGraphEdge, TerseGraphNode,
20    apply_graph_edge_query_page,
21    apply_graph_query_page, graph_edge_id, shortest_path_using_outgoing, stable_graph_edge_id,
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "snake_case")]
26pub enum ReadOnlyRecovery {
27    SnapshotFallback,
28    SnapshotFallbackWal,
29}
30
31pub fn read_only_snapshot_recovery(
32    db_path: &Path,
33    err: &anyhow::Error,
34) -> Option<ReadOnlyRecovery> {
35    if !error_mentions_locked_db(err) {
36        return None;
37    }
38    if wal_sidecar_path(db_path).exists() || shared_memory_sidecar_path(db_path).exists() {
39        Some(ReadOnlyRecovery::SnapshotFallbackWal)
40    } else if rollback_journal_path(db_path).exists() {
41        Some(ReadOnlyRecovery::SnapshotFallback)
42    } else {
43        None
44    }
45}
46
47pub fn rollback_journal_path(db_path: &Path) -> PathBuf {
48    let mut journal = db_path.as_os_str().to_os_string();
49    journal.push("-journal");
50    PathBuf::from(journal)
51}
52
53pub fn wal_sidecar_path(db_path: &Path) -> PathBuf {
54    let mut wal = db_path.as_os_str().to_os_string();
55    wal.push("-wal");
56    PathBuf::from(wal)
57}
58
59pub fn shared_memory_sidecar_path(db_path: &Path) -> PathBuf {
60    let mut shm = db_path.as_os_str().to_os_string();
61    shm.push("-shm");
62    PathBuf::from(shm)
63}
64
65pub fn copy_read_only_snapshot(
66    db_path: &Path,
67    default_stem: &str,
68) -> Result<(PathBuf, Vec<PathBuf>)> {
69    let snapshot_path = snapshot_copy_path(db_path, default_stem);
70    std::fs::copy(db_path, &snapshot_path).with_context(|| {
71        format!(
72            "copying locked db {} to snapshot {}",
73            db_path.display(),
74            snapshot_path.display()
75        )
76    })?;
77    let mut cleanup_paths = vec![snapshot_path.clone()];
78    copy_optional_snapshot_sidecar(
79        &wal_sidecar_path(db_path),
80        &wal_sidecar_path(&snapshot_path),
81        &mut cleanup_paths,
82    )?;
83    copy_optional_snapshot_sidecar(
84        &shared_memory_sidecar_path(db_path),
85        &shared_memory_sidecar_path(&snapshot_path),
86        &mut cleanup_paths,
87    )?;
88    Ok((snapshot_path, cleanup_paths))
89}
90
91pub fn error_mentions_locked_db(err: &anyhow::Error) -> bool {
92    err.chain()
93        .any(|cause| cause.to_string().contains("database is locked"))
94}
95
96fn copy_optional_snapshot_sidecar(
97    source_path: &Path,
98    snapshot_path: &Path,
99    cleanup_paths: &mut Vec<PathBuf>,
100) -> Result<()> {
101    match std::fs::copy(source_path, snapshot_path) {
102        Ok(_) => {
103            cleanup_paths.push(snapshot_path.to_path_buf());
104            Ok(())
105        }
106        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
107        Err(err) => Err(err).with_context(|| {
108            format!(
109                "copying SQLite sidecar {} to snapshot {}",
110                source_path.display(),
111                snapshot_path.display()
112            )
113        }),
114    }
115}
116
117fn snapshot_copy_path(db_path: &Path, default_stem: &str) -> PathBuf {
118    let nanos = SystemTime::now()
119        .duration_since(UNIX_EPOCH)
120        .unwrap_or(Duration::ZERO)
121        .as_nanos();
122    let stem = db_path
123        .file_stem()
124        .and_then(|stem| stem.to_str())
125        .unwrap_or(default_stem);
126    let mut file_name = OsString::from(format!("tsift-{stem}-{}-{nanos}", std::process::id()));
127    file_name.push(".db");
128    std::env::temp_dir().join(file_name)
129}
130const SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES: i64 = 4096;
131const SQLITE_GRAPH_STAGING_CHUNK_ROWS: usize = 500;
132
133/// SQLite-backed graph store.
134///
135/// # Temp-table invariant
136///
137/// Several methods — `replace_projection_with_version`, `upsert_projection`,
138/// `edges_between_nodes`, and `breadth_first_search` — create connection-scoped
139/// `TEMP TABLE` staging areas inside the underlying SQLite connection. Two
140/// concurrent calls on the same `SqliteGraphStore` (or sharing the same
141/// connection through `SqliteReadOnlyConnection`) would collide on those temp
142/// table names and produce incorrect results or errors.
143///
144/// **Only one temp-table-using method may be active at a time per connection.**
145pub struct SqliteGraphStore {
146    conn: Connection,
147    _snapshot_copy: Option<SnapshotCopyGuard>,
148    read_only_recovery: Option<ReadOnlyRecovery>,
149    temp_table_active: Cell<bool>,
150}
151
152/// Read-only handle to a SQLite graph database connection.
153///
154/// # Temp-table invariant
155///
156/// When this connection is unwrapped into a `SqliteGraphStore` via
157/// `SqliteGraphStore::from_read_only_connection`, the same temp-table
158/// concurrency rules apply: only one temp-table-using method
159/// (`replace_projection_with_version`, `upsert_projection`,
160/// `edges_between_nodes`, `breadth_first_search`) may be active at a time
161/// per connection.
162pub struct SqliteReadOnlyConnection {
163    conn: Connection,
164    _snapshot_copy: Option<SnapshotCopyGuard>,
165    recovery: Option<ReadOnlyRecovery>,
166}
167
168static BFS_CALL_ID: AtomicU64 = AtomicU64::new(0);
169
170impl SqliteReadOnlyConnection {
171    pub fn conn(&self) -> &Connection {
172        &self.conn
173    }
174
175    pub fn recovery(&self) -> Option<ReadOnlyRecovery> {
176        self.recovery
177    }
178}
179
180struct SnapshotCopyGuard {
181    paths: Vec<PathBuf>,
182}
183
184impl Drop for SnapshotCopyGuard {
185    fn drop(&mut self) {
186        for path in &self.paths {
187            let _ = std::fs::remove_file(path);
188        }
189    }
190}
191
192fn configure_writable_graph_connection(conn: &Connection, db_path: &Path) -> Result<()> {
193    conn.busy_timeout(Duration::from_secs(5))?;
194    conn.pragma_update(None, "journal_mode", "WAL")?;
195    let mode: String = conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))?;
196    if mode.to_lowercase() != "wal" {
197        bail!(
198            "graph substrate db {} requires WAL mode for concurrent reads, got {}",
199            db_path.display(),
200            mode
201        );
202    }
203    conn.pragma_update(
204        None,
205        "wal_autocheckpoint",
206        SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
207    )?;
208    let checkpoint_pages: i64 =
209        conn.query_row("PRAGMA wal_autocheckpoint", [], |row| row.get(0))?;
210    if checkpoint_pages != SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES {
211        bail!(
212            "graph substrate db {} requires wal_autocheckpoint={}, got {}",
213            db_path.display(),
214            SQLITE_GRAPH_WAL_AUTOCHECKPOINT_PAGES,
215            checkpoint_pages
216        );
217    }
218    Ok(())
219}
220
221fn sqlite_column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
222    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
223    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
224    for row in rows {
225        if row? == column {
226            return Ok(true);
227        }
228    }
229    Ok(false)
230}
231
232fn add_column_if_missing(
233    conn: &Connection,
234    table: &str,
235    column: &str,
236    definition: &str,
237) -> Result<()> {
238    if !sqlite_column_exists(conn, table, column)? {
239        conn.execute(
240            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
241            [],
242        )?;
243    }
244    Ok(())
245}
246
247fn backfill_graph_edge_keys(conn: &Connection) -> Result<()> {
248    if !sqlite_column_exists(conn, "graph_edges", "edge_key")? {
249        return Ok(());
250    }
251    let rows = {
252        let mut stmt = conn.prepare(
253            r#"
254            SELECT from_id, to_id, kind
255            FROM graph_edges
256            WHERE edge_key IS NULL OR edge_key = ''
257            ORDER BY from_id, kind, to_id
258            "#,
259        )?;
260        collect_rows(stmt.query_map([], |row| {
261            Ok((
262                row.get::<_, String>(0)?,
263                row.get::<_, String>(1)?,
264                row.get::<_, String>(2)?,
265            ))
266        })?)?
267    };
268    let mut update = conn.prepare(
269        r#"
270        UPDATE graph_edges
271        SET edge_key = ?4
272        WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3
273        "#,
274    )?;
275    for (from_id, to_id, kind) in rows {
276        update.execute((
277            &from_id,
278            &to_id,
279            &kind,
280            stable_graph_edge_id(&from_id, &to_id, &kind),
281        ))?;
282    }
283    Ok(())
284}
285
286fn migrate_sqlite_graph_schema(conn: &Connection, old_version: i64) -> Result<()> {
287    if old_version < 2 {
288        add_column_if_missing(conn, "graph_nodes", "row_hash", "TEXT")?;
289        add_column_if_missing(conn, "graph_nodes", "source_watermark", "TEXT")?;
290        add_column_if_missing(conn, "graph_edges", "row_hash", "TEXT")?;
291        add_column_if_missing(conn, "graph_edges", "source_watermark", "TEXT")?;
292    }
293    if old_version < 3 {
294        rebuild_graph_node_properties(conn)?;
295    }
296    if old_version < 4 {
297        ensure_sqlite_graph_operator_stats_schema(conn)?;
298    }
299    if old_version < 5 {
300        add_column_if_missing(conn, "graph_edges", "edge_key", "TEXT")?;
301        backfill_graph_edge_keys(conn)?;
302        ensure_sqlite_graph_edge_properties_schema(conn)?;
303        rebuild_graph_edge_properties(conn)?;
304    }
305    Ok(())
306}
307
308fn ensure_sqlite_graph_operator_stats_schema(conn: &Connection) -> Result<()> {
309    conn.execute_batch(
310        r#"
311        CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
312            ON graph_nodes(kind, label, id);
313        CREATE TABLE IF NOT EXISTS graph_operator_stats (
314            scope TEXT PRIMARY KEY,
315            nodes INTEGER NOT NULL,
316            edges INTEGER NOT NULL,
317            tombstone_nodes INTEGER NOT NULL,
318            tombstone_edges INTEGER NOT NULL,
319            file_size_bytes INTEGER,
320            freelist_bytes INTEGER,
321            observed_at_unix INTEGER NOT NULL
322        );
323        "#,
324    )?;
325    Ok(())
326}
327
328fn ensure_sqlite_graph_edge_properties_schema(conn: &Connection) -> Result<()> {
329    conn.execute_batch(
330        r#"
331        CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_edge_key
332            ON graph_edges(edge_key);
333        CREATE TABLE IF NOT EXISTS graph_edge_properties (
334            edge_key TEXT NOT NULL,
335            key TEXT NOT NULL,
336            value TEXT NOT NULL,
337            PRIMARY KEY (edge_key, key),
338            FOREIGN KEY (edge_key) REFERENCES graph_edges(edge_key) ON DELETE CASCADE
339        );
340        CREATE INDEX IF NOT EXISTS idx_graph_edge_properties_key_value_edge
341            ON graph_edge_properties(key, value, edge_key);
342        "#,
343    )?;
344    Ok(())
345}
346
347fn replace_node_properties(
348    conn: &Connection,
349    node_id: &str,
350    properties: &BTreeMap<String, String>,
351) -> Result<()> {
352    conn.execute(
353        "DELETE FROM graph_node_properties WHERE node_id = ?1",
354        [node_id],
355    )?;
356    let mut insert = conn.prepare(
357        r#"
358        INSERT INTO graph_node_properties (node_id, key, value)
359        VALUES (?1, ?2, ?3)
360        ON CONFLICT(node_id, key) DO UPDATE SET
361            value = excluded.value
362        "#,
363    )?;
364    for (key, value) in properties {
365        insert.execute((node_id, key, value))?;
366    }
367    Ok(())
368}
369
370fn replace_edge_properties(
371    conn: &Connection,
372    edge_key: &str,
373    properties: &BTreeMap<String, String>,
374) -> Result<()> {
375    conn.execute(
376        "DELETE FROM graph_edge_properties WHERE edge_key = ?1",
377        [edge_key],
378    )?;
379    let mut insert = conn.prepare(
380        r#"
381        INSERT INTO graph_edge_properties (edge_key, key, value)
382        VALUES (?1, ?2, ?3)
383        ON CONFLICT(edge_key, key) DO UPDATE SET
384            value = excluded.value
385        "#,
386    )?;
387    for (key, value) in properties {
388        insert.execute((edge_key, key, value))?;
389    }
390    Ok(())
391}
392
393fn rebuild_graph_node_properties(conn: &Connection) -> Result<()> {
394    if !sqlite_column_exists(conn, "graph_nodes", "properties_json")? {
395        return Ok(());
396    }
397    conn.execute_batch(
398        r#"
399        DELETE FROM graph_node_properties;
400        INSERT INTO graph_node_properties (node_id, key, value)
401        SELECT graph_nodes.id, json_each.key, CAST(json_each.value AS TEXT)
402        FROM graph_nodes, json_each(graph_nodes.properties_json)
403        WHERE json_each.key IS NOT NULL
404          AND json_each.value IS NOT NULL
405        "#,
406    )?;
407    Ok(())
408}
409
410fn rebuild_graph_edge_properties(conn: &Connection) -> Result<()> {
411    if !sqlite_column_exists(conn, "graph_edges", "properties_json")?
412        || !sqlite_column_exists(conn, "graph_edges", "edge_key")?
413    {
414        return Ok(());
415    }
416    conn.execute_batch(
417        r#"
418        DELETE FROM graph_edge_properties;
419        INSERT INTO graph_edge_properties (edge_key, key, value)
420        SELECT graph_edges.edge_key, json_each.key, CAST(json_each.value AS TEXT)
421        FROM graph_edges, json_each(graph_edges.properties_json)
422        WHERE graph_edges.edge_key IS NOT NULL
423          AND graph_edges.edge_key <> ''
424          AND json_each.key IS NOT NULL
425          AND json_each.value IS NOT NULL
426        "#,
427    )?;
428    Ok(())
429}
430
431pub fn open_graph_read_only_connection(db_path: &Path) -> Result<SqliteReadOnlyConnection> {
432    let conn = Connection::open_with_flags(
433        db_path,
434        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
435    )
436    .with_context(|| format!("opening graph.db read-only: {}", db_path.display()))?;
437    conn.busy_timeout(Duration::from_secs(5))?;
438    Ok(SqliteReadOnlyConnection {
439        conn,
440        _snapshot_copy: None,
441        recovery: None,
442    })
443}
444
445pub fn open_graph_read_only_connection_resilient(
446    db_path: &Path,
447) -> Result<SqliteReadOnlyConnection> {
448    match open_graph_read_only_connection(db_path).and_then(|connection| {
449        connection
450            .conn
451            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
452        Ok(connection)
453    }) {
454        Ok(connection) => Ok(connection),
455        Err(err) => match read_only_snapshot_recovery(db_path, &err) {
456            Some(recovery) => open_graph_read_only_snapshot(db_path, recovery),
457            None => Err(err),
458        },
459    }
460}
461
462fn open_graph_read_only_snapshot(
463    db_path: &Path,
464    recovery: ReadOnlyRecovery,
465) -> Result<SqliteReadOnlyConnection> {
466    let (snapshot_path, cleanup_paths) = copy_read_only_snapshot(db_path, "graph")?;
467    let conn = Connection::open_with_flags(
468        &snapshot_path,
469        OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470    )
471    .with_context(|| format!("opening graph.db snapshot {}", snapshot_path.display()))?;
472    conn.busy_timeout(Duration::from_secs(5))?;
473    Ok(SqliteReadOnlyConnection {
474        conn,
475        _snapshot_copy: Some(SnapshotCopyGuard {
476            paths: cleanup_paths,
477        }),
478        recovery: Some(recovery),
479    })
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
483pub struct SqliteProjectionRefreshPhase {
484    pub name: String,
485    pub duration_micros: u128,
486    pub detail: String,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490pub struct SqliteProjectionRefresh {
491    pub scope: String,
492    pub projection_version: String,
493    pub source_watermark: Option<String>,
494    pub tombstoned_nodes: Vec<String>,
495    pub tombstoned_edges: Vec<String>,
496    pub upserted_nodes: usize,
497    pub upserted_edges: usize,
498    pub unchanged_nodes: usize,
499    pub unchanged_edges: usize,
500    pub upserted_properties: usize,
501    pub unchanged_properties: usize,
502    pub deleted_properties: usize,
503    pub deleted_nodes: usize,
504    pub deleted_edges: usize,
505    pub pruned_tombstones: usize,
506    pub file_size_bytes_before: Option<u64>,
507    pub file_size_bytes_after: Option<u64>,
508    pub phase_timings: Vec<SqliteProjectionRefreshPhase>,
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
512pub struct SqliteProjectionVersion {
513    pub projection_version: String,
514    pub content_hash: Option<String>,
515    pub source_watermark: Option<String>,
516}
517
518fn sqlite_refresh_phase_timing(
519    name: &str,
520    started: Instant,
521    detail: &str,
522) -> SqliteProjectionRefreshPhase {
523    SqliteProjectionRefreshPhase {
524        name: name.to_string(),
525        duration_micros: started.elapsed().as_micros(),
526        detail: detail.to_string(),
527    }
528}
529
530fn sqlite_graph_staging_placeholders(column_count: usize, row_count: usize) -> String {
531    let row = format!(
532        "({})",
533        (0..column_count)
534            .map(|_| "?")
535            .collect::<Vec<_>>()
536            .join(", ")
537    );
538    (0..row_count)
539        .map(|_| row.as_str())
540        .collect::<Vec<_>>()
541        .join(", ")
542}
543
544fn sqlite_stage_all_ids(
545    tx: &rusqlite::Transaction<'_>,
546    nodes: &[GraphNode],
547    edges: &[GraphEdge],
548) -> Result<()> {
549    for chunk in nodes.iter().map(|n| &n.id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
550        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
551        let sql = format!("INSERT OR IGNORE INTO next_graph_all_node_ids (id) VALUES {}", placeholders.join(", "));
552        let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
553        tx.execute(&sql, params_from_iter(values.iter()))?;
554    }
555    for chunk in edges.iter().map(graph_edge_id).collect::<Vec<_>>().chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
556        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
557        let sql = format!("INSERT OR IGNORE INTO next_graph_all_edge_keys (edge_key) VALUES {}", placeholders.join(", "));
558        let values: Vec<Value> = chunk.iter().map(|ek| Value::Text(ek.to_string())).collect();
559        tx.execute(&sql, params_from_iter(values.iter()))?;
560    }
561    Ok(())
562}
563
564fn sqlite_stage_projection_nodes(
565    tx: &rusqlite::Transaction<'_>,
566    nodes: &[&GraphNode],
567    source_watermark: Option<&str>,
568) -> Result<()> {
569    for chunk in nodes.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
570        let sql = format!(
571            r#"
572            INSERT INTO next_graph_nodes
573                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
574            VALUES {}
575            "#,
576            sqlite_graph_staging_placeholders(8, chunk.len())
577        );
578        let mut values = Vec::with_capacity(chunk.len() * 8);
579        for node in chunk {
580            values.push(Value::Text(node.id.clone()));
581            values.push(Value::Text(node.kind.clone()));
582            values.push(Value::Text(node.label.clone()));
583            values.push(Value::Text(to_json(&node.properties)?));
584            values.push(Value::Text(to_json(&node.provenance)?));
585            values.push(
586                optional_to_json(&node.freshness)?
587                    .map(Value::Text)
588                    .unwrap_or(Value::Null),
589            );
590            values.push(Value::Text(row_hash(node)?));
591            values.push(
592                source_watermark
593                    .map(|watermark| Value::Text(watermark.to_string()))
594                    .unwrap_or(Value::Null),
595            );
596        }
597        tx.execute(&sql, params_from_iter(values))?;
598    }
599    Ok(())
600}
601
602fn sqlite_stage_projection_edges(
603    tx: &rusqlite::Transaction<'_>,
604    edges: &[&GraphEdge],
605    source_watermark: Option<&str>,
606) -> Result<()> {
607    for chunk in edges.chunks(SQLITE_GRAPH_STAGING_CHUNK_ROWS) {
608        let sql = format!(
609            r#"
610            INSERT INTO next_graph_edges
611                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
612            VALUES {}
613            "#,
614            sqlite_graph_staging_placeholders(9, chunk.len())
615        );
616        let mut values = Vec::with_capacity(chunk.len() * 9);
617        for edge in chunk {
618            values.push(Value::Text(graph_edge_id(edge)));
619            values.push(Value::Text(edge.from_id.clone()));
620            values.push(Value::Text(edge.to_id.clone()));
621            values.push(Value::Text(edge.kind.clone()));
622            values.push(Value::Text(to_json(&edge.properties)?));
623            values.push(Value::Text(to_json(&edge.provenance)?));
624            values.push(
625                optional_to_json(&edge.freshness)?
626                    .map(Value::Text)
627                    .unwrap_or(Value::Null),
628            );
629            values.push(Value::Text(row_hash(edge)?));
630            values.push(
631                source_watermark
632                    .map(|watermark| Value::Text(watermark.to_string()))
633                    .unwrap_or(Value::Null),
634            );
635        }
636        tx.execute(&sql, params_from_iter(values))?;
637    }
638    Ok(())
639}
640
641impl SqliteGraphStore {
642    fn assert_not_in_temp_table_section(&self) {
643        if self.temp_table_active.get() {
644            panic!("SqliteGraphStore: re-entrant temp-table call detected — only one temp-table-using method may be active at a time per connection");
645        }
646    }
647
648    pub fn open(db_path: &Path) -> Result<Self> {
649        if let Some(parent) = db_path.parent() {
650            std::fs::create_dir_all(parent)
651                .with_context(|| format!("creating graph substrate dir: {}", parent.display()))?;
652        }
653        let conn = Connection::open(db_path)
654            .with_context(|| format!("opening graph substrate db: {}", db_path.display()))?;
655        configure_writable_graph_connection(&conn, db_path)?;
656        Self::from_connection(conn)
657    }
658
659    pub fn in_memory() -> Result<Self> {
660        let conn = Connection::open_in_memory()?;
661        conn.busy_timeout(Duration::from_secs(5))?;
662        Self::from_connection(conn)
663    }
664
665    pub fn open_read_only(db_path: &Path) -> Result<Self> {
666        let connection = open_graph_read_only_connection(db_path)?;
667        Self::from_read_only_connection(connection)
668    }
669
670    pub fn open_read_only_resilient(db_path: &Path) -> Result<Self> {
671        let connection = open_graph_read_only_connection_resilient(db_path)?;
672        Self::from_read_only_connection(connection)
673    }
674
675    pub fn read_only_recovery(&self) -> Option<ReadOnlyRecovery> {
676        self.read_only_recovery
677    }
678
679    pub fn has_user_triggers(&self) -> Result<bool> {
680        self.conn
681            .query_row(
682                r#"
683                SELECT EXISTS(
684                    SELECT 1
685                    FROM sqlite_master
686                    WHERE type = 'trigger'
687                      AND name NOT LIKE 'sqlite_%'
688                )
689                "#,
690                [],
691                |row| row.get::<_, bool>(0),
692            )
693            .map_err(Into::into)
694    }
695
696    fn from_connection(conn: Connection) -> Result<Self> {
697        conn.pragma_update(None, "foreign_keys", "ON")?;
698        let user_version: i64 =
699            conn.pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
700        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
701            bail!(
702                "graph.db schema version {} is newer than supported version {}",
703                user_version,
704                SQLITE_GRAPH_SCHEMA_VERSION
705            );
706        }
707        conn.execute_batch(
708            r#"
709            CREATE TABLE IF NOT EXISTS graph_nodes (
710                id TEXT PRIMARY KEY,
711                kind TEXT NOT NULL,
712                label TEXT NOT NULL,
713                properties_json TEXT NOT NULL DEFAULT '{}',
714                provenance_json TEXT NOT NULL DEFAULT '[]',
715                freshness_json TEXT,
716                row_hash TEXT,
717                source_watermark TEXT
718            );
719            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind
720                ON graph_nodes(kind);
721            CREATE INDEX IF NOT EXISTS idx_graph_nodes_kind_label
722                ON graph_nodes(kind, label, id);
723
724            CREATE TABLE IF NOT EXISTS graph_edges (
725                edge_key TEXT NOT NULL UNIQUE,
726                from_id TEXT NOT NULL,
727                to_id TEXT NOT NULL,
728                kind TEXT NOT NULL,
729                properties_json TEXT NOT NULL DEFAULT '{}',
730                provenance_json TEXT NOT NULL DEFAULT '[]',
731                freshness_json TEXT,
732                row_hash TEXT,
733                source_watermark TEXT,
734                PRIMARY KEY (from_id, to_id, kind),
735                FOREIGN KEY (from_id) REFERENCES graph_nodes(id) ON DELETE CASCADE,
736                FOREIGN KEY (to_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
737            );
738            CREATE INDEX IF NOT EXISTS idx_graph_edges_from_kind
739                ON graph_edges(from_id, kind);
740            CREATE INDEX IF NOT EXISTS idx_graph_edges_to_kind
741                ON graph_edges(to_id, kind);
742
743            CREATE TABLE IF NOT EXISTS graph_projection_versions (
744                scope TEXT PRIMARY KEY,
745                projection_version TEXT NOT NULL,
746                content_hash TEXT,
747                source_watermark TEXT,
748                observed_at_unix INTEGER NOT NULL
749            );
750
751            CREATE TABLE IF NOT EXISTS graph_tombstones (
752                row_key TEXT PRIMARY KEY,
753                row_kind TEXT NOT NULL,
754                deleted_at_unix INTEGER NOT NULL
755            );
756
757            CREATE TABLE IF NOT EXISTS graph_node_properties (
758                node_id TEXT NOT NULL,
759                key TEXT NOT NULL,
760                value TEXT NOT NULL,
761                PRIMARY KEY (node_id, key),
762                FOREIGN KEY (node_id) REFERENCES graph_nodes(id) ON DELETE CASCADE
763            );
764            CREATE INDEX IF NOT EXISTS idx_graph_node_properties_key_value_node
765                ON graph_node_properties(key, value, node_id);
766
767            CREATE TABLE IF NOT EXISTS graph_operator_stats (
768                scope TEXT PRIMARY KEY,
769                nodes INTEGER NOT NULL,
770                edges INTEGER NOT NULL,
771                tombstone_nodes INTEGER NOT NULL,
772                tombstone_edges INTEGER NOT NULL,
773                file_size_bytes INTEGER,
774                freelist_bytes INTEGER,
775                observed_at_unix INTEGER NOT NULL
776            );
777            "#,
778        )?;
779        if user_version < SQLITE_GRAPH_SCHEMA_VERSION {
780            migrate_sqlite_graph_schema(&conn, user_version)?;
781            conn.pragma_update(None, "user_version", SQLITE_GRAPH_SCHEMA_VERSION)?;
782        }
783        Ok(Self {
784            conn,
785            _snapshot_copy: None,
786            read_only_recovery: None,
787            temp_table_active: Cell::new(false),
788        })
789    }
790
791    fn from_read_only_connection(connection: SqliteReadOnlyConnection) -> Result<Self> {
792        connection.conn.pragma_update(None, "foreign_keys", "ON")?;
793        let user_version: i64 =
794            connection
795                .conn
796                .pragma_query_value(None, "user_version", |row| row.get::<_, i64>(0))?;
797        if user_version > SQLITE_GRAPH_SCHEMA_VERSION {
798            bail!(
799                "graph.db schema version {} is newer than supported version {}",
800                user_version,
801                SQLITE_GRAPH_SCHEMA_VERSION
802            );
803        }
804        connection
805            .conn
806            .query_row("SELECT COUNT(*) FROM sqlite_master", [], |_row| Ok(()))?;
807        Ok(Self {
808            conn: connection.conn,
809            _snapshot_copy: connection._snapshot_copy,
810            read_only_recovery: connection.recovery,
811            temp_table_active: Cell::new(false),
812        })
813    }
814
815    pub fn replace_projection(&mut self, projection: &GraphProjection) -> Result<()> {
816        self.replace_projection_with_version("root", projection, None, None)
817            .map(|_| ())
818    }
819
820    pub fn replace_projection_with_version(
821        &mut self,
822        scope: impl Into<String>,
823        projection: &GraphProjection,
824        projection_version: Option<&str>,
825        source_watermark: Option<String>,
826    ) -> Result<SqliteProjectionRefresh> {
827        self.assert_not_in_temp_table_section();
828        self.temp_table_active.set(true);
829        let scope = scope.into();
830        let result = self.replace_projection_with_version_fallible(scope, projection, projection_version, source_watermark);
831        self.temp_table_active.set(false);
832        if let Ok(ref refresh) = result {
833            let total_rows = refresh.upserted_nodes + refresh.upserted_edges;
834            let autocheckpoint = if total_rows > 10000 {
835                8192
836            } else if total_rows > 1000 {
837                4096
838            } else {
839                2048
840            };
841            let _ = self.conn.pragma_update(None, "wal_autocheckpoint", autocheckpoint);
842            let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
843        }
844        result
845    }
846
847    fn replace_projection_with_version_fallible(
848        &mut self,
849        scope: String,
850        projection: &GraphProjection,
851        projection_version: Option<&str>,
852        source_watermark: Option<String>,
853    ) -> Result<SqliteProjectionRefresh> {
854        let projection_version = projection_version
855            .map(str::to_string)
856            .or_else(|| projection_version_from_nodes(&projection.nodes))
857            .unwrap_or_else(|| "unversioned".to_string());
858        let projection_hash = projection_hash_from_nodes(&projection.nodes);
859        let observed_at_unix = unix_now();
860        let file_size_bytes_before = sqlite_database_size_bytes(&self.conn).ok();
861        let force_refresh_writes = self.has_user_triggers().unwrap_or(true);
862        let mut phase_timings = Vec::new();
863
864        let tx = self.conn.transaction()?;
865        let started = Instant::now();
866        tx.execute_batch(
867            r#"
868            CREATE TEMP TABLE IF NOT EXISTS next_graph_nodes (
869                id TEXT PRIMARY KEY,
870                kind TEXT NOT NULL,
871                label TEXT NOT NULL,
872                properties_json TEXT NOT NULL,
873                provenance_json TEXT NOT NULL,
874                freshness_json TEXT,
875                row_hash TEXT NOT NULL,
876                source_watermark TEXT
877            );
878            CREATE TEMP TABLE IF NOT EXISTS next_graph_edges (
879                edge_key TEXT PRIMARY KEY,
880                from_id TEXT NOT NULL,
881                to_id TEXT NOT NULL,
882                kind TEXT NOT NULL,
883                properties_json TEXT NOT NULL,
884                provenance_json TEXT NOT NULL,
885                freshness_json TEXT,
886                row_hash TEXT NOT NULL,
887                source_watermark TEXT
888            );
889            CREATE INDEX IF NOT EXISTS temp.idx_next_graph_edges_from_to_kind
890                ON next_graph_edges(from_id, to_id, kind);
891            CREATE TEMP TABLE IF NOT EXISTS next_graph_node_properties (
892                node_id TEXT NOT NULL,
893                key TEXT NOT NULL,
894                value TEXT NOT NULL,
895                PRIMARY KEY (node_id, key)
896            );
897            CREATE TEMP TABLE IF NOT EXISTS next_graph_edge_properties (
898                edge_key TEXT NOT NULL,
899                key TEXT NOT NULL,
900                value TEXT NOT NULL,
901                PRIMARY KEY (edge_key, key)
902            );
903            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_nodes (
904                id TEXT PRIMARY KEY
905            );
906            CREATE TEMP TABLE IF NOT EXISTS next_graph_changed_edges (
907                edge_key TEXT PRIMARY KEY
908            );
909            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_node_ids (
910                id TEXT PRIMARY KEY
911            );
912            CREATE TEMP TABLE IF NOT EXISTS next_graph_all_edge_keys (
913                edge_key TEXT PRIMARY KEY
914            );
915            DELETE FROM next_graph_nodes;
916            DELETE FROM next_graph_edges;
917            DELETE FROM next_graph_node_properties;
918            DELETE FROM next_graph_edge_properties;
919            DELETE FROM next_graph_changed_nodes;
920            DELETE FROM next_graph_changed_edges;
921            DELETE FROM next_graph_all_node_ids;
922            DELETE FROM next_graph_all_edge_keys;
923            "#,
924        )?;
925        phase_timings.push(sqlite_refresh_phase_timing(
926            "sqlite_temp_table_prepare",
927            started,
928            "create and clear refresh staging tables before row loading",
929        ));
930        let (changed_nodes, changed_edges, skipped_nodes, skipped_edges) = if force_refresh_writes {
931            (projection.nodes.iter().collect(), projection.edges.iter().collect(), 0usize, 0usize)
932        } else {
933            let existing_node_hashes: BTreeMap<String, String> = {
934                let mut stmt = tx.prepare("SELECT id, row_hash FROM graph_nodes WHERE row_hash IS NOT NULL")?;
935                let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
936                collect_rows(rows)?.into_iter().collect()
937            };
938            let existing_edge_hashes: BTreeMap<String, String> = {
939                let mut stmt = tx.prepare("SELECT edge_key, row_hash FROM graph_edges WHERE row_hash IS NOT NULL")?;
940                let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
941                collect_rows(rows)?.into_iter().collect()
942            };
943            let cn: Vec<&GraphNode> = projection.nodes.iter().filter(|n| {
944                let hash = row_hash(*n).ok();
945                hash.as_ref().is_none_or(|h| existing_node_hashes.get(&n.id) != Some(h))
946            }).collect();
947            let ce: Vec<&GraphEdge> = projection.edges.iter().filter(|e| {
948                let hash = row_hash(*e).ok();
949                let ek = graph_edge_id(e);
950                hash.as_ref().is_none_or(|h| existing_edge_hashes.get(&ek) != Some(h))
951            }).collect();
952            let sn = projection.nodes.len() - cn.len();
953            let se = projection.edges.len() - ce.len();
954            (cn, ce, sn, se)
955        };
956        {
957            let started = Instant::now();
958            sqlite_stage_all_ids(&tx, &projection.nodes, &projection.edges)?;
959            sqlite_stage_projection_nodes(&tx, &changed_nodes, source_watermark.as_deref())?;
960            sqlite_stage_projection_edges(&tx, &changed_edges, source_watermark.as_deref())?;
961            phase_timings.push(sqlite_refresh_phase_timing(
962                "sqlite_node_staging",
963                started,
964                &format!(
965                    "pre-filtered staging: {} nodes ({} unchanged skipped), {} edges ({} unchanged skipped) into temp tables using multi-row chunks up to {} rows",
966                    changed_nodes.len(),
967                    skipped_nodes,
968                    changed_edges.len(),
969                    skipped_edges,
970                    SQLITE_GRAPH_STAGING_CHUNK_ROWS
971                ),
972            ));
973        }
974        {
975            let started = Instant::now();
976            let changed_nodes_sql = if force_refresh_writes {
977                r#"
978                INSERT INTO next_graph_changed_nodes (id)
979                SELECT id
980                FROM next_graph_nodes
981                "#
982            } else {
983                r#"
984                INSERT INTO next_graph_changed_nodes (id)
985                SELECT n.id
986                FROM next_graph_nodes n
987                LEFT JOIN graph_nodes g ON g.id = n.id
988                WHERE g.id IS NULL OR g.row_hash IS NOT n.row_hash
989                "#
990            };
991            tx.execute(changed_nodes_sql, [])?;
992            tx.execute_batch(
993                r#"
994                INSERT INTO next_graph_node_properties (node_id, key, value)
995                SELECT n.id, json_each.key, CAST(json_each.value AS TEXT)
996                FROM next_graph_nodes n
997                JOIN next_graph_changed_nodes c ON c.id = n.id,
998                     json_each(n.properties_json)
999                WHERE json_each.key IS NOT NULL
1000                  AND json_each.value IS NOT NULL
1001                "#,
1002            )?;
1003            phase_timings.push(sqlite_refresh_phase_timing(
1004                "sqlite_property_row_staging",
1005                started,
1006                "derive materialized node property rows only for new/changed node rows; unchanged row-hash owners reuse existing property rows",
1007            ));
1008        }
1009        {
1010            let started = Instant::now();
1011            let changed_edges_sql = if force_refresh_writes {
1012                r#"
1013                INSERT INTO next_graph_changed_edges (edge_key)
1014                SELECT edge_key
1015                FROM next_graph_edges
1016                "#
1017            } else {
1018                r#"
1019                INSERT INTO next_graph_changed_edges (edge_key)
1020                SELECT n.edge_key
1021                FROM next_graph_edges n
1022                LEFT JOIN graph_edges g ON g.edge_key = n.edge_key
1023                WHERE g.edge_key IS NULL OR g.row_hash IS NOT n.row_hash
1024                "#
1025            };
1026            tx.execute(changed_edges_sql, [])?;
1027            tx.execute_batch(
1028                r#"
1029                INSERT INTO next_graph_edge_properties (edge_key, key, value)
1030                SELECT e.edge_key, json_each.key, CAST(json_each.value AS TEXT)
1031                FROM next_graph_edges e
1032                JOIN next_graph_changed_edges c ON c.edge_key = e.edge_key,
1033                     json_each(e.properties_json)
1034                WHERE json_each.key IS NOT NULL
1035                  AND json_each.value IS NOT NULL
1036                "#,
1037            )?;
1038            phase_timings.push(sqlite_refresh_phase_timing(
1039                "sqlite_edge_property_row_staging",
1040                started,
1041                "derive materialized edge property rows only for new/changed edge rows; unchanged row-hash owners reuse existing property rows",
1042            ));
1043        }
1044
1045        let delta_started = Instant::now();
1046        let tombstoned_nodes = {
1047            let sql = if force_refresh_writes {
1048                r#"
1049                SELECT g.id
1050                FROM graph_nodes g
1051                LEFT JOIN next_graph_nodes n ON n.id = g.id
1052                WHERE n.id IS NULL
1053                ORDER BY g.id
1054                "#
1055            } else {
1056                r#"
1057                SELECT g.id
1058                FROM graph_nodes g
1059                LEFT JOIN next_graph_all_node_ids n ON n.id = g.id
1060                WHERE n.id IS NULL
1061                ORDER BY g.id
1062                "#
1063            };
1064            let mut stmt = tx.prepare(sql)?;
1065            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1066        };
1067        let tombstoned_edges = {
1068            let sql = if force_refresh_writes {
1069                r#"
1070                SELECT g.edge_key
1071                FROM graph_edges g
1072                LEFT JOIN next_graph_edges n
1073                    ON n.edge_key = g.edge_key
1074                WHERE n.edge_key IS NULL
1075                ORDER BY g.edge_key
1076                "#
1077            } else {
1078                r#"
1079                SELECT g.edge_key
1080                FROM graph_edges g
1081                LEFT JOIN next_graph_all_edge_keys n
1082                    ON n.edge_key = g.edge_key
1083                WHERE n.edge_key IS NULL
1084                ORDER BY g.edge_key
1085                "#
1086            };
1087            let mut stmt = tx.prepare(sql)?;
1088            collect_rows(stmt.query_map([], |row| row.get::<_, String>(0))?)?
1089        };
1090        let unchanged_nodes: usize = if force_refresh_writes {
1091            tx.query_row(
1092                r#"
1093                SELECT COUNT(*)
1094                FROM next_graph_nodes n
1095                JOIN graph_nodes g ON g.id = n.id
1096                WHERE g.row_hash = n.row_hash
1097                "#,
1098                [],
1099                |row| row.get(0),
1100            )?
1101        } else {
1102            skipped_nodes
1103        };
1104        let unchanged_edges: usize = if force_refresh_writes {
1105            tx.query_row(
1106                r#"
1107                SELECT COUNT(*)
1108                FROM next_graph_edges n
1109                JOIN graph_edges g
1110                    ON g.edge_key = n.edge_key
1111                WHERE g.row_hash = n.row_hash
1112                "#,
1113                [],
1114                |row| row.get(0),
1115            )?
1116        } else {
1117            skipped_edges
1118        };
1119        let reused_owner_node_properties: usize = if force_refresh_writes {
1120            tx.query_row(
1121                r#"
1122                SELECT COUNT(*)
1123                FROM graph_node_properties g
1124                JOIN next_graph_nodes n ON n.id = g.node_id
1125                LEFT JOIN next_graph_changed_nodes c ON c.id = n.id
1126                WHERE c.id IS NULL
1127                "#,
1128                [],
1129                |row| row.get(0),
1130            )?
1131        } else {
1132            tx.query_row(
1133                r#"
1134                SELECT COUNT(*)
1135                FROM graph_node_properties g
1136                JOIN next_graph_all_node_ids a ON a.id = g.node_id
1137                LEFT JOIN next_graph_changed_nodes c ON c.id = a.id
1138                WHERE c.id IS NULL
1139                "#,
1140                [],
1141                |row| row.get(0),
1142            )?
1143        };
1144        let reused_owner_edge_properties: usize = if force_refresh_writes {
1145            tx.query_row(
1146                r#"
1147                SELECT COUNT(*)
1148                FROM graph_edge_properties g
1149                JOIN next_graph_edges n ON n.edge_key = g.edge_key
1150                LEFT JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1151                WHERE c.edge_key IS NULL
1152                "#,
1153                [],
1154                |row| row.get(0),
1155            )?
1156        } else {
1157            tx.query_row(
1158                r#"
1159                SELECT COUNT(*)
1160                FROM graph_edge_properties g
1161                JOIN next_graph_all_edge_keys a ON a.edge_key = g.edge_key
1162                LEFT JOIN next_graph_changed_edges c ON c.edge_key = a.edge_key
1163                WHERE c.edge_key IS NULL
1164                "#,
1165                [],
1166                |row| row.get(0),
1167            )?
1168        };
1169        let unchanged_changed_node_properties: usize = tx.query_row(
1170            r#"
1171            SELECT COUNT(*)
1172            FROM next_graph_node_properties n
1173            JOIN graph_node_properties g
1174                ON g.node_id = n.node_id AND g.key = n.key
1175            WHERE g.value = n.value
1176            "#,
1177            [],
1178            |row| row.get(0),
1179        )?;
1180        let unchanged_changed_edge_properties: usize = tx.query_row(
1181            r#"
1182            SELECT COUNT(*)
1183            FROM next_graph_edge_properties n
1184            JOIN graph_edge_properties g
1185                ON g.edge_key = n.edge_key AND g.key = n.key
1186            WHERE g.value = n.value
1187            "#,
1188            [],
1189            |row| row.get(0),
1190        )?;
1191        let unchanged_properties = reused_owner_node_properties
1192            + reused_owner_edge_properties
1193            + unchanged_changed_node_properties
1194            + unchanged_changed_edge_properties;
1195
1196        let deleted_edges = if force_refresh_writes {
1197            tx.execute(
1198                r#"
1199                DELETE FROM graph_edges
1200                WHERE NOT EXISTS (
1201                    SELECT 1
1202                    FROM next_graph_edges n
1203                    WHERE n.edge_key = graph_edges.edge_key
1204                )
1205                "#,
1206                [],
1207            )?
1208        } else {
1209            tx.execute(
1210                r#"
1211                DELETE FROM graph_edges
1212                WHERE NOT EXISTS (
1213                    SELECT 1
1214                    FROM next_graph_all_edge_keys n
1215                    WHERE n.edge_key = graph_edges.edge_key
1216                )
1217                "#,
1218                [],
1219            )?
1220        };
1221        let deleted_nodes = if force_refresh_writes {
1222            tx.execute(
1223                r#"
1224                DELETE FROM graph_nodes
1225                WHERE NOT EXISTS (
1226                    SELECT 1
1227                    FROM next_graph_nodes n
1228                    WHERE n.id = graph_nodes.id
1229                )
1230                "#,
1231                [],
1232            )?
1233        } else {
1234            tx.execute(
1235                r#"
1236                DELETE FROM graph_nodes
1237                WHERE NOT EXISTS (
1238                    SELECT 1
1239                    FROM next_graph_all_node_ids n
1240                    WHERE n.id = graph_nodes.id
1241                )
1242                "#,
1243                [],
1244            )?
1245        };
1246
1247        let upsert_nodes_sql = r#"
1248            INSERT INTO graph_nodes
1249                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1250            SELECT
1251                n.id,
1252                n.kind,
1253                n.label,
1254                n.properties_json,
1255                n.provenance_json,
1256                n.freshness_json,
1257                n.row_hash,
1258                n.source_watermark
1259            FROM next_graph_nodes n
1260            JOIN next_graph_changed_nodes c ON c.id = n.id
1261            WHERE true
1262            ON CONFLICT(id) DO UPDATE SET
1263                kind = excluded.kind,
1264                label = excluded.label,
1265                properties_json = excluded.properties_json,
1266                provenance_json = excluded.provenance_json,
1267                freshness_json = excluded.freshness_json,
1268                row_hash = excluded.row_hash,
1269                source_watermark = excluded.source_watermark
1270            WHERE graph_nodes.row_hash IS NOT excluded.row_hash
1271            "#;
1272        tx.execute(upsert_nodes_sql, [])?;
1273        let upsert_edges_sql = r#"
1274            INSERT INTO graph_edges
1275                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1276            SELECT
1277                n.edge_key,
1278                n.from_id,
1279                n.to_id,
1280                n.kind,
1281                n.properties_json,
1282                n.provenance_json,
1283                n.freshness_json,
1284                n.row_hash,
1285                n.source_watermark
1286            FROM next_graph_edges n
1287            JOIN next_graph_changed_edges c ON c.edge_key = n.edge_key
1288            WHERE true
1289            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1290                edge_key = excluded.edge_key,
1291                properties_json = excluded.properties_json,
1292                provenance_json = excluded.provenance_json,
1293                freshness_json = excluded.freshness_json,
1294                row_hash = excluded.row_hash,
1295                source_watermark = excluded.source_watermark
1296            WHERE graph_edges.row_hash IS NOT excluded.row_hash
1297            "#;
1298        tx.execute(upsert_edges_sql, [])?;
1299        let deleted_node_properties = tx.execute(
1300            r#"
1301            DELETE FROM graph_node_properties
1302            WHERE EXISTS (
1303                SELECT 1
1304                FROM next_graph_changed_nodes c
1305                WHERE c.id = graph_node_properties.node_id
1306            )
1307              AND NOT EXISTS (
1308                SELECT 1
1309                FROM next_graph_node_properties n
1310                WHERE n.node_id = graph_node_properties.node_id
1311                  AND n.key = graph_node_properties.key
1312            )
1313            "#,
1314            [],
1315        )?;
1316        let deleted_edge_properties = tx.execute(
1317            r#"
1318            DELETE FROM graph_edge_properties
1319            WHERE EXISTS (
1320                SELECT 1
1321                FROM next_graph_changed_edges c
1322                WHERE c.edge_key = graph_edge_properties.edge_key
1323            )
1324              AND NOT EXISTS (
1325                SELECT 1
1326                FROM next_graph_edge_properties n
1327                WHERE n.edge_key = graph_edge_properties.edge_key
1328                  AND n.key = graph_edge_properties.key
1329            )
1330            "#,
1331            [],
1332        )?;
1333        let deleted_properties = deleted_node_properties + deleted_edge_properties;
1334        let upsert_properties_sql = r#"
1335            INSERT INTO graph_node_properties (node_id, key, value)
1336            SELECT n.node_id, n.key, n.value
1337            FROM next_graph_node_properties n
1338            LEFT JOIN graph_node_properties g
1339                ON g.node_id = n.node_id AND g.key = n.key
1340            WHERE g.node_id IS NULL OR g.value IS NOT n.value
1341            ON CONFLICT(node_id, key) DO UPDATE SET
1342                value = excluded.value
1343            WHERE graph_node_properties.value IS NOT excluded.value
1344            "#;
1345        let upserted_node_properties = tx.execute(upsert_properties_sql, [])?;
1346        let upsert_edge_properties_sql = r#"
1347            INSERT INTO graph_edge_properties (edge_key, key, value)
1348            SELECT n.edge_key, n.key, n.value
1349            FROM next_graph_edge_properties n
1350            LEFT JOIN graph_edge_properties g
1351                ON g.edge_key = n.edge_key AND g.key = n.key
1352            WHERE g.edge_key IS NULL OR g.value IS NOT n.value
1353            ON CONFLICT(edge_key, key) DO UPDATE SET
1354                value = excluded.value
1355            WHERE graph_edge_properties.value IS NOT excluded.value
1356            "#;
1357        let upserted_edge_properties = tx.execute(upsert_edge_properties_sql, [])?;
1358        let upserted_properties = upserted_node_properties + upserted_edge_properties;
1359        tx.execute(
1360            r#"
1361            INSERT INTO graph_projection_versions
1362                (scope, projection_version, content_hash, source_watermark, observed_at_unix)
1363            VALUES (?1, ?2, ?3, ?4, ?5)
1364            ON CONFLICT(scope) DO UPDATE SET
1365                projection_version = excluded.projection_version,
1366                content_hash = excluded.content_hash,
1367                source_watermark = excluded.source_watermark,
1368                observed_at_unix = excluded.observed_at_unix
1369            "#,
1370            (
1371                &scope,
1372                &projection_version,
1373                &projection_hash,
1374                &source_watermark,
1375                observed_at_unix,
1376            ),
1377        )?;
1378        let pruned_node_tombstones = tx.execute(
1379            r#"
1380            DELETE FROM graph_tombstones
1381            WHERE row_kind = 'node'
1382              AND EXISTS (
1383                SELECT 1
1384                FROM next_graph_nodes n
1385                WHERE n.id = substr(graph_tombstones.row_key, 6)
1386              )
1387            "#,
1388            [],
1389        )?;
1390        let pruned_edge_tombstones = tx.execute(
1391            r#"
1392            DELETE FROM graph_tombstones
1393            WHERE row_kind = 'edge'
1394              AND EXISTS (
1395                SELECT 1
1396                FROM next_graph_edges n
1397                WHERE n.edge_key = substr(graph_tombstones.row_key, 6)
1398              )
1399            "#,
1400            [],
1401        )?;
1402        {
1403            let mut insert_node_tombstone = tx.prepare(
1404                r#"
1405                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1406                VALUES (?1, 'node', ?2)
1407                ON CONFLICT(row_key) DO UPDATE SET
1408                    row_kind = excluded.row_kind,
1409                    deleted_at_unix = excluded.deleted_at_unix
1410                "#,
1411            )?;
1412            for id in &tombstoned_nodes {
1413                insert_node_tombstone.execute((format!("node:{id}"), observed_at_unix))?;
1414            }
1415        }
1416        {
1417            let mut insert_edge_tombstone = tx.prepare(
1418                r#"
1419                INSERT INTO graph_tombstones (row_key, row_kind, deleted_at_unix)
1420                VALUES (?1, 'edge', ?2)
1421                ON CONFLICT(row_key) DO UPDATE SET
1422                    row_kind = excluded.row_kind,
1423                    deleted_at_unix = excluded.deleted_at_unix
1424                "#,
1425            )?;
1426            for key in &tombstoned_edges {
1427                insert_edge_tombstone.execute((format!("edge:{key}"), observed_at_unix))?;
1428            }
1429        }
1430        let tombstone_node_count: usize = tx.query_row(
1431            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1432            [],
1433            |row| row.get(0),
1434        )?;
1435        let tombstone_edge_count: usize = tx.query_row(
1436            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1437            [],
1438            |row| row.get(0),
1439        )?;
1440        tx.execute(
1441            r#"
1442            INSERT INTO graph_operator_stats
1443                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1444            VALUES (?1, ?2, ?3, ?4, ?5, NULL, NULL, ?6)
1445            ON CONFLICT(scope) DO UPDATE SET
1446                nodes = excluded.nodes,
1447                edges = excluded.edges,
1448                tombstone_nodes = excluded.tombstone_nodes,
1449                tombstone_edges = excluded.tombstone_edges,
1450                file_size_bytes = excluded.file_size_bytes,
1451                freelist_bytes = excluded.freelist_bytes,
1452                observed_at_unix = excluded.observed_at_unix
1453            "#,
1454            (
1455                &scope,
1456                projection.nodes.len() as i64,
1457                projection.edges.len() as i64,
1458                tombstone_node_count as i64,
1459                tombstone_edge_count as i64,
1460                observed_at_unix,
1461            ),
1462        )?;
1463        phase_timings.push(sqlite_refresh_phase_timing(
1464            "sqlite_delta_write",
1465            delta_started,
1466            "apply row/property deltas, projection metadata, tombstones, and cached operator counts",
1467        ));
1468        let commit_started = Instant::now();
1469        tx.commit()?;
1470        phase_timings.push(sqlite_refresh_phase_timing(
1471            "sqlite_commit",
1472            commit_started,
1473            "commit refresh transaction and publish old-or-new graph visibility",
1474        ));
1475        let file_size_bytes_after = sqlite_database_size_bytes(&self.conn).ok();
1476        let freelist_bytes_after = sqlite_database_freelist_bytes(&self.conn).ok();
1477        let stats_started = Instant::now();
1478        self.conn.execute(
1479            r#"
1480            UPDATE graph_operator_stats
1481            SET file_size_bytes = ?2,
1482                freelist_bytes = ?3,
1483                observed_at_unix = ?4
1484            WHERE scope = ?1
1485            "#,
1486            (
1487                &scope,
1488                file_size_bytes_after.map(|value| value as i64),
1489                freelist_bytes_after.map(|value| value as i64),
1490                unix_now(),
1491            ),
1492        )?;
1493        phase_timings.push(sqlite_refresh_phase_timing(
1494            "sqlite_stats_cache_update",
1495            stats_started,
1496            "persist post-commit file and freelist proof for status/doctor",
1497        ));
1498        Ok(SqliteProjectionRefresh {
1499            scope,
1500            projection_version,
1501            source_watermark,
1502            upserted_nodes: projection.nodes.len().saturating_sub(unchanged_nodes),
1503            upserted_edges: projection.edges.len().saturating_sub(unchanged_edges),
1504            unchanged_nodes,
1505            unchanged_edges,
1506            upserted_properties,
1507            unchanged_properties,
1508            deleted_properties,
1509            deleted_nodes,
1510            deleted_edges,
1511            pruned_tombstones: pruned_node_tombstones + pruned_edge_tombstones,
1512            file_size_bytes_before,
1513            file_size_bytes_after,
1514            tombstoned_nodes,
1515            tombstoned_edges,
1516            phase_timings,
1517        })
1518    }
1519
1520    pub fn upsert_projection(&mut self, projection: &GraphProjection) -> Result<()> {
1521        let tx = self.conn.transaction()?;
1522        {
1523            let mut insert_node = tx.prepare(
1524                r#"
1525                INSERT INTO graph_nodes
1526                    (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1527                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1528                ON CONFLICT(id) DO UPDATE SET
1529                    kind = excluded.kind,
1530                    label = excluded.label,
1531                    properties_json = excluded.properties_json,
1532                    provenance_json = excluded.provenance_json,
1533                    freshness_json = excluded.freshness_json,
1534                    row_hash = excluded.row_hash,
1535                    source_watermark = excluded.source_watermark
1536                "#,
1537            )?;
1538            let mut delete_properties =
1539                tx.prepare("DELETE FROM graph_node_properties WHERE node_id = ?1")?;
1540            let mut insert_property = tx.prepare(
1541                r#"
1542                INSERT INTO graph_node_properties (node_id, key, value)
1543                VALUES (?1, ?2, ?3)
1544                "#,
1545            )?;
1546            for node in &projection.nodes {
1547                insert_node.execute((
1548                    &node.id,
1549                    &node.kind,
1550                    &node.label,
1551                    to_json(&node.properties)?,
1552                    to_json(&node.provenance)?,
1553                    optional_to_json(&node.freshness)?,
1554                    row_hash(node)?,
1555                ))?;
1556                delete_properties.execute([&node.id])?;
1557                for (key, value) in &node.properties {
1558                    insert_property.execute((&node.id, key, value))?;
1559                }
1560            }
1561        }
1562        {
1563            let mut insert_edge = tx.prepare(
1564                r#"
1565                INSERT INTO graph_edges
1566                    (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1567                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1568                ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1569                    edge_key = excluded.edge_key,
1570                    properties_json = excluded.properties_json,
1571                    provenance_json = excluded.provenance_json,
1572                    freshness_json = excluded.freshness_json,
1573                    row_hash = excluded.row_hash,
1574                    source_watermark = excluded.source_watermark
1575                "#,
1576            )?;
1577            let mut delete_properties =
1578                tx.prepare("DELETE FROM graph_edge_properties WHERE edge_key = ?1")?;
1579            let mut insert_property = tx.prepare(
1580                r#"
1581                INSERT INTO graph_edge_properties (edge_key, key, value)
1582                VALUES (?1, ?2, ?3)
1583                "#,
1584            )?;
1585            for edge in &projection.edges {
1586                let edge_key = graph_edge_id(edge);
1587                insert_edge.execute((
1588                    &edge_key,
1589                    &edge.from_id,
1590                    &edge.to_id,
1591                    &edge.kind,
1592                    to_json(&edge.properties)?,
1593                    to_json(&edge.provenance)?,
1594                    optional_to_json(&edge.freshness)?,
1595                    row_hash(edge)?,
1596                ))?;
1597                delete_properties.execute([&edge_key])?;
1598                for (key, value) in &edge.properties {
1599                    insert_property.execute((&edge_key, key, value))?;
1600                }
1601            }
1602        }
1603        tx.commit()?;
1604        Ok(())
1605    }
1606
1607    pub fn projection_version(&self, scope: &str) -> Result<Option<SqliteProjectionVersion>> {
1608        self.conn
1609            .query_row(
1610                r#"
1611                SELECT projection_version, content_hash, source_watermark
1612                FROM graph_projection_versions
1613                WHERE scope = ?1
1614                "#,
1615                [scope],
1616                |row| {
1617                    Ok(SqliteProjectionVersion {
1618                        projection_version: row.get(0)?,
1619                        content_hash: row.get(1)?,
1620                        source_watermark: row.get(2)?,
1621                    })
1622                },
1623            )
1624            .optional()
1625            .map_err(Into::into)
1626    }
1627
1628    pub fn update_projection_source_watermark(
1629        &mut self,
1630        scope: &str,
1631        source_watermark: Option<String>,
1632    ) -> Result<()> {
1633        self.conn.execute(
1634            r#"
1635            UPDATE graph_projection_versions
1636            SET source_watermark = ?2
1637            WHERE scope = ?1
1638            "#,
1639            (scope, source_watermark),
1640        )?;
1641        Ok(())
1642    }
1643
1644    pub fn compact_storage(&mut self, scope: &str, prune_tombstones: bool) -> Result<usize> {
1645        let pruned_tombstones = if prune_tombstones {
1646            self.conn.execute("DELETE FROM graph_tombstones", [])?
1647        } else {
1648            0
1649        };
1650        self.conn.execute_batch(
1651            r#"
1652            PRAGMA wal_checkpoint(TRUNCATE);
1653            VACUUM;
1654            "#,
1655        )?;
1656        let nodes = self
1657            .conn
1658            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
1659                row.get::<_, i64>(0)
1660            })?;
1661        let edges = self
1662            .conn
1663            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
1664                row.get::<_, i64>(0)
1665            })?;
1666        let tombstone_nodes = self.conn.query_row(
1667            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'node'",
1668            [],
1669            |row| row.get::<_, i64>(0),
1670        )?;
1671        let tombstone_edges = self.conn.query_row(
1672            "SELECT COUNT(*) FROM graph_tombstones WHERE row_kind = 'edge'",
1673            [],
1674            |row| row.get::<_, i64>(0),
1675        )?;
1676        let file_size_bytes = sqlite_database_size_bytes(&self.conn)
1677            .ok()
1678            .map(|value| value as i64);
1679        let freelist_bytes = sqlite_database_freelist_bytes(&self.conn)
1680            .ok()
1681            .map(|value| value as i64);
1682        self.conn.execute(
1683            r#"
1684            INSERT INTO graph_operator_stats
1685                (scope, nodes, edges, tombstone_nodes, tombstone_edges, file_size_bytes, freelist_bytes, observed_at_unix)
1686            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s', 'now'))
1687            ON CONFLICT(scope) DO UPDATE SET
1688                nodes = excluded.nodes,
1689                edges = excluded.edges,
1690                tombstone_nodes = excluded.tombstone_nodes,
1691                tombstone_edges = excluded.tombstone_edges,
1692                file_size_bytes = excluded.file_size_bytes,
1693                freelist_bytes = excluded.freelist_bytes,
1694                observed_at_unix = excluded.observed_at_unix
1695            "#,
1696            (
1697                scope,
1698                nodes,
1699                edges,
1700                tombstone_nodes,
1701                tombstone_edges,
1702                file_size_bytes,
1703                freelist_bytes,
1704            ),
1705        )?;
1706        Ok(pruned_tombstones)
1707    }
1708
1709    fn edges_between_nodes_inline(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
1710        let placeholders: Vec<&str> = node_ids.iter().map(|_| "?").collect();
1711        let in_clause = placeholders.join(", ");
1712        let sql = format!(
1713            "SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json \
1714             FROM graph_edges e \
1715             WHERE e.from_id IN ({in_clause}) \
1716               AND e.to_id IN ({in_clause}) \
1717             ORDER BY e.from_id, e.kind, e.to_id"
1718        );
1719        let values: Vec<Value> = node_ids
1720            .iter()
1721            .chain(node_ids.iter())
1722            .map(|id| Value::Text(id.clone()))
1723            .collect();
1724        let mut stmt = self.conn.prepare(&sql)?;
1725        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
1726    }
1727}
1728
1729fn sqlite_query_plan(conn: &Connection, sql: &str, values: &[Value]) -> Result<Vec<String>> {
1730    let mut stmt = conn.prepare(&format!("EXPLAIN QUERY PLAN {sql}"))?;
1731    collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
1732        row.get::<_, String>(3)
1733    })?)
1734}
1735
1736fn sqlite_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<String> {
1737    let mut diagnostics = vec![format!(
1738        "sqlite query pushdown active; plan: {}",
1739        plan.join(" | ")
1740    )];
1741    for expected_index in expected_indexes {
1742        if plan.iter().any(|row| row.contains(expected_index)) {
1743            diagnostics.push(format!("sqlite query plan uses {expected_index}"));
1744        } else {
1745            diagnostics.push(format!(
1746                "sqlite query plan did not report {expected_index}; inspect before changing graph property indexes"
1747            ));
1748        }
1749    }
1750    diagnostics
1751}
1752
1753#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1754pub struct TerseDiagnostic {
1755    pub code: &'static str,
1756    #[serde(skip_serializing_if = "Option::is_none")]
1757    pub index: Option<String>,
1758}
1759
1760#[allow(dead_code)]
1761fn terse_query_plan_diagnostics(plan: &[String], expected_indexes: &[&str]) -> Vec<TerseDiagnostic> {
1762    let mut diagnostics = vec![TerseDiagnostic {
1763        code: "plan_active",
1764        index: None,
1765    }];
1766    for expected_index in expected_indexes {
1767        if plan.iter().any(|row| row.contains(expected_index)) {
1768            diagnostics.push(TerseDiagnostic {
1769                code: "idx_ok",
1770                index: Some(expected_index.to_string()),
1771            });
1772        } else {
1773            diagnostics.push(TerseDiagnostic {
1774                code: "idx_missing",
1775                index: Some(expected_index.to_string()),
1776            });
1777        }
1778    }
1779    diagnostics
1780}
1781
1782fn push_sqlite_property_filter_exists(
1783    sql: &mut String,
1784    values: &mut Vec<Value>,
1785    node_alias: &str,
1786    filters: &[GraphPropertyFilter],
1787) {
1788    for (index, filter) in filters.iter().enumerate() {
1789        sql.push_str(&format!(
1790            r#"
1791            AND EXISTS (
1792                SELECT 1
1793                FROM graph_node_properties p{index} INDEXED BY idx_graph_node_properties_key_value_node
1794                WHERE p{index}.node_id = {node_alias}.id
1795                  AND p{index}.key = ?
1796                  AND p{index}.value = ?
1797            )
1798            "#
1799        ));
1800        values.push(Value::Text(filter.key.clone()));
1801        values.push(Value::Text(filter.value.clone()));
1802    }
1803}
1804
1805fn push_sqlite_edge_property_filter_exists(
1806    sql: &mut String,
1807    values: &mut Vec<Value>,
1808    edge_alias: &str,
1809    filters: &[GraphPropertyFilter],
1810) {
1811    for (index, filter) in filters.iter().enumerate() {
1812        sql.push_str(&format!(
1813            r#"
1814            AND EXISTS (
1815                SELECT 1
1816                FROM graph_edge_properties ep{index} INDEXED BY idx_graph_edge_properties_key_value_edge
1817                WHERE ep{index}.edge_key = {edge_alias}.edge_key
1818                  AND ep{index}.key = ?
1819                  AND ep{index}.value = ?
1820            )
1821            "#
1822        ));
1823        values.push(Value::Text(filter.key.clone()));
1824        values.push(Value::Text(filter.value.clone()));
1825    }
1826}
1827
1828struct SqliteIncidentEdgeBranch<'a> {
1829    index_name: &'a str,
1830    endpoint_column: &'a str,
1831    node_id: &'a str,
1832    kind: Option<&'a str>,
1833    filters: &'a [GraphPropertyFilter],
1834    cursor: Option<&'a str>,
1835}
1836
1837fn push_sqlite_incident_edge_branch(
1838    sql: &mut String,
1839    values: &mut Vec<Value>,
1840    branch: SqliteIncidentEdgeBranch<'_>,
1841) {
1842    sql.push_str(&format!(
1843        r#"
1844        SELECT
1845            e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
1846        FROM graph_edges e INDEXED BY {index_name}
1847        WHERE e.{endpoint_column} = ?
1848        "#,
1849        index_name = branch.index_name,
1850        endpoint_column = branch.endpoint_column,
1851    ));
1852    values.push(Value::Text(branch.node_id.to_string()));
1853    if let Some(kind) = branch.kind {
1854        sql.push_str(" AND e.kind = ?");
1855        values.push(Value::Text(kind.to_string()));
1856    }
1857    push_sqlite_edge_property_filter_exists(sql, values, "e", branch.filters);
1858    if let Some(cursor) = branch.cursor {
1859        sql.push_str(" AND e.edge_key > ?");
1860        values.push(Value::Text(cursor.to_string()));
1861    }
1862}
1863
1864fn sqlite_incident_edges_union_query(
1865    node_id: &str,
1866    kind: Option<&str>,
1867    filters: &[GraphPropertyFilter],
1868    cursor: Option<&str>,
1869    limit: Option<usize>,
1870) -> (String, Vec<Value>) {
1871    let mut sql = String::from(
1872        r#"
1873        SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
1874        FROM (
1875        "#,
1876    );
1877    let mut values = Vec::new();
1878    push_sqlite_incident_edge_branch(
1879        &mut sql,
1880        &mut values,
1881        SqliteIncidentEdgeBranch {
1882            index_name: "idx_graph_edges_from_kind",
1883            endpoint_column: "from_id",
1884            node_id,
1885            kind,
1886            filters,
1887            cursor,
1888        },
1889    );
1890    sql.push_str(" UNION ");
1891    push_sqlite_incident_edge_branch(
1892        &mut sql,
1893        &mut values,
1894        SqliteIncidentEdgeBranch {
1895            index_name: "idx_graph_edges_to_kind",
1896            endpoint_column: "to_id",
1897            node_id,
1898            kind,
1899            filters,
1900            cursor,
1901        },
1902    );
1903    sql.push_str(
1904        r#"
1905        ) e
1906        ORDER BY e.edge_key
1907        "#,
1908    );
1909    if let Some(limit) = limit {
1910        sql.push_str(" LIMIT ?");
1911        values.push(Value::Integer(limit.saturating_add(1) as i64));
1912    }
1913    (sql, values)
1914}
1915
1916impl GraphStore for SqliteGraphStore {
1917    fn upsert_node(&self, node: &GraphNode) -> Result<()> {
1918        self.conn.execute(
1919            r#"
1920            INSERT INTO graph_nodes
1921                (id, kind, label, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1922            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL)
1923            ON CONFLICT(id) DO UPDATE SET
1924                kind = excluded.kind,
1925                label = excluded.label,
1926                properties_json = excluded.properties_json,
1927                provenance_json = excluded.provenance_json,
1928                freshness_json = excluded.freshness_json,
1929                row_hash = excluded.row_hash,
1930                source_watermark = excluded.source_watermark
1931            "#,
1932            (
1933                &node.id,
1934                &node.kind,
1935                &node.label,
1936                to_json(&node.properties)?,
1937                to_json(&node.provenance)?,
1938                optional_to_json(&node.freshness)?,
1939                row_hash(node)?,
1940            ),
1941        )?;
1942        replace_node_properties(&self.conn, &node.id, &node.properties)?;
1943        Ok(())
1944    }
1945
1946    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
1947        let edge_key = graph_edge_id(edge);
1948        self.conn.execute(
1949            r#"
1950            INSERT INTO graph_edges
1951                (edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json, row_hash, source_watermark)
1952            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, NULL)
1953            ON CONFLICT(from_id, to_id, kind) DO UPDATE SET
1954                edge_key = excluded.edge_key,
1955                properties_json = excluded.properties_json,
1956                provenance_json = excluded.provenance_json,
1957                freshness_json = excluded.freshness_json,
1958                row_hash = excluded.row_hash,
1959                source_watermark = excluded.source_watermark
1960            "#,
1961            (
1962                &edge_key,
1963                &edge.from_id,
1964                &edge.to_id,
1965                &edge.kind,
1966                to_json(&edge.properties)?,
1967                to_json(&edge.provenance)?,
1968                optional_to_json(&edge.freshness)?,
1969                row_hash(edge)?,
1970            ),
1971        )?;
1972        replace_edge_properties(&self.conn, &edge_key, &edge.properties)?;
1973        Ok(())
1974    }
1975
1976    fn delete_node(&self, id: &str) -> Result<usize> {
1977        self.conn
1978            .execute("DELETE FROM graph_nodes WHERE id = ?1", [id])
1979            .map_err(Into::into)
1980    }
1981
1982    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
1983        self.conn
1984            .execute(
1985                "DELETE FROM graph_edges WHERE from_id = ?1 AND to_id = ?2 AND kind = ?3",
1986                (from_id, to_id, kind),
1987            )
1988            .map_err(Into::into)
1989    }
1990
1991    fn node(&self, id: &str) -> Result<Option<GraphNode>> {
1992        self.conn
1993            .query_row(
1994                r#"
1995                SELECT id, kind, label, properties_json, provenance_json, freshness_json
1996                FROM graph_nodes
1997                WHERE id = ?1
1998                "#,
1999                [id],
2000                node_from_row,
2001            )
2002            .optional()
2003            .map_err(Into::into)
2004    }
2005
2006    fn all_nodes(&self) -> Result<Vec<GraphNode>> {
2007        let mut stmt = self.conn.prepare(
2008            r#"
2009            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2010            FROM graph_nodes
2011            ORDER BY id
2012            "#,
2013        )?;
2014        collect_rows(stmt.query_map([], node_from_row)?)
2015    }
2016
2017    fn all_edges(&self) -> Result<Vec<GraphEdge>> {
2018        let mut stmt = self.conn.prepare(
2019            r#"
2020            SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2021            FROM graph_edges
2022            ORDER BY from_id, kind, to_id
2023            "#,
2024        )?;
2025        collect_rows(stmt.query_map([], edge_from_row)?)
2026    }
2027
2028    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
2029        self.conn
2030            .query_row(
2031                r#"
2032                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2033                FROM graph_edges INDEXED BY idx_graph_edges_edge_key
2034                WHERE edge_key = ?1
2035                "#,
2036                [edge_id],
2037                edge_from_row,
2038            )
2039            .optional()
2040            .map_err(Into::into)
2041    }
2042
2043    fn graph_counts(&self) -> Result<(usize, usize)> {
2044        let nodes = self
2045            .conn
2046            .query_row("SELECT COUNT(*) FROM graph_nodes", [], |row| {
2047                row.get::<_, usize>(0)
2048            })?;
2049        let edges = self
2050            .conn
2051            .query_row("SELECT COUNT(*) FROM graph_edges", [], |row| {
2052                row.get::<_, usize>(0)
2053            })?;
2054        Ok((nodes, edges))
2055    }
2056
2057    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
2058        match kind {
2059            Some(kind) => self
2060                .conn
2061                .query_row(
2062                    r#"
2063                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2064                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2065                    WHERE from_id <> to_id AND kind = ?1
2066                    ORDER BY from_id, kind, to_id
2067                    LIMIT 1
2068                    "#,
2069                    [kind],
2070                    edge_from_row,
2071                )
2072                .optional()
2073                .map_err(Into::into),
2074            None => self
2075                .conn
2076                .query_row(
2077                    r#"
2078                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2079                    FROM graph_edges INDEXED BY idx_graph_edges_from_kind
2080                    WHERE from_id <> to_id
2081                    ORDER BY from_id, kind, to_id
2082                    LIMIT 1
2083                    "#,
2084                    [],
2085                    edge_from_row,
2086                )
2087                .optional()
2088                .map_err(Into::into),
2089        }
2090    }
2091
2092    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
2093        self.conn
2094            .query_row(
2095                r#"
2096                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json,
2097                       ep.key, ep.value
2098                FROM graph_edge_properties ep INDEXED BY idx_graph_edge_properties_key_value_edge
2099                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2100                  ON e.edge_key = ep.edge_key
2101                WHERE e.from_id <> e.to_id
2102                ORDER BY ep.key, ep.value, ep.edge_key
2103                LIMIT 1
2104                "#,
2105                [],
2106                |row| {
2107                    Ok((
2108                        edge_from_row(row)?,
2109                        GraphPropertyFilter {
2110                            key: row.get(7)?,
2111                            value: row.get(8)?,
2112                        },
2113                    ))
2114                },
2115            )
2116            .optional()
2117            .map_err(Into::into)
2118    }
2119
2120    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
2121        let mut stmt = self.conn.prepare(
2122            r#"
2123            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2124            FROM graph_nodes
2125            WHERE kind = ?1
2126            ORDER BY id
2127            "#,
2128        )?;
2129        collect_rows(stmt.query_map([kind], node_from_row)?)
2130    }
2131
2132    fn paged_nodes_by_kind(
2133        &self,
2134        kind: &str,
2135        options: GraphQueryOptions,
2136    ) -> Result<GraphPagedSubgraph> {
2137        let mut sql = String::from(
2138            r#"
2139            SELECT id, kind, label, properties_json, provenance_json, freshness_json
2140            FROM graph_nodes
2141            WHERE kind = ?
2142            "#,
2143        );
2144        let mut values = vec![Value::Text(kind.to_string())];
2145        push_sqlite_property_filter_exists(
2146            &mut sql,
2147            &mut values,
2148            "graph_nodes",
2149            &options.property_filters,
2150        );
2151        if let Some(cursor) = &options.cursor {
2152            sql.push_str(" AND id > ?");
2153            values.push(Value::Text(cursor.clone()));
2154        }
2155        sql.push_str(" ORDER BY id");
2156        if let Some(limit) = options.limit {
2157            sql.push_str(" LIMIT ?");
2158            values.push(Value::Integer(limit.saturating_add(1) as i64));
2159        }
2160
2161        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2162        let mut stmt = self.conn.prepare(&sql)?;
2163        let mut nodes =
2164            collect_rows(stmt.query_map(params_from_iter(values.iter()), node_from_row)?)?;
2165        let before_limit = nodes.len();
2166        let mut next_cursor = None;
2167        if let Some(limit) = options.limit
2168            && nodes.len() > limit
2169        {
2170            next_cursor = nodes
2171                .get(limit.saturating_sub(1))
2172                .map(|node| node.id.clone());
2173            nodes.truncate(limit);
2174        }
2175        let expected_indexes = if options.property_filters.is_empty() {
2176            vec!["idx_graph_nodes_kind"]
2177        } else {
2178            vec![
2179                "idx_graph_nodes_kind",
2180                "idx_graph_node_properties_key_value_node",
2181            ]
2182        };
2183        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2184        if !options.property_filters.is_empty() {
2185            diagnostics.push(
2186                "property filters were evaluated by SQLite materialized property rows before paging"
2187                    .to_string(),
2188            );
2189        }
2190        if options.cursor.is_some() {
2191            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2192        }
2193        if next_cursor.is_some() {
2194            diagnostics.push(
2195                "result was truncated; pass page.next_cursor as --cursor for the next page"
2196                    .to_string(),
2197            );
2198        }
2199        Ok(GraphPagedSubgraph {
2200            page: GraphQueryPage {
2201                cursor: options.cursor,
2202                limit: options.limit,
2203                next_cursor,
2204                returned_nodes: nodes.len(),
2205                returned_edges: 0,
2206                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2207                diagnostics,
2208            },
2209            nodes,
2210            edges: Vec::new(),
2211        })
2212    }
2213
2214    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2215        match kind {
2216            Some(kind) => {
2217                let mut stmt = self.conn.prepare(
2218                    r#"
2219                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2220                    FROM graph_edges
2221                    WHERE from_id = ?1 AND kind = ?2
2222                    ORDER BY to_id, kind
2223                    "#,
2224                )?;
2225                collect_rows(stmt.query_map((from_id, kind), edge_from_row)?)
2226            }
2227            None => {
2228                let mut stmt = self.conn.prepare(
2229                    r#"
2230                    SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2231                    FROM graph_edges
2232                    WHERE from_id = ?1
2233                    ORDER BY to_id, kind
2234                    "#,
2235                )?;
2236                collect_rows(stmt.query_map([from_id], edge_from_row)?)
2237            }
2238        }
2239    }
2240
2241    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
2242        let (sql, values) = sqlite_incident_edges_union_query(node_id, kind, &[], None, None);
2243        let mut stmt = self.conn.prepare(&sql)?;
2244        collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)
2245    }
2246
2247    fn paged_edges(
2248        &self,
2249        kind: Option<&str>,
2250        options: GraphQueryOptions,
2251    ) -> Result<GraphPagedSubgraph> {
2252        let primary_property_filter = options.property_filters.first();
2253        let mut values = Vec::new();
2254        let mut sql = if let Some(filter) = primary_property_filter {
2255            values.push(Value::Text(filter.key.clone()));
2256            values.push(Value::Text(filter.value.clone()));
2257            String::from(
2258                r#"
2259                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2260                FROM graph_edge_properties ep0 INDEXED BY idx_graph_edge_properties_key_value_edge
2261                JOIN graph_edges e INDEXED BY idx_graph_edges_edge_key
2262                  ON e.edge_key = ep0.edge_key
2263                WHERE ep0.key = ?
2264                  AND ep0.value = ?
2265                "#,
2266            )
2267        } else {
2268            String::from(
2269                r#"
2270                SELECT edge_key, from_id, to_id, kind, properties_json, provenance_json, freshness_json
2271                FROM graph_edges e
2272                WHERE 1 = 1
2273                "#,
2274            )
2275        };
2276        if let Some(kind) = kind {
2277            sql.push_str(" AND e.kind = ?");
2278            values.push(Value::Text(kind.to_string()));
2279        }
2280        push_sqlite_edge_property_filter_exists(
2281            &mut sql,
2282            &mut values,
2283            "e",
2284            if primary_property_filter.is_some() {
2285                &options.property_filters[1..]
2286            } else {
2287                &options.property_filters
2288            },
2289        );
2290        if let Some(cursor) = &options.cursor {
2291            if primary_property_filter.is_some() {
2292                sql.push_str(" AND ep0.edge_key > ?");
2293            } else {
2294                sql.push_str(" AND e.edge_key > ?");
2295            }
2296            values.push(Value::Text(cursor.clone()));
2297        }
2298        if primary_property_filter.is_some() {
2299            sql.push_str(" ORDER BY ep0.edge_key");
2300        } else {
2301            sql.push_str(" ORDER BY e.edge_key");
2302        }
2303        if let Some(limit) = options.limit {
2304            sql.push_str(" LIMIT ?");
2305            values.push(Value::Integer(limit.saturating_add(1) as i64));
2306        }
2307
2308        let primary_property_row_count = if let Some(filter) = primary_property_filter {
2309            Some(self.conn.query_row(
2310                r#"
2311                SELECT COUNT(*)
2312                FROM graph_edge_properties INDEXED BY idx_graph_edge_properties_key_value_edge
2313                WHERE key = ?1 AND value = ?2
2314                "#,
2315                (&filter.key, &filter.value),
2316                |row| row.get::<_, usize>(0),
2317            )?)
2318        } else {
2319            None
2320        };
2321        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2322        let mut stmt = self.conn.prepare(&sql)?;
2323        let mut edges =
2324            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2325        let before_limit = edges.len();
2326        let mut next_cursor = None;
2327        if let Some(limit) = options.limit
2328            && edges.len() > limit
2329        {
2330            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2331            edges.truncate(limit);
2332        }
2333        let expected_indexes = if options.property_filters.is_empty() {
2334            vec!["idx_graph_edges_edge_key"]
2335        } else {
2336            vec![
2337                "idx_graph_edge_properties_key_value_edge",
2338                "idx_graph_edges_edge_key",
2339            ]
2340        };
2341        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2342        if !options.property_filters.is_empty() {
2343            if let Some(row_count) = primary_property_row_count {
2344                diagnostics.push(format!(
2345                    "edge property primary filter matched {row_count} materialized row(s) before edge-kind/cursor paging"
2346                ));
2347            }
2348            diagnostics.push(
2349                "edge property scan drives from SQLite materialized property rows before joining graph_edges"
2350                    .to_string(),
2351            );
2352        }
2353        if options.cursor.is_some() {
2354            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2355        }
2356        if next_cursor.is_some() {
2357            diagnostics.push(
2358                "result was truncated; pass page.next_cursor as --cursor for the next page"
2359                    .to_string(),
2360            );
2361        }
2362        Ok(GraphPagedSubgraph {
2363            page: GraphQueryPage {
2364                cursor: options.cursor,
2365                limit: options.limit,
2366                next_cursor,
2367                returned_nodes: 0,
2368                returned_edges: edges.len(),
2369                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2370                diagnostics,
2371            },
2372            nodes: Vec::new(),
2373            edges,
2374        })
2375    }
2376
2377    fn paged_incident_edges(
2378        &self,
2379        node_id: &str,
2380        kind: Option<&str>,
2381        options: GraphQueryOptions,
2382    ) -> Result<GraphPagedSubgraph> {
2383        let (sql, values) = sqlite_incident_edges_union_query(
2384            node_id,
2385            kind,
2386            &options.property_filters,
2387            options.cursor.as_deref(),
2388            options.limit,
2389        );
2390        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2391        let mut stmt = self.conn.prepare(&sql)?;
2392        let mut edges =
2393            collect_rows(stmt.query_map(params_from_iter(values.iter()), edge_from_row)?)?;
2394        let before_limit = edges.len();
2395        let mut next_cursor = None;
2396        if let Some(limit) = options.limit
2397            && edges.len() > limit
2398        {
2399            next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
2400            edges.truncate(limit);
2401        }
2402        let expected_indexes = if options.property_filters.is_empty() {
2403            vec!["idx_graph_edges_from_kind", "idx_graph_edges_to_kind"]
2404        } else {
2405            vec![
2406                "idx_graph_edges_from_kind",
2407                "idx_graph_edges_to_kind",
2408                "idx_graph_edge_properties_key_value_edge",
2409            ]
2410        };
2411        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2412        diagnostics.push(
2413            "incident edge scan uses UNION over from_id/to_id index probes instead of an OR predicate"
2414                .to_string(),
2415        );
2416        if !options.property_filters.is_empty() {
2417            diagnostics.push(
2418                "edge property filters were evaluated by SQLite materialized property rows before paging"
2419                    .to_string(),
2420            );
2421        }
2422        if options.cursor.is_some() {
2423            diagnostics.push("cursor is exclusive and pushed into SQLite by edge id".to_string());
2424        }
2425        if next_cursor.is_some() {
2426            diagnostics.push(
2427                "result was truncated; pass page.next_cursor as --cursor for the next page"
2428                    .to_string(),
2429            );
2430        }
2431        Ok(GraphPagedSubgraph {
2432            page: GraphQueryPage {
2433                cursor: options.cursor,
2434                limit: options.limit,
2435                next_cursor,
2436                returned_nodes: 0,
2437                returned_edges: edges.len(),
2438                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2439                diagnostics,
2440            },
2441            nodes: Vec::new(),
2442            edges,
2443        })
2444    }
2445
2446    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
2447        if node_ids.is_empty() {
2448            return Ok(Vec::new());
2449        }
2450        if node_ids.len() <= 20 {
2451            return self.edges_between_nodes_inline(node_ids);
2452        }
2453        self.assert_not_in_temp_table_section();
2454        self.temp_table_active.set(true);
2455        let result = (|| -> Result<Vec<GraphEdge>> {
2456            let tx = self.conn.unchecked_transaction()?;
2457            tx.execute_batch(
2458                r#"
2459                CREATE TEMP TABLE IF NOT EXISTS _edges_between_ids (id TEXT PRIMARY KEY);
2460                DELETE FROM _edges_between_ids;
2461                "#,
2462            )?;
2463            for chunk in node_ids.iter().collect::<Vec<_>>().chunks(450) {
2464                let row_placeholders: Vec<String> =
2465                    chunk.iter().map(|_| "(?)".to_string()).collect();
2466                let placeholders = row_placeholders.join(", ");
2467                let sql = format!(
2468                    "INSERT OR IGNORE INTO _edges_between_ids (id) VALUES {placeholders}"
2469                );
2470                let values: Vec<Value> = chunk.iter().map(|id| Value::Text((*id).clone())).collect();
2471                tx.execute(&sql, params_from_iter(values.iter()))?;
2472            }
2473            let edges = {
2474                let mut stmt = tx.prepare(
2475                    r#"
2476                    SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2477                    FROM graph_edges e
2478                    WHERE EXISTS (SELECT 1 FROM _edges_between_ids f WHERE f.id = e.from_id)
2479                      AND EXISTS (SELECT 1 FROM _edges_between_ids t WHERE t.id = e.to_id)
2480                    ORDER BY e.from_id, e.kind, e.to_id
2481                    "#,
2482                )?;
2483                collect_rows(stmt.query_map([], edge_from_row)?)?
2484            };
2485            tx.finish()?;
2486            Ok(edges)
2487        })();
2488        self.temp_table_active.set(false);
2489        result
2490    }
2491
2492    fn ranked_neighborhood(
2493        &self,
2494        center_id: &str,
2495        options: &RankedNeighborhoodOptions,
2496    ) -> Result<Option<RankedNeighborhoodResult>> {
2497        if self.node(center_id)?.is_none() {
2498            return Ok(None);
2499        }
2500        let center = self.node(center_id)?.unwrap();
2501
2502        let score_expr = match options.scoring {
2503            tsift_core::NeighborhoodScoring::BreadthFirst => {
2504                "MAX(0, 120 - (walk.depth * 18))".to_string()
2505            }
2506            tsift_core::NeighborhoodScoring::EdgeKindWeighted => {
2507                "MAX(0, 120 - (walk.depth * 18)) + CASE walk.edge_kind \
2508                 WHEN 'semantic_relation' THEN 34 \
2509                 WHEN 'mentions_entity' THEN 28 \
2510                 WHEN 'mentions_concept' THEN 28 \
2511                 WHEN 'tagged_entity' THEN 28 \
2512                 WHEN 'tagged_concept' THEN 28 \
2513                 WHEN 'related_concept' THEN 28 \
2514                 WHEN 'mentions' THEN 22 \
2515                 WHEN 'calls' THEN 20 \
2516                 WHEN 'requests_context' THEN 18 \
2517                 WHEN 'scopes_context' THEN 18 \
2518                 WHEN 'scopes_source' THEN 18 \
2519                 WHEN 'explains_result' THEN 18 \
2520                 WHEN 'defines' THEN 12 \
2521                 WHEN 'contains' THEN 12 \
2522                 WHEN 'belongs_to' THEN 12 \
2523                 ELSE 8 END".to_string()
2524            }
2525            tsift_core::NeighborhoodScoring::DegreeWeighted => {
2526                "MAX(0, 120 - (walk.depth * 18)) + CASE \
2527                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 3 THEN 20 \
2528                 WHEN COALESCE((SELECT degree FROM degree_cache dc WHERE dc.id = walk.id), 0) <= 10 THEN 10 \
2529                 ELSE 0 END"
2530                    .to_string()
2531            }
2532        };
2533
2534        let use_degree_cache = matches!(options.scoring, tsift_core::NeighborhoodScoring::DegreeWeighted);
2535        let degree_cte = if use_degree_cache {
2536            "degree_cache AS ( \
2537             SELECT id, (SELECT COUNT(*) FROM graph_edges e WHERE e.from_id = n.id OR e.to_id = n.id) AS degree \
2538             FROM graph_nodes n), "
2539        } else {
2540            ""
2541        };
2542        let mut sql = format!(
2543            r#"
2544            WITH {degree_cte}RECURSIVE walk(id, depth, edge_kind, score) AS (
2545                SELECT ?, 0, '', ?
2546                UNION
2547                SELECT e.to_id, walk.depth + 1, e.kind,
2548            "#,
2549        );
2550        sql.push_str(&format!("    {}\n", score_expr));
2551        sql.push_str(
2552            r#"
2553                FROM walk
2554                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2555                    ON e.from_id = walk.id
2556                WHERE walk.depth < ?
2557            "#,
2558        );
2559        let mut values = vec![
2560            Value::Text(center_id.to_string()),
2561            Value::Integer(i64::MAX),
2562            Value::Integer(options.depth as i64),
2563        ];
2564        if let Some(kind) = &options.edge_kind {
2565            sql.push_str(" AND e.kind = ?");
2566            values.push(Value::Text(kind.clone()));
2567        }
2568        sql.push_str(
2569            r#"
2570            ),
2571            scored_nodes AS (
2572                SELECT walk.id, walk.score,
2573                    n.kind AS node_kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2574                FROM walk
2575                JOIN graph_nodes n ON n.id = walk.id
2576                GROUP BY walk.id
2577            ),
2578            ranked AS (
2579                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2580                FROM scored_nodes
2581                ORDER BY score DESC, id ASC
2582            ),
2583            kept AS (
2584                SELECT id, score, node_kind, label, properties_json, provenance_json, freshness_json
2585                FROM ranked
2586                LIMIT ?
2587            ),
2588            total AS (
2589                SELECT COUNT(*) AS cnt FROM scored_nodes
2590            )
2591            SELECT
2592                'meta' AS row_type,
2593                (SELECT cnt FROM total) AS total_discovered,
2594                0 AS node_id, '' AS node_kind, '' AS node_label,
2595                '' AS node_props, '' AS node_prov, '' AS node_fresh,
2596                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2597                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2598            UNION ALL
2599            SELECT
2600                'node' AS row_type,
2601                0 AS total_discovered,
2602                k.id, k.node_kind, k.label, k.properties_json, k.provenance_json, k.freshness_json,
2603                '' AS edge_key, '' AS edge_from, '' AS edge_to, '' AS edge_kind_col,
2604                '' AS edge_props, '' AS edge_prov, '' AS edge_fresh
2605            FROM kept k
2606            UNION ALL
2607            SELECT
2608                'edge' AS row_type,
2609                0 AS total_discovered,
2610                '' AS node_id, '' AS node_kind, '' AS node_label,
2611                '' AS node_props, '' AS node_prov, '' AS node_fresh,
2612                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2613            FROM graph_edges e
2614            WHERE EXISTS (SELECT 1 FROM kept k WHERE k.id = e.from_id)
2615              AND EXISTS (SELECT 1 FROM kept k2 WHERE k2.id = e.to_id)
2616            "#,
2617        );
2618        values.push(Value::Integer(options.max_nodes as i64));
2619
2620        let mut stmt = self.conn.prepare(&sql)?;
2621        let mut nodes = vec![center.clone()];
2622        let mut edges = Vec::new();
2623        let mut total_discovered = 0usize;
2624
2625        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2626            let row_type: String = row.get(0)?;
2627            match row_type.as_str() {
2628                "meta" => Ok(QueryResult::Meta {
2629                    total: row.get::<_, i64>(1)? as usize,
2630                }),
2631                "node" => Ok(QueryResult::Node(node_from_row_at(row, 2)?)),
2632                "edge" => Ok(QueryResult::Edge(edge_from_row_at(row, 8)?)),
2633                _ => Err(rusqlite::Error::InvalidQuery),
2634            }
2635        })?;
2636        for row_result in rows {
2637            match row_result? {
2638                QueryResult::Meta { total } => {
2639                    total_discovered = total;
2640                }
2641                QueryResult::Node(node) => {
2642                    if node.id != center_id {
2643                        nodes.push(node);
2644                    }
2645                }QueryResult::Edge(edge) => {
2646                    edges.push(edge);
2647                }
2648            }
2649        }
2650
2651        let total_discovered = total_discovered.max(nodes.len());
2652        let pruned_count = total_discovered.saturating_sub(nodes.len());
2653
2654        match options.property_mode {
2655            PropertyMode::Full => {}
2656            PropertyMode::Omit => {
2657                for n in &mut nodes {
2658                    n.properties.clear();
2659                }
2660                for e in &mut edges {
2661                    e.properties.clear();
2662                }
2663            }
2664            PropertyMode::Sample => {
2665                let mut seen_kinds = std::collections::BTreeSet::new();
2666                for n in &mut nodes {
2667                    if !seen_kinds.contains(&n.kind) {
2668                        seen_kinds.insert(n.kind.clone());
2669                    } else {
2670                        n.properties.clear();
2671                    }
2672                }
2673                for e in &mut edges {
2674                    e.properties.clear();
2675                }
2676            }
2677        }
2678
2679        Ok(Some(RankedNeighborhoodResult {
2680            nodes,
2681            edges,
2682            pruned_count,
2683            total_discovered,
2684        }))
2685    }
2686
2687    fn neighborhood(
2688        &self,
2689        center_id: &str,
2690        depth: usize,
2691        kind: Option<&str>,
2692    ) -> Result<Option<GraphSubgraph>> {
2693        self.paged_neighborhood(center_id, depth, kind, GraphQueryOptions::default())
2694            .map(|result| {
2695                result.map(|result| {
2696                    GraphSubgraph {
2697                        nodes: result.nodes,
2698                        edges: result.edges,
2699                    }
2700                    .sorted()
2701                })
2702            })
2703    }
2704
2705    fn paged_neighborhood(
2706        &self,
2707        center_id: &str,
2708        depth: usize,
2709        kind: Option<&str>,
2710        options: GraphQueryOptions,
2711    ) -> Result<Option<GraphPagedSubgraph>> {
2712        if self.node(center_id)?.is_none() {
2713            return Ok(None);
2714        }
2715        let mut sql = String::from(
2716            r#"
2717            WITH RECURSIVE walk(id, depth) AS (
2718                SELECT ?, 0
2719                UNION
2720                SELECT e.to_id, walk.depth + 1
2721                FROM walk
2722                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2723                    ON e.from_id = walk.id
2724                WHERE walk.depth < ?
2725            "#,
2726        );
2727        let mut values = vec![
2728            Value::Text(center_id.to_string()),
2729            Value::Integer(depth as i64),
2730        ];
2731        if let Some(kind) = kind {
2732            sql.push_str(" AND e.kind = ?");
2733            values.push(Value::Text(kind.to_string()));
2734        }
2735        sql.push_str(
2736            r#"
2737            ),
2738            filtered_nodes AS (
2739            SELECT DISTINCT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
2740            FROM walk
2741            JOIN graph_nodes n ON n.id = walk.id
2742            WHERE 1 = 1
2743            "#,
2744        );
2745        push_sqlite_property_filter_exists(&mut sql, &mut values, "n", &options.property_filters);
2746        if let Some(cursor) = &options.cursor {
2747            sql.push_str(" AND n.id > ?");
2748            values.push(Value::Text(cursor.clone()));
2749        }
2750        sql.push_str(
2751            r#"
2752            ),
2753            page_nodes AS (
2754                SELECT id, kind, label, properties_json, provenance_json, freshness_json
2755                FROM filtered_nodes
2756                ORDER BY id
2757            "#,
2758        );
2759        if let Some(limit) = options.limit {
2760            sql.push_str(" LIMIT ?");
2761            values.push(Value::Integer(limit.saturating_add(1) as i64));
2762        }
2763        sql.push_str(
2764            r#"
2765            ),
2766            walk_edges AS (
2767                SELECT e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2768                FROM walk
2769                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2770                    ON e.from_id = walk.id
2771                WHERE walk.depth < ?
2772            "#,
2773        );
2774        values.push(Value::Integer(depth as i64));
2775        if let Some(kind) = kind {
2776            sql.push_str(" AND e.kind = ?");
2777            values.push(Value::Text(kind.to_string()));
2778        }
2779        sql.push_str(
2780            r#"
2781            )
2782            SELECT
2783                'node' AS row_type,
2784                p.id, p.kind, p.label, p.properties_json, p.provenance_json, p.freshness_json,
2785                NULL AS edge_key, NULL AS from_id, NULL AS to_id, NULL AS edge_kind,
2786                NULL AS edge_properties_json, NULL AS edge_provenance_json, NULL AS edge_freshness_json
2787            FROM page_nodes p
2788            UNION ALL
2789            SELECT DISTINCT
2790                'edge' AS row_type,
2791                NULL AS id, NULL AS kind, NULL AS label, NULL AS properties_json,
2792                NULL AS provenance_json, NULL AS freshness_json,
2793                e.edge_key, e.from_id, e.to_id, e.kind, e.properties_json, e.provenance_json, e.freshness_json
2794            FROM walk_edges e
2795            WHERE e.from_id IN (SELECT id FROM page_nodes)
2796              AND e.to_id IN (SELECT id FROM page_nodes)
2797            "#,
2798        );
2799
2800        let plan = sqlite_query_plan(&self.conn, &sql, &values)?;
2801        let mut stmt = self.conn.prepare(&sql)?;
2802        let mut nodes = Vec::new();
2803        let mut edges = Vec::new();
2804        let rows = stmt.query_map(params_from_iter(values.iter()), |row| {
2805            let row_type: String = row.get(0)?;
2806            match row_type.as_str() {
2807                "node" => Ok((Some(node_from_row_at(row, 1)?), None)),
2808                "edge" => Ok((None, Some(edge_from_row_at(row, 7)?))),
2809                _ => Err(rusqlite::Error::InvalidQuery),
2810            }
2811        })?;
2812        for row in rows {
2813            let (node, edge) = row?;
2814            if let Some(node) = node {
2815                nodes.push(node);
2816            }
2817            if let Some(edge) = edge {
2818                edges.push(edge);
2819            }
2820        }
2821        nodes.sort_by(|left, right| left.id.cmp(&right.id));
2822        let before_limit = nodes.len();
2823        let mut next_cursor = None;
2824        if let Some(limit) = options.limit
2825            && nodes.len() > limit
2826        {
2827            next_cursor = nodes
2828                .get(limit.saturating_sub(1))
2829                .map(|node| node.id.clone());
2830            nodes.truncate(limit);
2831        }
2832        let node_ids = nodes
2833            .iter()
2834            .map(|node| node.id.as_str())
2835            .collect::<BTreeSet<_>>();
2836        edges.retain(|edge| {
2837            node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
2838        });
2839        edges.sort_by(|left, right| {
2840            left.from_id
2841                .cmp(&right.from_id)
2842                .then(left.kind.cmp(&right.kind))
2843                .then(left.to_id.cmp(&right.to_id))
2844        });
2845        let expected_indexes = if options.property_filters.is_empty() {
2846            vec!["idx_graph_edges_from_kind"]
2847        } else {
2848            vec![
2849                "idx_graph_edges_from_kind",
2850                "idx_graph_node_properties_key_value_node",
2851            ]
2852        };
2853        let mut diagnostics = sqlite_query_plan_diagnostics(&plan, &expected_indexes);
2854        diagnostics.push(
2855            "neighborhood nodes and page edges share one recursive reachable-set CTE".to_string(),
2856        );
2857        if !options.property_filters.is_empty() {
2858            diagnostics.push(
2859                "property filters were evaluated by SQLite materialized property rows before paging"
2860                    .to_string(),
2861            );
2862        }
2863        if options.cursor.is_some() {
2864            diagnostics.push("cursor is exclusive and pushed into SQLite by node id".to_string());
2865        }
2866        if next_cursor.is_some() {
2867            diagnostics.push(
2868                "result was truncated; pass page.next_cursor as --cursor for the next page"
2869                    .to_string(),
2870            );
2871        }
2872        Ok(Some(GraphPagedSubgraph {
2873            page: GraphQueryPage {
2874                cursor: options.cursor,
2875                limit: options.limit,
2876                next_cursor,
2877                returned_nodes: nodes.len(),
2878                returned_edges: edges.len(),
2879                truncated: options.limit.is_some_and(|limit| before_limit > limit),
2880                diagnostics,
2881            },
2882            nodes,
2883            edges,
2884        }))
2885    }
2886
2887    fn shortest_path(
2888        &self,
2889        from_id: &str,
2890        to_id: &str,
2891        kind: Option<&str>,
2892    ) -> Result<Option<GraphPath>> {
2893        self.shortest_path_with_max_hops(from_id, to_id, kind, None)
2894    }
2895
2896    fn shortest_path_with_max_hops(
2897        &self,
2898        from_id: &str,
2899        to_id: &str,
2900        kind: Option<&str>,
2901        max_hops: Option<usize>,
2902    ) -> Result<Option<GraphPath>> {
2903        if from_id == to_id {
2904            return Ok(Some(GraphPath {
2905                nodes: vec![from_id.to_string()],
2906                hops: 0,
2907            }));
2908        }
2909        let hop_limit = max_hops.unwrap_or(usize::MAX);
2910        if hop_limit == 0 {
2911            return Ok(None);
2912        }
2913
2914        self.assert_not_in_temp_table_section();
2915        self.temp_table_active.set(true);
2916        let result = (|| -> Result<Option<GraphPath>> {
2917            let call_id = BFS_CALL_ID.fetch_add(1, Ordering::Relaxed);
2918        let tbl = format!("_tsift_frontier_{call_id}");
2919
2920        let mut visited = BTreeSet::from([from_id.to_string()]);
2921        let mut parent = BTreeMap::<String, String>::from([(from_id.to_string(), String::new())]);
2922        let mut frontier = vec![from_id.to_string()];
2923        self.conn.execute_batch(&format!(
2924            r#"CREATE TEMP TABLE IF NOT EXISTS "{tbl}" (id TEXT PRIMARY KEY);
2925               DELETE FROM "{tbl}";"#,
2926        ))?;
2927        let select_sql = if kind.is_some() {
2928            format!(
2929                r#"SELECT e.from_id, e.to_id
2930                   FROM "{tbl}" f
2931                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2932                       ON e.from_id = f.id
2933                   WHERE e.kind = ?
2934                   ORDER BY e.from_id, e.to_id, e.kind"#,
2935            )
2936        } else {
2937            format!(
2938                r#"SELECT e.from_id, e.to_id
2939                   FROM "{tbl}" f
2940                   JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
2941                       ON e.from_id = f.id
2942                   ORDER BY e.from_id, e.to_id, e.kind"#,
2943            )
2944        };
2945        let insert_sql = format!(r#"INSERT OR IGNORE INTO "{tbl}" (id) VALUES (?)"#);
2946        let delete_sql = format!(r#"DELETE FROM "{tbl}""#);
2947        let drop_sql = format!(r#"DROP TABLE IF EXISTS "{tbl}""#);
2948        let mut frontier_select_stmt = self.conn.prepare(&select_sql)?;
2949        let mut frontier_insert_stmt = self.conn.prepare(&insert_sql)?;
2950        let mut found_path: Option<GraphPath> = None;
2951        for _depth in 0..hop_limit {
2952            if frontier.is_empty() {
2953                break;
2954            }
2955            self.conn.execute(&delete_sql, [])?;
2956            for id in &frontier {
2957                frontier_insert_stmt.execute([id.as_str()])?;
2958            }
2959            let mut next_frontier = BTreeSet::new();
2960            let rows = if let Some(kind) = kind {
2961                collect_rows(frontier_select_stmt.query_map([kind], |row| {
2962                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2963                })?)?
2964            } else {
2965                collect_rows(frontier_select_stmt.query_map([], |row| {
2966                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
2967                })?)?
2968            };
2969            for (from, next) in rows {
2970                if !visited.insert(next.clone()) {
2971                    continue;
2972                }
2973                parent.insert(next.clone(), from);
2974                if next == to_id {
2975                    let mut nodes = vec![to_id.to_string()];
2976                    let mut cursor = to_id;
2977                    while let Some(previous) = parent.get(cursor) {
2978                        if previous.is_empty() {
2979                            break;
2980                        }
2981                        nodes.push(previous.clone());
2982                        cursor = previous;
2983                    }
2984                    nodes.reverse();
2985                    found_path = Some(GraphPath {
2986                        hops: nodes.len().saturating_sub(1),
2987                        nodes,
2988                    });
2989                    break;
2990                }
2991                next_frontier.insert(next);
2992            }
2993            if found_path.is_some() {
2994                break;
2995            }
2996            frontier = next_frontier.into_iter().collect();
2997        }
2998        let _ = self.conn.execute_batch(&drop_sql);
2999        Ok(found_path)
3000        })();
3001        self.temp_table_active.set(false);
3002        result
3003    }
3004
3005    fn reachable_nodes_by_kind(
3006        &self,
3007        from_id: &str,
3008        kind: &str,
3009        depth: usize,
3010        limit: usize,
3011    ) -> Result<Vec<(GraphNode, GraphPath)>> {
3012        Ok(self
3013            .reachable_nodes_by_kinds(from_id, &[kind], depth, limit)?
3014            .remove(kind)
3015            .unwrap_or_default())
3016    }
3017
3018    fn reachable_nodes_by_kinds(
3019        &self,
3020        from_id: &str,
3021        kinds: &[&str],
3022        depth: usize,
3023        limit: usize,
3024    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
3025        let mut requested = kinds
3026            .iter()
3027            .map(|kind| (*kind).to_string())
3028            .collect::<BTreeSet<_>>()
3029            .into_iter()
3030            .collect::<Vec<_>>();
3031        let mut results = requested
3032            .iter()
3033            .map(|kind| (kind.clone(), Vec::new()))
3034            .collect::<BTreeMap<_, _>>();
3035        if requested.is_empty() {
3036            return Ok(results);
3037        }
3038        requested.sort();
3039        let placeholders = std::iter::repeat_n("?", requested.len())
3040            .collect::<Vec<_>>()
3041            .join(", ");
3042        let mut sql = format!(
3043            r#"
3044            WITH RECURSIVE walk(id, depth, path) AS (
3045                SELECT ?, 0, char(31) || ? || char(31)
3046                UNION ALL
3047                SELECT e.to_id, walk.depth + 1, walk.path || e.to_id || char(31)
3048                FROM walk
3049                JOIN graph_edges e INDEXED BY idx_graph_edges_from_kind
3050                    ON e.from_id = walk.id
3051                WHERE walk.depth < ?
3052                  AND instr(walk.path, char(31) || e.to_id || char(31)) = 0
3053            ),
3054            ranked AS (
3055                SELECT
3056                    n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json,
3057                    walk.path, walk.depth,
3058                    ROW_NUMBER() OVER (PARTITION BY n.kind, n.id ORDER BY walk.depth, n.label, n.id) AS rn
3059                FROM walk
3060                JOIN graph_nodes n ON n.id = walk.id
3061                WHERE n.kind IN ({placeholders}) AND n.id <> ?
3062            ),
3063            kind_ranked AS (
3064                SELECT *,
3065                    ROW_NUMBER() OVER (PARTITION BY kind ORDER BY depth, label, id) AS kind_rank
3066                FROM ranked
3067                WHERE rn = 1
3068            )
3069            SELECT id, kind, label, properties_json, provenance_json, freshness_json, path, depth
3070            FROM kind_ranked
3071            "#,
3072        );
3073        let mut values = vec![
3074            Value::Text(from_id.to_string()),
3075            Value::Text(from_id.to_string()),
3076            Value::Integer(depth as i64),
3077        ];
3078        values.extend(requested.iter().cloned().map(Value::Text));
3079        values.push(Value::Text(from_id.to_string()));
3080        if limit > 0 && limit != usize::MAX {
3081            sql.push_str(" WHERE kind_rank <= ?");
3082            values.push(Value::Integer(limit as i64));
3083        }
3084        sql.push_str(" ORDER BY kind, depth, label, id");
3085        let mut stmt = self.conn.prepare(&sql)?;
3086        let rows = collect_rows(stmt.query_map(params_from_iter(values.iter()), |row| {
3087            let node = node_from_row(row)?;
3088            let path: String = row.get(6)?;
3089            let hops: usize = row.get(7)?;
3090            Ok((
3091                node,
3092                GraphPath {
3093                    nodes: path
3094                        .split('\u{1f}')
3095                        .filter(|part| !part.is_empty())
3096                        .map(str::to_string)
3097                        .collect(),
3098                    hops,
3099                },
3100            ))
3101        })?)?;
3102        for (node, path) in rows {
3103            results
3104                .entry(node.kind.clone())
3105                .or_default()
3106                .push((node, path));
3107        }
3108        Ok(results)
3109    }
3110
3111    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
3112        if let Some(node) = self.node(target)? {
3113            return Ok(Some(node));
3114        }
3115        if kinds.is_empty() {
3116            return Ok(None);
3117        }
3118
3119        let normalized = target.trim().trim_start_matches('#');
3120        let kind_placeholders = std::iter::repeat_n("?", kinds.len())
3121            .collect::<Vec<_>>()
3122            .join(", ");
3123        let kind_rank = kinds
3124            .iter()
3125            .enumerate()
3126            .map(|(rank, _)| format!("WHEN ? THEN {rank}"))
3127            .collect::<Vec<_>>()
3128            .join(" ");
3129        let sql = format!(
3130            r#"
3131            SELECT n.id, n.kind, n.label, n.properties_json, n.provenance_json, n.freshness_json
3132            FROM graph_nodes n
3133            WHERE n.kind IN ({kind_placeholders})
3134              AND (
3135                EXISTS (
3136                    SELECT 1
3137                    FROM graph_node_properties p_handle INDEXED BY idx_graph_node_properties_key_value_node
3138                    WHERE p_handle.node_id = n.id
3139                      AND p_handle.key = 'handle'
3140                      AND p_handle.value = ?
3141                )
3142                OR EXISTS (
3143                    SELECT 1
3144                    FROM graph_node_properties p_ref INDEXED BY idx_graph_node_properties_key_value_node
3145                    WHERE p_ref.node_id = n.id
3146                      AND p_ref.key = 'ref_id'
3147                      AND p_ref.value = ?
3148                )
3149                OR n.label = ?
3150                OR n.label = ?
3151              )
3152            ORDER BY CASE n.kind {kind_rank} ELSE 999 END, n.id
3153            LIMIT 1
3154            "#
3155        );
3156        let mut values = kinds
3157            .iter()
3158            .map(|kind| Value::Text((*kind).to_string()))
3159            .collect::<Vec<_>>();
3160        values.push(Value::Text(target.to_string()));
3161        values.push(Value::Text(normalized.to_string()));
3162        values.push(Value::Text(target.to_string()));
3163        values.push(Value::Text(format!("#{normalized}")));
3164        values.extend(kinds.iter().map(|kind| Value::Text((*kind).to_string())));
3165        self.conn
3166            .query_row(&sql, params_from_iter(values.iter()), node_from_row)
3167            .optional()
3168            .map_err(Into::into)
3169    }
3170}
3171
3172fn to_json<T: Serialize>(value: &T) -> Result<String> {
3173    serde_json::to_string(value).map_err(Into::into)
3174}
3175
3176fn row_hash<T: Serialize>(value: &T) -> Result<String> {
3177    let payload = serde_json::to_vec(value)?;
3178    Ok(blake3::hash(&payload).to_hex().to_string())
3179}
3180
3181fn optional_to_json<T: Serialize>(value: &Option<T>) -> Result<Option<String>> {
3182    value.as_ref().map(to_json).transpose()
3183}
3184
3185fn collect_rows<T>(
3186    rows: impl Iterator<Item = std::result::Result<T, rusqlite::Error>>,
3187) -> Result<Vec<T>> {
3188    rows.collect::<std::result::Result<Vec<_>, _>>()
3189        .map_err(Into::into)
3190}
3191
3192enum QueryResult {
3193    Meta { total: usize },
3194    Node(GraphNode),
3195    Edge(GraphEdge),
3196}
3197
3198fn node_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphNode> {
3199    let properties_col = offset + 3;
3200    let provenance_col = offset + 4;
3201    let freshness_col = offset + 5;
3202    let properties_json: String = row.get(properties_col)?;
3203    let provenance_json: String = row.get(provenance_col)?;
3204    let freshness_json: Option<String> = row.get(freshness_col)?;
3205    Ok(GraphNode {
3206        id: row.get(offset)?,
3207        kind: row.get(offset + 1)?,
3208        label: row.get(offset + 2)?,
3209        properties: from_json(properties_col, &properties_json)?,
3210        provenance: from_json(provenance_col, &provenance_json)?,
3211        freshness: optional_from_json(freshness_col, freshness_json)?,
3212    })
3213}
3214
3215fn node_from_row(row: &Row<'_>) -> rusqlite::Result<GraphNode> {
3216    node_from_row_at(row, 0)
3217}
3218
3219fn edge_from_row_at(row: &Row<'_>, offset: usize) -> rusqlite::Result<GraphEdge> {
3220    let properties_col = offset + 4;
3221    let provenance_col = offset + 5;
3222    let freshness_col = offset + 6;
3223    let properties_json: String = row.get(properties_col)?;
3224    let provenance_json: String = row.get(provenance_col)?;
3225    let freshness_json: Option<String> = row.get(freshness_col)?;
3226    Ok(GraphEdge {
3227        id: row.get(offset)?,
3228        from_id: row.get(offset + 1)?,
3229        to_id: row.get(offset + 2)?,
3230        kind: row.get(offset + 3)?,
3231        properties: from_json(properties_col, &properties_json)?,
3232        provenance: from_json(provenance_col, &provenance_json)?,
3233        freshness: optional_from_json(freshness_col, freshness_json)?,
3234    })
3235}
3236
3237fn edge_from_row(row: &Row<'_>) -> rusqlite::Result<GraphEdge> {
3238    edge_from_row_at(row, 0)
3239}
3240
3241fn from_json<T: DeserializeOwned>(column: usize, raw: &str) -> rusqlite::Result<T> {
3242    serde_json::from_str(raw)
3243        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(err)))
3244}
3245
3246fn optional_from_json<T: DeserializeOwned>(
3247    column: usize,
3248    raw: Option<String>,
3249) -> rusqlite::Result<Option<T>> {
3250    raw.map(|value| from_json(column, &value)).transpose()
3251}
3252
3253fn projection_version_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3254    nodes
3255        .iter()
3256        .find(|node| node.kind == "projection_meta")
3257        .and_then(|node| node.properties.get("projection_version").cloned())
3258}
3259
3260fn projection_hash_from_nodes(nodes: &[GraphNode]) -> Option<String> {
3261    nodes
3262        .iter()
3263        .find(|node| node.kind == "projection_meta")
3264        .and_then(|node| node.properties.get("content_hash").cloned())
3265}
3266
3267fn unix_now() -> i64 {
3268    std::time::SystemTime::now()
3269        .duration_since(std::time::UNIX_EPOCH)
3270        .map(|duration| duration.as_secs() as i64)
3271        .unwrap_or_default()
3272}
3273
3274fn sqlite_database_size_bytes(conn: &Connection) -> Result<u64> {
3275    let page_count: u64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
3276    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3277    Ok(page_count.saturating_mul(page_size))
3278}
3279
3280fn sqlite_database_freelist_bytes(conn: &Connection) -> Result<u64> {
3281    let freelist_count: u64 = conn.query_row("PRAGMA freelist_count", [], |row| row.get(0))?;
3282    let page_size: u64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
3283    Ok(freelist_count.saturating_mul(page_size))
3284}
3285
3286#[cfg(test)]
3287mod tests {
3288    use super::*;
3289
3290    fn sample_provenance() -> GraphProvenance {
3291        GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
3292    }
3293
3294    fn sample_projection() -> GraphProjection {
3295        let source = sample_provenance();
3296        GraphProjection {
3297            nodes: vec![
3298                GraphNode::new("doc:livekit", "document", "LiveKit guide")
3299                    .with_property("domain", "livekit")
3300                    .with_provenance(source.clone())
3301                    .with_freshness(GraphFreshness::content_hash("node-hash")),
3302                GraphNode::new("topic:rooms", "topic", "Rooms"),
3303                GraphNode::new("topic:egress", "topic", "Egress"),
3304            ],
3305            edges: vec![
3306                GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3307                    .with_property("confidence", "0.91")
3308                    .with_provenance(source.clone())
3309                    .with_freshness(GraphFreshness::content_hash("edge-hash")),
3310                GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
3311            ],
3312        }
3313    }
3314
3315    fn assert_projection_store_contract(store: &impl GraphStore) {
3316        let projection = sample_projection();
3317        projection.upsert_into(store).unwrap();
3318
3319        assert_eq!(
3320            store.node("doc:livekit").unwrap(),
3321            projection
3322                .nodes
3323                .iter()
3324                .find(|node| node.id == "doc:livekit")
3325                .cloned()
3326        );
3327        assert_eq!(
3328            store.nodes_by_kind("topic").unwrap(),
3329            vec![
3330                GraphNode::new("topic:egress", "topic", "Egress"),
3331                GraphNode::new("topic:rooms", "topic", "Rooms"),
3332            ]
3333        );
3334
3335        let mentions = store
3336            .outgoing_edges("doc:livekit", Some("mentions"))
3337            .unwrap();
3338        assert_eq!(mentions.len(), 1);
3339        assert_eq!(mentions[0].to_id, "topic:rooms");
3340        assert_eq!(
3341            mentions[0].properties.get("confidence"),
3342            Some(&"0.91".into())
3343        );
3344
3345        let path = store
3346            .shortest_path("doc:livekit", "topic:egress", None)
3347            .unwrap()
3348            .unwrap();
3349        assert_eq!(
3350            path.nodes,
3351            vec!["doc:livekit", "topic:rooms", "topic:egress"]
3352        );
3353    }
3354
3355    #[test]
3356    fn sqlite_store_round_trips_generic_nodes_edges() {
3357        let store = SqliteGraphStore::in_memory().unwrap();
3358        let source = sample_provenance();
3359        let node = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3360            .with_property("domain", "livekit")
3361            .with_provenance(source.clone())
3362            .with_freshness(GraphFreshness::content_hash("node-hash"));
3363        let topic = GraphNode::new("topic:rooms", "topic", "Rooms");
3364        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3365            .with_property("confidence", "0.91")
3366            .with_provenance(source)
3367            .with_freshness(GraphFreshness::content_hash("edge-hash"));
3368
3369        store.upsert_node(&node).unwrap();
3370        store.upsert_node(&topic).unwrap();
3371        store.upsert_edge(&edge).unwrap();
3372
3373        assert_eq!(store.node("doc:livekit").unwrap(), Some(node));
3374        assert_eq!(store.nodes_by_kind("topic").unwrap(), vec![topic]);
3375        assert_eq!(store.all_nodes().unwrap().len(), 2);
3376        assert_eq!(store.all_edges().unwrap().len(), 1);
3377        assert_eq!(
3378            store
3379                .outgoing_edges("doc:livekit", Some("mentions"))
3380                .unwrap(),
3381            vec![edge]
3382        );
3383    }
3384
3385    #[test]
3386    fn sqlite_materializes_edge_properties_and_scans_first_class_edges() {
3387        let store = SqliteGraphStore::in_memory().unwrap();
3388        for node in [
3389            GraphNode::new("doc:livekit", "document", "LiveKit guide"),
3390            GraphNode::new("topic:rooms", "topic", "Rooms"),
3391            GraphNode::new("topic:egress", "topic", "Egress"),
3392        ] {
3393            store.upsert_node(&node).unwrap();
3394        }
3395        let edge = GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
3396            .with_property("confidence", "0.91");
3397        let edge_id = edge.id.clone();
3398        store.upsert_edge(&edge).unwrap();
3399        store
3400            .upsert_edge(
3401                &GraphEdge::new("topic:egress", "topic:rooms", "related_to")
3402                    .with_property("confidence", "0.42"),
3403            )
3404            .unwrap();
3405
3406        assert_eq!(store.edge(&edge_id).unwrap(), Some(edge));
3407        let mut expected_incident_ids = vec![
3408            GraphEdge::stable_id("doc:livekit", "topic:rooms", "mentions"),
3409            GraphEdge::stable_id("topic:egress", "topic:rooms", "related_to"),
3410        ];
3411        expected_incident_ids.sort();
3412        assert_eq!(
3413            store
3414                .incident_edges("topic:rooms", None)
3415                .unwrap()
3416                .into_iter()
3417                .map(|edge| edge.id)
3418                .collect::<Vec<_>>(),
3419            expected_incident_ids
3420        );
3421
3422        let page = store
3423            .paged_edges(
3424                Some("mentions"),
3425                GraphQueryOptions {
3426                    property_filters: vec![GraphPropertyFilter {
3427                        key: "confidence".to_string(),
3428                        value: "0.91".to_string(),
3429                    }],
3430                    ..GraphQueryOptions::default()
3431                },
3432            )
3433            .unwrap();
3434        assert_eq!(page.edges.len(), 1);
3435        assert_eq!(page.edges[0].id, edge_id);
3436        assert!(
3437            page.page
3438                .diagnostics
3439                .iter()
3440                .any(|diagnostic| diagnostic.contains("idx_graph_edge_properties_key_value_edge")),
3441            "{:?}",
3442            page.page.diagnostics
3443        );
3444        assert!(
3445            page.page
3446                .diagnostics
3447                .iter()
3448                .any(|diagnostic| diagnostic.contains("idx_graph_edges_edge_key")),
3449            "{:?}",
3450            page.page.diagnostics
3451        );
3452        assert!(
3453            page.page.diagnostics.iter().any(|diagnostic| diagnostic
3454                .contains("edge property primary filter matched 1 materialized row")),
3455            "{:?}",
3456            page.page.diagnostics
3457        );
3458        assert!(
3459            page.page
3460                .diagnostics
3461                .iter()
3462                .any(|diagnostic| diagnostic
3463                    .contains("drives from SQLite materialized property rows")),
3464            "{:?}",
3465            page.page.diagnostics
3466        );
3467
3468        let property_rows: usize = store
3469            .conn
3470            .query_row(
3471                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3472                [],
3473                |row| row.get(0),
3474            )
3475            .unwrap();
3476        assert_eq!(property_rows, 2);
3477    }
3478
3479    #[test]
3480    fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
3481        let sqlite = SqliteGraphStore::in_memory().unwrap();
3482        assert_projection_store_contract(&sqlite);
3483    }
3484
3485    #[test]
3486    fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
3487        fn assert_crud_contract(store: &impl GraphStore) {
3488            let projection = sample_projection();
3489            projection.upsert_into(store).unwrap();
3490
3491            let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
3492            assert_eq!(
3493                neighborhood
3494                    .nodes
3495                    .iter()
3496                    .map(|node| node.id.as_str())
3497                    .collect::<Vec<_>>(),
3498                vec!["doc:livekit", "topic:egress", "topic:rooms"]
3499            );
3500            assert_eq!(
3501                neighborhood
3502                    .edges
3503                    .iter()
3504                    .map(|edge| (
3505                        edge.from_id.as_str(),
3506                        edge.kind.as_str(),
3507                        edge.to_id.as_str()
3508                    ))
3509                    .collect::<Vec<_>>(),
3510                vec![
3511                    ("doc:livekit", "mentions", "topic:rooms"),
3512                    ("topic:rooms", "related_to", "topic:egress"),
3513                ]
3514            );
3515
3516            assert_eq!(
3517                store
3518                    .delete_edge("topic:rooms", "topic:egress", "related_to")
3519                    .unwrap(),
3520                1
3521            );
3522            assert!(
3523                store
3524                    .shortest_path("doc:livekit", "topic:egress", None)
3525                    .unwrap()
3526                    .is_none()
3527            );
3528            assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
3529            assert!(store.node("topic:rooms").unwrap().is_none());
3530            assert!(
3531                store
3532                    .outgoing_edges("doc:livekit", None)
3533                    .unwrap()
3534                    .is_empty()
3535            );
3536        }
3537
3538        assert_crud_contract(&SqliteGraphStore::in_memory().unwrap());
3539    }
3540
3541    #[test]
3542    fn sqlite_upsert_projection_batches_rows_and_properties() {
3543        let mut store = SqliteGraphStore::in_memory().unwrap();
3544        let mut projection = sample_projection();
3545        store.upsert_projection(&projection).unwrap();
3546
3547        let page = store
3548            .paged_nodes_by_kind(
3549                "document",
3550                GraphQueryOptions {
3551                    property_filters: vec![GraphPropertyFilter {
3552                        key: "domain".to_string(),
3553                        value: "livekit".to_string(),
3554                    }],
3555                    ..GraphQueryOptions::default()
3556                },
3557            )
3558            .unwrap();
3559        assert_eq!(page.nodes[0].id, "doc:livekit");
3560
3561        projection.nodes[0] = GraphNode::new("doc:livekit", "document", "LiveKit guide")
3562            .with_property("domain", "recording");
3563        store.upsert_projection(&projection).unwrap();
3564
3565        let old_property_count: usize = store
3566            .conn
3567            .query_row(
3568                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain' AND value = 'livekit'",
3569                [],
3570                |row| row.get(0),
3571            )
3572            .unwrap();
3573        let updated_page = store
3574            .paged_nodes_by_kind(
3575                "document",
3576                GraphQueryOptions {
3577                    property_filters: vec![GraphPropertyFilter {
3578                        key: "domain".to_string(),
3579                        value: "recording".to_string(),
3580                    }],
3581                    ..GraphQueryOptions::default()
3582                },
3583            )
3584            .unwrap();
3585        assert_eq!(old_property_count, 0);
3586        assert_eq!(updated_page.nodes[0].id, "doc:livekit");
3587        let edge_property_count: usize = store
3588            .conn
3589            .query_row(
3590                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3591                [],
3592                |row| row.get(0),
3593            )
3594            .unwrap();
3595        assert_eq!(edge_property_count, 1);
3596    }
3597
3598    #[test]
3599    fn sqlite_store_filters_edges_by_kind_and_paths() {
3600        let store = SqliteGraphStore::in_memory().unwrap();
3601        for id in ["a", "b", "c"] {
3602            store
3603                .upsert_node(&GraphNode::new(id, "symbol", id))
3604                .unwrap();
3605        }
3606        store
3607            .upsert_edge(&GraphEdge::new("a", "b", "calls"))
3608            .unwrap();
3609        store
3610            .upsert_edge(&GraphEdge::new("a", "c", "documents"))
3611            .unwrap();
3612        store
3613            .upsert_edge(&GraphEdge::new("b", "c", "calls"))
3614            .unwrap();
3615
3616        let calls = store.outgoing_edges("a", Some("calls")).unwrap();
3617        assert_eq!(calls.len(), 1);
3618        assert_eq!(calls[0].to_id, "b");
3619        assert_eq!(store.graph_counts().unwrap(), (3, 3));
3620        assert_eq!(
3621            store.sample_edge(Some("calls")).unwrap().unwrap().to_id,
3622            "b"
3623        );
3624
3625        let path = store
3626            .shortest_path("a", "c", Some("calls"))
3627            .unwrap()
3628            .unwrap();
3629        assert_eq!(path.nodes, vec!["a", "b", "c"]);
3630        assert_eq!(path.hops, 2);
3631
3632        assert!(
3633            store
3634                .shortest_path("c", "a", Some("calls"))
3635                .unwrap()
3636                .is_none()
3637        );
3638    }
3639
3640    #[test]
3641    fn sqlite_store_batches_edges_between_node_sets() {
3642        let store = SqliteGraphStore::in_memory().unwrap();
3643        for id in ["a", "b", "c", "outside"] {
3644            store
3645                .upsert_node(&GraphNode::new(id, "symbol", id))
3646                .unwrap();
3647        }
3648        for edge in [
3649            GraphEdge::new("a", "b", "calls"),
3650            GraphEdge::new("b", "c", "calls"),
3651            GraphEdge::new("a", "outside", "calls"),
3652            GraphEdge::new("outside", "c", "calls"),
3653        ] {
3654            store.upsert_edge(&edge).unwrap();
3655        }
3656
3657        let scoped = ["a".to_string(), "b".to_string(), "c".to_string()]
3658            .into_iter()
3659            .collect::<BTreeSet<_>>();
3660        let edge_keys = store
3661            .edges_between_nodes(&scoped)
3662            .unwrap()
3663            .into_iter()
3664            .map(|edge| (edge.from_id, edge.kind, edge.to_id))
3665            .collect::<Vec<_>>();
3666
3667        assert_eq!(
3668            edge_keys,
3669            vec![
3670                ("a".to_string(), "calls".to_string(), "b".to_string()),
3671                ("b".to_string(), "calls".to_string(), "c".to_string()),
3672            ]
3673        );
3674    }
3675
3676    #[test]
3677    fn sqlite_projection_refresh_tracks_versions_watermarks_and_tombstones() {
3678        let mut store = SqliteGraphStore::in_memory().unwrap();
3679        let mut projection = sample_projection();
3680        projection.nodes.push(
3681            GraphNode::new(
3682                "projection:fixture",
3683                "projection_meta",
3684                "fixture projection",
3685            )
3686            .with_property("projection_version", "fixture-v1")
3687            .with_property("content_hash", "hash-a"),
3688        );
3689        store
3690            .replace_projection_with_version(
3691                "root",
3692                &projection,
3693                Some("fixture-v1"),
3694                Some("commit-a".to_string()),
3695            )
3696            .unwrap();
3697
3698        projection.nodes.retain(|node| node.id != "topic:egress");
3699        projection.edges.retain(|edge| edge.to_id != "topic:egress");
3700        let refresh = store
3701            .replace_projection_with_version(
3702                "root",
3703                &projection,
3704                Some("fixture-v2"),
3705                Some("commit-b".to_string()),
3706            )
3707            .unwrap();
3708
3709        assert_eq!(refresh.projection_version, "fixture-v2");
3710        assert_eq!(refresh.source_watermark.as_deref(), Some("commit-b"));
3711        assert_eq!(refresh.tombstoned_nodes, vec!["topic:egress".to_string()]);
3712        assert_eq!(refresh.tombstoned_edges.len(), 1);
3713        assert_eq!(refresh.deleted_nodes, 1);
3714        assert_eq!(refresh.deleted_edges, 1);
3715        assert_eq!(refresh.unchanged_nodes, 3);
3716        assert_eq!(refresh.upserted_nodes, 0);
3717        assert_eq!(refresh.unchanged_properties, 4);
3718        assert_eq!(refresh.upserted_properties, 0);
3719        assert_eq!(refresh.deleted_properties, 0);
3720        assert!(
3721            refresh
3722                .phase_timings
3723                .iter()
3724                .any(|phase| phase.name == "sqlite_property_row_staging"),
3725            "{:?}",
3726            refresh.phase_timings
3727        );
3728        assert!(
3729            refresh
3730                .phase_timings
3731                .iter()
3732                .any(|phase| phase.name == "sqlite_edge_property_row_staging"),
3733            "{:?}",
3734            refresh.phase_timings
3735        );
3736        let version = store.projection_version("root").unwrap().unwrap();
3737        assert_eq!(version.projection_version, "fixture-v2");
3738        assert_eq!(version.source_watermark.as_deref(), Some("commit-b"));
3739        let cached_counts: (usize, usize, usize, usize) = store
3740            .conn
3741            .query_row(
3742                r#"
3743                SELECT nodes, edges, tombstone_nodes, tombstone_edges
3744                FROM graph_operator_stats
3745                WHERE scope = 'root'
3746                "#,
3747                [],
3748                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3749            )
3750            .unwrap();
3751        assert_eq!(cached_counts, (3, 1, 1, 1));
3752
3753        projection
3754            .nodes
3755            .push(GraphNode::new("topic:egress", "topic", "Egress"));
3756        let refresh = store
3757            .replace_projection_with_version(
3758                "root",
3759                &projection,
3760                Some("fixture-v3"),
3761                Some("commit-c".to_string()),
3762            )
3763            .unwrap();
3764        assert_eq!(refresh.pruned_tombstones, 1);
3765        assert_eq!(refresh.tombstoned_nodes, Vec::<String>::new());
3766
3767        projection.nodes.retain(|node| node.id != "topic:egress");
3768        store
3769            .replace_projection_with_version(
3770                "root",
3771                &projection,
3772                Some("fixture-v4"),
3773                Some("commit-d".to_string()),
3774            )
3775            .unwrap();
3776        assert_eq!(store.compact_storage("root", true).unwrap(), 2);
3777        let cached_counts: (usize, usize, usize, usize) = store
3778            .conn
3779            .query_row(
3780                r#"
3781                SELECT nodes, edges, tombstone_nodes, tombstone_edges
3782                FROM graph_operator_stats
3783                WHERE scope = 'root'
3784                "#,
3785                [],
3786                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3787            )
3788            .unwrap();
3789        assert_eq!(cached_counts, (3, 1, 0, 0));
3790    }
3791
3792    #[test]
3793    fn sqlite_shortest_path_uses_bounded_frontier() {
3794        let store = SqliteGraphStore::in_memory().unwrap();
3795        for idx in 0..80 {
3796            store
3797                .upsert_node(&GraphNode::new(
3798                    format!("node:{idx:02}"),
3799                    "symbol",
3800                    format!("node {idx:02}"),
3801                ))
3802                .unwrap();
3803        }
3804        for idx in 0..79 {
3805            store
3806                .upsert_edge(&GraphEdge::new(
3807                    format!("node:{idx:02}"),
3808                    format!("node:{:02}", idx + 1),
3809                    "calls",
3810                ))
3811                .unwrap();
3812        }
3813        store
3814            .upsert_edge(&GraphEdge::new("node:00", "node:79", "mentions"))
3815            .unwrap();
3816
3817        assert!(
3818            store
3819                .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(64))
3820                .unwrap()
3821                .is_none()
3822        );
3823        let path = store
3824            .shortest_path_with_max_hops("node:00", "node:79", Some("calls"), Some(79))
3825            .unwrap()
3826            .unwrap();
3827        assert_eq!(path.hops, 79);
3828        assert_eq!(path.nodes.first().map(String::as_str), Some("node:00"));
3829        assert_eq!(path.nodes.last().map(String::as_str), Some("node:79"));
3830
3831        let direct = store
3832            .shortest_path_with_max_hops("node:00", "node:79", Some("mentions"), Some(1))
3833            .unwrap()
3834            .unwrap();
3835        assert_eq!(direct.nodes, vec!["node:00", "node:79"]);
3836    }
3837
3838    #[test]
3839    fn sqlite_resolves_evidence_targets_with_indexed_properties() {
3840        let store = SqliteGraphStore::in_memory().unwrap();
3841        for node in [
3842            GraphNode::new("gbak-refresh", "backlog", "#refresh")
3843                .with_property("ref_id", "refresh")
3844                .with_property("handle", "backlog-handle"),
3845            GraphNode::new("gjob-refresh", "job_packet", "do #refresh")
3846                .with_property("ref_id", "refresh"),
3847            GraphNode::new("gwres-refresh", "worker_result", "completed #refresh")
3848                .with_property("ref_id", "refresh"),
3849        ] {
3850            store.upsert_node(&node).unwrap();
3851        }
3852
3853        let by_ref = store
3854            .resolve_evidence_target("#refresh", &["backlog", "job_packet", "worker_result"])
3855            .unwrap()
3856            .unwrap();
3857        assert_eq!(by_ref.id, "gbak-refresh");
3858        let by_handle = store
3859            .resolve_evidence_target("backlog-handle", &["backlog"])
3860            .unwrap()
3861            .unwrap();
3862        assert_eq!(by_handle.id, "gbak-refresh");
3863    }
3864
3865    #[test]
3866    fn sqlite_schema_migration_backfills_materialized_node_properties() {
3867        let conn = Connection::open_in_memory().unwrap();
3868        conn.execute_batch(
3869            r#"
3870            PRAGMA user_version = 2;
3871            CREATE TABLE graph_nodes (
3872                id TEXT PRIMARY KEY,
3873                kind TEXT NOT NULL,
3874                label TEXT NOT NULL,
3875                properties_json TEXT NOT NULL DEFAULT '{}',
3876                provenance_json TEXT NOT NULL DEFAULT '[]',
3877                freshness_json TEXT,
3878                row_hash TEXT,
3879                source_watermark TEXT
3880            );
3881            CREATE INDEX idx_graph_nodes_kind ON graph_nodes(kind);
3882            CREATE TABLE graph_edges (
3883                from_id TEXT NOT NULL,
3884                to_id TEXT NOT NULL,
3885                kind TEXT NOT NULL,
3886                properties_json TEXT NOT NULL DEFAULT '{}',
3887                provenance_json TEXT NOT NULL DEFAULT '[]',
3888                freshness_json TEXT,
3889                row_hash TEXT,
3890                source_watermark TEXT,
3891                PRIMARY KEY (from_id, to_id, kind)
3892            );
3893            CREATE INDEX idx_graph_edges_from_kind ON graph_edges(from_id, kind);
3894            CREATE INDEX idx_graph_edges_to_kind ON graph_edges(to_id, kind);
3895            CREATE TABLE graph_projection_versions (
3896                scope TEXT PRIMARY KEY,
3897                projection_version TEXT NOT NULL,
3898                content_hash TEXT,
3899                source_watermark TEXT,
3900                observed_at_unix INTEGER NOT NULL
3901            );
3902            CREATE TABLE graph_tombstones (
3903                row_key TEXT PRIMARY KEY,
3904                row_kind TEXT NOT NULL,
3905                deleted_at_unix INTEGER NOT NULL
3906            );
3907            INSERT INTO graph_nodes
3908                (id, kind, label, properties_json, provenance_json)
3909            VALUES
3910                ('topic:rooms', 'topic', 'Rooms', '{"domain":"livekit"}', '[]'),
3911                ('topic:egress', 'topic', 'Egress', '{"domain":"recording"}', '[]');
3912            INSERT INTO graph_edges
3913                (from_id, to_id, kind, properties_json, provenance_json)
3914            VALUES
3915                ('topic:rooms', 'topic:egress', 'mentions', '{"confidence":"0.91"}', '[]');
3916            "#,
3917        )
3918        .unwrap();
3919
3920        let store = SqliteGraphStore::from_connection(conn).unwrap();
3921        let version: i64 = store
3922            .conn
3923            .pragma_query_value(None, "user_version", |row| row.get(0))
3924            .unwrap();
3925        assert_eq!(version, SQLITE_GRAPH_SCHEMA_VERSION);
3926        let property_rows: usize = store
3927            .conn
3928            .query_row(
3929                "SELECT COUNT(*) FROM graph_node_properties WHERE key = 'domain'",
3930                [],
3931                |row| row.get(0),
3932            )
3933            .unwrap();
3934        assert_eq!(property_rows, 2);
3935        let edge_property_rows: usize = store
3936            .conn
3937            .query_row(
3938                "SELECT COUNT(*) FROM graph_edge_properties WHERE key = 'confidence'",
3939                [],
3940                |row| row.get(0),
3941            )
3942            .unwrap();
3943        assert_eq!(edge_property_rows, 1);
3944        let edge = store
3945            .edge(&GraphEdge::stable_id(
3946                "topic:rooms",
3947                "topic:egress",
3948                "mentions",
3949            ))
3950            .unwrap()
3951            .unwrap();
3952        assert_eq!(edge.properties.get("confidence"), Some(&"0.91".to_string()));
3953
3954        let page = store
3955            .paged_nodes_by_kind(
3956                "topic",
3957                GraphQueryOptions {
3958                    property_filters: vec![GraphPropertyFilter {
3959                        key: "domain".to_string(),
3960                        value: "livekit".to_string(),
3961                    }],
3962                    ..GraphQueryOptions::default()
3963                },
3964            )
3965            .unwrap();
3966        assert_eq!(page.nodes[0].id, "topic:rooms");
3967        assert!(
3968            page.page
3969                .diagnostics
3970                .iter()
3971                .any(|diagnostic| diagnostic.contains("idx_graph_node_properties_key_value_node")),
3972            "{:?}",
3973            page.page.diagnostics
3974        );
3975    }
3976
3977    #[test]
3978    fn sqlite_store_batches_reachable_nodes_by_kinds() {
3979        let store = SqliteGraphStore::in_memory().unwrap();
3980        for node in [
3981            GraphNode::new("start", "backlog", "start"),
3982            GraphNode::new("ctx", "worker_context", "context"),
3983            GraphNode::new("src", "source_handle", "source"),
3984            GraphNode::new("sem", "semantic_concept", "concept"),
3985        ] {
3986            store.upsert_node(&node).unwrap();
3987        }
3988        store
3989            .upsert_edge(&GraphEdge::new("start", "ctx", "has_context"))
3990            .unwrap();
3991        store
3992            .upsert_edge(&GraphEdge::new("ctx", "src", "uses_source"))
3993            .unwrap();
3994        store
3995            .upsert_edge(&GraphEdge::new("start", "sem", "mentions_concept"))
3996            .unwrap();
3997
3998        let rows = store
3999            .reachable_nodes_by_kinds(
4000                "start",
4001                &["worker_context", "source_handle", "semantic_concept"],
4002                2,
4003                8,
4004            )
4005            .unwrap();
4006        assert_eq!(rows["worker_context"][0].0.id, "ctx");
4007        assert_eq!(
4008            rows["source_handle"][0].1.nodes,
4009            vec!["start", "ctx", "src"]
4010        );
4011        assert_eq!(rows["semantic_concept"][0].1.hops, 1);
4012    }
4013
4014    #[test]
4015    fn sqlite_projection_refresh_handles_bulk_row_replacement() {
4016        let mut store = SqliteGraphStore::in_memory().unwrap();
4017        let source = GraphProvenance::new("fixture", "bulk");
4018        let mut projection = GraphProjection::default();
4019        for idx in 0..128 {
4020            projection.nodes.push(
4021                GraphNode::new(
4022                    format!("node:{idx:03}"),
4023                    if idx % 2 == 0 { "symbol" } else { "file" },
4024                    format!("bulk node {idx:03}"),
4025                )
4026                .with_property("ordinal", idx.to_string())
4027                .with_provenance(source.clone())
4028                .with_freshness(GraphFreshness::content_hash(format!("node-hash-{idx:03}"))),
4029            );
4030        }
4031        for idx in 0..127 {
4032            projection.edges.push(
4033                GraphEdge::new(
4034                    format!("node:{idx:03}"),
4035                    format!("node:{:03}", idx + 1),
4036                    "next",
4037                )
4038                .with_property("ordinal", idx.to_string())
4039                .with_provenance(source.clone())
4040                .with_freshness(GraphFreshness::content_hash(format!("edge-hash-{idx:03}"))),
4041            );
4042        }
4043
4044        store
4045            .replace_projection_with_version(
4046                "root",
4047                &projection,
4048                Some("bulk-v1"),
4049                Some("commit-a".to_string()),
4050            )
4051            .unwrap();
4052
4053        projection
4054            .nodes
4055            .retain(|node| !node.id.ends_with("000") && !node.id.ends_with("064"));
4056        projection.edges.retain(|edge| {
4057            !edge.from_id.ends_with("000")
4058                && !edge.to_id.ends_with("000")
4059                && !edge.from_id.ends_with("064")
4060                && !edge.to_id.ends_with("064")
4061        });
4062        let refresh = store
4063            .replace_projection_with_version(
4064                "root",
4065                &projection,
4066                Some("bulk-v2"),
4067                Some("commit-b".to_string()),
4068            )
4069            .unwrap();
4070
4071        assert_eq!(store.all_nodes().unwrap().len(), 126);
4072        assert_eq!(store.all_edges().unwrap().len(), 124);
4073        assert_eq!(
4074            refresh.tombstoned_nodes,
4075            vec!["node:000".to_string(), "node:064".to_string()]
4076        );
4077        assert_eq!(refresh.tombstoned_edges.len(), 3);
4078        assert_eq!(refresh.deleted_nodes, 2);
4079        assert_eq!(refresh.deleted_edges, 3);
4080        assert_eq!(refresh.unchanged_nodes, 126);
4081        assert_eq!(refresh.unchanged_edges, 124);
4082        assert_eq!(refresh.upserted_nodes, 0);
4083        assert_eq!(refresh.upserted_edges, 0);
4084        assert_eq!(refresh.unchanged_properties, 250);
4085        assert_eq!(refresh.upserted_properties, 0);
4086        assert!(
4087            refresh
4088                .phase_timings
4089                .iter()
4090                .any(|phase| phase.name == "sqlite_node_staging"
4091                    && phase.detail.contains("126 unchanged skipped")
4092                    && phase.detail.contains("multi-row chunks up to 500 rows")),
4093            "{:?}",
4094            refresh.phase_timings
4095        );
4096        assert!(
4097            refresh
4098                .phase_timings
4099                .iter()
4100                .any(|phase| phase.name == "sqlite_node_staging"
4101                    && phase.detail.contains("124 unchanged skipped")),
4102            "{:?}",
4103            refresh.phase_timings
4104        );
4105        let staged_node_properties: usize = store
4106            .conn
4107            .query_row(
4108                "SELECT COUNT(*) FROM temp.next_graph_node_properties",
4109                [],
4110                |row| row.get(0),
4111            )
4112            .unwrap();
4113        let staged_edge_properties: usize = store
4114            .conn
4115            .query_row(
4116                "SELECT COUNT(*) FROM temp.next_graph_edge_properties",
4117                [],
4118                |row| row.get(0),
4119            )
4120            .unwrap();
4121        assert_eq!(staged_node_properties, 0);
4122        assert_eq!(staged_edge_properties, 0);
4123        assert!(
4124            refresh
4125                .phase_timings
4126                .iter()
4127                .any(|phase| phase.name == "sqlite_property_row_staging"
4128                    && phase.detail.contains("new/changed node rows")),
4129            "{:?}",
4130            refresh.phase_timings
4131        );
4132        assert!(
4133            refresh
4134                .phase_timings
4135                .iter()
4136                .any(|phase| phase.name == "sqlite_edge_property_row_staging"
4137                    && phase.detail.contains("new/changed edge rows")),
4138            "{:?}",
4139            refresh.phase_timings
4140        );
4141        assert_eq!(
4142            store
4143                .projection_version("root")
4144                .unwrap()
4145                .unwrap()
4146                .source_watermark
4147                .as_deref(),
4148            Some("commit-b")
4149        );
4150    }
4151
4152    #[test]
4153    fn sqlite_reentrant_temp_table_guard_panics() {
4154        let store = SqliteGraphStore::in_memory().unwrap();
4155        store.temp_table_active.set(true);
4156        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4157            store.assert_not_in_temp_table_section();
4158        }));
4159        assert!(result.is_err());
4160    }
4161
4162    #[test]
4163    fn sqlite_temp_table_guard_clears_after_method() {
4164        let mut store = SqliteGraphStore::in_memory().unwrap();
4165        let projection = GraphProjection {
4166            nodes: vec![],
4167            edges: vec![],
4168        };
4169        store.replace_projection(&projection).unwrap();
4170        assert!(!store.temp_table_active.get());
4171    }
4172}