Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{CompiledGroupedQuery, CompiledQuery, DrivingTable, ExpansionSlot, ShapeHash};
8use fathomdb_schema::SchemaManager;
9use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
10
11use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
12use crate::{EngineError, sqlite};
13
14/// Maximum number of cached shape-hash to SQL mappings before the cache is
15/// cleared entirely.  A clear-all strategy is simpler than partial eviction
16/// and the cost of re-compiling on a miss is negligible.
17const MAX_SHAPE_CACHE_SIZE: usize = 4096;
18
19/// Maximum number of root IDs per batched expansion query.  Kept well below
20/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
21/// the edge-kind parameter.  Larger root sets are chunked into multiple
22/// batches of this size rather than falling back to per-root queries.
23const BATCH_CHUNK_SIZE: usize = 200;
24
25/// A pool of read-only `SQLite` connections for concurrent read access.
26///
27/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
28/// proceed in parallel when they happen to grab different slots.
29struct ReadPool {
30    connections: Vec<Mutex<Connection>>,
31}
32
33impl fmt::Debug for ReadPool {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("ReadPool")
36            .field("size", &self.connections.len())
37            .finish()
38    }
39}
40
41impl ReadPool {
42    /// Open `pool_size` read-only connections to the database at `path`.
43    ///
44    /// Each connection has PRAGMAs initialized via
45    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
46    /// feature is enabled and `vector_enabled` is true, the vec extension
47    /// auto-loaded.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`EngineError`] if any connection fails to open or initialize.
52    fn new(
53        db_path: &Path,
54        pool_size: usize,
55        schema_manager: &SchemaManager,
56        vector_enabled: bool,
57    ) -> Result<Self, EngineError> {
58        let mut connections = Vec::with_capacity(pool_size);
59        for _ in 0..pool_size {
60            let conn = if vector_enabled {
61                #[cfg(feature = "sqlite-vec")]
62                {
63                    sqlite::open_readonly_connection_with_vec(db_path)?
64                }
65                #[cfg(not(feature = "sqlite-vec"))]
66                {
67                    sqlite::open_readonly_connection(db_path)?
68                }
69            } else {
70                sqlite::open_readonly_connection(db_path)?
71            };
72            schema_manager
73                .initialize_reader_connection(&conn)
74                .map_err(EngineError::Schema)?;
75            connections.push(Mutex::new(conn));
76        }
77        Ok(Self { connections })
78    }
79
80    /// Acquire a connection from the pool.
81    ///
82    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
83    /// If every slot is held, falls back to a blocking lock on the first slot.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
88    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
89        // Fast path: try each connection without blocking.
90        for conn in &self.connections {
91            if let Ok(guard) = conn.try_lock() {
92                return Ok(guard);
93            }
94        }
95        // Fallback: block on the first connection.
96        self.connections[0].lock().map_err(|_| {
97            trace_error!("read pool: connection mutex poisoned");
98            EngineError::Bridge("connection mutex poisoned".to_owned())
99        })
100    }
101
102    /// Return the number of connections in the pool.
103    #[cfg(test)]
104    fn size(&self) -> usize {
105        self.connections.len()
106    }
107}
108
109/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
110///
111/// This is a read-only introspection struct. It does not execute SQL.
112#[derive(Clone, Debug, PartialEq, Eq)]
113pub struct QueryPlan {
114    pub sql: String,
115    pub bind_count: usize,
116    pub driving_table: DrivingTable,
117    pub shape_hash: ShapeHash,
118    pub cache_hit: bool,
119}
120
121/// A single node row returned from a query.
122#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct NodeRow {
124    /// Physical row ID.
125    pub row_id: String,
126    /// Logical ID of the node.
127    pub logical_id: String,
128    /// Node kind.
129    pub kind: String,
130    /// JSON-encoded node properties.
131    pub properties: String,
132    /// Optional URI referencing external content.
133    pub content_ref: Option<String>,
134    /// Unix timestamp of last access, if tracked.
135    pub last_accessed_at: Option<i64>,
136}
137
138/// A single run row returned from a query.
139#[derive(Clone, Debug, PartialEq, Eq)]
140pub struct RunRow {
141    /// Unique run ID.
142    pub id: String,
143    /// Run kind.
144    pub kind: String,
145    /// Current status.
146    pub status: String,
147    /// JSON-encoded run properties.
148    pub properties: String,
149}
150
151/// A single step row returned from a query.
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct StepRow {
154    /// Unique step ID.
155    pub id: String,
156    /// ID of the parent run.
157    pub run_id: String,
158    /// Step kind.
159    pub kind: String,
160    /// Current status.
161    pub status: String,
162    /// JSON-encoded step properties.
163    pub properties: String,
164}
165
166/// A single action row returned from a query.
167#[derive(Clone, Debug, PartialEq, Eq)]
168pub struct ActionRow {
169    /// Unique action ID.
170    pub id: String,
171    /// ID of the parent step.
172    pub step_id: String,
173    /// Action kind.
174    pub kind: String,
175    /// Current status.
176    pub status: String,
177    /// JSON-encoded action properties.
178    pub properties: String,
179}
180
181/// A single row from the `provenance_events` table.
182#[derive(Clone, Debug, PartialEq, Eq)]
183pub struct ProvenanceEvent {
184    pub id: String,
185    pub event_type: String,
186    pub subject: String,
187    pub source_ref: Option<String>,
188    pub metadata_json: String,
189    pub created_at: i64,
190}
191
192/// Result set from executing a flat (non-grouped) compiled query.
193#[derive(Clone, Debug, Default, PartialEq, Eq)]
194pub struct QueryRows {
195    /// Matched node rows.
196    pub nodes: Vec<NodeRow>,
197    /// Runs associated with the matched nodes.
198    pub runs: Vec<RunRow>,
199    /// Steps associated with the matched runs.
200    pub steps: Vec<StepRow>,
201    /// Actions associated with the matched steps.
202    pub actions: Vec<ActionRow>,
203    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
204    /// to degrade to an empty result instead of propagating an error.
205    pub was_degraded: bool,
206}
207
208/// Expansion results for a single root node within a grouped query.
209#[derive(Clone, Debug, PartialEq, Eq)]
210pub struct ExpansionRootRows {
211    /// Logical ID of the root node that seeded this expansion.
212    pub root_logical_id: String,
213    /// Nodes reached by traversing from the root.
214    pub nodes: Vec<NodeRow>,
215}
216
217/// All expansion results for a single named slot across all roots.
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub struct ExpansionSlotRows {
220    /// Name of the expansion slot.
221    pub slot: String,
222    /// Per-root expansion results.
223    pub roots: Vec<ExpansionRootRows>,
224}
225
226/// Result set from executing a grouped compiled query.
227#[derive(Clone, Debug, Default, PartialEq, Eq)]
228pub struct GroupedQueryRows {
229    /// Root node rows matched by the base query.
230    pub roots: Vec<NodeRow>,
231    /// Per-slot expansion results.
232    pub expansions: Vec<ExpansionSlotRows>,
233    /// `true` when a capability miss caused the query to degrade to an empty result.
234    pub was_degraded: bool,
235}
236
237/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
238pub struct ExecutionCoordinator {
239    database_path: PathBuf,
240    schema_manager: Arc<SchemaManager>,
241    pool: ReadPool,
242    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
243    vector_enabled: bool,
244    vec_degradation_warned: AtomicBool,
245    telemetry: Arc<TelemetryCounters>,
246}
247
248impl fmt::Debug for ExecutionCoordinator {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        f.debug_struct("ExecutionCoordinator")
251            .field("database_path", &self.database_path)
252            .finish_non_exhaustive()
253    }
254}
255
256impl ExecutionCoordinator {
257    /// # Errors
258    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
259    pub fn open(
260        path: impl AsRef<Path>,
261        schema_manager: Arc<SchemaManager>,
262        vector_dimension: Option<usize>,
263        pool_size: usize,
264        telemetry: Arc<TelemetryCounters>,
265    ) -> Result<Self, EngineError> {
266        let path = path.as_ref().to_path_buf();
267        #[cfg(feature = "sqlite-vec")]
268        let conn = if vector_dimension.is_some() {
269            sqlite::open_connection_with_vec(&path)?
270        } else {
271            sqlite::open_connection(&path)?
272        };
273        #[cfg(not(feature = "sqlite-vec"))]
274        let conn = sqlite::open_connection(&path)?;
275
276        let report = schema_manager.bootstrap(&conn)?;
277
278        #[cfg(feature = "sqlite-vec")]
279        let mut vector_enabled = report.vector_profile_enabled;
280        #[cfg(not(feature = "sqlite-vec"))]
281        let vector_enabled = {
282            let _ = &report;
283            false
284        };
285
286        if let Some(dim) = vector_dimension {
287            schema_manager
288                .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
289                .map_err(EngineError::Schema)?;
290            // Profile was just created or updated — mark as enabled.
291            #[cfg(feature = "sqlite-vec")]
292            {
293                vector_enabled = true;
294            }
295        }
296
297        // Drop the bootstrap connection — pool connections are used for reads.
298        drop(conn);
299
300        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
301
302        Ok(Self {
303            database_path: path,
304            schema_manager,
305            pool,
306            shape_sql_map: Mutex::new(HashMap::new()),
307            vector_enabled,
308            vec_degradation_warned: AtomicBool::new(false),
309            telemetry,
310        })
311    }
312
313    /// Returns the filesystem path to the `SQLite` database.
314    pub fn database_path(&self) -> &Path {
315        &self.database_path
316    }
317
318    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
319    #[must_use]
320    pub fn vector_enabled(&self) -> bool {
321        self.vector_enabled
322    }
323
324    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
325        self.pool.acquire()
326    }
327
328    /// Aggregate `SQLite` page-cache counters across all pool connections.
329    ///
330    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
331    /// Connections that are currently locked by a query are skipped — this
332    /// is acceptable for statistical counters.
333    #[must_use]
334    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
335        let mut total = SqliteCacheStatus::default();
336        for conn_mutex in &self.pool.connections {
337            if let Ok(conn) = conn_mutex.try_lock() {
338                total.add(&read_db_cache_status(&conn));
339            }
340        }
341        total
342    }
343
344    /// # Errors
345    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
346    #[allow(clippy::expect_used)]
347    pub fn execute_compiled_read(
348        &self,
349        compiled: &CompiledQuery,
350    ) -> Result<QueryRows, EngineError> {
351        let row_sql = wrap_node_row_projection_sql(&compiled.sql);
352        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
353        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
354        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
355        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
356        // so we propagate EngineError there instead.
357        {
358            let mut cache = self
359                .shape_sql_map
360                .lock()
361                .unwrap_or_else(PoisonError::into_inner);
362            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
363                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
364                cache.clear();
365            }
366            cache.insert(compiled.shape_hash, row_sql.clone());
367        }
368
369        let bind_values = compiled
370            .binds
371            .iter()
372            .map(bind_value_to_sql)
373            .collect::<Vec<_>>();
374
375        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
376        // shape_sql_map uses into_inner() (pure cache, safe to recover).
377        // conn uses map_err → EngineError (connection state may be corrupt after panic;
378        // into_inner() would risk using a connection with partial transaction state).
379        let conn_guard = match self.lock_connection() {
380            Ok(g) => g,
381            Err(e) => {
382                self.telemetry.increment_errors();
383                return Err(e);
384            }
385        };
386        let mut statement = match conn_guard.prepare_cached(&row_sql) {
387            Ok(stmt) => stmt,
388            Err(e) if is_vec_table_absent(&e) => {
389                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
390                    trace_warn!("vector table absent, degrading to non-vector query");
391                }
392                return Ok(QueryRows {
393                    was_degraded: true,
394                    ..Default::default()
395                });
396            }
397            Err(e) => {
398                self.telemetry.increment_errors();
399                return Err(EngineError::Sqlite(e));
400            }
401        };
402        let nodes = match statement
403            .query_map(params_from_iter(bind_values.iter()), |row| {
404                Ok(NodeRow {
405                    row_id: row.get(0)?,
406                    logical_id: row.get(1)?,
407                    kind: row.get(2)?,
408                    properties: row.get(3)?,
409                    content_ref: row.get(4)?,
410                    last_accessed_at: row.get(5)?,
411                })
412            })
413            .and_then(Iterator::collect)
414        {
415            Ok(rows) => rows,
416            Err(e) => {
417                self.telemetry.increment_errors();
418                return Err(EngineError::Sqlite(e));
419            }
420        };
421
422        self.telemetry.increment_queries();
423        Ok(QueryRows {
424            nodes,
425            runs: Vec::new(),
426            steps: Vec::new(),
427            actions: Vec::new(),
428            was_degraded: false,
429        })
430    }
431
432    /// # Errors
433    /// Returns [`EngineError`] if the root query or any bounded expansion
434    /// query cannot be prepared or executed.
435    pub fn execute_compiled_grouped_read(
436        &self,
437        compiled: &CompiledGroupedQuery,
438    ) -> Result<GroupedQueryRows, EngineError> {
439        let root_rows = self.execute_compiled_read(&compiled.root)?;
440        if root_rows.was_degraded {
441            return Ok(GroupedQueryRows {
442                roots: Vec::new(),
443                expansions: Vec::new(),
444                was_degraded: true,
445            });
446        }
447
448        let roots = root_rows.nodes;
449        let mut expansions = Vec::with_capacity(compiled.expansions.len());
450        for expansion in &compiled.expansions {
451            let slot_rows = if roots.is_empty() {
452                Vec::new()
453            } else {
454                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
455            };
456            expansions.push(ExpansionSlotRows {
457                slot: expansion.slot.clone(),
458                roots: slot_rows,
459            });
460        }
461
462        Ok(GroupedQueryRows {
463            roots,
464            expansions,
465            was_degraded: false,
466        })
467    }
468
469    /// Chunked batched expansion: splits roots into chunks of
470    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
471    /// results while preserving root ordering.  This keeps bind-parameter
472    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
473    /// for large result sets.
474    fn read_expansion_nodes_chunked(
475        &self,
476        roots: &[NodeRow],
477        expansion: &ExpansionSlot,
478        hard_limit: usize,
479    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
480        if roots.len() <= BATCH_CHUNK_SIZE {
481            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
482        }
483
484        // Merge chunk results keyed by root logical_id, then reassemble in
485        // root order.
486        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
487        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
488            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
489                per_root
490                    .entry(group.root_logical_id)
491                    .or_default()
492                    .extend(group.nodes);
493            }
494        }
495
496        Ok(roots
497            .iter()
498            .map(|root| ExpansionRootRows {
499                root_logical_id: root.logical_id.clone(),
500                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
501            })
502            .collect())
503    }
504
505    /// Batched expansion: one recursive CTE query per expansion slot that
506    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
507    /// source_logical_id ...)` to enforce the per-root hard limit inside the
508    /// database rather than in Rust.
509    fn read_expansion_nodes_batched(
510        &self,
511        roots: &[NodeRow],
512        expansion: &ExpansionSlot,
513        hard_limit: usize,
514    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
515        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
516        let (join_condition, next_logical_id) = match expansion.direction {
517            fathomdb_query::TraverseDirection::Out => {
518                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
519            }
520            fathomdb_query::TraverseDirection::In => {
521                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
522            }
523        };
524
525        // Build a UNION ALL of SELECT literals for the root seed rows.
526        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
527        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
528        let root_seed_union: String = (1..=root_ids.len())
529            .map(|i| format!("SELECT ?{i}"))
530            .collect::<Vec<_>>()
531            .join(" UNION ALL ");
532
533        // The `root_id` column tracks which root each traversal path
534        // originated from. The `ROW_NUMBER()` window in the outer query
535        // enforces the per-root hard limit.
536        let sql = format!(
537            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
538            traversed(root_id, logical_id, depth, visited, emitted) AS (
539                SELECT rid, rid, 0, printf(',%s,', rid), 0
540                FROM root_ids
541                UNION ALL
542                SELECT
543                    t.root_id,
544                    {next_logical_id},
545                    t.depth + 1,
546                    t.visited || {next_logical_id} || ',',
547                    t.emitted + 1
548                FROM traversed t
549                JOIN edges e ON {join_condition}
550                    AND e.kind = ?{edge_kind_param}
551                    AND e.superseded_at IS NULL
552                WHERE t.depth < {max_depth}
553                  AND t.emitted < {hard_limit}
554                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
555            ),
556            numbered AS (
557                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
558                     , n.content_ref, am.last_accessed_at
559                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
560                FROM traversed t
561                JOIN nodes n ON n.logical_id = t.logical_id
562                    AND n.superseded_at IS NULL
563                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
564                WHERE t.depth > 0
565            )
566            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
567            FROM numbered
568            WHERE rn <= {hard_limit}
569            ORDER BY root_id, logical_id",
570            edge_kind_param = root_ids.len() + 1,
571            max_depth = expansion.max_depth,
572        );
573
574        let conn_guard = self.lock_connection()?;
575        let mut statement = conn_guard
576            .prepare_cached(&sql)
577            .map_err(EngineError::Sqlite)?;
578
579        // Bind root IDs (1..=N) and edge kind (N+1).
580        let mut bind_values: Vec<Value> = root_ids
581            .iter()
582            .map(|id| Value::Text((*id).to_owned()))
583            .collect();
584        bind_values.push(Value::Text(expansion.label.clone()));
585
586        let rows = statement
587            .query_map(params_from_iter(bind_values.iter()), |row| {
588                Ok((
589                    row.get::<_, String>(0)?, // root_id
590                    NodeRow {
591                        row_id: row.get(1)?,
592                        logical_id: row.get(2)?,
593                        kind: row.get(3)?,
594                        properties: row.get(4)?,
595                        content_ref: row.get(5)?,
596                        last_accessed_at: row.get(6)?,
597                    },
598                ))
599            })
600            .map_err(EngineError::Sqlite)?
601            .collect::<Result<Vec<_>, _>>()
602            .map_err(EngineError::Sqlite)?;
603
604        // Partition results back into per-root groups, preserving root order.
605        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
606        for (root_id, node) in rows {
607            per_root.entry(root_id).or_default().push(node);
608        }
609
610        let root_groups = roots
611            .iter()
612            .map(|root| ExpansionRootRows {
613                root_logical_id: root.logical_id.clone(),
614                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
615            })
616            .collect();
617
618        Ok(root_groups)
619    }
620
621    /// Read a single run by id.
622    ///
623    /// # Errors
624    /// Returns [`EngineError`] if the query fails or if the connection mutex
625    /// has been poisoned.
626    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
627        let conn = self.lock_connection()?;
628        conn.query_row(
629            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
630            rusqlite::params![id],
631            |row| {
632                Ok(RunRow {
633                    id: row.get(0)?,
634                    kind: row.get(1)?,
635                    status: row.get(2)?,
636                    properties: row.get(3)?,
637                })
638            },
639        )
640        .optional()
641        .map_err(EngineError::Sqlite)
642    }
643
644    /// Read a single step by id.
645    ///
646    /// # Errors
647    /// Returns [`EngineError`] if the query fails or if the connection mutex
648    /// has been poisoned.
649    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
650        let conn = self.lock_connection()?;
651        conn.query_row(
652            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
653            rusqlite::params![id],
654            |row| {
655                Ok(StepRow {
656                    id: row.get(0)?,
657                    run_id: row.get(1)?,
658                    kind: row.get(2)?,
659                    status: row.get(3)?,
660                    properties: row.get(4)?,
661                })
662            },
663        )
664        .optional()
665        .map_err(EngineError::Sqlite)
666    }
667
668    /// Read a single action by id.
669    ///
670    /// # Errors
671    /// Returns [`EngineError`] if the query fails or if the connection mutex
672    /// has been poisoned.
673    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
674        let conn = self.lock_connection()?;
675        conn.query_row(
676            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
677            rusqlite::params![id],
678            |row| {
679                Ok(ActionRow {
680                    id: row.get(0)?,
681                    step_id: row.get(1)?,
682                    kind: row.get(2)?,
683                    status: row.get(3)?,
684                    properties: row.get(4)?,
685                })
686            },
687        )
688        .optional()
689        .map_err(EngineError::Sqlite)
690    }
691
692    /// Read all active (non-superseded) runs.
693    ///
694    /// # Errors
695    /// Returns [`EngineError`] if the query fails or if the connection mutex
696    /// has been poisoned.
697    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
698        let conn = self.lock_connection()?;
699        let mut stmt = conn
700            .prepare_cached(
701                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
702            )
703            .map_err(EngineError::Sqlite)?;
704        let rows = stmt
705            .query_map([], |row| {
706                Ok(RunRow {
707                    id: row.get(0)?,
708                    kind: row.get(1)?,
709                    status: row.get(2)?,
710                    properties: row.get(3)?,
711                })
712            })
713            .map_err(EngineError::Sqlite)?
714            .collect::<Result<Vec<_>, _>>()
715            .map_err(EngineError::Sqlite)?;
716        Ok(rows)
717    }
718
719    /// Returns the number of shape→SQL entries currently indexed.
720    ///
721    /// Each distinct query shape (structural hash of kind + steps + limits)
722    /// maps to exactly one SQL string.  This is a test-oriented introspection
723    /// helper; it does not reflect rusqlite's internal prepared-statement
724    /// cache, which is keyed by SQL text.
725    ///
726    /// # Panics
727    /// Panics if the internal shape-SQL-map mutex is poisoned.
728    #[must_use]
729    #[allow(clippy::expect_used)]
730    pub fn shape_sql_count(&self) -> usize {
731        self.shape_sql_map
732            .lock()
733            .unwrap_or_else(PoisonError::into_inner)
734            .len()
735    }
736
737    /// Returns a cloned `Arc` to the schema manager.
738    #[must_use]
739    pub fn schema_manager(&self) -> Arc<SchemaManager> {
740        Arc::clone(&self.schema_manager)
741    }
742
743    /// Return the execution plan for a compiled query without executing it.
744    ///
745    /// Useful for debugging, testing shape-hash caching, and operator
746    /// diagnostics. Does not open a transaction or touch the database beyond
747    /// checking the statement cache.
748    ///
749    /// # Panics
750    /// Panics if the internal shape-SQL-map mutex is poisoned.
751    #[must_use]
752    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
753        let cache_hit = self
754            .shape_sql_map
755            .lock()
756            .unwrap_or_else(PoisonError::into_inner)
757            .contains_key(&compiled.shape_hash);
758        QueryPlan {
759            sql: wrap_node_row_projection_sql(&compiled.sql),
760            bind_count: compiled.binds.len(),
761            driving_table: compiled.driving_table,
762            shape_hash: compiled.shape_hash,
763            cache_hit,
764        }
765    }
766
767    /// Execute a named PRAGMA and return the result as a String.
768    /// Used by Layer 1 tests to verify startup pragma initialization.
769    ///
770    /// # Errors
771    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
772    /// mutex has been poisoned.
773    #[doc(hidden)]
774    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
775        let conn = self.lock_connection()?;
776        let result = conn
777            .query_row(&format!("PRAGMA {name}"), [], |row| {
778                // PRAGMAs may return TEXT or INTEGER; normalise to String.
779                row.get::<_, rusqlite::types::Value>(0)
780            })
781            .map_err(EngineError::Sqlite)?;
782        let s = match result {
783            rusqlite::types::Value::Text(t) => t,
784            rusqlite::types::Value::Integer(i) => i.to_string(),
785            rusqlite::types::Value::Real(f) => f.to_string(),
786            rusqlite::types::Value::Blob(_) => {
787                return Err(EngineError::InvalidWrite(format!(
788                    "PRAGMA {name} returned an unexpected BLOB value"
789                )));
790            }
791            rusqlite::types::Value::Null => String::new(),
792        };
793        Ok(s)
794    }
795
796    /// Return all provenance events whose `subject` matches the given value.
797    ///
798    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
799    /// values (for excise events).
800    ///
801    /// # Errors
802    /// Returns [`EngineError`] if the query fails or if the connection mutex
803    /// has been poisoned.
804    pub fn query_provenance_events(
805        &self,
806        subject: &str,
807    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
808        let conn = self.lock_connection()?;
809        let mut stmt = conn
810            .prepare_cached(
811                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
812                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
813            )
814            .map_err(EngineError::Sqlite)?;
815        let events = stmt
816            .query_map(rusqlite::params![subject], |row| {
817                Ok(ProvenanceEvent {
818                    id: row.get(0)?,
819                    event_type: row.get(1)?,
820                    subject: row.get(2)?,
821                    source_ref: row.get(3)?,
822                    metadata_json: row.get(4)?,
823                    created_at: row.get(5)?,
824                })
825            })
826            .map_err(EngineError::Sqlite)?
827            .collect::<Result<Vec<_>, _>>()
828            .map_err(EngineError::Sqlite)?;
829        Ok(events)
830    }
831}
832
833fn wrap_node_row_projection_sql(base_sql: &str) -> String {
834    format!(
835        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
836         FROM ({base_sql}) q \
837         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
838    )
839}
840
841/// Returns `true` when `err` indicates the vec virtual table is absent
842/// (sqlite-vec feature enabled but `vec_nodes_active` not yet created).
843pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
844    match err {
845        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
846            msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
847        }
848        _ => false,
849    }
850}
851
852fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
853    match value {
854        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
855        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
856        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
857    }
858}
859
860#[cfg(test)]
861#[allow(clippy::expect_used)]
862mod tests {
863    use std::panic::{AssertUnwindSafe, catch_unwind};
864    use std::sync::Arc;
865
866    use fathomdb_query::{BindValue, QueryBuilder};
867    use fathomdb_schema::SchemaManager;
868    use rusqlite::types::Value;
869    use tempfile::NamedTempFile;
870
871    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
872
873    use super::{bind_value_to_sql, is_vec_table_absent, wrap_node_row_projection_sql};
874
875    #[test]
876    fn is_vec_table_absent_matches_known_error_messages() {
877        use rusqlite::ffi;
878        fn make_err(msg: &str) -> rusqlite::Error {
879            rusqlite::Error::SqliteFailure(
880                ffi::Error {
881                    code: ffi::ErrorCode::Unknown,
882                    extended_code: 1,
883                },
884                Some(msg.to_owned()),
885            )
886        }
887        assert!(is_vec_table_absent(&make_err(
888            "no such table: vec_nodes_active"
889        )));
890        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
891        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
892        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
893        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
894    }
895
896    #[test]
897    fn bind_value_text_maps_to_sql_text() {
898        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
899        assert_eq!(val, Value::Text("hello".to_owned()));
900    }
901
902    #[test]
903    fn bind_value_integer_maps_to_sql_integer() {
904        let val = bind_value_to_sql(&BindValue::Integer(42));
905        assert_eq!(val, Value::Integer(42));
906    }
907
908    #[test]
909    fn bind_value_bool_true_maps_to_integer_one() {
910        let val = bind_value_to_sql(&BindValue::Bool(true));
911        assert_eq!(val, Value::Integer(1));
912    }
913
914    #[test]
915    fn bind_value_bool_false_maps_to_integer_zero() {
916        let val = bind_value_to_sql(&BindValue::Bool(false));
917        assert_eq!(val, Value::Integer(0));
918    }
919
920    #[test]
921    fn same_shape_queries_share_one_cache_entry() {
922        let db = NamedTempFile::new().expect("temporary db");
923        let coordinator = ExecutionCoordinator::open(
924            db.path(),
925            Arc::new(SchemaManager::new()),
926            None,
927            1,
928            Arc::new(TelemetryCounters::default()),
929        )
930        .expect("coordinator");
931
932        let compiled_a = QueryBuilder::nodes("Meeting")
933            .text_search("budget", 5)
934            .limit(10)
935            .compile()
936            .expect("compiled a");
937        let compiled_b = QueryBuilder::nodes("Meeting")
938            .text_search("standup", 5)
939            .limit(10)
940            .compile()
941            .expect("compiled b");
942
943        coordinator
944            .execute_compiled_read(&compiled_a)
945            .expect("read a");
946        coordinator
947            .execute_compiled_read(&compiled_b)
948            .expect("read b");
949
950        assert_eq!(
951            compiled_a.shape_hash, compiled_b.shape_hash,
952            "different bind values, same structural shape → same hash"
953        );
954        assert_eq!(coordinator.shape_sql_count(), 1);
955    }
956
957    #[test]
958    fn vector_read_degrades_gracefully_when_vec_table_absent() {
959        let db = NamedTempFile::new().expect("temporary db");
960        let coordinator = ExecutionCoordinator::open(
961            db.path(),
962            Arc::new(SchemaManager::new()),
963            None,
964            1,
965            Arc::new(TelemetryCounters::default()),
966        )
967        .expect("coordinator");
968
969        let compiled = QueryBuilder::nodes("Meeting")
970            .vector_search("budget embeddings", 5)
971            .compile()
972            .expect("vector query compiles");
973
974        let result = coordinator.execute_compiled_read(&compiled);
975        let rows = result.expect("degraded read must succeed, not error");
976        assert!(
977            rows.was_degraded,
978            "result must be flagged as degraded when vec_nodes_active is absent"
979        );
980        assert!(
981            rows.nodes.is_empty(),
982            "degraded result must return empty nodes"
983        );
984    }
985
986    #[test]
987    fn coordinator_caches_by_shape_hash() {
988        let db = NamedTempFile::new().expect("temporary db");
989        let coordinator = ExecutionCoordinator::open(
990            db.path(),
991            Arc::new(SchemaManager::new()),
992            None,
993            1,
994            Arc::new(TelemetryCounters::default()),
995        )
996        .expect("coordinator");
997
998        let compiled = QueryBuilder::nodes("Meeting")
999            .text_search("budget", 5)
1000            .compile()
1001            .expect("compiled query");
1002
1003        coordinator
1004            .execute_compiled_read(&compiled)
1005            .expect("execute compiled read");
1006        assert_eq!(coordinator.shape_sql_count(), 1);
1007    }
1008
1009    // --- Item 6: explain_compiled_read tests ---
1010
1011    #[test]
1012    fn explain_returns_correct_sql() {
1013        let db = NamedTempFile::new().expect("temporary db");
1014        let coordinator = ExecutionCoordinator::open(
1015            db.path(),
1016            Arc::new(SchemaManager::new()),
1017            None,
1018            1,
1019            Arc::new(TelemetryCounters::default()),
1020        )
1021        .expect("coordinator");
1022
1023        let compiled = QueryBuilder::nodes("Meeting")
1024            .text_search("budget", 5)
1025            .compile()
1026            .expect("compiled query");
1027
1028        let plan = coordinator.explain_compiled_read(&compiled);
1029
1030        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
1031    }
1032
1033    #[test]
1034    fn explain_returns_correct_driving_table() {
1035        use fathomdb_query::DrivingTable;
1036
1037        let db = NamedTempFile::new().expect("temporary db");
1038        let coordinator = ExecutionCoordinator::open(
1039            db.path(),
1040            Arc::new(SchemaManager::new()),
1041            None,
1042            1,
1043            Arc::new(TelemetryCounters::default()),
1044        )
1045        .expect("coordinator");
1046
1047        let compiled = QueryBuilder::nodes("Meeting")
1048            .text_search("budget", 5)
1049            .compile()
1050            .expect("compiled query");
1051
1052        let plan = coordinator.explain_compiled_read(&compiled);
1053
1054        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
1055    }
1056
1057    #[test]
1058    fn explain_reports_cache_miss_then_hit() {
1059        let db = NamedTempFile::new().expect("temporary db");
1060        let coordinator = ExecutionCoordinator::open(
1061            db.path(),
1062            Arc::new(SchemaManager::new()),
1063            None,
1064            1,
1065            Arc::new(TelemetryCounters::default()),
1066        )
1067        .expect("coordinator");
1068
1069        let compiled = QueryBuilder::nodes("Meeting")
1070            .text_search("budget", 5)
1071            .compile()
1072            .expect("compiled query");
1073
1074        // Before execution: cache miss
1075        let plan_before = coordinator.explain_compiled_read(&compiled);
1076        assert!(
1077            !plan_before.cache_hit,
1078            "cache miss expected before first execute"
1079        );
1080
1081        // Execute to populate cache
1082        coordinator
1083            .execute_compiled_read(&compiled)
1084            .expect("execute read");
1085
1086        // After execution: cache hit
1087        let plan_after = coordinator.explain_compiled_read(&compiled);
1088        assert!(
1089            plan_after.cache_hit,
1090            "cache hit expected after first execute"
1091        );
1092    }
1093
1094    #[test]
1095    fn explain_does_not_execute_query() {
1096        // Call explain_compiled_read on an empty database. If explain were
1097        // actually executing SQL, it would return Ok with 0 rows. But the
1098        // key assertion is that it returns a QueryPlan (not an error) even
1099        // without touching the database.
1100        let db = NamedTempFile::new().expect("temporary db");
1101        let coordinator = ExecutionCoordinator::open(
1102            db.path(),
1103            Arc::new(SchemaManager::new()),
1104            None,
1105            1,
1106            Arc::new(TelemetryCounters::default()),
1107        )
1108        .expect("coordinator");
1109
1110        let compiled = QueryBuilder::nodes("Meeting")
1111            .text_search("anything", 5)
1112            .compile()
1113            .expect("compiled query");
1114
1115        // This must not error, even though the database is empty
1116        let plan = coordinator.explain_compiled_read(&compiled);
1117
1118        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
1119        assert_eq!(plan.bind_count, compiled.binds.len());
1120    }
1121
1122    #[test]
1123    fn coordinator_executes_compiled_read() {
1124        let db = NamedTempFile::new().expect("temporary db");
1125        let coordinator = ExecutionCoordinator::open(
1126            db.path(),
1127            Arc::new(SchemaManager::new()),
1128            None,
1129            1,
1130            Arc::new(TelemetryCounters::default()),
1131        )
1132        .expect("coordinator");
1133        let conn = rusqlite::Connection::open(db.path()).expect("open db");
1134
1135        conn.execute_batch(
1136            r#"
1137            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1138            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
1139            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1140            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
1141            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1142            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
1143            "#,
1144        )
1145        .expect("seed data");
1146
1147        let compiled = QueryBuilder::nodes("Meeting")
1148            .text_search("budget", 5)
1149            .limit(5)
1150            .compile()
1151            .expect("compiled query");
1152
1153        let rows = coordinator
1154            .execute_compiled_read(&compiled)
1155            .expect("execute read");
1156
1157        assert_eq!(rows.nodes.len(), 1);
1158        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
1159    }
1160
1161    #[test]
1162    fn text_search_finds_structured_only_node_via_property_fts() {
1163        let db = NamedTempFile::new().expect("temporary db");
1164        let coordinator = ExecutionCoordinator::open(
1165            db.path(),
1166            Arc::new(SchemaManager::new()),
1167            None,
1168            1,
1169            Arc::new(TelemetryCounters::default()),
1170        )
1171        .expect("coordinator");
1172        let conn = rusqlite::Connection::open(db.path()).expect("open db");
1173
1174        // Insert a structured-only node (no chunks) with a property FTS row.
1175        conn.execute_batch(
1176            r#"
1177            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
1178            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
1179            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
1180            VALUES ('goal-1', 'Goal', 'Ship v2');
1181            "#,
1182        )
1183        .expect("seed data");
1184
1185        let compiled = QueryBuilder::nodes("Goal")
1186            .text_search("Ship", 5)
1187            .limit(5)
1188            .compile()
1189            .expect("compiled query");
1190
1191        let rows = coordinator
1192            .execute_compiled_read(&compiled)
1193            .expect("execute read");
1194
1195        assert_eq!(rows.nodes.len(), 1);
1196        assert_eq!(rows.nodes[0].logical_id, "goal-1");
1197    }
1198
1199    #[test]
1200    fn text_search_returns_both_chunk_and_property_backed_hits() {
1201        let db = NamedTempFile::new().expect("temporary db");
1202        let coordinator = ExecutionCoordinator::open(
1203            db.path(),
1204            Arc::new(SchemaManager::new()),
1205            None,
1206            1,
1207            Arc::new(TelemetryCounters::default()),
1208        )
1209        .expect("coordinator");
1210        let conn = rusqlite::Connection::open(db.path()).expect("open db");
1211
1212        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
1213        conn.execute_batch(
1214            r"
1215            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
1216            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
1217            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1218            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
1219            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1220            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
1221            ",
1222        )
1223        .expect("seed chunk-backed node");
1224
1225        // Property-backed hit: a Meeting with property FTS containing "quarterly".
1226        conn.execute_batch(
1227            r#"
1228            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
1229            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
1230            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
1231            VALUES ('meeting-2', 'Meeting', 'quarterly sync');
1232            "#,
1233        )
1234        .expect("seed property-backed node");
1235
1236        let compiled = QueryBuilder::nodes("Meeting")
1237            .text_search("quarterly", 10)
1238            .limit(10)
1239            .compile()
1240            .expect("compiled query");
1241
1242        let rows = coordinator
1243            .execute_compiled_read(&compiled)
1244            .expect("execute read");
1245
1246        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
1247        ids.sort_unstable();
1248        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
1249    }
1250
1251    // --- Item 1: capability gate tests ---
1252
1253    #[test]
1254    fn capability_gate_reports_false_without_feature() {
1255        let db = NamedTempFile::new().expect("temporary db");
1256        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
1257        // when no dimension is requested (the vector profile is never bootstrapped).
1258        let coordinator = ExecutionCoordinator::open(
1259            db.path(),
1260            Arc::new(SchemaManager::new()),
1261            None,
1262            1,
1263            Arc::new(TelemetryCounters::default()),
1264        )
1265        .expect("coordinator");
1266        assert!(
1267            !coordinator.vector_enabled(),
1268            "vector_enabled must be false when no dimension is requested"
1269        );
1270    }
1271
1272    #[cfg(feature = "sqlite-vec")]
1273    #[test]
1274    fn capability_gate_reports_true_when_feature_enabled() {
1275        let db = NamedTempFile::new().expect("temporary db");
1276        let coordinator = ExecutionCoordinator::open(
1277            db.path(),
1278            Arc::new(SchemaManager::new()),
1279            Some(128),
1280            1,
1281            Arc::new(TelemetryCounters::default()),
1282        )
1283        .expect("coordinator");
1284        assert!(
1285            coordinator.vector_enabled(),
1286            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
1287        );
1288    }
1289
1290    // --- Item 4: runtime table read tests ---
1291
1292    #[test]
1293    fn read_run_returns_inserted_run() {
1294        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1295
1296        let db = NamedTempFile::new().expect("temporary db");
1297        let writer = WriterActor::start(
1298            db.path(),
1299            Arc::new(SchemaManager::new()),
1300            ProvenanceMode::Warn,
1301            Arc::new(TelemetryCounters::default()),
1302        )
1303        .expect("writer");
1304        writer
1305            .submit(WriteRequest {
1306                label: "runtime".to_owned(),
1307                nodes: vec![],
1308                node_retires: vec![],
1309                edges: vec![],
1310                edge_retires: vec![],
1311                chunks: vec![],
1312                runs: vec![RunInsert {
1313                    id: "run-r1".to_owned(),
1314                    kind: "session".to_owned(),
1315                    status: "active".to_owned(),
1316                    properties: "{}".to_owned(),
1317                    source_ref: Some("src-1".to_owned()),
1318                    upsert: false,
1319                    supersedes_id: None,
1320                }],
1321                steps: vec![],
1322                actions: vec![],
1323                optional_backfills: vec![],
1324                vec_inserts: vec![],
1325                operational_writes: vec![],
1326            })
1327            .expect("write run");
1328
1329        let coordinator = ExecutionCoordinator::open(
1330            db.path(),
1331            Arc::new(SchemaManager::new()),
1332            None,
1333            1,
1334            Arc::new(TelemetryCounters::default()),
1335        )
1336        .expect("coordinator");
1337        let row = coordinator
1338            .read_run("run-r1")
1339            .expect("read_run")
1340            .expect("row exists");
1341        assert_eq!(row.id, "run-r1");
1342        assert_eq!(row.kind, "session");
1343        assert_eq!(row.status, "active");
1344    }
1345
1346    #[test]
1347    fn read_step_returns_inserted_step() {
1348        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
1349
1350        let db = NamedTempFile::new().expect("temporary db");
1351        let writer = WriterActor::start(
1352            db.path(),
1353            Arc::new(SchemaManager::new()),
1354            ProvenanceMode::Warn,
1355            Arc::new(TelemetryCounters::default()),
1356        )
1357        .expect("writer");
1358        writer
1359            .submit(WriteRequest {
1360                label: "runtime".to_owned(),
1361                nodes: vec![],
1362                node_retires: vec![],
1363                edges: vec![],
1364                edge_retires: vec![],
1365                chunks: vec![],
1366                runs: vec![RunInsert {
1367                    id: "run-s1".to_owned(),
1368                    kind: "session".to_owned(),
1369                    status: "active".to_owned(),
1370                    properties: "{}".to_owned(),
1371                    source_ref: Some("src-1".to_owned()),
1372                    upsert: false,
1373                    supersedes_id: None,
1374                }],
1375                steps: vec![StepInsert {
1376                    id: "step-s1".to_owned(),
1377                    run_id: "run-s1".to_owned(),
1378                    kind: "llm".to_owned(),
1379                    status: "completed".to_owned(),
1380                    properties: "{}".to_owned(),
1381                    source_ref: Some("src-1".to_owned()),
1382                    upsert: false,
1383                    supersedes_id: None,
1384                }],
1385                actions: vec![],
1386                optional_backfills: vec![],
1387                vec_inserts: vec![],
1388                operational_writes: vec![],
1389            })
1390            .expect("write step");
1391
1392        let coordinator = ExecutionCoordinator::open(
1393            db.path(),
1394            Arc::new(SchemaManager::new()),
1395            None,
1396            1,
1397            Arc::new(TelemetryCounters::default()),
1398        )
1399        .expect("coordinator");
1400        let row = coordinator
1401            .read_step("step-s1")
1402            .expect("read_step")
1403            .expect("row exists");
1404        assert_eq!(row.id, "step-s1");
1405        assert_eq!(row.run_id, "run-s1");
1406        assert_eq!(row.kind, "llm");
1407    }
1408
1409    #[test]
1410    fn read_action_returns_inserted_action() {
1411        use crate::{
1412            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
1413            writer::{ActionInsert, StepInsert},
1414        };
1415
1416        let db = NamedTempFile::new().expect("temporary db");
1417        let writer = WriterActor::start(
1418            db.path(),
1419            Arc::new(SchemaManager::new()),
1420            ProvenanceMode::Warn,
1421            Arc::new(TelemetryCounters::default()),
1422        )
1423        .expect("writer");
1424        writer
1425            .submit(WriteRequest {
1426                label: "runtime".to_owned(),
1427                nodes: vec![],
1428                node_retires: vec![],
1429                edges: vec![],
1430                edge_retires: vec![],
1431                chunks: vec![],
1432                runs: vec![RunInsert {
1433                    id: "run-a1".to_owned(),
1434                    kind: "session".to_owned(),
1435                    status: "active".to_owned(),
1436                    properties: "{}".to_owned(),
1437                    source_ref: Some("src-1".to_owned()),
1438                    upsert: false,
1439                    supersedes_id: None,
1440                }],
1441                steps: vec![StepInsert {
1442                    id: "step-a1".to_owned(),
1443                    run_id: "run-a1".to_owned(),
1444                    kind: "llm".to_owned(),
1445                    status: "completed".to_owned(),
1446                    properties: "{}".to_owned(),
1447                    source_ref: Some("src-1".to_owned()),
1448                    upsert: false,
1449                    supersedes_id: None,
1450                }],
1451                actions: vec![ActionInsert {
1452                    id: "action-a1".to_owned(),
1453                    step_id: "step-a1".to_owned(),
1454                    kind: "emit".to_owned(),
1455                    status: "completed".to_owned(),
1456                    properties: "{}".to_owned(),
1457                    source_ref: Some("src-1".to_owned()),
1458                    upsert: false,
1459                    supersedes_id: None,
1460                }],
1461                optional_backfills: vec![],
1462                vec_inserts: vec![],
1463                operational_writes: vec![],
1464            })
1465            .expect("write action");
1466
1467        let coordinator = ExecutionCoordinator::open(
1468            db.path(),
1469            Arc::new(SchemaManager::new()),
1470            None,
1471            1,
1472            Arc::new(TelemetryCounters::default()),
1473        )
1474        .expect("coordinator");
1475        let row = coordinator
1476            .read_action("action-a1")
1477            .expect("read_action")
1478            .expect("row exists");
1479        assert_eq!(row.id, "action-a1");
1480        assert_eq!(row.step_id, "step-a1");
1481        assert_eq!(row.kind, "emit");
1482    }
1483
1484    #[test]
1485    fn read_active_runs_excludes_superseded() {
1486        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1487
1488        let db = NamedTempFile::new().expect("temporary db");
1489        let writer = WriterActor::start(
1490            db.path(),
1491            Arc::new(SchemaManager::new()),
1492            ProvenanceMode::Warn,
1493            Arc::new(TelemetryCounters::default()),
1494        )
1495        .expect("writer");
1496
1497        // Insert original run
1498        writer
1499            .submit(WriteRequest {
1500                label: "v1".to_owned(),
1501                nodes: vec![],
1502                node_retires: vec![],
1503                edges: vec![],
1504                edge_retires: vec![],
1505                chunks: vec![],
1506                runs: vec![RunInsert {
1507                    id: "run-v1".to_owned(),
1508                    kind: "session".to_owned(),
1509                    status: "active".to_owned(),
1510                    properties: "{}".to_owned(),
1511                    source_ref: Some("src-1".to_owned()),
1512                    upsert: false,
1513                    supersedes_id: None,
1514                }],
1515                steps: vec![],
1516                actions: vec![],
1517                optional_backfills: vec![],
1518                vec_inserts: vec![],
1519                operational_writes: vec![],
1520            })
1521            .expect("v1 write");
1522
1523        // Supersede original run with v2
1524        writer
1525            .submit(WriteRequest {
1526                label: "v2".to_owned(),
1527                nodes: vec![],
1528                node_retires: vec![],
1529                edges: vec![],
1530                edge_retires: vec![],
1531                chunks: vec![],
1532                runs: vec![RunInsert {
1533                    id: "run-v2".to_owned(),
1534                    kind: "session".to_owned(),
1535                    status: "completed".to_owned(),
1536                    properties: "{}".to_owned(),
1537                    source_ref: Some("src-2".to_owned()),
1538                    upsert: true,
1539                    supersedes_id: Some("run-v1".to_owned()),
1540                }],
1541                steps: vec![],
1542                actions: vec![],
1543                optional_backfills: vec![],
1544                vec_inserts: vec![],
1545                operational_writes: vec![],
1546            })
1547            .expect("v2 write");
1548
1549        let coordinator = ExecutionCoordinator::open(
1550            db.path(),
1551            Arc::new(SchemaManager::new()),
1552            None,
1553            1,
1554            Arc::new(TelemetryCounters::default()),
1555        )
1556        .expect("coordinator");
1557        let active = coordinator.read_active_runs().expect("read_active_runs");
1558
1559        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
1560        assert_eq!(active[0].id, "run-v2");
1561    }
1562
1563    #[allow(clippy::panic)]
1564    fn poison_connection(coordinator: &ExecutionCoordinator) {
1565        let result = catch_unwind(AssertUnwindSafe(|| {
1566            let _guard = coordinator.pool.connections[0]
1567                .lock()
1568                .expect("poison test lock");
1569            panic!("poison coordinator connection mutex");
1570        }));
1571        assert!(
1572            result.is_err(),
1573            "poison test must unwind while holding the connection mutex"
1574        );
1575    }
1576
1577    #[allow(clippy::panic)]
1578    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
1579    where
1580        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
1581    {
1582        match op(coordinator) {
1583            Err(EngineError::Bridge(message)) => {
1584                assert_eq!(message, "connection mutex poisoned");
1585            }
1586            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
1587            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
1588        }
1589    }
1590
1591    #[test]
1592    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
1593        let db = NamedTempFile::new().expect("temporary db");
1594        let coordinator = ExecutionCoordinator::open(
1595            db.path(),
1596            Arc::new(SchemaManager::new()),
1597            None,
1598            1,
1599            Arc::new(TelemetryCounters::default()),
1600        )
1601        .expect("coordinator");
1602
1603        poison_connection(&coordinator);
1604
1605        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
1606        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
1607        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
1608        assert_poisoned_connection_error(
1609            &coordinator,
1610            super::ExecutionCoordinator::read_active_runs,
1611        );
1612        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
1613        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
1614    }
1615
1616    // --- M-2: Bounded shape cache ---
1617
1618    #[test]
1619    fn shape_cache_stays_bounded() {
1620        use fathomdb_query::ShapeHash;
1621
1622        let db = NamedTempFile::new().expect("temporary db");
1623        let coordinator = ExecutionCoordinator::open(
1624            db.path(),
1625            Arc::new(SchemaManager::new()),
1626            None,
1627            1,
1628            Arc::new(TelemetryCounters::default()),
1629        )
1630        .expect("coordinator");
1631
1632        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
1633        {
1634            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
1635            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
1636                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
1637            }
1638        }
1639        // The cache is now over the limit but hasn't been pruned yet (pruning
1640        // happens on the insert path in execute_compiled_read).
1641
1642        // Execute a compiled read to trigger the bounded-cache check.
1643        let compiled = QueryBuilder::nodes("Meeting")
1644            .text_search("budget", 5)
1645            .limit(10)
1646            .compile()
1647            .expect("compiled query");
1648
1649        coordinator
1650            .execute_compiled_read(&compiled)
1651            .expect("execute read");
1652
1653        assert!(
1654            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
1655            "shape cache must stay bounded: got {} entries, max {}",
1656            coordinator.shape_sql_count(),
1657            super::MAX_SHAPE_CACHE_SIZE
1658        );
1659    }
1660
1661    // --- M-1: Read pool size ---
1662
1663    #[test]
1664    fn read_pool_size_configurable() {
1665        let db = NamedTempFile::new().expect("temporary db");
1666        let coordinator = ExecutionCoordinator::open(
1667            db.path(),
1668            Arc::new(SchemaManager::new()),
1669            None,
1670            2,
1671            Arc::new(TelemetryCounters::default()),
1672        )
1673        .expect("coordinator with pool_size=2");
1674
1675        assert_eq!(coordinator.pool.size(), 2);
1676
1677        // Basic read should succeed through the pool.
1678        let compiled = QueryBuilder::nodes("Meeting")
1679            .text_search("budget", 5)
1680            .limit(10)
1681            .compile()
1682            .expect("compiled query");
1683
1684        let result = coordinator.execute_compiled_read(&compiled);
1685        assert!(result.is_ok(), "read through pool must succeed");
1686    }
1687
1688    // --- M-4: Grouped read batching ---
1689
1690    #[test]
1691    fn grouped_read_results_match_baseline() {
1692        use fathomdb_query::TraverseDirection;
1693
1694        let db = NamedTempFile::new().expect("temporary db");
1695
1696        // Bootstrap the database via coordinator (creates schema).
1697        let coordinator = ExecutionCoordinator::open(
1698            db.path(),
1699            Arc::new(SchemaManager::new()),
1700            None,
1701            1,
1702            Arc::new(TelemetryCounters::default()),
1703        )
1704        .expect("coordinator");
1705
1706        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
1707        // to expansion nodes (Task-0-a, Task-0-b, etc.).
1708        {
1709            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
1710            for i in 0..10 {
1711                conn.execute_batch(&format!(
1712                    r#"
1713                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1714                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
1715                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1716                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
1717                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1718                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
1719
1720                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1721                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
1722                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1723                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
1724
1725                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1726                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
1727                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1728                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
1729                    "#,
1730                )).expect("seed data");
1731            }
1732        }
1733
1734        let compiled = QueryBuilder::nodes("Meeting")
1735            .text_search("meeting", 10)
1736            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
1737            .limit(10)
1738            .compile_grouped()
1739            .expect("compiled grouped query");
1740
1741        let result = coordinator
1742            .execute_compiled_grouped_read(&compiled)
1743            .expect("grouped read");
1744
1745        assert!(!result.was_degraded, "grouped read should not be degraded");
1746        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
1747        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
1748        assert_eq!(result.expansions[0].slot, "tasks");
1749        assert_eq!(
1750            result.expansions[0].roots.len(),
1751            10,
1752            "each expansion slot should have entries for all 10 roots"
1753        );
1754
1755        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
1756        for root_expansion in &result.expansions[0].roots {
1757            assert_eq!(
1758                root_expansion.nodes.len(),
1759                2,
1760                "root {} should have 2 expansion nodes, got {}",
1761                root_expansion.root_logical_id,
1762                root_expansion.nodes.len()
1763            );
1764        }
1765    }
1766}