Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{CompiledGroupedQuery, CompiledQuery, DrivingTable, ExpansionSlot, ShapeHash};
8use fathomdb_schema::SchemaManager;
9use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
10
11use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
12use crate::{EngineError, sqlite};
13
14/// Maximum number of cached shape-hash to SQL mappings before the cache is
15/// cleared entirely.  A clear-all strategy is simpler than partial eviction
16/// and the cost of re-compiling on a miss is negligible.
17const MAX_SHAPE_CACHE_SIZE: usize = 4096;
18
19/// Maximum number of root IDs per batched expansion query.  Kept well below
20/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
21/// the edge-kind parameter.  Larger root sets are chunked into multiple
22/// batches of this size rather than falling back to per-root queries.
23const BATCH_CHUNK_SIZE: usize = 200;
24
25/// A pool of read-only `SQLite` connections for concurrent read access.
26///
27/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
28/// proceed in parallel when they happen to grab different slots.
29struct ReadPool {
30    connections: Vec<Mutex<Connection>>,
31}
32
33impl fmt::Debug for ReadPool {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("ReadPool")
36            .field("size", &self.connections.len())
37            .finish()
38    }
39}
40
41impl ReadPool {
42    /// Open `pool_size` read-only connections to the database at `path`.
43    ///
44    /// Each connection has PRAGMAs initialized via
45    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
46    /// feature is enabled and `vector_enabled` is true, the vec extension
47    /// auto-loaded.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`EngineError`] if any connection fails to open or initialize.
52    fn new(
53        db_path: &Path,
54        pool_size: usize,
55        schema_manager: &SchemaManager,
56        vector_enabled: bool,
57    ) -> Result<Self, EngineError> {
58        let mut connections = Vec::with_capacity(pool_size);
59        for _ in 0..pool_size {
60            let conn = if vector_enabled {
61                #[cfg(feature = "sqlite-vec")]
62                {
63                    sqlite::open_readonly_connection_with_vec(db_path)?
64                }
65                #[cfg(not(feature = "sqlite-vec"))]
66                {
67                    sqlite::open_readonly_connection(db_path)?
68                }
69            } else {
70                sqlite::open_readonly_connection(db_path)?
71            };
72            schema_manager
73                .initialize_reader_connection(&conn)
74                .map_err(EngineError::Schema)?;
75            connections.push(Mutex::new(conn));
76        }
77        Ok(Self { connections })
78    }
79
80    /// Acquire a connection from the pool.
81    ///
82    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
83    /// If every slot is held, falls back to a blocking lock on the first slot.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
88    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
89        // Fast path: try each connection without blocking.
90        for conn in &self.connections {
91            if let Ok(guard) = conn.try_lock() {
92                return Ok(guard);
93            }
94        }
95        // Fallback: block on the first connection.
96        self.connections[0].lock().map_err(|_| {
97            trace_error!("read pool: connection mutex poisoned");
98            EngineError::Bridge("connection mutex poisoned".to_owned())
99        })
100    }
101
102    /// Return the number of connections in the pool.
103    #[cfg(test)]
104    fn size(&self) -> usize {
105        self.connections.len()
106    }
107}
108
109/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
110///
111/// This is a read-only introspection struct. It does not execute SQL.
112#[derive(Clone, Debug, PartialEq, Eq)]
113pub struct QueryPlan {
114    pub sql: String,
115    pub bind_count: usize,
116    pub driving_table: DrivingTable,
117    pub shape_hash: ShapeHash,
118    pub cache_hit: bool,
119}
120
121/// A single node row returned from a query.
122#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct NodeRow {
124    /// Physical row ID.
125    pub row_id: String,
126    /// Logical ID of the node.
127    pub logical_id: String,
128    /// Node kind.
129    pub kind: String,
130    /// JSON-encoded node properties.
131    pub properties: String,
132    /// Unix timestamp of last access, if tracked.
133    pub last_accessed_at: Option<i64>,
134}
135
136/// A single run row returned from a query.
137#[derive(Clone, Debug, PartialEq, Eq)]
138pub struct RunRow {
139    /// Unique run ID.
140    pub id: String,
141    /// Run kind.
142    pub kind: String,
143    /// Current status.
144    pub status: String,
145    /// JSON-encoded run properties.
146    pub properties: String,
147}
148
149/// A single step row returned from a query.
150#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct StepRow {
152    /// Unique step ID.
153    pub id: String,
154    /// ID of the parent run.
155    pub run_id: String,
156    /// Step kind.
157    pub kind: String,
158    /// Current status.
159    pub status: String,
160    /// JSON-encoded step properties.
161    pub properties: String,
162}
163
164/// A single action row returned from a query.
165#[derive(Clone, Debug, PartialEq, Eq)]
166pub struct ActionRow {
167    /// Unique action ID.
168    pub id: String,
169    /// ID of the parent step.
170    pub step_id: String,
171    /// Action kind.
172    pub kind: String,
173    /// Current status.
174    pub status: String,
175    /// JSON-encoded action properties.
176    pub properties: String,
177}
178
179/// A single row from the `provenance_events` table.
180#[derive(Clone, Debug, PartialEq, Eq)]
181pub struct ProvenanceEvent {
182    pub id: String,
183    pub event_type: String,
184    pub subject: String,
185    pub source_ref: Option<String>,
186    pub metadata_json: String,
187    pub created_at: i64,
188}
189
190/// Result set from executing a flat (non-grouped) compiled query.
191#[derive(Clone, Debug, Default, PartialEq, Eq)]
192pub struct QueryRows {
193    /// Matched node rows.
194    pub nodes: Vec<NodeRow>,
195    /// Runs associated with the matched nodes.
196    pub runs: Vec<RunRow>,
197    /// Steps associated with the matched runs.
198    pub steps: Vec<StepRow>,
199    /// Actions associated with the matched steps.
200    pub actions: Vec<ActionRow>,
201    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
202    /// to degrade to an empty result instead of propagating an error.
203    pub was_degraded: bool,
204}
205
206/// Expansion results for a single root node within a grouped query.
207#[derive(Clone, Debug, PartialEq, Eq)]
208pub struct ExpansionRootRows {
209    /// Logical ID of the root node that seeded this expansion.
210    pub root_logical_id: String,
211    /// Nodes reached by traversing from the root.
212    pub nodes: Vec<NodeRow>,
213}
214
215/// All expansion results for a single named slot across all roots.
216#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct ExpansionSlotRows {
218    /// Name of the expansion slot.
219    pub slot: String,
220    /// Per-root expansion results.
221    pub roots: Vec<ExpansionRootRows>,
222}
223
224/// Result set from executing a grouped compiled query.
225#[derive(Clone, Debug, Default, PartialEq, Eq)]
226pub struct GroupedQueryRows {
227    /// Root node rows matched by the base query.
228    pub roots: Vec<NodeRow>,
229    /// Per-slot expansion results.
230    pub expansions: Vec<ExpansionSlotRows>,
231    /// `true` when a capability miss caused the query to degrade to an empty result.
232    pub was_degraded: bool,
233}
234
235/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
236pub struct ExecutionCoordinator {
237    database_path: PathBuf,
238    schema_manager: Arc<SchemaManager>,
239    pool: ReadPool,
240    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
241    vector_enabled: bool,
242    vec_degradation_warned: AtomicBool,
243    telemetry: Arc<TelemetryCounters>,
244}
245
246impl fmt::Debug for ExecutionCoordinator {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        f.debug_struct("ExecutionCoordinator")
249            .field("database_path", &self.database_path)
250            .finish_non_exhaustive()
251    }
252}
253
254impl ExecutionCoordinator {
255    /// # Errors
256    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
257    pub fn open(
258        path: impl AsRef<Path>,
259        schema_manager: Arc<SchemaManager>,
260        vector_dimension: Option<usize>,
261        pool_size: usize,
262        telemetry: Arc<TelemetryCounters>,
263    ) -> Result<Self, EngineError> {
264        let path = path.as_ref().to_path_buf();
265        #[cfg(feature = "sqlite-vec")]
266        let conn = if vector_dimension.is_some() {
267            sqlite::open_connection_with_vec(&path)?
268        } else {
269            sqlite::open_connection(&path)?
270        };
271        #[cfg(not(feature = "sqlite-vec"))]
272        let conn = sqlite::open_connection(&path)?;
273
274        let report = schema_manager.bootstrap(&conn)?;
275
276        #[cfg(feature = "sqlite-vec")]
277        let mut vector_enabled = report.vector_profile_enabled;
278        #[cfg(not(feature = "sqlite-vec"))]
279        let vector_enabled = {
280            let _ = &report;
281            false
282        };
283
284        if let Some(dim) = vector_dimension {
285            schema_manager
286                .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
287                .map_err(EngineError::Schema)?;
288            // Profile was just created or updated — mark as enabled.
289            #[cfg(feature = "sqlite-vec")]
290            {
291                vector_enabled = true;
292            }
293        }
294
295        // Drop the bootstrap connection — pool connections are used for reads.
296        drop(conn);
297
298        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
299
300        Ok(Self {
301            database_path: path,
302            schema_manager,
303            pool,
304            shape_sql_map: Mutex::new(HashMap::new()),
305            vector_enabled,
306            vec_degradation_warned: AtomicBool::new(false),
307            telemetry,
308        })
309    }
310
311    /// Returns the filesystem path to the `SQLite` database.
312    pub fn database_path(&self) -> &Path {
313        &self.database_path
314    }
315
316    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
317    #[must_use]
318    pub fn vector_enabled(&self) -> bool {
319        self.vector_enabled
320    }
321
322    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
323        self.pool.acquire()
324    }
325
326    /// Aggregate `SQLite` page-cache counters across all pool connections.
327    ///
328    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
329    /// Connections that are currently locked by a query are skipped — this
330    /// is acceptable for statistical counters.
331    #[must_use]
332    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
333        let mut total = SqliteCacheStatus::default();
334        for conn_mutex in &self.pool.connections {
335            if let Ok(conn) = conn_mutex.try_lock() {
336                total.add(&read_db_cache_status(&conn));
337            }
338        }
339        total
340    }
341
342    /// # Errors
343    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
344    #[allow(clippy::expect_used)]
345    pub fn execute_compiled_read(
346        &self,
347        compiled: &CompiledQuery,
348    ) -> Result<QueryRows, EngineError> {
349        let row_sql = wrap_node_row_projection_sql(&compiled.sql);
350        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
351        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
352        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
353        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
354        // so we propagate EngineError there instead.
355        {
356            let mut cache = self
357                .shape_sql_map
358                .lock()
359                .unwrap_or_else(PoisonError::into_inner);
360            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
361                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
362                cache.clear();
363            }
364            cache.insert(compiled.shape_hash, row_sql.clone());
365        }
366
367        let bind_values = compiled
368            .binds
369            .iter()
370            .map(bind_value_to_sql)
371            .collect::<Vec<_>>();
372
373        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
374        // shape_sql_map uses into_inner() (pure cache, safe to recover).
375        // conn uses map_err → EngineError (connection state may be corrupt after panic;
376        // into_inner() would risk using a connection with partial transaction state).
377        let conn_guard = match self.lock_connection() {
378            Ok(g) => g,
379            Err(e) => {
380                self.telemetry.increment_errors();
381                return Err(e);
382            }
383        };
384        let mut statement = match conn_guard.prepare_cached(&row_sql) {
385            Ok(stmt) => stmt,
386            Err(e) if is_vec_table_absent(&e) => {
387                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
388                    trace_warn!("vector table absent, degrading to non-vector query");
389                }
390                return Ok(QueryRows {
391                    was_degraded: true,
392                    ..Default::default()
393                });
394            }
395            Err(e) => {
396                self.telemetry.increment_errors();
397                return Err(EngineError::Sqlite(e));
398            }
399        };
400        let nodes = match statement
401            .query_map(params_from_iter(bind_values.iter()), |row| {
402                Ok(NodeRow {
403                    row_id: row.get(0)?,
404                    logical_id: row.get(1)?,
405                    kind: row.get(2)?,
406                    properties: row.get(3)?,
407                    last_accessed_at: row.get(4)?,
408                })
409            })
410            .and_then(Iterator::collect)
411        {
412            Ok(rows) => rows,
413            Err(e) => {
414                self.telemetry.increment_errors();
415                return Err(EngineError::Sqlite(e));
416            }
417        };
418
419        self.telemetry.increment_queries();
420        Ok(QueryRows {
421            nodes,
422            runs: Vec::new(),
423            steps: Vec::new(),
424            actions: Vec::new(),
425            was_degraded: false,
426        })
427    }
428
429    /// # Errors
430    /// Returns [`EngineError`] if the root query or any bounded expansion
431    /// query cannot be prepared or executed.
432    pub fn execute_compiled_grouped_read(
433        &self,
434        compiled: &CompiledGroupedQuery,
435    ) -> Result<GroupedQueryRows, EngineError> {
436        let root_rows = self.execute_compiled_read(&compiled.root)?;
437        if root_rows.was_degraded {
438            return Ok(GroupedQueryRows {
439                roots: Vec::new(),
440                expansions: Vec::new(),
441                was_degraded: true,
442            });
443        }
444
445        let roots = root_rows.nodes;
446        let mut expansions = Vec::with_capacity(compiled.expansions.len());
447        for expansion in &compiled.expansions {
448            let slot_rows = if roots.is_empty() {
449                Vec::new()
450            } else {
451                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
452            };
453            expansions.push(ExpansionSlotRows {
454                slot: expansion.slot.clone(),
455                roots: slot_rows,
456            });
457        }
458
459        Ok(GroupedQueryRows {
460            roots,
461            expansions,
462            was_degraded: false,
463        })
464    }
465
466    /// Chunked batched expansion: splits roots into chunks of
467    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
468    /// results while preserving root ordering.  This keeps bind-parameter
469    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
470    /// for large result sets.
471    fn read_expansion_nodes_chunked(
472        &self,
473        roots: &[NodeRow],
474        expansion: &ExpansionSlot,
475        hard_limit: usize,
476    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
477        if roots.len() <= BATCH_CHUNK_SIZE {
478            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
479        }
480
481        // Merge chunk results keyed by root logical_id, then reassemble in
482        // root order.
483        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
484        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
485            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
486                per_root
487                    .entry(group.root_logical_id)
488                    .or_default()
489                    .extend(group.nodes);
490            }
491        }
492
493        Ok(roots
494            .iter()
495            .map(|root| ExpansionRootRows {
496                root_logical_id: root.logical_id.clone(),
497                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
498            })
499            .collect())
500    }
501
502    /// Batched expansion: one recursive CTE query per expansion slot that
503    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
504    /// source_logical_id ...)` to enforce the per-root hard limit inside the
505    /// database rather than in Rust.
506    fn read_expansion_nodes_batched(
507        &self,
508        roots: &[NodeRow],
509        expansion: &ExpansionSlot,
510        hard_limit: usize,
511    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
512        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
513        let (join_condition, next_logical_id) = match expansion.direction {
514            fathomdb_query::TraverseDirection::Out => {
515                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
516            }
517            fathomdb_query::TraverseDirection::In => {
518                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
519            }
520        };
521
522        // Build a UNION ALL of SELECT literals for the root seed rows.
523        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
524        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
525        let root_seed_union: String = (1..=root_ids.len())
526            .map(|i| format!("SELECT ?{i}"))
527            .collect::<Vec<_>>()
528            .join(" UNION ALL ");
529
530        // The `root_id` column tracks which root each traversal path
531        // originated from. The `ROW_NUMBER()` window in the outer query
532        // enforces the per-root hard limit.
533        let sql = format!(
534            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
535            traversed(root_id, logical_id, depth, visited, emitted) AS (
536                SELECT rid, rid, 0, printf(',%s,', rid), 0
537                FROM root_ids
538                UNION ALL
539                SELECT
540                    t.root_id,
541                    {next_logical_id},
542                    t.depth + 1,
543                    t.visited || {next_logical_id} || ',',
544                    t.emitted + 1
545                FROM traversed t
546                JOIN edges e ON {join_condition}
547                    AND e.kind = ?{edge_kind_param}
548                    AND e.superseded_at IS NULL
549                WHERE t.depth < {max_depth}
550                  AND t.emitted < {hard_limit}
551                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
552            ),
553            numbered AS (
554                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
555                     , am.last_accessed_at
556                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
557                FROM traversed t
558                JOIN nodes n ON n.logical_id = t.logical_id
559                    AND n.superseded_at IS NULL
560                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
561                WHERE t.depth > 0
562            )
563            SELECT root_id, row_id, logical_id, kind, properties, last_accessed_at
564            FROM numbered
565            WHERE rn <= {hard_limit}
566            ORDER BY root_id, logical_id",
567            edge_kind_param = root_ids.len() + 1,
568            max_depth = expansion.max_depth,
569        );
570
571        let conn_guard = self.lock_connection()?;
572        let mut statement = conn_guard
573            .prepare_cached(&sql)
574            .map_err(EngineError::Sqlite)?;
575
576        // Bind root IDs (1..=N) and edge kind (N+1).
577        let mut bind_values: Vec<Value> = root_ids
578            .iter()
579            .map(|id| Value::Text((*id).to_owned()))
580            .collect();
581        bind_values.push(Value::Text(expansion.label.clone()));
582
583        let rows = statement
584            .query_map(params_from_iter(bind_values.iter()), |row| {
585                Ok((
586                    row.get::<_, String>(0)?, // root_id
587                    NodeRow {
588                        row_id: row.get(1)?,
589                        logical_id: row.get(2)?,
590                        kind: row.get(3)?,
591                        properties: row.get(4)?,
592                        last_accessed_at: row.get(5)?,
593                    },
594                ))
595            })
596            .map_err(EngineError::Sqlite)?
597            .collect::<Result<Vec<_>, _>>()
598            .map_err(EngineError::Sqlite)?;
599
600        // Partition results back into per-root groups, preserving root order.
601        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
602        for (root_id, node) in rows {
603            per_root.entry(root_id).or_default().push(node);
604        }
605
606        let root_groups = roots
607            .iter()
608            .map(|root| ExpansionRootRows {
609                root_logical_id: root.logical_id.clone(),
610                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
611            })
612            .collect();
613
614        Ok(root_groups)
615    }
616
617    /// Read a single run by id.
618    ///
619    /// # Errors
620    /// Returns [`EngineError`] if the query fails or if the connection mutex
621    /// has been poisoned.
622    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
623        let conn = self.lock_connection()?;
624        conn.query_row(
625            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
626            rusqlite::params![id],
627            |row| {
628                Ok(RunRow {
629                    id: row.get(0)?,
630                    kind: row.get(1)?,
631                    status: row.get(2)?,
632                    properties: row.get(3)?,
633                })
634            },
635        )
636        .optional()
637        .map_err(EngineError::Sqlite)
638    }
639
640    /// Read a single step by id.
641    ///
642    /// # Errors
643    /// Returns [`EngineError`] if the query fails or if the connection mutex
644    /// has been poisoned.
645    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
646        let conn = self.lock_connection()?;
647        conn.query_row(
648            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
649            rusqlite::params![id],
650            |row| {
651                Ok(StepRow {
652                    id: row.get(0)?,
653                    run_id: row.get(1)?,
654                    kind: row.get(2)?,
655                    status: row.get(3)?,
656                    properties: row.get(4)?,
657                })
658            },
659        )
660        .optional()
661        .map_err(EngineError::Sqlite)
662    }
663
664    /// Read a single action by id.
665    ///
666    /// # Errors
667    /// Returns [`EngineError`] if the query fails or if the connection mutex
668    /// has been poisoned.
669    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
670        let conn = self.lock_connection()?;
671        conn.query_row(
672            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
673            rusqlite::params![id],
674            |row| {
675                Ok(ActionRow {
676                    id: row.get(0)?,
677                    step_id: row.get(1)?,
678                    kind: row.get(2)?,
679                    status: row.get(3)?,
680                    properties: row.get(4)?,
681                })
682            },
683        )
684        .optional()
685        .map_err(EngineError::Sqlite)
686    }
687
688    /// Read all active (non-superseded) runs.
689    ///
690    /// # Errors
691    /// Returns [`EngineError`] if the query fails or if the connection mutex
692    /// has been poisoned.
693    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
694        let conn = self.lock_connection()?;
695        let mut stmt = conn
696            .prepare_cached(
697                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
698            )
699            .map_err(EngineError::Sqlite)?;
700        let rows = stmt
701            .query_map([], |row| {
702                Ok(RunRow {
703                    id: row.get(0)?,
704                    kind: row.get(1)?,
705                    status: row.get(2)?,
706                    properties: row.get(3)?,
707                })
708            })
709            .map_err(EngineError::Sqlite)?
710            .collect::<Result<Vec<_>, _>>()
711            .map_err(EngineError::Sqlite)?;
712        Ok(rows)
713    }
714
715    /// Returns the number of shape→SQL entries currently indexed.
716    ///
717    /// Each distinct query shape (structural hash of kind + steps + limits)
718    /// maps to exactly one SQL string.  This is a test-oriented introspection
719    /// helper; it does not reflect rusqlite's internal prepared-statement
720    /// cache, which is keyed by SQL text.
721    ///
722    /// # Panics
723    /// Panics if the internal shape-SQL-map mutex is poisoned.
724    #[must_use]
725    #[allow(clippy::expect_used)]
726    pub fn shape_sql_count(&self) -> usize {
727        self.shape_sql_map
728            .lock()
729            .unwrap_or_else(PoisonError::into_inner)
730            .len()
731    }
732
733    /// Returns a cloned `Arc` to the schema manager.
734    #[must_use]
735    pub fn schema_manager(&self) -> Arc<SchemaManager> {
736        Arc::clone(&self.schema_manager)
737    }
738
739    /// Return the execution plan for a compiled query without executing it.
740    ///
741    /// Useful for debugging, testing shape-hash caching, and operator
742    /// diagnostics. Does not open a transaction or touch the database beyond
743    /// checking the statement cache.
744    ///
745    /// # Panics
746    /// Panics if the internal shape-SQL-map mutex is poisoned.
747    #[must_use]
748    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
749        let cache_hit = self
750            .shape_sql_map
751            .lock()
752            .unwrap_or_else(PoisonError::into_inner)
753            .contains_key(&compiled.shape_hash);
754        QueryPlan {
755            sql: wrap_node_row_projection_sql(&compiled.sql),
756            bind_count: compiled.binds.len(),
757            driving_table: compiled.driving_table,
758            shape_hash: compiled.shape_hash,
759            cache_hit,
760        }
761    }
762
763    /// Execute a named PRAGMA and return the result as a String.
764    /// Used by Layer 1 tests to verify startup pragma initialization.
765    ///
766    /// # Errors
767    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
768    /// mutex has been poisoned.
769    #[doc(hidden)]
770    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
771        let conn = self.lock_connection()?;
772        let result = conn
773            .query_row(&format!("PRAGMA {name}"), [], |row| {
774                // PRAGMAs may return TEXT or INTEGER; normalise to String.
775                row.get::<_, rusqlite::types::Value>(0)
776            })
777            .map_err(EngineError::Sqlite)?;
778        let s = match result {
779            rusqlite::types::Value::Text(t) => t,
780            rusqlite::types::Value::Integer(i) => i.to_string(),
781            rusqlite::types::Value::Real(f) => f.to_string(),
782            rusqlite::types::Value::Blob(_) => {
783                return Err(EngineError::InvalidWrite(format!(
784                    "PRAGMA {name} returned an unexpected BLOB value"
785                )));
786            }
787            rusqlite::types::Value::Null => String::new(),
788        };
789        Ok(s)
790    }
791
792    /// Return all provenance events whose `subject` matches the given value.
793    ///
794    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
795    /// values (for excise events).
796    ///
797    /// # Errors
798    /// Returns [`EngineError`] if the query fails or if the connection mutex
799    /// has been poisoned.
800    pub fn query_provenance_events(
801        &self,
802        subject: &str,
803    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
804        let conn = self.lock_connection()?;
805        let mut stmt = conn
806            .prepare_cached(
807                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
808                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
809            )
810            .map_err(EngineError::Sqlite)?;
811        let events = stmt
812            .query_map(rusqlite::params![subject], |row| {
813                Ok(ProvenanceEvent {
814                    id: row.get(0)?,
815                    event_type: row.get(1)?,
816                    subject: row.get(2)?,
817                    source_ref: row.get(3)?,
818                    metadata_json: row.get(4)?,
819                    created_at: row.get(5)?,
820                })
821            })
822            .map_err(EngineError::Sqlite)?
823            .collect::<Result<Vec<_>, _>>()
824            .map_err(EngineError::Sqlite)?;
825        Ok(events)
826    }
827}
828
829fn wrap_node_row_projection_sql(base_sql: &str) -> String {
830    format!(
831        "SELECT q.row_id, q.logical_id, q.kind, q.properties, am.last_accessed_at \
832         FROM ({base_sql}) q \
833         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
834    )
835}
836
837/// Returns `true` when `err` indicates the vec virtual table is absent
838/// (sqlite-vec feature enabled but `vec_nodes_active` not yet created).
839pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
840    match err {
841        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
842            msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
843        }
844        _ => false,
845    }
846}
847
848fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
849    match value {
850        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
851        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
852        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
853    }
854}
855
856#[cfg(test)]
857#[allow(clippy::expect_used)]
858mod tests {
859    use std::panic::{AssertUnwindSafe, catch_unwind};
860    use std::sync::Arc;
861
862    use fathomdb_query::{BindValue, QueryBuilder};
863    use fathomdb_schema::SchemaManager;
864    use rusqlite::types::Value;
865    use tempfile::NamedTempFile;
866
867    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
868
869    use super::{bind_value_to_sql, is_vec_table_absent, wrap_node_row_projection_sql};
870
871    #[test]
872    fn is_vec_table_absent_matches_known_error_messages() {
873        use rusqlite::ffi;
874        fn make_err(msg: &str) -> rusqlite::Error {
875            rusqlite::Error::SqliteFailure(
876                ffi::Error {
877                    code: ffi::ErrorCode::Unknown,
878                    extended_code: 1,
879                },
880                Some(msg.to_owned()),
881            )
882        }
883        assert!(is_vec_table_absent(&make_err(
884            "no such table: vec_nodes_active"
885        )));
886        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
887        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
888        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
889        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
890    }
891
892    #[test]
893    fn bind_value_text_maps_to_sql_text() {
894        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
895        assert_eq!(val, Value::Text("hello".to_owned()));
896    }
897
898    #[test]
899    fn bind_value_integer_maps_to_sql_integer() {
900        let val = bind_value_to_sql(&BindValue::Integer(42));
901        assert_eq!(val, Value::Integer(42));
902    }
903
904    #[test]
905    fn bind_value_bool_true_maps_to_integer_one() {
906        let val = bind_value_to_sql(&BindValue::Bool(true));
907        assert_eq!(val, Value::Integer(1));
908    }
909
910    #[test]
911    fn bind_value_bool_false_maps_to_integer_zero() {
912        let val = bind_value_to_sql(&BindValue::Bool(false));
913        assert_eq!(val, Value::Integer(0));
914    }
915
916    #[test]
917    fn same_shape_queries_share_one_cache_entry() {
918        let db = NamedTempFile::new().expect("temporary db");
919        let coordinator = ExecutionCoordinator::open(
920            db.path(),
921            Arc::new(SchemaManager::new()),
922            None,
923            1,
924            Arc::new(TelemetryCounters::default()),
925        )
926        .expect("coordinator");
927
928        let compiled_a = QueryBuilder::nodes("Meeting")
929            .text_search("budget", 5)
930            .limit(10)
931            .compile()
932            .expect("compiled a");
933        let compiled_b = QueryBuilder::nodes("Meeting")
934            .text_search("standup", 5)
935            .limit(10)
936            .compile()
937            .expect("compiled b");
938
939        coordinator
940            .execute_compiled_read(&compiled_a)
941            .expect("read a");
942        coordinator
943            .execute_compiled_read(&compiled_b)
944            .expect("read b");
945
946        assert_eq!(
947            compiled_a.shape_hash, compiled_b.shape_hash,
948            "different bind values, same structural shape → same hash"
949        );
950        assert_eq!(coordinator.shape_sql_count(), 1);
951    }
952
953    #[test]
954    fn vector_read_degrades_gracefully_when_vec_table_absent() {
955        let db = NamedTempFile::new().expect("temporary db");
956        let coordinator = ExecutionCoordinator::open(
957            db.path(),
958            Arc::new(SchemaManager::new()),
959            None,
960            1,
961            Arc::new(TelemetryCounters::default()),
962        )
963        .expect("coordinator");
964
965        let compiled = QueryBuilder::nodes("Meeting")
966            .vector_search("budget embeddings", 5)
967            .compile()
968            .expect("vector query compiles");
969
970        let result = coordinator.execute_compiled_read(&compiled);
971        let rows = result.expect("degraded read must succeed, not error");
972        assert!(
973            rows.was_degraded,
974            "result must be flagged as degraded when vec_nodes_active is absent"
975        );
976        assert!(
977            rows.nodes.is_empty(),
978            "degraded result must return empty nodes"
979        );
980    }
981
982    #[test]
983    fn coordinator_caches_by_shape_hash() {
984        let db = NamedTempFile::new().expect("temporary db");
985        let coordinator = ExecutionCoordinator::open(
986            db.path(),
987            Arc::new(SchemaManager::new()),
988            None,
989            1,
990            Arc::new(TelemetryCounters::default()),
991        )
992        .expect("coordinator");
993
994        let compiled = QueryBuilder::nodes("Meeting")
995            .text_search("budget", 5)
996            .compile()
997            .expect("compiled query");
998
999        coordinator
1000            .execute_compiled_read(&compiled)
1001            .expect("execute compiled read");
1002        assert_eq!(coordinator.shape_sql_count(), 1);
1003    }
1004
1005    // --- Item 6: explain_compiled_read tests ---
1006
1007    #[test]
1008    fn explain_returns_correct_sql() {
1009        let db = NamedTempFile::new().expect("temporary db");
1010        let coordinator = ExecutionCoordinator::open(
1011            db.path(),
1012            Arc::new(SchemaManager::new()),
1013            None,
1014            1,
1015            Arc::new(TelemetryCounters::default()),
1016        )
1017        .expect("coordinator");
1018
1019        let compiled = QueryBuilder::nodes("Meeting")
1020            .text_search("budget", 5)
1021            .compile()
1022            .expect("compiled query");
1023
1024        let plan = coordinator.explain_compiled_read(&compiled);
1025
1026        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
1027    }
1028
1029    #[test]
1030    fn explain_returns_correct_driving_table() {
1031        use fathomdb_query::DrivingTable;
1032
1033        let db = NamedTempFile::new().expect("temporary db");
1034        let coordinator = ExecutionCoordinator::open(
1035            db.path(),
1036            Arc::new(SchemaManager::new()),
1037            None,
1038            1,
1039            Arc::new(TelemetryCounters::default()),
1040        )
1041        .expect("coordinator");
1042
1043        let compiled = QueryBuilder::nodes("Meeting")
1044            .text_search("budget", 5)
1045            .compile()
1046            .expect("compiled query");
1047
1048        let plan = coordinator.explain_compiled_read(&compiled);
1049
1050        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
1051    }
1052
1053    #[test]
1054    fn explain_reports_cache_miss_then_hit() {
1055        let db = NamedTempFile::new().expect("temporary db");
1056        let coordinator = ExecutionCoordinator::open(
1057            db.path(),
1058            Arc::new(SchemaManager::new()),
1059            None,
1060            1,
1061            Arc::new(TelemetryCounters::default()),
1062        )
1063        .expect("coordinator");
1064
1065        let compiled = QueryBuilder::nodes("Meeting")
1066            .text_search("budget", 5)
1067            .compile()
1068            .expect("compiled query");
1069
1070        // Before execution: cache miss
1071        let plan_before = coordinator.explain_compiled_read(&compiled);
1072        assert!(
1073            !plan_before.cache_hit,
1074            "cache miss expected before first execute"
1075        );
1076
1077        // Execute to populate cache
1078        coordinator
1079            .execute_compiled_read(&compiled)
1080            .expect("execute read");
1081
1082        // After execution: cache hit
1083        let plan_after = coordinator.explain_compiled_read(&compiled);
1084        assert!(
1085            plan_after.cache_hit,
1086            "cache hit expected after first execute"
1087        );
1088    }
1089
1090    #[test]
1091    fn explain_does_not_execute_query() {
1092        // Call explain_compiled_read on an empty database. If explain were
1093        // actually executing SQL, it would return Ok with 0 rows. But the
1094        // key assertion is that it returns a QueryPlan (not an error) even
1095        // without touching the database.
1096        let db = NamedTempFile::new().expect("temporary db");
1097        let coordinator = ExecutionCoordinator::open(
1098            db.path(),
1099            Arc::new(SchemaManager::new()),
1100            None,
1101            1,
1102            Arc::new(TelemetryCounters::default()),
1103        )
1104        .expect("coordinator");
1105
1106        let compiled = QueryBuilder::nodes("Meeting")
1107            .text_search("anything", 5)
1108            .compile()
1109            .expect("compiled query");
1110
1111        // This must not error, even though the database is empty
1112        let plan = coordinator.explain_compiled_read(&compiled);
1113
1114        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
1115        assert_eq!(plan.bind_count, compiled.binds.len());
1116    }
1117
1118    #[test]
1119    fn coordinator_executes_compiled_read() {
1120        let db = NamedTempFile::new().expect("temporary db");
1121        let coordinator = ExecutionCoordinator::open(
1122            db.path(),
1123            Arc::new(SchemaManager::new()),
1124            None,
1125            1,
1126            Arc::new(TelemetryCounters::default()),
1127        )
1128        .expect("coordinator");
1129        let conn = rusqlite::Connection::open(db.path()).expect("open db");
1130
1131        conn.execute_batch(
1132            r#"
1133            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1134            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
1135            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1136            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
1137            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1138            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
1139            "#,
1140        )
1141        .expect("seed data");
1142
1143        let compiled = QueryBuilder::nodes("Meeting")
1144            .text_search("budget", 5)
1145            .limit(5)
1146            .compile()
1147            .expect("compiled query");
1148
1149        let rows = coordinator
1150            .execute_compiled_read(&compiled)
1151            .expect("execute read");
1152
1153        assert_eq!(rows.nodes.len(), 1);
1154        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
1155    }
1156
1157    // --- Item 1: capability gate tests ---
1158
1159    #[test]
1160    fn capability_gate_reports_false_without_feature() {
1161        let db = NamedTempFile::new().expect("temporary db");
1162        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
1163        // when no dimension is requested (the vector profile is never bootstrapped).
1164        let coordinator = ExecutionCoordinator::open(
1165            db.path(),
1166            Arc::new(SchemaManager::new()),
1167            None,
1168            1,
1169            Arc::new(TelemetryCounters::default()),
1170        )
1171        .expect("coordinator");
1172        assert!(
1173            !coordinator.vector_enabled(),
1174            "vector_enabled must be false when no dimension is requested"
1175        );
1176    }
1177
1178    #[cfg(feature = "sqlite-vec")]
1179    #[test]
1180    fn capability_gate_reports_true_when_feature_enabled() {
1181        let db = NamedTempFile::new().expect("temporary db");
1182        let coordinator = ExecutionCoordinator::open(
1183            db.path(),
1184            Arc::new(SchemaManager::new()),
1185            Some(128),
1186            1,
1187            Arc::new(TelemetryCounters::default()),
1188        )
1189        .expect("coordinator");
1190        assert!(
1191            coordinator.vector_enabled(),
1192            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
1193        );
1194    }
1195
1196    // --- Item 4: runtime table read tests ---
1197
1198    #[test]
1199    fn read_run_returns_inserted_run() {
1200        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1201
1202        let db = NamedTempFile::new().expect("temporary db");
1203        let writer = WriterActor::start(
1204            db.path(),
1205            Arc::new(SchemaManager::new()),
1206            ProvenanceMode::Warn,
1207            Arc::new(TelemetryCounters::default()),
1208        )
1209        .expect("writer");
1210        writer
1211            .submit(WriteRequest {
1212                label: "runtime".to_owned(),
1213                nodes: vec![],
1214                node_retires: vec![],
1215                edges: vec![],
1216                edge_retires: vec![],
1217                chunks: vec![],
1218                runs: vec![RunInsert {
1219                    id: "run-r1".to_owned(),
1220                    kind: "session".to_owned(),
1221                    status: "active".to_owned(),
1222                    properties: "{}".to_owned(),
1223                    source_ref: Some("src-1".to_owned()),
1224                    upsert: false,
1225                    supersedes_id: None,
1226                }],
1227                steps: vec![],
1228                actions: vec![],
1229                optional_backfills: vec![],
1230                vec_inserts: vec![],
1231                operational_writes: vec![],
1232            })
1233            .expect("write run");
1234
1235        let coordinator = ExecutionCoordinator::open(
1236            db.path(),
1237            Arc::new(SchemaManager::new()),
1238            None,
1239            1,
1240            Arc::new(TelemetryCounters::default()),
1241        )
1242        .expect("coordinator");
1243        let row = coordinator
1244            .read_run("run-r1")
1245            .expect("read_run")
1246            .expect("row exists");
1247        assert_eq!(row.id, "run-r1");
1248        assert_eq!(row.kind, "session");
1249        assert_eq!(row.status, "active");
1250    }
1251
1252    #[test]
1253    fn read_step_returns_inserted_step() {
1254        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
1255
1256        let db = NamedTempFile::new().expect("temporary db");
1257        let writer = WriterActor::start(
1258            db.path(),
1259            Arc::new(SchemaManager::new()),
1260            ProvenanceMode::Warn,
1261            Arc::new(TelemetryCounters::default()),
1262        )
1263        .expect("writer");
1264        writer
1265            .submit(WriteRequest {
1266                label: "runtime".to_owned(),
1267                nodes: vec![],
1268                node_retires: vec![],
1269                edges: vec![],
1270                edge_retires: vec![],
1271                chunks: vec![],
1272                runs: vec![RunInsert {
1273                    id: "run-s1".to_owned(),
1274                    kind: "session".to_owned(),
1275                    status: "active".to_owned(),
1276                    properties: "{}".to_owned(),
1277                    source_ref: Some("src-1".to_owned()),
1278                    upsert: false,
1279                    supersedes_id: None,
1280                }],
1281                steps: vec![StepInsert {
1282                    id: "step-s1".to_owned(),
1283                    run_id: "run-s1".to_owned(),
1284                    kind: "llm".to_owned(),
1285                    status: "completed".to_owned(),
1286                    properties: "{}".to_owned(),
1287                    source_ref: Some("src-1".to_owned()),
1288                    upsert: false,
1289                    supersedes_id: None,
1290                }],
1291                actions: vec![],
1292                optional_backfills: vec![],
1293                vec_inserts: vec![],
1294                operational_writes: vec![],
1295            })
1296            .expect("write step");
1297
1298        let coordinator = ExecutionCoordinator::open(
1299            db.path(),
1300            Arc::new(SchemaManager::new()),
1301            None,
1302            1,
1303            Arc::new(TelemetryCounters::default()),
1304        )
1305        .expect("coordinator");
1306        let row = coordinator
1307            .read_step("step-s1")
1308            .expect("read_step")
1309            .expect("row exists");
1310        assert_eq!(row.id, "step-s1");
1311        assert_eq!(row.run_id, "run-s1");
1312        assert_eq!(row.kind, "llm");
1313    }
1314
1315    #[test]
1316    fn read_action_returns_inserted_action() {
1317        use crate::{
1318            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
1319            writer::{ActionInsert, StepInsert},
1320        };
1321
1322        let db = NamedTempFile::new().expect("temporary db");
1323        let writer = WriterActor::start(
1324            db.path(),
1325            Arc::new(SchemaManager::new()),
1326            ProvenanceMode::Warn,
1327            Arc::new(TelemetryCounters::default()),
1328        )
1329        .expect("writer");
1330        writer
1331            .submit(WriteRequest {
1332                label: "runtime".to_owned(),
1333                nodes: vec![],
1334                node_retires: vec![],
1335                edges: vec![],
1336                edge_retires: vec![],
1337                chunks: vec![],
1338                runs: vec![RunInsert {
1339                    id: "run-a1".to_owned(),
1340                    kind: "session".to_owned(),
1341                    status: "active".to_owned(),
1342                    properties: "{}".to_owned(),
1343                    source_ref: Some("src-1".to_owned()),
1344                    upsert: false,
1345                    supersedes_id: None,
1346                }],
1347                steps: vec![StepInsert {
1348                    id: "step-a1".to_owned(),
1349                    run_id: "run-a1".to_owned(),
1350                    kind: "llm".to_owned(),
1351                    status: "completed".to_owned(),
1352                    properties: "{}".to_owned(),
1353                    source_ref: Some("src-1".to_owned()),
1354                    upsert: false,
1355                    supersedes_id: None,
1356                }],
1357                actions: vec![ActionInsert {
1358                    id: "action-a1".to_owned(),
1359                    step_id: "step-a1".to_owned(),
1360                    kind: "emit".to_owned(),
1361                    status: "completed".to_owned(),
1362                    properties: "{}".to_owned(),
1363                    source_ref: Some("src-1".to_owned()),
1364                    upsert: false,
1365                    supersedes_id: None,
1366                }],
1367                optional_backfills: vec![],
1368                vec_inserts: vec![],
1369                operational_writes: vec![],
1370            })
1371            .expect("write action");
1372
1373        let coordinator = ExecutionCoordinator::open(
1374            db.path(),
1375            Arc::new(SchemaManager::new()),
1376            None,
1377            1,
1378            Arc::new(TelemetryCounters::default()),
1379        )
1380        .expect("coordinator");
1381        let row = coordinator
1382            .read_action("action-a1")
1383            .expect("read_action")
1384            .expect("row exists");
1385        assert_eq!(row.id, "action-a1");
1386        assert_eq!(row.step_id, "step-a1");
1387        assert_eq!(row.kind, "emit");
1388    }
1389
1390    #[test]
1391    fn read_active_runs_excludes_superseded() {
1392        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1393
1394        let db = NamedTempFile::new().expect("temporary db");
1395        let writer = WriterActor::start(
1396            db.path(),
1397            Arc::new(SchemaManager::new()),
1398            ProvenanceMode::Warn,
1399            Arc::new(TelemetryCounters::default()),
1400        )
1401        .expect("writer");
1402
1403        // Insert original run
1404        writer
1405            .submit(WriteRequest {
1406                label: "v1".to_owned(),
1407                nodes: vec![],
1408                node_retires: vec![],
1409                edges: vec![],
1410                edge_retires: vec![],
1411                chunks: vec![],
1412                runs: vec![RunInsert {
1413                    id: "run-v1".to_owned(),
1414                    kind: "session".to_owned(),
1415                    status: "active".to_owned(),
1416                    properties: "{}".to_owned(),
1417                    source_ref: Some("src-1".to_owned()),
1418                    upsert: false,
1419                    supersedes_id: None,
1420                }],
1421                steps: vec![],
1422                actions: vec![],
1423                optional_backfills: vec![],
1424                vec_inserts: vec![],
1425                operational_writes: vec![],
1426            })
1427            .expect("v1 write");
1428
1429        // Supersede original run with v2
1430        writer
1431            .submit(WriteRequest {
1432                label: "v2".to_owned(),
1433                nodes: vec![],
1434                node_retires: vec![],
1435                edges: vec![],
1436                edge_retires: vec![],
1437                chunks: vec![],
1438                runs: vec![RunInsert {
1439                    id: "run-v2".to_owned(),
1440                    kind: "session".to_owned(),
1441                    status: "completed".to_owned(),
1442                    properties: "{}".to_owned(),
1443                    source_ref: Some("src-2".to_owned()),
1444                    upsert: true,
1445                    supersedes_id: Some("run-v1".to_owned()),
1446                }],
1447                steps: vec![],
1448                actions: vec![],
1449                optional_backfills: vec![],
1450                vec_inserts: vec![],
1451                operational_writes: vec![],
1452            })
1453            .expect("v2 write");
1454
1455        let coordinator = ExecutionCoordinator::open(
1456            db.path(),
1457            Arc::new(SchemaManager::new()),
1458            None,
1459            1,
1460            Arc::new(TelemetryCounters::default()),
1461        )
1462        .expect("coordinator");
1463        let active = coordinator.read_active_runs().expect("read_active_runs");
1464
1465        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
1466        assert_eq!(active[0].id, "run-v2");
1467    }
1468
1469    #[allow(clippy::panic)]
1470    fn poison_connection(coordinator: &ExecutionCoordinator) {
1471        let result = catch_unwind(AssertUnwindSafe(|| {
1472            let _guard = coordinator.pool.connections[0]
1473                .lock()
1474                .expect("poison test lock");
1475            panic!("poison coordinator connection mutex");
1476        }));
1477        assert!(
1478            result.is_err(),
1479            "poison test must unwind while holding the connection mutex"
1480        );
1481    }
1482
1483    #[allow(clippy::panic)]
1484    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
1485    where
1486        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
1487    {
1488        match op(coordinator) {
1489            Err(EngineError::Bridge(message)) => {
1490                assert_eq!(message, "connection mutex poisoned");
1491            }
1492            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
1493            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
1494        }
1495    }
1496
1497    #[test]
1498    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
1499        let db = NamedTempFile::new().expect("temporary db");
1500        let coordinator = ExecutionCoordinator::open(
1501            db.path(),
1502            Arc::new(SchemaManager::new()),
1503            None,
1504            1,
1505            Arc::new(TelemetryCounters::default()),
1506        )
1507        .expect("coordinator");
1508
1509        poison_connection(&coordinator);
1510
1511        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
1512        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
1513        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
1514        assert_poisoned_connection_error(
1515            &coordinator,
1516            super::ExecutionCoordinator::read_active_runs,
1517        );
1518        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
1519        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
1520    }
1521
1522    // --- M-2: Bounded shape cache ---
1523
1524    #[test]
1525    fn shape_cache_stays_bounded() {
1526        use fathomdb_query::ShapeHash;
1527
1528        let db = NamedTempFile::new().expect("temporary db");
1529        let coordinator = ExecutionCoordinator::open(
1530            db.path(),
1531            Arc::new(SchemaManager::new()),
1532            None,
1533            1,
1534            Arc::new(TelemetryCounters::default()),
1535        )
1536        .expect("coordinator");
1537
1538        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
1539        {
1540            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
1541            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
1542                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
1543            }
1544        }
1545        // The cache is now over the limit but hasn't been pruned yet (pruning
1546        // happens on the insert path in execute_compiled_read).
1547
1548        // Execute a compiled read to trigger the bounded-cache check.
1549        let compiled = QueryBuilder::nodes("Meeting")
1550            .text_search("budget", 5)
1551            .limit(10)
1552            .compile()
1553            .expect("compiled query");
1554
1555        coordinator
1556            .execute_compiled_read(&compiled)
1557            .expect("execute read");
1558
1559        assert!(
1560            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
1561            "shape cache must stay bounded: got {} entries, max {}",
1562            coordinator.shape_sql_count(),
1563            super::MAX_SHAPE_CACHE_SIZE
1564        );
1565    }
1566
1567    // --- M-1: Read pool size ---
1568
1569    #[test]
1570    fn read_pool_size_configurable() {
1571        let db = NamedTempFile::new().expect("temporary db");
1572        let coordinator = ExecutionCoordinator::open(
1573            db.path(),
1574            Arc::new(SchemaManager::new()),
1575            None,
1576            2,
1577            Arc::new(TelemetryCounters::default()),
1578        )
1579        .expect("coordinator with pool_size=2");
1580
1581        assert_eq!(coordinator.pool.size(), 2);
1582
1583        // Basic read should succeed through the pool.
1584        let compiled = QueryBuilder::nodes("Meeting")
1585            .text_search("budget", 5)
1586            .limit(10)
1587            .compile()
1588            .expect("compiled query");
1589
1590        let result = coordinator.execute_compiled_read(&compiled);
1591        assert!(result.is_ok(), "read through pool must succeed");
1592    }
1593
1594    // --- M-4: Grouped read batching ---
1595
1596    #[test]
1597    fn grouped_read_results_match_baseline() {
1598        use fathomdb_query::TraverseDirection;
1599
1600        let db = NamedTempFile::new().expect("temporary db");
1601
1602        // Bootstrap the database via coordinator (creates schema).
1603        let coordinator = ExecutionCoordinator::open(
1604            db.path(),
1605            Arc::new(SchemaManager::new()),
1606            None,
1607            1,
1608            Arc::new(TelemetryCounters::default()),
1609        )
1610        .expect("coordinator");
1611
1612        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
1613        // to expansion nodes (Task-0-a, Task-0-b, etc.).
1614        {
1615            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
1616            for i in 0..10 {
1617                conn.execute_batch(&format!(
1618                    r#"
1619                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1620                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
1621                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1622                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
1623                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1624                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
1625
1626                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1627                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
1628                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1629                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
1630
1631                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1632                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
1633                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1634                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
1635                    "#,
1636                )).expect("seed data");
1637            }
1638        }
1639
1640        let compiled = QueryBuilder::nodes("Meeting")
1641            .text_search("meeting", 10)
1642            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
1643            .limit(10)
1644            .compile_grouped()
1645            .expect("compiled grouped query");
1646
1647        let result = coordinator
1648            .execute_compiled_grouped_read(&compiled)
1649            .expect("grouped read");
1650
1651        assert!(!result.was_degraded, "grouped read should not be degraded");
1652        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
1653        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
1654        assert_eq!(result.expansions[0].slot, "tasks");
1655        assert_eq!(
1656            result.expansions[0].roots.len(),
1657            10,
1658            "each expansion slot should have entries for all 10 roots"
1659        );
1660
1661        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
1662        for root_expansion in &result.expansions[0].roots {
1663            assert_eq!(
1664                root_expansion.nodes.len(),
1665                2,
1666                "root {} should have 2 expansion nodes, got {}",
1667                root_expansion.root_logical_id,
1668                root_expansion.nodes.len()
1669            );
1670        }
1671    }
1672}