Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10    FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11    SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20/// Maximum number of cached shape-hash to SQL mappings before the cache is
21/// cleared entirely.  A clear-all strategy is simpler than partial eviction
22/// and the cost of re-compiling on a miss is negligible.
23const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25/// Maximum number of root IDs per batched expansion query.  Kept well below
26/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
27/// the edge-kind parameter.  Larger root sets are chunked into multiple
28/// batches of this size rather than falling back to per-root queries.
29const BATCH_CHUNK_SIZE: usize = 200;
30
31/// Compile an optional expansion-slot target-side filter predicate into a SQL
32/// fragment and bind values for injection into the `numbered` CTE's WHERE clause.
33///
34/// Returns `("", vec![])` when `filter` is `None` — preserving byte-for-byte
35/// identical SQL to pre-Pack-3 behavior. When `Some(predicate)`, returns an
36/// `AND …` fragment and the corresponding bind values starting at `first_param`.
37///
38/// Only `JsonPathEq`, `JsonPathCompare`, `JsonPathFusedEq`, and
39/// `JsonPathFusedTimestampCmp` are supported here; each variant targets the
40/// `n.properties` column already present in the `numbered` CTE join.
41/// Column-direct predicates (`KindEq`, `LogicalIdEq`, etc.) reference `n.kind`
42/// and similar columns that are also available in the `numbered` CTE.
43fn compile_expansion_filter(
44    filter: Option<&Predicate>,
45    first_param: usize,
46) -> (String, Vec<Value>) {
47    let Some(predicate) = filter else {
48        return (String::new(), vec![]);
49    };
50    let p = first_param;
51    match predicate {
52        Predicate::JsonPathEq { path, value } => {
53            let val = match value {
54                ScalarValue::Text(t) => Value::Text(t.clone()),
55                ScalarValue::Integer(i) => Value::Integer(*i),
56                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
57            };
58            (
59                format!(
60                    "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
61                    p + 1
62                ),
63                vec![Value::Text(path.clone()), val],
64            )
65        }
66        Predicate::JsonPathCompare { path, op, value } => {
67            let val = match value {
68                ScalarValue::Text(t) => Value::Text(t.clone()),
69                ScalarValue::Integer(i) => Value::Integer(*i),
70                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
71            };
72            let operator = match op {
73                ComparisonOp::Gt => ">",
74                ComparisonOp::Gte => ">=",
75                ComparisonOp::Lt => "<",
76                ComparisonOp::Lte => "<=",
77            };
78            (
79                format!(
80                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
81                    p + 1
82                ),
83                vec![Value::Text(path.clone()), val],
84            )
85        }
86        Predicate::JsonPathFusedEq { path, value } => (
87            format!(
88                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
89                p + 1
90            ),
91            vec![Value::Text(path.clone()), Value::Text(value.clone())],
92        ),
93        Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
94            let operator = match op {
95                ComparisonOp::Gt => ">",
96                ComparisonOp::Gte => ">=",
97                ComparisonOp::Lt => "<",
98                ComparisonOp::Lte => "<=",
99            };
100            (
101                format!(
102                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
103                    p + 1
104                ),
105                vec![Value::Text(path.clone()), Value::Integer(*value)],
106            )
107        }
108        Predicate::KindEq(kind) => (
109            format!("\n                  AND n.kind = ?{p}"),
110            vec![Value::Text(kind.clone())],
111        ),
112        Predicate::LogicalIdEq(logical_id) => (
113            format!("\n                  AND n.logical_id = ?{p}"),
114            vec![Value::Text(logical_id.clone())],
115        ),
116        Predicate::SourceRefEq(source_ref) => (
117            format!("\n                  AND n.source_ref = ?{p}"),
118            vec![Value::Text(source_ref.clone())],
119        ),
120        Predicate::ContentRefEq(uri) => (
121            format!("\n                  AND n.content_ref = ?{p}"),
122            vec![Value::Text(uri.clone())],
123        ),
124        Predicate::ContentRefNotNull => (
125            "\n                  AND n.content_ref IS NOT NULL".to_owned(),
126            vec![],
127        ),
128    }
129}
130
131/// A pool of read-only `SQLite` connections for concurrent read access.
132///
133/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
134/// proceed in parallel when they happen to grab different slots.
135struct ReadPool {
136    connections: Vec<Mutex<Connection>>,
137}
138
139impl fmt::Debug for ReadPool {
140    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141        f.debug_struct("ReadPool")
142            .field("size", &self.connections.len())
143            .finish()
144    }
145}
146
147impl ReadPool {
148    /// Open `pool_size` read-only connections to the database at `path`.
149    ///
150    /// Each connection has PRAGMAs initialized via
151    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
152    /// feature is enabled and `vector_enabled` is true, the vec extension
153    /// auto-loaded.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`EngineError`] if any connection fails to open or initialize.
158    fn new(
159        db_path: &Path,
160        pool_size: usize,
161        schema_manager: &SchemaManager,
162        vector_enabled: bool,
163    ) -> Result<Self, EngineError> {
164        let mut connections = Vec::with_capacity(pool_size);
165        for _ in 0..pool_size {
166            let conn = if vector_enabled {
167                #[cfg(feature = "sqlite-vec")]
168                {
169                    sqlite::open_readonly_connection_with_vec(db_path)?
170                }
171                #[cfg(not(feature = "sqlite-vec"))]
172                {
173                    sqlite::open_readonly_connection(db_path)?
174                }
175            } else {
176                sqlite::open_readonly_connection(db_path)?
177            };
178            schema_manager
179                .initialize_reader_connection(&conn)
180                .map_err(EngineError::Schema)?;
181            connections.push(Mutex::new(conn));
182        }
183        Ok(Self { connections })
184    }
185
186    /// Acquire a connection from the pool.
187    ///
188    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
189    /// If every slot is held, falls back to a blocking lock on the first slot.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
194    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
195        // Fast path: try each connection without blocking.
196        for conn in &self.connections {
197            if let Ok(guard) = conn.try_lock() {
198                return Ok(guard);
199            }
200        }
201        // Fallback: block on the first connection.
202        self.connections[0].lock().map_err(|_| {
203            trace_error!("read pool: connection mutex poisoned");
204            EngineError::Bridge("connection mutex poisoned".to_owned())
205        })
206    }
207
208    /// Return the number of connections in the pool.
209    #[cfg(test)]
210    fn size(&self) -> usize {
211        self.connections.len()
212    }
213}
214
215/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
216///
217/// This is a read-only introspection struct. It does not execute SQL.
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub struct QueryPlan {
220    pub sql: String,
221    pub bind_count: usize,
222    pub driving_table: DrivingTable,
223    pub shape_hash: ShapeHash,
224    pub cache_hit: bool,
225}
226
227/// A single node row returned from a query.
228#[derive(Clone, Debug, PartialEq, Eq)]
229pub struct NodeRow {
230    /// Physical row ID.
231    pub row_id: String,
232    /// Logical ID of the node.
233    pub logical_id: String,
234    /// Node kind.
235    pub kind: String,
236    /// JSON-encoded node properties.
237    pub properties: String,
238    /// Optional URI referencing external content.
239    pub content_ref: Option<String>,
240    /// Unix timestamp of last access, if tracked.
241    pub last_accessed_at: Option<i64>,
242}
243
244/// A single run row returned from a query.
245#[derive(Clone, Debug, PartialEq, Eq)]
246pub struct RunRow {
247    /// Unique run ID.
248    pub id: String,
249    /// Run kind.
250    pub kind: String,
251    /// Current status.
252    pub status: String,
253    /// JSON-encoded run properties.
254    pub properties: String,
255}
256
257/// A single step row returned from a query.
258#[derive(Clone, Debug, PartialEq, Eq)]
259pub struct StepRow {
260    /// Unique step ID.
261    pub id: String,
262    /// ID of the parent run.
263    pub run_id: String,
264    /// Step kind.
265    pub kind: String,
266    /// Current status.
267    pub status: String,
268    /// JSON-encoded step properties.
269    pub properties: String,
270}
271
272/// A single action row returned from a query.
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct ActionRow {
275    /// Unique action ID.
276    pub id: String,
277    /// ID of the parent step.
278    pub step_id: String,
279    /// Action kind.
280    pub kind: String,
281    /// Current status.
282    pub status: String,
283    /// JSON-encoded action properties.
284    pub properties: String,
285}
286
287/// A single row from the `provenance_events` table.
288#[derive(Clone, Debug, PartialEq, Eq)]
289pub struct ProvenanceEvent {
290    pub id: String,
291    pub event_type: String,
292    pub subject: String,
293    pub source_ref: Option<String>,
294    pub metadata_json: String,
295    pub created_at: i64,
296}
297
298/// Result set from executing a flat (non-grouped) compiled query.
299#[derive(Clone, Debug, Default, PartialEq, Eq)]
300pub struct QueryRows {
301    /// Matched node rows.
302    pub nodes: Vec<NodeRow>,
303    /// Runs associated with the matched nodes.
304    pub runs: Vec<RunRow>,
305    /// Steps associated with the matched runs.
306    pub steps: Vec<StepRow>,
307    /// Actions associated with the matched steps.
308    pub actions: Vec<ActionRow>,
309    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
310    /// to degrade to an empty result instead of propagating an error.
311    pub was_degraded: bool,
312}
313
314/// Expansion results for a single root node within a grouped query.
315#[derive(Clone, Debug, PartialEq, Eq)]
316pub struct ExpansionRootRows {
317    /// Logical ID of the root node that seeded this expansion.
318    pub root_logical_id: String,
319    /// Nodes reached by traversing from the root.
320    pub nodes: Vec<NodeRow>,
321}
322
323/// All expansion results for a single named slot across all roots.
324#[derive(Clone, Debug, PartialEq, Eq)]
325pub struct ExpansionSlotRows {
326    /// Name of the expansion slot.
327    pub slot: String,
328    /// Per-root expansion results.
329    pub roots: Vec<ExpansionRootRows>,
330}
331
332/// Result set from executing a grouped compiled query.
333#[derive(Clone, Debug, Default, PartialEq, Eq)]
334pub struct GroupedQueryRows {
335    /// Root node rows matched by the base query.
336    pub roots: Vec<NodeRow>,
337    /// Per-slot expansion results.
338    pub expansions: Vec<ExpansionSlotRows>,
339    /// `true` when a capability miss caused the query to degrade to an empty result.
340    pub was_degraded: bool,
341}
342
343/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
344pub struct ExecutionCoordinator {
345    database_path: PathBuf,
346    schema_manager: Arc<SchemaManager>,
347    pool: ReadPool,
348    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
349    vector_enabled: bool,
350    vec_degradation_warned: AtomicBool,
351    telemetry: Arc<TelemetryCounters>,
352    /// Phase 12.5a: optional read-time query embedder. When present,
353    /// [`Self::execute_retrieval_plan`] invokes it via
354    /// [`Self::fill_vector_branch`] after compile to populate
355    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
356    /// invariant on `search()` is preserved: the vector slot stays empty
357    /// and the coordinator's stage-gating check skips the vector branch.
358    query_embedder: Option<Arc<dyn QueryEmbedder>>,
359}
360
361impl fmt::Debug for ExecutionCoordinator {
362    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363        f.debug_struct("ExecutionCoordinator")
364            .field("database_path", &self.database_path)
365            .finish_non_exhaustive()
366    }
367}
368
369impl ExecutionCoordinator {
370    /// # Errors
371    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
372    pub fn open(
373        path: impl AsRef<Path>,
374        schema_manager: Arc<SchemaManager>,
375        vector_dimension: Option<usize>,
376        pool_size: usize,
377        telemetry: Arc<TelemetryCounters>,
378        query_embedder: Option<Arc<dyn QueryEmbedder>>,
379    ) -> Result<Self, EngineError> {
380        let path = path.as_ref().to_path_buf();
381        #[cfg(feature = "sqlite-vec")]
382        let conn = if vector_dimension.is_some() {
383            sqlite::open_connection_with_vec(&path)?
384        } else {
385            sqlite::open_connection(&path)?
386        };
387        #[cfg(not(feature = "sqlite-vec"))]
388        let conn = sqlite::open_connection(&path)?;
389
390        let report = schema_manager.bootstrap(&conn)?;
391
392        // ----- Open-time rebuild guards for derived FTS state -----
393        //
394        // `fts_node_properties` and `fts_node_property_positions` are both
395        // derived from the canonical `nodes.properties` blobs and the set
396        // of registered `fts_property_schemas`. They are NOT source of
397        // truth, so whenever a schema migration or crash leaves either
398        // table out of sync with the canonical state we repopulate them
399        // from scratch at open() time.
400        //
401        // The derived-state invariant is: if `fts_property_schemas` is
402        // non-empty, then `fts_node_properties` must also be non-empty,
403        // and if any schema is recursive-mode then
404        // `fts_node_property_positions` must also be non-empty. If either
405        // guard trips, both tables are cleared and rebuilt together — it
406        // is never correct to rebuild only one half.
407        //
408        // Guard 1 (P2-1, FX pack, v15→v16 migration): detects an empty
409        // `fts_node_properties` while schemas exist. This covers the
410        // Phase 2 tokenizer swap (migration 16 drops the FTS5 virtual
411        // table before recreating it under `porter unicode61`) and the
412        // crash-recovery case where a prior open applied migration 16 but
413        // crashed before the subsequent property-FTS rebuild committed.
414        //
415        // Guard 2 (P4-P2-3, FX2 pack, v16→v17 migration, extended to
416        // v17→v18 by P4-P2-4): detects an empty
417        // `fts_node_property_positions` while any recursive-mode schema
418        // exists. This covers migration 17 (which added the sidecar table
419        // but left it empty) and migration 18 (which drops and recreates
420        // the sidecar to add the UNIQUE constraint, also leaving it
421        // empty). Both migrations land on the same guard because both
422        // leave the positions table empty while recursive schemas remain
423        // registered.
424        //
425        // Both guards are no-ops on an already-consistent database, so
426        // running them on every open is safe and cheap.
427        let needs_property_fts_rebuild = {
428            let schema_count: i64 =
429                conn.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
430                    row.get(0)
431                })?;
432            if schema_count == 0 {
433                false
434            } else {
435                let fts_count: i64 =
436                    conn.query_row("SELECT COUNT(*) FROM fts_node_properties", [], |row| {
437                        row.get(0)
438                    })?;
439                fts_count == 0
440            }
441        };
442        // Guard 2 (see block comment above): recursive schemas registered
443        // but `fts_node_property_positions` empty. Rebuild regenerates
444        // both the FTS blob and the position map from canonical state, so
445        // it is safe to trigger even when `fts_node_properties` is
446        // already populated.
447        let needs_position_backfill = {
448            // NOTE: This LIKE pattern assumes `property_paths_json` is
449            // serialized with compact formatting (no whitespace around
450            // `:`). All current writers go through `serde_json`'s compact
451            // output so this holds. If a future writer emits pretty-
452            // printed JSON (`"mode": "recursive"` with a space), this
453            // guard would silently fail. A more robust check would use
454            // `json_extract(property_paths_json, '$[*].mode')` or a
455            // parsed scan, at the cost of a per-row JSON walk.
456            let recursive_schema_count: i64 = conn.query_row(
457                "SELECT COUNT(*) FROM fts_property_schemas \
458                 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
459                [],
460                |row| row.get(0),
461            )?;
462            if recursive_schema_count == 0 {
463                false
464            } else {
465                let position_count: i64 = conn.query_row(
466                    "SELECT COUNT(*) FROM fts_node_property_positions",
467                    [],
468                    |row| row.get(0),
469                )?;
470                position_count == 0
471            }
472        };
473        if needs_property_fts_rebuild || needs_position_backfill {
474            let tx = conn.unchecked_transaction()?;
475            tx.execute("DELETE FROM fts_node_properties", [])?;
476            tx.execute("DELETE FROM fts_node_property_positions", [])?;
477            crate::projection::insert_property_fts_rows(
478                &tx,
479                "SELECT logical_id, properties FROM nodes \
480                 WHERE kind = ?1 AND superseded_at IS NULL",
481            )?;
482            tx.commit()?;
483        }
484
485        #[cfg(feature = "sqlite-vec")]
486        let mut vector_enabled = report.vector_profile_enabled;
487        #[cfg(not(feature = "sqlite-vec"))]
488        let vector_enabled = {
489            let _ = &report;
490            false
491        };
492
493        if let Some(dim) = vector_dimension {
494            schema_manager
495                .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
496                .map_err(EngineError::Schema)?;
497            // Profile was just created or updated — mark as enabled.
498            #[cfg(feature = "sqlite-vec")]
499            {
500                vector_enabled = true;
501            }
502        }
503
504        // Drop the bootstrap connection — pool connections are used for reads.
505        drop(conn);
506
507        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
508
509        Ok(Self {
510            database_path: path,
511            schema_manager,
512            pool,
513            shape_sql_map: Mutex::new(HashMap::new()),
514            vector_enabled,
515            vec_degradation_warned: AtomicBool::new(false),
516            telemetry,
517            query_embedder,
518        })
519    }
520
521    /// Returns the filesystem path to the `SQLite` database.
522    pub fn database_path(&self) -> &Path {
523        &self.database_path
524    }
525
526    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
527    #[must_use]
528    pub fn vector_enabled(&self) -> bool {
529        self.vector_enabled
530    }
531
532    /// Returns the configured read-time query embedder, if any.
533    ///
534    /// The 0.4.0 write-time parity work reuses this same embedder for
535    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
536    /// so there is always exactly one source of truth for vector
537    /// identity per [`Engine`] instance.
538    #[must_use]
539    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
540        self.query_embedder.as_ref()
541    }
542
543    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
544        self.pool.acquire()
545    }
546
547    /// Aggregate `SQLite` page-cache counters across all pool connections.
548    ///
549    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
550    /// Connections that are currently locked by a query are skipped — this
551    /// is acceptable for statistical counters.
552    #[must_use]
553    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
554        let mut total = SqliteCacheStatus::default();
555        for conn_mutex in &self.pool.connections {
556            if let Ok(conn) = conn_mutex.try_lock() {
557                total.add(&read_db_cache_status(&conn));
558            }
559        }
560        total
561    }
562
563    /// # Errors
564    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
565    #[allow(clippy::expect_used)]
566    pub fn execute_compiled_read(
567        &self,
568        compiled: &CompiledQuery,
569    ) -> Result<QueryRows, EngineError> {
570        // Scan fallback for first-registration async rebuild: if the query uses the
571        // FtsNodes driving table and the root kind has is_first_registration=1 with
572        // state IN ('PENDING','BUILDING'), the fts_node_properties table has no rows
573        // yet. Route to a full-kind node scan so callers get results instead of empty.
574        if compiled.driving_table == DrivingTable::FtsNodes
575            && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
576            && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
577        {
578            self.telemetry.increment_queries();
579            return Ok(QueryRows {
580                nodes,
581                runs: Vec::new(),
582                steps: Vec::new(),
583                actions: Vec::new(),
584                was_degraded: false,
585            });
586        }
587
588        let row_sql = wrap_node_row_projection_sql(&compiled.sql);
589        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
590        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
591        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
592        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
593        // so we propagate EngineError there instead.
594        {
595            let mut cache = self
596                .shape_sql_map
597                .lock()
598                .unwrap_or_else(PoisonError::into_inner);
599            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
600                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
601                cache.clear();
602            }
603            cache.insert(compiled.shape_hash, row_sql.clone());
604        }
605
606        let bind_values = compiled
607            .binds
608            .iter()
609            .map(bind_value_to_sql)
610            .collect::<Vec<_>>();
611
612        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
613        // shape_sql_map uses into_inner() (pure cache, safe to recover).
614        // conn uses map_err → EngineError (connection state may be corrupt after panic;
615        // into_inner() would risk using a connection with partial transaction state).
616        let conn_guard = match self.lock_connection() {
617            Ok(g) => g,
618            Err(e) => {
619                self.telemetry.increment_errors();
620                return Err(e);
621            }
622        };
623        let mut statement = match conn_guard.prepare_cached(&row_sql) {
624            Ok(stmt) => stmt,
625            Err(e) if is_vec_table_absent(&e) => {
626                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
627                    trace_warn!("vector table absent, degrading to non-vector query");
628                }
629                return Ok(QueryRows {
630                    was_degraded: true,
631                    ..Default::default()
632                });
633            }
634            Err(e) => {
635                self.telemetry.increment_errors();
636                return Err(EngineError::Sqlite(e));
637            }
638        };
639        let nodes = match statement
640            .query_map(params_from_iter(bind_values.iter()), |row| {
641                Ok(NodeRow {
642                    row_id: row.get(0)?,
643                    logical_id: row.get(1)?,
644                    kind: row.get(2)?,
645                    properties: row.get(3)?,
646                    content_ref: row.get(4)?,
647                    last_accessed_at: row.get(5)?,
648                })
649            })
650            .and_then(Iterator::collect)
651        {
652            Ok(rows) => rows,
653            Err(e) => {
654                self.telemetry.increment_errors();
655                return Err(EngineError::Sqlite(e));
656            }
657        };
658
659        self.telemetry.increment_queries();
660        Ok(QueryRows {
661            nodes,
662            runs: Vec::new(),
663            steps: Vec::new(),
664            actions: Vec::new(),
665            was_degraded: false,
666        })
667    }
668
669    /// Execute a compiled adaptive search and return matching hits.
670    ///
671    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
672    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
673    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
674    /// while residual predicates (JSON path filters) stay in the outer
675    /// `WHERE`. The chunk and property FTS
676    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
677    /// better matches), ordered, and limited. All hits return
678    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
679    /// later phases.
680    ///
681    /// # Errors
682    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
683    pub fn execute_compiled_search(
684        &self,
685        compiled: &CompiledSearch,
686    ) -> Result<SearchRows, EngineError> {
687        // Build the two-branch plan from the strict text query and delegate
688        // to the shared plan-execution routine. The relaxed branch is derived
689        // via `derive_relaxed` and only fires when strict returned fewer than
690        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
691        // "relaxed iff strict is empty," but the routine spells the rule out
692        // explicitly so raising K later is a one-line constant bump.
693        let (relaxed_query, was_degraded_at_plan_time) =
694            fathomdb_query::derive_relaxed(&compiled.text_query);
695        let relaxed = relaxed_query.map(|q| CompiledSearch {
696            root_kind: compiled.root_kind.clone(),
697            text_query: q,
698            limit: compiled.limit,
699            fusable_filters: compiled.fusable_filters.clone(),
700            residual_filters: compiled.residual_filters.clone(),
701            attribution_requested: compiled.attribution_requested,
702        });
703        let plan = CompiledSearchPlan {
704            strict: compiled.clone(),
705            relaxed,
706            was_degraded_at_plan_time,
707        };
708        self.execute_compiled_search_plan(&plan)
709    }
710
711    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
712    /// deduped result rows.
713    ///
714    /// This is the shared retrieval/merge routine that both
715    /// [`Self::execute_compiled_search`] (adaptive path) and
716    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
717    /// runs first; the relaxed branch only fires when it is present AND the
718    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
719    /// hits. Merge and dedup semantics are identical to the adaptive path
720    /// regardless of how the plan was constructed.
721    ///
722    /// Error contract: if the relaxed branch errors, the error propagates;
723    /// strict hits are not returned. This matches the rest of the engine's
724    /// fail-hard posture.
725    ///
726    /// # Errors
727    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
728    /// executed.
729    pub fn execute_compiled_search_plan(
730        &self,
731        plan: &CompiledSearchPlan,
732    ) -> Result<SearchRows, EngineError> {
733        let strict = &plan.strict;
734        let limit = strict.limit;
735        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
736
737        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
738        let strict_underfilled = strict_hits.len() < fallback_threshold;
739
740        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
741        let mut fallback_used = false;
742        let mut was_degraded = false;
743        if let Some(relaxed) = plan.relaxed.as_ref()
744            && strict_underfilled
745        {
746            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
747            fallback_used = true;
748            was_degraded = plan.was_degraded_at_plan_time;
749        }
750
751        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
752        // Attribution runs AFTER dedup so that duplicate hits dropped by
753        // `merge_search_branches` do not waste a highlight+position-map
754        // lookup.
755        if strict.attribution_requested {
756            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
757            self.populate_attribution_for_hits(
758                &mut merged,
759                &strict.text_query,
760                relaxed_text_query,
761            )?;
762        }
763        let strict_hit_count = merged
764            .iter()
765            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
766            .count();
767        let relaxed_hit_count = merged
768            .iter()
769            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
770            .count();
771        // Phase 10: no vector execution path yet, so vector_hit_count is
772        // always zero. Future phases that wire a vector branch will
773        // contribute here.
774        let vector_hit_count = 0;
775
776        Ok(SearchRows {
777            hits: merged,
778            strict_hit_count,
779            relaxed_hit_count,
780            vector_hit_count,
781            fallback_used,
782            was_degraded,
783        })
784    }
785
786    /// Execute a compiled vector-only search and return matching hits.
787    ///
788    /// Phase 11 delivers the standalone vector retrieval path. The emitted
789    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
790    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
791    /// into the candidate CTE. The outer `SELECT` applies residual JSON
792    /// predicates and orders by score descending, where `score = -distance`
793    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
794    ///
795    /// ## Capability-miss handling
796    ///
797    /// If the `sqlite-vec` capability is absent (feature disabled or the
798    /// `vec_nodes_active` virtual table has not been created because the
799    /// engine was not opened with a `vector_dimension`), this method returns
800    /// an empty [`SearchRows`] with `was_degraded = true`. This is
801    /// **non-fatal** — the error does not propagate — matching the addendum's
802    /// §Vector-Specific Behavior / Degradation.
803    ///
804    /// ## Attribution
805    ///
806    /// When `compiled.attribution_requested == true`, every returned hit
807    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
808    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
809    /// extended uniformly).
810    ///
811    /// # Errors
812    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
813    /// executed for reasons other than a vec-table capability miss.
814    #[allow(clippy::too_many_lines)]
815    pub fn execute_compiled_vector_search(
816        &self,
817        compiled: &CompiledVectorSearch,
818    ) -> Result<SearchRows, EngineError> {
819        use std::fmt::Write as _;
820
821        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
822        // empty result rather than a SQL error from `LIMIT 0` semantics in
823        // the inner vec0 scan.
824        if compiled.limit == 0 {
825            return Ok(SearchRows::default());
826        }
827
828        let filter_by_kind = !compiled.root_kind.is_empty();
829        let mut binds: Vec<BindValue> = Vec::new();
830        binds.push(BindValue::Text(compiled.query_text.clone()));
831        if filter_by_kind {
832            binds.push(BindValue::Text(compiled.root_kind.clone()));
833        }
834
835        // Build fusable-filter clauses, aliased against `src` inside the
836        // candidate CTE. Same predicate set the text path fuses.
837        let mut fused_clauses = String::new();
838        for predicate in &compiled.fusable_filters {
839            match predicate {
840                Predicate::KindEq(kind) => {
841                    binds.push(BindValue::Text(kind.clone()));
842                    let idx = binds.len();
843                    let _ = write!(
844                        fused_clauses,
845                        "\n                      AND src.kind = ?{idx}"
846                    );
847                }
848                Predicate::LogicalIdEq(logical_id) => {
849                    binds.push(BindValue::Text(logical_id.clone()));
850                    let idx = binds.len();
851                    let _ = write!(
852                        fused_clauses,
853                        "\n                      AND src.logical_id = ?{idx}"
854                    );
855                }
856                Predicate::SourceRefEq(source_ref) => {
857                    binds.push(BindValue::Text(source_ref.clone()));
858                    let idx = binds.len();
859                    let _ = write!(
860                        fused_clauses,
861                        "\n                      AND src.source_ref = ?{idx}"
862                    );
863                }
864                Predicate::ContentRefEq(uri) => {
865                    binds.push(BindValue::Text(uri.clone()));
866                    let idx = binds.len();
867                    let _ = write!(
868                        fused_clauses,
869                        "\n                      AND src.content_ref = ?{idx}"
870                    );
871                }
872                Predicate::ContentRefNotNull => {
873                    fused_clauses
874                        .push_str("\n                      AND src.content_ref IS NOT NULL");
875                }
876                Predicate::JsonPathFusedEq { path, value } => {
877                    binds.push(BindValue::Text(path.clone()));
878                    let path_idx = binds.len();
879                    binds.push(BindValue::Text(value.clone()));
880                    let value_idx = binds.len();
881                    let _ = write!(
882                        fused_clauses,
883                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
884                    );
885                }
886                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
887                    binds.push(BindValue::Text(path.clone()));
888                    let path_idx = binds.len();
889                    binds.push(BindValue::Integer(*value));
890                    let value_idx = binds.len();
891                    let operator = match op {
892                        ComparisonOp::Gt => ">",
893                        ComparisonOp::Gte => ">=",
894                        ComparisonOp::Lt => "<",
895                        ComparisonOp::Lte => "<=",
896                    };
897                    let _ = write!(
898                        fused_clauses,
899                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
900                    );
901                }
902                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
903                    // JSON predicates are residual; compile_vector_search
904                    // guarantees they never appear here, but stay defensive.
905                }
906            }
907        }
908
909        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
910        let mut filter_clauses = String::new();
911        for predicate in &compiled.residual_filters {
912            match predicate {
913                Predicate::JsonPathEq { path, value } => {
914                    binds.push(BindValue::Text(path.clone()));
915                    let path_idx = binds.len();
916                    binds.push(scalar_to_bind(value));
917                    let value_idx = binds.len();
918                    let _ = write!(
919                        filter_clauses,
920                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
921                    );
922                }
923                Predicate::JsonPathCompare { path, op, value } => {
924                    binds.push(BindValue::Text(path.clone()));
925                    let path_idx = binds.len();
926                    binds.push(scalar_to_bind(value));
927                    let value_idx = binds.len();
928                    let operator = match op {
929                        ComparisonOp::Gt => ">",
930                        ComparisonOp::Gte => ">=",
931                        ComparisonOp::Lt => "<",
932                        ComparisonOp::Lte => "<=",
933                    };
934                    let _ = write!(
935                        filter_clauses,
936                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
937                    );
938                }
939                Predicate::KindEq(_)
940                | Predicate::LogicalIdEq(_)
941                | Predicate::SourceRefEq(_)
942                | Predicate::ContentRefEq(_)
943                | Predicate::ContentRefNotNull
944                | Predicate::JsonPathFusedEq { .. }
945                | Predicate::JsonPathFusedTimestampCmp { .. } => {
946                    // Fusable predicates live in fused_clauses above.
947                }
948            }
949        }
950
951        // Bind the outer limit as a named parameter for prepare_cached
952        // stability across calls that vary only by limit value.
953        let limit = compiled.limit;
954        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
955        let limit_idx = binds.len();
956
957        // sqlite-vec requires the LIMIT/k constraint to be visible directly
958        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
959        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
960        // Phase 12's planner may raise this to compensate for fusion
961        // narrowing the candidate pool).
962        let base_limit = limit;
963        let kind_clause = if filter_by_kind {
964            "\n                      AND src.kind = ?2"
965        } else {
966            ""
967        };
968
969        let sql = format!(
970            "WITH vector_hits AS (
971                SELECT
972                    src.row_id AS row_id,
973                    src.logical_id AS logical_id,
974                    src.kind AS kind,
975                    src.properties AS properties,
976                    src.source_ref AS source_ref,
977                    src.content_ref AS content_ref,
978                    src.created_at AS created_at,
979                    vc.distance AS distance,
980                    vc.chunk_id AS chunk_id
981                FROM (
982                    SELECT chunk_id, distance
983                    FROM vec_nodes_active
984                    WHERE embedding MATCH ?1
985                    LIMIT {base_limit}
986                ) vc
987                JOIN chunks c ON c.id = vc.chunk_id
988                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
989                WHERE 1 = 1{kind_clause}{fused_clauses}
990            )
991            SELECT
992                h.row_id,
993                h.logical_id,
994                h.kind,
995                h.properties,
996                h.content_ref,
997                am.last_accessed_at,
998                h.created_at,
999                h.distance,
1000                h.chunk_id
1001            FROM vector_hits h
1002            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1003            WHERE 1 = 1{filter_clauses}
1004            ORDER BY h.distance ASC
1005            LIMIT ?{limit_idx}"
1006        );
1007
1008        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1009
1010        let conn_guard = match self.lock_connection() {
1011            Ok(g) => g,
1012            Err(e) => {
1013                self.telemetry.increment_errors();
1014                return Err(e);
1015            }
1016        };
1017        let mut statement = match conn_guard.prepare_cached(&sql) {
1018            Ok(stmt) => stmt,
1019            Err(e) if is_vec_table_absent(&e) => {
1020                // Capability miss: non-fatal — surface as was_degraded.
1021                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1022                    trace_warn!("vector table absent, degrading vector_search to empty result");
1023                }
1024                return Ok(SearchRows {
1025                    hits: Vec::new(),
1026                    strict_hit_count: 0,
1027                    relaxed_hit_count: 0,
1028                    vector_hit_count: 0,
1029                    fallback_used: false,
1030                    was_degraded: true,
1031                });
1032            }
1033            Err(e) => {
1034                self.telemetry.increment_errors();
1035                return Err(EngineError::Sqlite(e));
1036            }
1037        };
1038
1039        let attribution_requested = compiled.attribution_requested;
1040        let hits = match statement
1041            .query_map(params_from_iter(bind_values.iter()), |row| {
1042                let distance: f64 = row.get(7)?;
1043                // Score is the negated distance per addendum 1
1044                // §Vector-Specific Behavior / Score and distance. For
1045                // distance metrics (sqlite-vec's default) lower distance =
1046                // better match, so negating yields the higher-is-better
1047                // convention that dedup_branch_hits and the unified result
1048                // surface rely on.
1049                let score = -distance;
1050                Ok(SearchHit {
1051                    node: fathomdb_query::NodeRowLite {
1052                        row_id: row.get(0)?,
1053                        logical_id: row.get(1)?,
1054                        kind: row.get(2)?,
1055                        properties: row.get(3)?,
1056                        content_ref: row.get(4)?,
1057                        last_accessed_at: row.get(5)?,
1058                    },
1059                    written_at: row.get(6)?,
1060                    score,
1061                    modality: RetrievalModality::Vector,
1062                    source: SearchHitSource::Vector,
1063                    // Vector hits have no strict/relaxed notion.
1064                    match_mode: None,
1065                    // Vector hits have no snippet.
1066                    snippet: None,
1067                    projection_row_id: row.get::<_, Option<String>>(8)?,
1068                    vector_distance: Some(distance),
1069                    attribution: if attribution_requested {
1070                        Some(HitAttribution {
1071                            matched_paths: Vec::new(),
1072                        })
1073                    } else {
1074                        None
1075                    },
1076                })
1077            })
1078            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1079        {
1080            Ok(rows) => rows,
1081            Err(e) => {
1082                // Some SQLite errors surface during row iteration (e.g. when
1083                // the vec0 extension is not loaded but the table exists as a
1084                // stub). Classify as capability-miss when the shape matches.
1085                if is_vec_table_absent(&e) {
1086                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1087                        trace_warn!(
1088                            "vector table absent at query time, degrading vector_search to empty result"
1089                        );
1090                    }
1091                    drop(statement);
1092                    drop(conn_guard);
1093                    return Ok(SearchRows {
1094                        hits: Vec::new(),
1095                        strict_hit_count: 0,
1096                        relaxed_hit_count: 0,
1097                        vector_hit_count: 0,
1098                        fallback_used: false,
1099                        was_degraded: true,
1100                    });
1101                }
1102                self.telemetry.increment_errors();
1103                return Err(EngineError::Sqlite(e));
1104            }
1105        };
1106
1107        drop(statement);
1108        drop(conn_guard);
1109
1110        self.telemetry.increment_queries();
1111        let vector_hit_count = hits.len();
1112        Ok(SearchRows {
1113            hits,
1114            strict_hit_count: 0,
1115            relaxed_hit_count: 0,
1116            vector_hit_count,
1117            fallback_used: false,
1118            was_degraded: false,
1119        })
1120    }
1121
1122    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1123    /// entry point) and return deterministically ranked, block-ordered
1124    /// [`SearchRows`].
1125    ///
1126    /// Stages, per addendum 1 §Retrieval Planner Model:
1127    ///
1128    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1129    ///    empty branch result inside `run_search_branch`).
1130    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1131    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1132    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1133    ///    Phase 6 text-only path.
1134    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1135    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1136    ///    planner never wires a vector branch through `search()`, so this
1137    ///    code path is structurally present but dormant.** A future phase
1138    ///    that wires read-time embedding into `compile_retrieval_plan` will
1139    ///    immediately light it up.
1140    /// 4. **Fusion.** All collected hits are merged via
1141    ///    [`merge_search_branches_three`], which produces strict ->
1142    ///    relaxed -> vector block ordering with cross-branch dedup
1143    ///    resolved by branch precedence.
1144    ///
1145    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1146    /// addendum's "vector capability miss => `was_degraded`" semantics
1147    /// applies to `search()` only when the unified planner actually fires
1148    /// the vector branch, which v1 never does.
1149    ///
1150    /// # Errors
1151    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1152    /// executed for a non-capability-miss reason.
1153    pub fn execute_retrieval_plan(
1154        &self,
1155        plan: &CompiledRetrievalPlan,
1156        raw_query: &str,
1157    ) -> Result<SearchRows, EngineError> {
1158        // Phase 12.5a: we work against a local owned copy so
1159        // `fill_vector_branch` can mutate `plan.vector` and
1160        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1161        // a bounded set of predicates + text AST nodes) and avoids
1162        // forcing callers to hand us `&mut` access.
1163        let mut plan = plan.clone();
1164        let limit = plan.text.strict.limit;
1165
1166        // Stage 1: text strict.
1167        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1168
1169        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1170        // path uses.
1171        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1172        let strict_underfilled = strict_hits.len() < fallback_threshold;
1173        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1174        let mut fallback_used = false;
1175        let mut was_degraded = false;
1176        if let Some(relaxed) = plan.text.relaxed.as_ref()
1177            && strict_underfilled
1178        {
1179            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1180            fallback_used = true;
1181            was_degraded = plan.was_degraded_at_plan_time;
1182        }
1183
1184        // Phase 12.5a: fill the vector branch from the configured
1185        // read-time query embedder, if any. Option (b) from the spec:
1186        // only pay the embedding cost when the text branches returned
1187        // nothing, because the three-branch stage gate below only runs
1188        // the vector stage under exactly that condition. This keeps the
1189        // hot path (strict text matched) embedder-free.
1190        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1191        if text_branches_empty && self.query_embedder.is_some() {
1192            self.fill_vector_branch(&mut plan, raw_query);
1193        }
1194
1195        // Stage 3: vector. Runs only when text retrieval is empty AND a
1196        // vector branch is present. When no embedder is configured (Phase
1197        // 12.5a default) `plan.vector` stays `None` and this stage is a
1198        // no-op, preserving the Phase 12 v1 dormancy invariant.
1199        let mut vector_hits: Vec<SearchHit> = Vec::new();
1200        if let Some(vector) = plan.vector.as_ref()
1201            && strict_hits.is_empty()
1202            && relaxed_hits.is_empty()
1203        {
1204            let vector_rows = self.execute_compiled_vector_search(vector)?;
1205            // `execute_compiled_vector_search` returns a fully populated
1206            // `SearchRows`. Promote its hits into the merge stage and lift
1207            // its capability-miss `was_degraded` flag onto the unified
1208            // result, per addendum §Vector-Specific Behavior.
1209            vector_hits = vector_rows.hits;
1210            if vector_rows.was_degraded {
1211                was_degraded = true;
1212            }
1213        }
1214        // Phase 12.5a: an embedder-reported capability miss surfaces as
1215        // `plan.was_degraded_at_plan_time = true` set inside
1216        // `fill_vector_branch`. Lift it onto the response so callers see
1217        // the graceful degradation even when the vector stage-gate never
1218        // had the chance to fire (the embedder call itself failed and
1219        // the vector slot stayed `None`).
1220        if text_branches_empty
1221            && plan.was_degraded_at_plan_time
1222            && plan.vector.is_none()
1223            && self.query_embedder.is_some()
1224        {
1225            was_degraded = true;
1226        }
1227
1228        // Stage 4: fusion.
1229        let strict = &plan.text.strict;
1230        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1231        if strict.attribution_requested {
1232            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1233            self.populate_attribution_for_hits(
1234                &mut merged,
1235                &strict.text_query,
1236                relaxed_text_query,
1237            )?;
1238        }
1239
1240        let strict_hit_count = merged
1241            .iter()
1242            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1243            .count();
1244        let relaxed_hit_count = merged
1245            .iter()
1246            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1247            .count();
1248        let vector_hit_count = merged
1249            .iter()
1250            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1251            .count();
1252
1253        Ok(SearchRows {
1254            hits: merged,
1255            strict_hit_count,
1256            relaxed_hit_count,
1257            vector_hit_count,
1258            fallback_used,
1259            was_degraded,
1260        })
1261    }
1262
1263    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1264    /// query embedder, if any.
1265    ///
1266    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1267    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1268    /// - Both the strict and relaxed text branches already ran and
1269    ///   returned zero hits, so the existing three-branch stage gate
1270    ///   will actually fire the vector stage once the slot is populated.
1271    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1272    ///   cost entirely when text retrieval already won.
1273    ///
1274    /// Contract: never panics, never returns an error. On embedder error
1275    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1276    /// `plan.vector` as `None`; the coordinator's normal error-free
1277    /// degradation path then reports `was_degraded` on the result.
1278    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1279        let Some(embedder) = self.query_embedder.as_ref() else {
1280            return;
1281        };
1282        match embedder.embed_query(raw_query) {
1283            Ok(vec) => {
1284                // `CompiledVectorSearch::query_text` is a JSON float-array
1285                // literal at the time the coordinator binds it (see the
1286                // `CompiledVectorSearch` docs). `serde_json::to_string`
1287                // on a `Vec<f32>` produces exactly that shape — no
1288                // wire-format change required.
1289                let literal = match serde_json::to_string(&vec) {
1290                    Ok(s) => s,
1291                    Err(err) => {
1292                        trace_warn!(
1293                            error = %err,
1294                            "query embedder vector serialization failed; skipping vector branch"
1295                        );
1296                        let _ = err; // Used by trace_warn! when tracing feature is active
1297                        plan.was_degraded_at_plan_time = true;
1298                        return;
1299                    }
1300                };
1301                let strict = &plan.text.strict;
1302                plan.vector = Some(CompiledVectorSearch {
1303                    root_kind: strict.root_kind.clone(),
1304                    query_text: literal,
1305                    limit: strict.limit,
1306                    fusable_filters: strict.fusable_filters.clone(),
1307                    residual_filters: strict.residual_filters.clone(),
1308                    attribution_requested: strict.attribution_requested,
1309                });
1310            }
1311            Err(err) => {
1312                trace_warn!(
1313                    error = %err,
1314                    "query embedder unavailable, skipping vector branch"
1315                );
1316                let _ = err; // Used by trace_warn! when tracing feature is active
1317                plan.was_degraded_at_plan_time = true;
1318            }
1319        }
1320    }
1321
1322    /// Execute a single search branch against the underlying FTS surfaces.
1323    ///
1324    /// This is the shared SQL emission path used by
1325    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1326    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1327    /// The returned hits are tagged with `branch`'s corresponding
1328    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1329    /// caller is responsible for merging multiple branches.
1330    #[allow(clippy::too_many_lines)]
1331    fn run_search_branch(
1332        &self,
1333        compiled: &CompiledSearch,
1334        branch: SearchBranch,
1335    ) -> Result<Vec<SearchHit>, EngineError> {
1336        use std::fmt::Write as _;
1337        // Short-circuit an empty/whitespace-only query: rendering it would
1338        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1339        // (including the adaptive path when strict is Empty and derive_relaxed
1340        // returns None) must see an empty result, not an error. Each branch
1341        // is short-circuited independently so a strict-Empty + relaxed-Some
1342        // plan still exercises the relaxed branch.
1343        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1344        // matches "every row not containing X" — a complement-of-corpus scan
1345        // that no caller would intentionally want. Short-circuit to empty at
1346        // the root only; a `Not` nested inside an `And` is a legitimate
1347        // exclusion and must still run.
1348        if matches!(
1349            compiled.text_query,
1350            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1351        ) {
1352            return Ok(Vec::new());
1353        }
1354        let rendered = render_text_query_fts5(&compiled.text_query);
1355        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1356        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1357        // The adaptive `text_search()` path never produces an empty root_kind
1358        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1359        // the entry point.
1360        let filter_by_kind = !compiled.root_kind.is_empty();
1361        let mut binds: Vec<BindValue> = if filter_by_kind {
1362            vec![
1363                BindValue::Text(rendered.clone()),
1364                BindValue::Text(compiled.root_kind.clone()),
1365                BindValue::Text(rendered),
1366                BindValue::Text(compiled.root_kind.clone()),
1367            ]
1368        } else {
1369            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1370        };
1371
1372        // P2-5: both fusable and residual predicates now match against the
1373        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1374        // `u.content_ref`, `u.properties`) because the inner UNION arms
1375        // project the full active-row column set through the
1376        // `JOIN nodes src` already present in each arm. The previous
1377        // implementation re-joined `nodes hn` at the CTE level and
1378        // `nodes n` again at the outer SELECT, which was triple work on
1379        // the hot search path.
1380        let mut fused_clauses = String::new();
1381        for predicate in &compiled.fusable_filters {
1382            match predicate {
1383                Predicate::KindEq(kind) => {
1384                    binds.push(BindValue::Text(kind.clone()));
1385                    let idx = binds.len();
1386                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1387                }
1388                Predicate::LogicalIdEq(logical_id) => {
1389                    binds.push(BindValue::Text(logical_id.clone()));
1390                    let idx = binds.len();
1391                    let _ = write!(
1392                        fused_clauses,
1393                        "\n                  AND u.logical_id = ?{idx}"
1394                    );
1395                }
1396                Predicate::SourceRefEq(source_ref) => {
1397                    binds.push(BindValue::Text(source_ref.clone()));
1398                    let idx = binds.len();
1399                    let _ = write!(
1400                        fused_clauses,
1401                        "\n                  AND u.source_ref = ?{idx}"
1402                    );
1403                }
1404                Predicate::ContentRefEq(uri) => {
1405                    binds.push(BindValue::Text(uri.clone()));
1406                    let idx = binds.len();
1407                    let _ = write!(
1408                        fused_clauses,
1409                        "\n                  AND u.content_ref = ?{idx}"
1410                    );
1411                }
1412                Predicate::ContentRefNotNull => {
1413                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1414                }
1415                Predicate::JsonPathFusedEq { path, value } => {
1416                    binds.push(BindValue::Text(path.clone()));
1417                    let path_idx = binds.len();
1418                    binds.push(BindValue::Text(value.clone()));
1419                    let value_idx = binds.len();
1420                    let _ = write!(
1421                        fused_clauses,
1422                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1423                    );
1424                }
1425                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1426                    binds.push(BindValue::Text(path.clone()));
1427                    let path_idx = binds.len();
1428                    binds.push(BindValue::Integer(*value));
1429                    let value_idx = binds.len();
1430                    let operator = match op {
1431                        ComparisonOp::Gt => ">",
1432                        ComparisonOp::Gte => ">=",
1433                        ComparisonOp::Lt => "<",
1434                        ComparisonOp::Lte => "<=",
1435                    };
1436                    let _ = write!(
1437                        fused_clauses,
1438                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1439                    );
1440                }
1441                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1442                    // Should be in residual_filters; compile_search guarantees
1443                    // this, but stay defensive.
1444                }
1445            }
1446        }
1447
1448        let mut filter_clauses = String::new();
1449        for predicate in &compiled.residual_filters {
1450            match predicate {
1451                Predicate::JsonPathEq { path, value } => {
1452                    binds.push(BindValue::Text(path.clone()));
1453                    let path_idx = binds.len();
1454                    binds.push(scalar_to_bind(value));
1455                    let value_idx = binds.len();
1456                    let _ = write!(
1457                        filter_clauses,
1458                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1459                    );
1460                }
1461                Predicate::JsonPathCompare { path, op, value } => {
1462                    binds.push(BindValue::Text(path.clone()));
1463                    let path_idx = binds.len();
1464                    binds.push(scalar_to_bind(value));
1465                    let value_idx = binds.len();
1466                    let operator = match op {
1467                        ComparisonOp::Gt => ">",
1468                        ComparisonOp::Gte => ">=",
1469                        ComparisonOp::Lt => "<",
1470                        ComparisonOp::Lte => "<=",
1471                    };
1472                    let _ = write!(
1473                        filter_clauses,
1474                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1475                    );
1476                }
1477                Predicate::KindEq(_)
1478                | Predicate::LogicalIdEq(_)
1479                | Predicate::SourceRefEq(_)
1480                | Predicate::ContentRefEq(_)
1481                | Predicate::ContentRefNotNull
1482                | Predicate::JsonPathFusedEq { .. }
1483                | Predicate::JsonPathFusedTimestampCmp { .. } => {
1484                    // Fusable predicates live in fused_clauses; compile_search
1485                    // partitions them out of residual_filters.
1486                }
1487            }
1488        }
1489
1490        // Bind `limit` as an integer parameter rather than formatting it into
1491        // the SQL string. Interpolating the limit made the prepared-statement
1492        // SQL vary by limit value, so rusqlite's default 16-slot
1493        // `prepare_cached` cache thrashed for paginated callers that varied
1494        // limits per call. With the bind the SQL is structurally stable for
1495        // a given filter shape regardless of `limit` value.
1496        let limit = compiled.limit;
1497        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1498        let limit_idx = binds.len();
1499        // P2-5: the inner UNION arms project the full active-row column
1500        // set through `JOIN nodes src` (kind, row_id, source_ref,
1501        // content_ref, content_hash, created_at, properties). Both the
1502        // CTE's outer WHERE and the final SELECT consume those columns
1503        // directly, which eliminates the previous `JOIN nodes hn` at the
1504        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1505        // redundant joins on the hot search path. `src.superseded_at IS
1506        // NULL` in each arm already filters retired rows, which is what
1507        // the dropped outer joins used to do.
1508        let (chunk_fts_bind, chunk_kind_clause, prop_fts_bind, prop_kind_clause) = if filter_by_kind
1509        {
1510            (
1511                "?1",
1512                "\n                      AND src.kind = ?2",
1513                "?3",
1514                "\n                      AND fp.kind = ?4",
1515            )
1516        } else {
1517            ("?1", "", "?2", "")
1518        };
1519        let sql = format!(
1520            "WITH search_hits AS (
1521                SELECT
1522                    u.row_id AS row_id,
1523                    u.logical_id AS logical_id,
1524                    u.kind AS kind,
1525                    u.properties AS properties,
1526                    u.source_ref AS source_ref,
1527                    u.content_ref AS content_ref,
1528                    u.created_at AS created_at,
1529                    u.score AS score,
1530                    u.source AS source,
1531                    u.snippet AS snippet,
1532                    u.projection_row_id AS projection_row_id
1533                FROM (
1534                    SELECT
1535                        src.row_id AS row_id,
1536                        c.node_logical_id AS logical_id,
1537                        src.kind AS kind,
1538                        src.properties AS properties,
1539                        src.source_ref AS source_ref,
1540                        src.content_ref AS content_ref,
1541                        src.created_at AS created_at,
1542                        -bm25(fts_nodes) AS score,
1543                        'chunk' AS source,
1544                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1545                        f.chunk_id AS projection_row_id
1546                    FROM fts_nodes f
1547                    JOIN chunks c ON c.id = f.chunk_id
1548                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1549                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}
1550                    UNION ALL
1551                    SELECT
1552                        src.row_id AS row_id,
1553                        fp.node_logical_id AS logical_id,
1554                        src.kind AS kind,
1555                        src.properties AS properties,
1556                        src.source_ref AS source_ref,
1557                        src.content_ref AS content_ref,
1558                        src.created_at AS created_at,
1559                        -bm25(fts_node_properties) AS score,
1560                        'property' AS source,
1561                        substr(fp.text_content, 1, 200) AS snippet,
1562                        CAST(fp.rowid AS TEXT) AS projection_row_id
1563                    FROM fts_node_properties fp
1564                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1565                    WHERE fts_node_properties MATCH {prop_fts_bind}{prop_kind_clause}
1566                ) u
1567                WHERE 1 = 1{fused_clauses}
1568                ORDER BY u.score DESC
1569                LIMIT ?{limit_idx}
1570            )
1571            SELECT
1572                h.row_id,
1573                h.logical_id,
1574                h.kind,
1575                h.properties,
1576                h.content_ref,
1577                am.last_accessed_at,
1578                h.created_at,
1579                h.score,
1580                h.source,
1581                h.snippet,
1582                h.projection_row_id
1583            FROM search_hits h
1584            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1585            WHERE 1 = 1{filter_clauses}
1586            ORDER BY h.score DESC"
1587        );
1588
1589        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1590
1591        let conn_guard = match self.lock_connection() {
1592            Ok(g) => g,
1593            Err(e) => {
1594                self.telemetry.increment_errors();
1595                return Err(e);
1596            }
1597        };
1598        let mut statement = match conn_guard.prepare_cached(&sql) {
1599            Ok(stmt) => stmt,
1600            Err(e) => {
1601                self.telemetry.increment_errors();
1602                return Err(EngineError::Sqlite(e));
1603            }
1604        };
1605
1606        let hits = match statement
1607            .query_map(params_from_iter(bind_values.iter()), |row| {
1608                let source_str: String = row.get(8)?;
1609                // The CTE emits only two literal values here: `'chunk'` and
1610                // `'property'`. Default to `Chunk` on anything unexpected so a
1611                // schema drift surfaces as a mislabelled hit rather than a
1612                // row-level error.
1613                let source = if source_str == "property" {
1614                    SearchHitSource::Property
1615                } else {
1616                    SearchHitSource::Chunk
1617                };
1618                let match_mode = match branch {
1619                    SearchBranch::Strict => SearchMatchMode::Strict,
1620                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1621                };
1622                Ok(SearchHit {
1623                    node: fathomdb_query::NodeRowLite {
1624                        row_id: row.get(0)?,
1625                        logical_id: row.get(1)?,
1626                        kind: row.get(2)?,
1627                        properties: row.get(3)?,
1628                        content_ref: row.get(4)?,
1629                        last_accessed_at: row.get(5)?,
1630                    },
1631                    written_at: row.get(6)?,
1632                    score: row.get(7)?,
1633                    // Phase 10: every branch currently emits text hits.
1634                    modality: RetrievalModality::Text,
1635                    source,
1636                    match_mode: Some(match_mode),
1637                    snippet: row.get(9)?,
1638                    projection_row_id: row.get(10)?,
1639                    vector_distance: None,
1640                    attribution: None,
1641                })
1642            })
1643            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1644        {
1645            Ok(rows) => rows,
1646            Err(e) => {
1647                self.telemetry.increment_errors();
1648                return Err(EngineError::Sqlite(e));
1649            }
1650        };
1651
1652        // Drop the statement so `conn_guard` is free (attribution is
1653        // resolved after dedup in `execute_compiled_search_plan` to avoid
1654        // spending highlight lookups on hits that will be discarded).
1655        drop(statement);
1656        drop(conn_guard);
1657
1658        self.telemetry.increment_queries();
1659        Ok(hits)
1660    }
1661
1662    /// Populate per-hit attribution for the given deduped merged hits.
1663    /// Runs after [`merge_search_branches`] so dropped duplicates do not
1664    /// incur the highlight+position-map lookup cost.
1665    fn populate_attribution_for_hits(
1666        &self,
1667        hits: &mut [SearchHit],
1668        strict_text_query: &fathomdb_query::TextQuery,
1669        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1670    ) -> Result<(), EngineError> {
1671        let conn_guard = match self.lock_connection() {
1672            Ok(g) => g,
1673            Err(e) => {
1674                self.telemetry.increment_errors();
1675                return Err(e);
1676            }
1677        };
1678        let strict_expr = render_text_query_fts5(strict_text_query);
1679        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1680        for hit in hits.iter_mut() {
1681            // Phase 10: text hits always carry `Some(match_mode)`. Vector
1682            // hits (when a future phase adds them) have `None` here and
1683            // are skipped by the attribution resolver because attribution
1684            // is meaningless for vector matches.
1685            let match_expr = match hit.match_mode {
1686                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1687                Some(SearchMatchMode::Relaxed) => {
1688                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1689                }
1690                None => continue,
1691            };
1692            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1693                Ok(att) => hit.attribution = Some(att),
1694                Err(e) => {
1695                    self.telemetry.increment_errors();
1696                    return Err(e);
1697                }
1698            }
1699        }
1700        Ok(())
1701    }
1702
1703    /// # Errors
1704    /// Returns [`EngineError`] if the root query or any bounded expansion
1705    /// query cannot be prepared or executed.
1706    pub fn execute_compiled_grouped_read(
1707        &self,
1708        compiled: &CompiledGroupedQuery,
1709    ) -> Result<GroupedQueryRows, EngineError> {
1710        let root_rows = self.execute_compiled_read(&compiled.root)?;
1711        if root_rows.was_degraded {
1712            return Ok(GroupedQueryRows {
1713                roots: Vec::new(),
1714                expansions: Vec::new(),
1715                was_degraded: true,
1716            });
1717        }
1718
1719        let roots = root_rows.nodes;
1720        let mut expansions = Vec::with_capacity(compiled.expansions.len());
1721        for expansion in &compiled.expansions {
1722            let slot_rows = if roots.is_empty() {
1723                Vec::new()
1724            } else {
1725                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1726            };
1727            expansions.push(ExpansionSlotRows {
1728                slot: expansion.slot.clone(),
1729                roots: slot_rows,
1730            });
1731        }
1732
1733        Ok(GroupedQueryRows {
1734            roots,
1735            expansions,
1736            was_degraded: false,
1737        })
1738    }
1739
1740    /// Chunked batched expansion: splits roots into chunks of
1741    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
1742    /// results while preserving root ordering.  This keeps bind-parameter
1743    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
1744    /// for large result sets.
1745    fn read_expansion_nodes_chunked(
1746        &self,
1747        roots: &[NodeRow],
1748        expansion: &ExpansionSlot,
1749        hard_limit: usize,
1750    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1751        if roots.len() <= BATCH_CHUNK_SIZE {
1752            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1753        }
1754
1755        // Merge chunk results keyed by root logical_id, then reassemble in
1756        // root order.
1757        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1758        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1759            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1760                per_root
1761                    .entry(group.root_logical_id)
1762                    .or_default()
1763                    .extend(group.nodes);
1764            }
1765        }
1766
1767        Ok(roots
1768            .iter()
1769            .map(|root| ExpansionRootRows {
1770                root_logical_id: root.logical_id.clone(),
1771                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1772            })
1773            .collect())
1774    }
1775
1776    /// Batched expansion: one recursive CTE query per expansion slot that
1777    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
1778    /// source_logical_id ...)` to enforce the per-root hard limit inside the
1779    /// database rather than in Rust.
1780    fn read_expansion_nodes_batched(
1781        &self,
1782        roots: &[NodeRow],
1783        expansion: &ExpansionSlot,
1784        hard_limit: usize,
1785    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1786        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1787        let (join_condition, next_logical_id) = match expansion.direction {
1788            fathomdb_query::TraverseDirection::Out => {
1789                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1790            }
1791            fathomdb_query::TraverseDirection::In => {
1792                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1793            }
1794        };
1795
1796        // EXECUTE-TIME VALIDATION: fused filter against kinds without an FTS schema.
1797        // This check runs at execute time (not builder time) because the target kind
1798        // set for an expand slot is edge-label-scoped, not kind-scoped, and multiple
1799        // target kinds may be reachable via one edge label. See Pack 12 docs.
1800        if expansion.filter.as_ref().is_some_and(|f| {
1801            matches!(
1802                f,
1803                Predicate::JsonPathFusedEq { .. } | Predicate::JsonPathFusedTimestampCmp { .. }
1804            )
1805        }) {
1806            self.validate_fused_filter_for_edge_label(&expansion.label)?;
1807        }
1808
1809        // Build a UNION ALL of SELECT literals for the root seed rows.
1810        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
1811        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
1812        let root_seed_union: String = (1..=root_ids.len())
1813            .map(|i| format!("SELECT ?{i}"))
1814            .collect::<Vec<_>>()
1815            .join(" UNION ALL ");
1816
1817        // Bind params: root IDs occupy ?1..=?N, edge kind is ?(N+1).
1818        // Filter params (if any) follow starting at ?(N+2).
1819        let edge_kind_param = root_ids.len() + 1;
1820        let filter_param_start = root_ids.len() + 2;
1821
1822        // Compile the optional target-side filter to a SQL fragment + bind values.
1823        // The fragment is injected into the `numbered` CTE's WHERE clause BEFORE
1824        // the ROW_NUMBER() window so the per-originator limit counts only matching rows.
1825        let (filter_sql, filter_binds) =
1826            compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
1827
1828        // The `root_id` column tracks which root each traversal path
1829        // originated from. The `ROW_NUMBER()` window in the outer query
1830        // enforces the per-root hard limit.
1831        let sql = format!(
1832            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
1833            traversed(root_id, logical_id, depth, visited, emitted) AS (
1834                SELECT rid, rid, 0, printf(',%s,', rid), 0
1835                FROM root_ids
1836                UNION ALL
1837                SELECT
1838                    t.root_id,
1839                    {next_logical_id},
1840                    t.depth + 1,
1841                    t.visited || {next_logical_id} || ',',
1842                    t.emitted + 1
1843                FROM traversed t
1844                JOIN edges e ON {join_condition}
1845                    AND e.kind = ?{edge_kind_param}
1846                    AND e.superseded_at IS NULL
1847                WHERE t.depth < {max_depth}
1848                  AND t.emitted < {hard_limit}
1849                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
1850            ),
1851            numbered AS (
1852                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
1853                     , n.content_ref, am.last_accessed_at
1854                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
1855                FROM traversed t
1856                JOIN nodes n ON n.logical_id = t.logical_id
1857                    AND n.superseded_at IS NULL
1858                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
1859                WHERE t.depth > 0{filter_sql}
1860            )
1861            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
1862            FROM numbered
1863            WHERE rn <= {hard_limit}
1864            ORDER BY root_id, logical_id",
1865            max_depth = expansion.max_depth,
1866        );
1867
1868        let conn_guard = self.lock_connection()?;
1869        let mut statement = conn_guard
1870            .prepare_cached(&sql)
1871            .map_err(EngineError::Sqlite)?;
1872
1873        // Bind root IDs (1..=N) and edge kind (N+1), then filter params (N+2...).
1874        let mut bind_values: Vec<Value> = root_ids
1875            .iter()
1876            .map(|id| Value::Text((*id).to_owned()))
1877            .collect();
1878        bind_values.push(Value::Text(expansion.label.clone()));
1879        bind_values.extend(filter_binds);
1880
1881        let rows = statement
1882            .query_map(params_from_iter(bind_values.iter()), |row| {
1883                Ok((
1884                    row.get::<_, String>(0)?, // root_id
1885                    NodeRow {
1886                        row_id: row.get(1)?,
1887                        logical_id: row.get(2)?,
1888                        kind: row.get(3)?,
1889                        properties: row.get(4)?,
1890                        content_ref: row.get(5)?,
1891                        last_accessed_at: row.get(6)?,
1892                    },
1893                ))
1894            })
1895            .map_err(EngineError::Sqlite)?
1896            .collect::<Result<Vec<_>, _>>()
1897            .map_err(EngineError::Sqlite)?;
1898
1899        // Partition results back into per-root groups, preserving root order.
1900        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1901        for (root_id, node) in rows {
1902            per_root.entry(root_id).or_default().push(node);
1903        }
1904
1905        let root_groups = roots
1906            .iter()
1907            .map(|root| ExpansionRootRows {
1908                root_logical_id: root.logical_id.clone(),
1909                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1910            })
1911            .collect();
1912
1913        Ok(root_groups)
1914    }
1915
1916    /// Validate that all target node kinds reachable via `edge_label` have a
1917    /// registered property-FTS schema. Called at execute time when an expansion
1918    /// slot carries a fused filter predicate.
1919    ///
1920    /// EXECUTE-TIME VALIDATION: this check runs at execute time (not builder
1921    /// time) for expand slots because the target kind set is edge-label-scoped
1922    /// rather than kind-scoped, and is not statically knowable at builder time
1923    /// when multiple target kinds may be reachable via the same label.
1924    /// See Pack 12 docs.
1925    ///
1926    /// # Errors
1927    /// Returns `EngineError::InvalidConfig` if any reachable target kind lacks
1928    /// a registered property-FTS schema.
1929    fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
1930        let conn = self.lock_connection()?;
1931        // Collect the distinct node kinds reachable as targets of this edge label.
1932        let mut stmt = conn
1933            .prepare_cached(
1934                "SELECT DISTINCT n.kind \
1935                 FROM edges e \
1936                 JOIN nodes n ON n.logical_id = e.target_logical_id \
1937                 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
1938            )
1939            .map_err(EngineError::Sqlite)?;
1940        let target_kinds: Vec<String> = stmt
1941            .query_map(rusqlite::params![edge_label], |row| row.get(0))
1942            .map_err(EngineError::Sqlite)?
1943            .collect::<Result<Vec<_>, _>>()
1944            .map_err(EngineError::Sqlite)?;
1945
1946        for kind in &target_kinds {
1947            let has_schema: bool = conn
1948                .query_row(
1949                    "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
1950                    rusqlite::params![kind],
1951                    |row| row.get(0),
1952                )
1953                .map_err(EngineError::Sqlite)?;
1954            if !has_schema {
1955                return Err(EngineError::InvalidConfig(format!(
1956                    "kind {kind:?} has no registered property-FTS schema; register one with \
1957                     admin.register_fts_property_schema(..) before using fused filters on \
1958                     expansion slots, or use JsonPathEq for non-fused semantics \
1959                     (expand slot uses edge label {edge_label:?})"
1960                )));
1961            }
1962        }
1963        Ok(())
1964    }
1965
1966    /// Read a single run by id.
1967    ///
1968    /// # Errors
1969    /// Returns [`EngineError`] if the query fails or if the connection mutex
1970    /// has been poisoned.
1971    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
1972        let conn = self.lock_connection()?;
1973        conn.query_row(
1974            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
1975            rusqlite::params![id],
1976            |row| {
1977                Ok(RunRow {
1978                    id: row.get(0)?,
1979                    kind: row.get(1)?,
1980                    status: row.get(2)?,
1981                    properties: row.get(3)?,
1982                })
1983            },
1984        )
1985        .optional()
1986        .map_err(EngineError::Sqlite)
1987    }
1988
1989    /// Read a single step by id.
1990    ///
1991    /// # Errors
1992    /// Returns [`EngineError`] if the query fails or if the connection mutex
1993    /// has been poisoned.
1994    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
1995        let conn = self.lock_connection()?;
1996        conn.query_row(
1997            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
1998            rusqlite::params![id],
1999            |row| {
2000                Ok(StepRow {
2001                    id: row.get(0)?,
2002                    run_id: row.get(1)?,
2003                    kind: row.get(2)?,
2004                    status: row.get(3)?,
2005                    properties: row.get(4)?,
2006                })
2007            },
2008        )
2009        .optional()
2010        .map_err(EngineError::Sqlite)
2011    }
2012
2013    /// Read a single action by id.
2014    ///
2015    /// # Errors
2016    /// Returns [`EngineError`] if the query fails or if the connection mutex
2017    /// has been poisoned.
2018    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2019        let conn = self.lock_connection()?;
2020        conn.query_row(
2021            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2022            rusqlite::params![id],
2023            |row| {
2024                Ok(ActionRow {
2025                    id: row.get(0)?,
2026                    step_id: row.get(1)?,
2027                    kind: row.get(2)?,
2028                    status: row.get(3)?,
2029                    properties: row.get(4)?,
2030                })
2031            },
2032        )
2033        .optional()
2034        .map_err(EngineError::Sqlite)
2035    }
2036
2037    /// Read all active (non-superseded) runs.
2038    ///
2039    /// # Errors
2040    /// Returns [`EngineError`] if the query fails or if the connection mutex
2041    /// has been poisoned.
2042    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2043        let conn = self.lock_connection()?;
2044        let mut stmt = conn
2045            .prepare_cached(
2046                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2047            )
2048            .map_err(EngineError::Sqlite)?;
2049        let rows = stmt
2050            .query_map([], |row| {
2051                Ok(RunRow {
2052                    id: row.get(0)?,
2053                    kind: row.get(1)?,
2054                    status: row.get(2)?,
2055                    properties: row.get(3)?,
2056                })
2057            })
2058            .map_err(EngineError::Sqlite)?
2059            .collect::<Result<Vec<_>, _>>()
2060            .map_err(EngineError::Sqlite)?;
2061        Ok(rows)
2062    }
2063
2064    /// Returns the number of shape→SQL entries currently indexed.
2065    ///
2066    /// Each distinct query shape (structural hash of kind + steps + limits)
2067    /// maps to exactly one SQL string.  This is a test-oriented introspection
2068    /// helper; it does not reflect rusqlite's internal prepared-statement
2069    /// cache, which is keyed by SQL text.
2070    ///
2071    /// # Panics
2072    /// Panics if the internal shape-SQL-map mutex is poisoned.
2073    #[must_use]
2074    #[allow(clippy::expect_used)]
2075    pub fn shape_sql_count(&self) -> usize {
2076        self.shape_sql_map
2077            .lock()
2078            .unwrap_or_else(PoisonError::into_inner)
2079            .len()
2080    }
2081
2082    /// Returns a cloned `Arc` to the schema manager.
2083    #[must_use]
2084    pub fn schema_manager(&self) -> Arc<SchemaManager> {
2085        Arc::clone(&self.schema_manager)
2086    }
2087
2088    /// Return the execution plan for a compiled query without executing it.
2089    ///
2090    /// Useful for debugging, testing shape-hash caching, and operator
2091    /// diagnostics. Does not open a transaction or touch the database beyond
2092    /// checking the statement cache.
2093    ///
2094    /// # Panics
2095    /// Panics if the internal shape-SQL-map mutex is poisoned.
2096    #[must_use]
2097    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2098        let cache_hit = self
2099            .shape_sql_map
2100            .lock()
2101            .unwrap_or_else(PoisonError::into_inner)
2102            .contains_key(&compiled.shape_hash);
2103        QueryPlan {
2104            sql: wrap_node_row_projection_sql(&compiled.sql),
2105            bind_count: compiled.binds.len(),
2106            driving_table: compiled.driving_table,
2107            shape_hash: compiled.shape_hash,
2108            cache_hit,
2109        }
2110    }
2111
2112    /// Execute a named PRAGMA and return the result as a String.
2113    /// Used by Layer 1 tests to verify startup pragma initialization.
2114    ///
2115    /// # Errors
2116    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
2117    /// mutex has been poisoned.
2118    #[doc(hidden)]
2119    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2120        let conn = self.lock_connection()?;
2121        let result = conn
2122            .query_row(&format!("PRAGMA {name}"), [], |row| {
2123                // PRAGMAs may return TEXT or INTEGER; normalise to String.
2124                row.get::<_, rusqlite::types::Value>(0)
2125            })
2126            .map_err(EngineError::Sqlite)?;
2127        let s = match result {
2128            rusqlite::types::Value::Text(t) => t,
2129            rusqlite::types::Value::Integer(i) => i.to_string(),
2130            rusqlite::types::Value::Real(f) => f.to_string(),
2131            rusqlite::types::Value::Blob(_) => {
2132                return Err(EngineError::InvalidWrite(format!(
2133                    "PRAGMA {name} returned an unexpected BLOB value"
2134                )));
2135            }
2136            rusqlite::types::Value::Null => String::new(),
2137        };
2138        Ok(s)
2139    }
2140
2141    /// Return all provenance events whose `subject` matches the given value.
2142    ///
2143    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
2144    /// values (for excise events).
2145    ///
2146    /// # Errors
2147    /// Returns [`EngineError`] if the query fails or if the connection mutex
2148    /// has been poisoned.
2149    pub fn query_provenance_events(
2150        &self,
2151        subject: &str,
2152    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2153        let conn = self.lock_connection()?;
2154        let mut stmt = conn
2155            .prepare_cached(
2156                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2157                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2158            )
2159            .map_err(EngineError::Sqlite)?;
2160        let events = stmt
2161            .query_map(rusqlite::params![subject], |row| {
2162                Ok(ProvenanceEvent {
2163                    id: row.get(0)?,
2164                    event_type: row.get(1)?,
2165                    subject: row.get(2)?,
2166                    source_ref: row.get(3)?,
2167                    metadata_json: row.get(4)?,
2168                    created_at: row.get(5)?,
2169                })
2170            })
2171            .map_err(EngineError::Sqlite)?
2172            .collect::<Result<Vec<_>, _>>()
2173            .map_err(EngineError::Sqlite)?;
2174        Ok(events)
2175    }
2176
2177    /// Check if `kind` has a first-registration async rebuild in progress
2178    /// (`is_first_registration=1` with state PENDING/BUILDING/SWAPPING and no
2179    /// rows yet in `fts_node_properties`). If so, execute a full-kind scan and
2180    /// return the nodes. Returns `None` when the normal FTS5 path should run.
2181    fn scan_fallback_if_first_registration(
2182        &self,
2183        kind: &str,
2184    ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2185        let conn = self.lock_connection()?;
2186
2187        // Quick point-lookup: kind has a first-registration rebuild in a
2188        // pre-complete state AND fts_node_properties has no rows for it yet.
2189        let needs_scan: bool = conn
2190            .query_row(
2191                "SELECT 1 FROM fts_property_rebuild_state \
2192                 WHERE kind = ?1 AND is_first_registration = 1 \
2193                 AND state IN ('PENDING','BUILDING','SWAPPING') \
2194                 AND NOT EXISTS (SELECT 1 FROM fts_node_properties WHERE kind = ?1) \
2195                 LIMIT 1",
2196                rusqlite::params![kind],
2197                |_| Ok(true),
2198            )
2199            .optional()?
2200            .unwrap_or(false);
2201
2202        if !needs_scan {
2203            return Ok(None);
2204        }
2205
2206        // Scan fallback: return all active nodes of this kind.
2207        // Intentionally unindexed — acceptable for first-registration window.
2208        let mut stmt = conn
2209            .prepare_cached(
2210                "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2211                 am.last_accessed_at \
2212                 FROM nodes n \
2213                 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2214                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2215            )
2216            .map_err(EngineError::Sqlite)?;
2217
2218        let nodes = stmt
2219            .query_map(rusqlite::params![kind], |row| {
2220                Ok(NodeRow {
2221                    row_id: row.get(0)?,
2222                    logical_id: row.get(1)?,
2223                    kind: row.get(2)?,
2224                    properties: row.get(3)?,
2225                    content_ref: row.get(4)?,
2226                    last_accessed_at: row.get(5)?,
2227                })
2228            })
2229            .map_err(EngineError::Sqlite)?
2230            .collect::<Result<Vec<_>, _>>()
2231            .map_err(EngineError::Sqlite)?;
2232
2233        Ok(Some(nodes))
2234    }
2235
2236    /// Return the current rebuild progress for a kind, or `None` if no rebuild
2237    /// has been registered for that kind.
2238    ///
2239    /// # Errors
2240    /// Returns [`EngineError`] if the database query fails.
2241    pub fn get_property_fts_rebuild_progress(
2242        &self,
2243        kind: &str,
2244    ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2245        let conn = self.lock_connection()?;
2246        let row = conn
2247            .query_row(
2248                "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2249                 FROM fts_property_rebuild_state WHERE kind = ?1",
2250                rusqlite::params![kind],
2251                |r| {
2252                    Ok(crate::rebuild_actor::RebuildProgress {
2253                        state: r.get(0)?,
2254                        rows_total: r.get(1)?,
2255                        rows_done: r.get(2)?,
2256                        started_at: r.get(3)?,
2257                        last_progress_at: r.get(4)?,
2258                        error_message: r.get(5)?,
2259                    })
2260                },
2261            )
2262            .optional()?;
2263        Ok(row)
2264    }
2265}
2266
2267fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2268    format!(
2269        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2270         FROM ({base_sql}) q \
2271         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2272    )
2273}
2274
2275/// Returns `true` when `err` indicates the vec virtual table is absent
2276/// (sqlite-vec feature enabled but `vec_nodes_active` not yet created).
2277pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2278    match err {
2279        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2280            msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
2281        }
2282        _ => false,
2283    }
2284}
2285
2286fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2287    match value {
2288        ScalarValue::Text(text) => BindValue::Text(text.clone()),
2289        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2290        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2291    }
2292}
2293
2294/// Merge strict and relaxed search branches into a single block-ordered,
2295/// deduplicated, limit-truncated hit list.
2296///
2297/// Phase 3 rules, in order:
2298///
2299/// 1. Each branch is sorted internally by score descending with `logical_id`
2300///    ascending as the deterministic tiebreak.
2301/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
2302///    once from the chunk surface and once from the property surface) the
2303///    higher-score row wins, then chunk > property > vector, then declaration
2304///    order (chunk first).
2305/// 3. Strict hits form one block and relaxed hits form the next. Strict
2306///    always precedes relaxed in the merged output regardless of per-hit
2307///    score.
2308/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
2309///    already appears in the strict block is dropped.
2310/// 5. The merged output is truncated to `limit`.
2311fn merge_search_branches(
2312    strict: Vec<SearchHit>,
2313    relaxed: Vec<SearchHit>,
2314    limit: usize,
2315) -> Vec<SearchHit> {
2316    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2317}
2318
2319/// Three-branch generalization of [`merge_search_branches`]: orders hits as
2320/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
2321/// Semantics, with cross-branch dedup resolved by branch precedence
2322/// (strict > relaxed > vector). Within each block the existing
2323/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
2324/// priority chunk > property > vector).
2325///
2326/// Phase 12 (the unified `search()` entry point) calls this directly. The
2327/// two-branch [`merge_search_branches`] wrapper is preserved as a
2328/// convenience for the text-only `execute_compiled_search_plan` path; both
2329/// reduce to the same code.
2330fn merge_search_branches_three(
2331    strict: Vec<SearchHit>,
2332    relaxed: Vec<SearchHit>,
2333    vector: Vec<SearchHit>,
2334    limit: usize,
2335) -> Vec<SearchHit> {
2336    let strict_block = dedup_branch_hits(strict);
2337    let relaxed_block = dedup_branch_hits(relaxed);
2338    let vector_block = dedup_branch_hits(vector);
2339
2340    let mut seen: std::collections::HashSet<String> = strict_block
2341        .iter()
2342        .map(|h| h.node.logical_id.clone())
2343        .collect();
2344
2345    let mut merged = strict_block;
2346    for hit in relaxed_block {
2347        if seen.insert(hit.node.logical_id.clone()) {
2348            merged.push(hit);
2349        }
2350    }
2351    for hit in vector_block {
2352        if seen.insert(hit.node.logical_id.clone()) {
2353            merged.push(hit);
2354        }
2355    }
2356
2357    if merged.len() > limit {
2358        merged.truncate(limit);
2359    }
2360    merged
2361}
2362
2363/// Sort a branch's hits by score descending + `logical_id` ascending, then
2364/// dedup duplicate `logical_id`s within the branch using source priority
2365/// (chunk > property > vector) and declaration order.
2366fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2367    hits.sort_by(|a, b| {
2368        b.score
2369            .partial_cmp(&a.score)
2370            .unwrap_or(std::cmp::Ordering::Equal)
2371            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2372            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2373    });
2374
2375    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2376    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2377    hits
2378}
2379
2380fn source_priority(source: SearchHitSource) -> u8 {
2381    // Lower is better. Chunk is declared before property in the CTE; vector
2382    // is reserved for future wiring but comes last among the known variants.
2383    match source {
2384        SearchHitSource::Chunk => 0,
2385        SearchHitSource::Property => 1,
2386        SearchHitSource::Vector => 2,
2387    }
2388}
2389
2390/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
2391/// output so the coordinator can recover per-term byte offsets in the
2392/// original `text_content` column.
2393///
2394/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
2395/// ("start of text") byte. These bytes are safe for all valid JSON text
2396/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
2397/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
2398/// extracted blob, making the sentinel ambiguous for that row. Such input
2399/// is treated as out of scope for attribution correctness — hits on those
2400/// rows may have misattributed `matched_paths`, but no panic or query
2401/// failure occurs. A future hardening step could strip bytes < 0x20 at
2402/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
2403///
2404/// Using one-byte markers keeps the original-to-highlighted offset
2405/// accounting trivial: every sentinel adds exactly one byte to the
2406/// highlighted string.
2407const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2408const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2409
2410/// Load the `fts_node_property_positions` sidecar rows for a given
2411/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
2412/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
2413fn load_position_map(
2414    conn: &Connection,
2415    logical_id: &str,
2416    kind: &str,
2417) -> Result<Vec<(usize, usize, String)>, EngineError> {
2418    let mut stmt = conn
2419        .prepare_cached(
2420            "SELECT start_offset, end_offset, leaf_path \
2421             FROM fts_node_property_positions \
2422             WHERE node_logical_id = ?1 AND kind = ?2 \
2423             ORDER BY start_offset ASC",
2424        )
2425        .map_err(EngineError::Sqlite)?;
2426    let rows = stmt
2427        .query_map(rusqlite::params![logical_id, kind], |row| {
2428            let start: i64 = row.get(0)?;
2429            let end: i64 = row.get(1)?;
2430            let path: String = row.get(2)?;
2431            // Offsets are non-negative and within blob byte limits; on the
2432            // off chance a corrupt row is encountered, fall back to 0 so
2433            // lookups silently skip it rather than panicking.
2434            let start = usize::try_from(start).unwrap_or(0);
2435            let end = usize::try_from(end).unwrap_or(0);
2436            Ok((start, end, path))
2437        })
2438        .map_err(EngineError::Sqlite)?;
2439    let mut out = Vec::new();
2440    for row in rows {
2441        out.push(row.map_err(EngineError::Sqlite)?);
2442    }
2443    Ok(out)
2444}
2445
2446/// Parse a `highlight()`-wrapped string, returning the list of original-text
2447/// byte offsets at which matched terms begin. `wrapped` is the
2448/// highlight-decorated form of a text column; `open` / `close` are the
2449/// sentinel markers passed to `highlight()`. The returned offsets refer to
2450/// positions in the *original* text (i.e. the column as it would be stored
2451/// without highlight decoration).
2452fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2453    let mut offsets = Vec::new();
2454    let bytes = wrapped.as_bytes();
2455    let open_bytes = open.as_bytes();
2456    let close_bytes = close.as_bytes();
2457    let mut i = 0usize;
2458    // Number of sentinel bytes consumed so far — every marker encountered
2459    // subtracts from the wrapped-string index to get the original offset.
2460    let mut marker_bytes_seen = 0usize;
2461    while i < bytes.len() {
2462        if bytes[i..].starts_with(open_bytes) {
2463            // Record the original-text offset of the term following the open
2464            // marker.
2465            let original_offset = i - marker_bytes_seen;
2466            offsets.push(original_offset);
2467            i += open_bytes.len();
2468            marker_bytes_seen += open_bytes.len();
2469        } else if bytes[i..].starts_with(close_bytes) {
2470            i += close_bytes.len();
2471            marker_bytes_seen += close_bytes.len();
2472        } else {
2473            i += 1;
2474        }
2475    }
2476    offsets
2477}
2478
2479/// Binary-search the position map for the leaf whose `[start, end)` range
2480/// contains `offset`. Returns `None` if no leaf covers the offset.
2481fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
2482    // Binary search for the greatest start_offset <= offset.
2483    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
2484        Ok(i) => i,
2485        Err(0) => return None,
2486        Err(i) => i - 1,
2487    };
2488    let (start, end, path) = &positions[idx];
2489    if offset >= *start && offset < *end {
2490        Some(path.as_str())
2491    } else {
2492        None
2493    }
2494}
2495
2496/// Resolve per-hit match attribution by introspecting the FTS5 match state
2497/// for the hit's row via `highlight()` and mapping the resulting original-
2498/// text offsets back to recursive-leaf paths via the Phase 4 position map.
2499///
2500/// Chunk-backed hits carry no leaf structure and always return an empty
2501/// `matched_paths` vector. Property-backed hits without a `projection_row_id`
2502/// (which should not happen — the search CTE always populates it) also
2503/// return empty attribution rather than erroring.
2504fn resolve_hit_attribution(
2505    conn: &Connection,
2506    hit: &SearchHit,
2507    match_expr: &str,
2508) -> Result<HitAttribution, EngineError> {
2509    if !matches!(hit.source, SearchHitSource::Property) {
2510        return Ok(HitAttribution {
2511            matched_paths: Vec::new(),
2512        });
2513    }
2514    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
2515        return Ok(HitAttribution {
2516            matched_paths: Vec::new(),
2517        });
2518    };
2519    let rowid: i64 = match rowid_str.parse() {
2520        Ok(v) => v,
2521        Err(_) => {
2522            return Ok(HitAttribution {
2523                matched_paths: Vec::new(),
2524            });
2525        }
2526    };
2527
2528    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
2529    // FTS5 MATCH in the WHERE clause re-establishes the match state that
2530    // `highlight()` needs to decorate the returned text.
2531    let mut stmt = conn
2532        .prepare_cached(
2533            "SELECT highlight(fts_node_properties, 2, ?1, ?2) \
2534             FROM fts_node_properties \
2535             WHERE rowid = ?3 AND fts_node_properties MATCH ?4",
2536        )
2537        .map_err(EngineError::Sqlite)?;
2538    let wrapped: Option<String> = stmt
2539        .query_row(
2540            rusqlite::params![
2541                ATTRIBUTION_HIGHLIGHT_OPEN,
2542                ATTRIBUTION_HIGHLIGHT_CLOSE,
2543                rowid,
2544                match_expr,
2545            ],
2546            |row| row.get(0),
2547        )
2548        .optional()
2549        .map_err(EngineError::Sqlite)?;
2550    let Some(wrapped) = wrapped else {
2551        return Ok(HitAttribution {
2552            matched_paths: Vec::new(),
2553        });
2554    };
2555
2556    let offsets = parse_highlight_offsets(
2557        &wrapped,
2558        ATTRIBUTION_HIGHLIGHT_OPEN,
2559        ATTRIBUTION_HIGHLIGHT_CLOSE,
2560    );
2561    if offsets.is_empty() {
2562        return Ok(HitAttribution {
2563            matched_paths: Vec::new(),
2564        });
2565    }
2566
2567    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
2568    if positions.is_empty() {
2569        // Scalar-only schemas have no position-map entries; attribution
2570        // degrades to an empty vector rather than erroring.
2571        return Ok(HitAttribution {
2572            matched_paths: Vec::new(),
2573        });
2574    }
2575
2576    let mut matched_paths: Vec<String> = Vec::new();
2577    for offset in offsets {
2578        if let Some(path) = find_leaf_for_offset(&positions, offset)
2579            && !matched_paths.iter().any(|p| p == path)
2580        {
2581            matched_paths.push(path.to_owned());
2582        }
2583    }
2584    Ok(HitAttribution { matched_paths })
2585}
2586
2587fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
2588    match value {
2589        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
2590        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
2591        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
2592    }
2593}
2594
2595#[cfg(test)]
2596#[allow(clippy::expect_used)]
2597mod tests {
2598    use std::panic::{AssertUnwindSafe, catch_unwind};
2599    use std::sync::Arc;
2600
2601    use fathomdb_query::{BindValue, QueryBuilder};
2602    use fathomdb_schema::SchemaManager;
2603    use rusqlite::types::Value;
2604    use tempfile::NamedTempFile;
2605
2606    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
2607
2608    use fathomdb_query::{
2609        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
2610    };
2611
2612    use super::{
2613        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
2614        wrap_node_row_projection_sql,
2615    };
2616
2617    fn mk_hit(
2618        logical_id: &str,
2619        score: f64,
2620        match_mode: SearchMatchMode,
2621        source: SearchHitSource,
2622    ) -> SearchHit {
2623        SearchHit {
2624            node: NodeRowLite {
2625                row_id: format!("{logical_id}-row"),
2626                logical_id: logical_id.to_owned(),
2627                kind: "Goal".to_owned(),
2628                properties: "{}".to_owned(),
2629                content_ref: None,
2630                last_accessed_at: None,
2631            },
2632            score,
2633            modality: RetrievalModality::Text,
2634            source,
2635            match_mode: Some(match_mode),
2636            snippet: None,
2637            written_at: 0,
2638            projection_row_id: None,
2639            vector_distance: None,
2640            attribution: None,
2641        }
2642    }
2643
2644    #[test]
2645    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
2646        let strict = vec![mk_hit(
2647            "a",
2648            1.0,
2649            SearchMatchMode::Strict,
2650            SearchHitSource::Chunk,
2651        )];
2652        // Relaxed has a higher score but must still come second.
2653        let relaxed = vec![mk_hit(
2654            "b",
2655            9.9,
2656            SearchMatchMode::Relaxed,
2657            SearchHitSource::Chunk,
2658        )];
2659        let merged = merge_search_branches(strict, relaxed, 10);
2660        assert_eq!(merged.len(), 2);
2661        assert_eq!(merged[0].node.logical_id, "a");
2662        assert!(matches!(
2663            merged[0].match_mode,
2664            Some(SearchMatchMode::Strict)
2665        ));
2666        assert_eq!(merged[1].node.logical_id, "b");
2667        assert!(matches!(
2668            merged[1].match_mode,
2669            Some(SearchMatchMode::Relaxed)
2670        ));
2671    }
2672
2673    #[test]
2674    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
2675        let strict = vec![mk_hit(
2676            "shared",
2677            1.0,
2678            SearchMatchMode::Strict,
2679            SearchHitSource::Chunk,
2680        )];
2681        let relaxed = vec![
2682            mk_hit(
2683                "shared",
2684                9.9,
2685                SearchMatchMode::Relaxed,
2686                SearchHitSource::Chunk,
2687            ),
2688            mk_hit(
2689                "other",
2690                2.0,
2691                SearchMatchMode::Relaxed,
2692                SearchHitSource::Chunk,
2693            ),
2694        ];
2695        let merged = merge_search_branches(strict, relaxed, 10);
2696        assert_eq!(merged.len(), 2);
2697        assert_eq!(merged[0].node.logical_id, "shared");
2698        assert!(matches!(
2699            merged[0].match_mode,
2700            Some(SearchMatchMode::Strict)
2701        ));
2702        assert_eq!(merged[1].node.logical_id, "other");
2703        assert!(matches!(
2704            merged[1].match_mode,
2705            Some(SearchMatchMode::Relaxed)
2706        ));
2707    }
2708
2709    #[test]
2710    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
2711        let strict = vec![
2712            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2713            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2714            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2715        ];
2716        let merged = merge_search_branches(strict, vec![], 10);
2717        assert_eq!(
2718            merged
2719                .iter()
2720                .map(|h| &h.node.logical_id)
2721                .collect::<Vec<_>>(),
2722            vec!["a", "c", "b"]
2723        );
2724    }
2725
2726    #[test]
2727    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
2728        let strict = vec![
2729            mk_hit(
2730                "shared",
2731                1.0,
2732                SearchMatchMode::Strict,
2733                SearchHitSource::Property,
2734            ),
2735            mk_hit(
2736                "shared",
2737                1.0,
2738                SearchMatchMode::Strict,
2739                SearchHitSource::Chunk,
2740            ),
2741        ];
2742        let merged = merge_search_branches(strict, vec![], 10);
2743        assert_eq!(merged.len(), 1);
2744        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
2745    }
2746
2747    #[test]
2748    fn merge_truncates_to_limit_after_block_merge() {
2749        let strict = vec![
2750            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2751            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2752        ];
2753        let relaxed = vec![mk_hit(
2754            "c",
2755            9.0,
2756            SearchMatchMode::Relaxed,
2757            SearchHitSource::Chunk,
2758        )];
2759        let merged = merge_search_branches(strict, relaxed, 2);
2760        assert_eq!(merged.len(), 2);
2761        assert_eq!(merged[0].node.logical_id, "a");
2762        assert_eq!(merged[1].node.logical_id, "b");
2763    }
2764
2765    /// P12 architectural pin: the generalized three-branch merger must
2766    /// produce strict -> relaxed -> vector block ordering with cross-branch
2767    /// dedup resolved by branch precedence (strict > relaxed > vector).
2768    /// v1 `search()` policy never fires the vector branch through the
2769    /// unified planner because read-time embedding is deferred, but the
2770    /// merge helper itself must be ready for the day the planner does so —
2771    /// otherwise wiring the future phase requires touching the core merge
2772    /// code as well as the planner.
2773    #[test]
2774    fn search_architecturally_supports_three_branch_fusion() {
2775        let strict = vec![mk_hit(
2776            "alpha",
2777            1.0,
2778            SearchMatchMode::Strict,
2779            SearchHitSource::Chunk,
2780        )];
2781        let relaxed = vec![mk_hit(
2782            "bravo",
2783            5.0,
2784            SearchMatchMode::Relaxed,
2785            SearchHitSource::Chunk,
2786        )];
2787        // Synthetic vector hit with the highest score. Three-block ordering
2788        // must still place it last.
2789        let mut vector_hit = mk_hit(
2790            "charlie",
2791            9.9,
2792            SearchMatchMode::Strict,
2793            SearchHitSource::Vector,
2794        );
2795        // Vector hits actually carry match_mode=None per the addendum, but
2796        // the merge helper's ordering is mode-agnostic; we override here to
2797        // pin the modality field for the test.
2798        vector_hit.match_mode = None;
2799        vector_hit.modality = RetrievalModality::Vector;
2800        let vector = vec![vector_hit];
2801
2802        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
2803        assert_eq!(merged.len(), 3);
2804        assert_eq!(merged[0].node.logical_id, "alpha");
2805        assert_eq!(merged[1].node.logical_id, "bravo");
2806        assert_eq!(merged[2].node.logical_id, "charlie");
2807        // Vector block comes last regardless of its higher score.
2808        assert!(matches!(merged[2].source, SearchHitSource::Vector));
2809
2810        // Cross-branch dedup: a logical_id that appears in multiple branches
2811        // is attributed to its highest-priority originating branch only.
2812        let strict2 = vec![mk_hit(
2813            "shared",
2814            0.5,
2815            SearchMatchMode::Strict,
2816            SearchHitSource::Chunk,
2817        )];
2818        let relaxed2 = vec![mk_hit(
2819            "shared",
2820            5.0,
2821            SearchMatchMode::Relaxed,
2822            SearchHitSource::Chunk,
2823        )];
2824        let mut vshared = mk_hit(
2825            "shared",
2826            9.9,
2827            SearchMatchMode::Strict,
2828            SearchHitSource::Vector,
2829        );
2830        vshared.match_mode = None;
2831        vshared.modality = RetrievalModality::Vector;
2832        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
2833        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
2834        assert!(matches!(
2835            merged2[0].match_mode,
2836            Some(SearchMatchMode::Strict)
2837        ));
2838        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
2839
2840        // Relaxed wins over vector when strict is absent.
2841        let mut vshared2 = mk_hit(
2842            "shared",
2843            9.9,
2844            SearchMatchMode::Strict,
2845            SearchHitSource::Vector,
2846        );
2847        vshared2.match_mode = None;
2848        vshared2.modality = RetrievalModality::Vector;
2849        let merged3 = merge_search_branches_three(
2850            vec![],
2851            vec![mk_hit(
2852                "shared",
2853                1.0,
2854                SearchMatchMode::Relaxed,
2855                SearchHitSource::Chunk,
2856            )],
2857            vec![vshared2],
2858            10,
2859        );
2860        assert_eq!(merged3.len(), 1);
2861        assert!(matches!(
2862            merged3[0].match_mode,
2863            Some(SearchMatchMode::Relaxed)
2864        ));
2865    }
2866
2867    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
2868    /// never fires this shape today (read-time embedding is deferred), but
2869    /// when it does the merger will see empty strict + empty relaxed + a
2870    /// non-empty vector block. The three-branch merger must pass that
2871    /// block through unchanged, preserving `RetrievalModality::Vector`,
2872    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
2873    ///
2874    /// Note: the review spec asked for `vector_hit_count == 1` /
2875    /// `strict_hit_count == 0` assertions. Those are fields on
2876    /// `SearchRows`, which is assembled one layer up in
2877    /// `execute_compiled_retrieval_plan`. The merger returns a bare
2878    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
2879    /// directly on the returned vec (block shape + per-hit fields).
2880    #[test]
2881    fn merge_search_branches_three_vector_only_preserves_vector_block() {
2882        let mut vector_hit = mk_hit(
2883            "solo",
2884            0.75,
2885            SearchMatchMode::Strict,
2886            SearchHitSource::Vector,
2887        );
2888        vector_hit.match_mode = None;
2889        vector_hit.modality = RetrievalModality::Vector;
2890
2891        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
2892
2893        assert_eq!(merged.len(), 1);
2894        assert_eq!(merged[0].node.logical_id, "solo");
2895        assert!(matches!(merged[0].source, SearchHitSource::Vector));
2896        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
2897        assert!(
2898            merged[0].match_mode.is_none(),
2899            "vector hits carry match_mode=None per addendum 1"
2900        );
2901    }
2902
2903    /// P12-N-3: limit truncation must preserve block precedence — when the
2904    /// strict block alone already exceeds the limit, relaxed and vector
2905    /// hits must be dropped entirely even if they have higher raw scores.
2906    ///
2907    /// Note: the review spec asked for `strict_hit_count == 2` /
2908    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
2909    /// are `SearchRows` fields assembled one layer up. Since
2910    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
2911    /// test asserts the corresponding invariants directly: the returned
2912    /// vec contains exactly the top two strict hits, with no relaxed or
2913    /// vector hits leaking past the limit.
2914    #[test]
2915    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
2916        let strict = vec![
2917            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2918            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2919            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2920        ];
2921        let relaxed = vec![mk_hit(
2922            "d",
2923            9.0,
2924            SearchMatchMode::Relaxed,
2925            SearchHitSource::Chunk,
2926        )];
2927        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
2928        vector_hit.match_mode = None;
2929        vector_hit.modality = RetrievalModality::Vector;
2930        let vector = vec![vector_hit];
2931
2932        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
2933
2934        assert_eq!(merged.len(), 2);
2935        assert_eq!(merged[0].node.logical_id, "a");
2936        assert_eq!(merged[1].node.logical_id, "b");
2937        // Neither relaxed nor vector hits made it past the limit.
2938        assert!(
2939            merged
2940                .iter()
2941                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
2942            "strict block must win limit contention against higher-scored relaxed/vector hits"
2943        );
2944        assert!(
2945            merged
2946                .iter()
2947                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
2948            "no vector source hits should leak past the limit"
2949        );
2950    }
2951
2952    #[test]
2953    fn is_vec_table_absent_matches_known_error_messages() {
2954        use rusqlite::ffi;
2955        fn make_err(msg: &str) -> rusqlite::Error {
2956            rusqlite::Error::SqliteFailure(
2957                ffi::Error {
2958                    code: ffi::ErrorCode::Unknown,
2959                    extended_code: 1,
2960                },
2961                Some(msg.to_owned()),
2962            )
2963        }
2964        assert!(is_vec_table_absent(&make_err(
2965            "no such table: vec_nodes_active"
2966        )));
2967        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
2968        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
2969        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
2970        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
2971    }
2972
2973    #[test]
2974    fn bind_value_text_maps_to_sql_text() {
2975        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
2976        assert_eq!(val, Value::Text("hello".to_owned()));
2977    }
2978
2979    #[test]
2980    fn bind_value_integer_maps_to_sql_integer() {
2981        let val = bind_value_to_sql(&BindValue::Integer(42));
2982        assert_eq!(val, Value::Integer(42));
2983    }
2984
2985    #[test]
2986    fn bind_value_bool_true_maps_to_integer_one() {
2987        let val = bind_value_to_sql(&BindValue::Bool(true));
2988        assert_eq!(val, Value::Integer(1));
2989    }
2990
2991    #[test]
2992    fn bind_value_bool_false_maps_to_integer_zero() {
2993        let val = bind_value_to_sql(&BindValue::Bool(false));
2994        assert_eq!(val, Value::Integer(0));
2995    }
2996
2997    #[test]
2998    fn same_shape_queries_share_one_cache_entry() {
2999        let db = NamedTempFile::new().expect("temporary db");
3000        let coordinator = ExecutionCoordinator::open(
3001            db.path(),
3002            Arc::new(SchemaManager::new()),
3003            None,
3004            1,
3005            Arc::new(TelemetryCounters::default()),
3006            None,
3007        )
3008        .expect("coordinator");
3009
3010        let compiled_a = QueryBuilder::nodes("Meeting")
3011            .text_search("budget", 5)
3012            .limit(10)
3013            .compile()
3014            .expect("compiled a");
3015        let compiled_b = QueryBuilder::nodes("Meeting")
3016            .text_search("standup", 5)
3017            .limit(10)
3018            .compile()
3019            .expect("compiled b");
3020
3021        coordinator
3022            .execute_compiled_read(&compiled_a)
3023            .expect("read a");
3024        coordinator
3025            .execute_compiled_read(&compiled_b)
3026            .expect("read b");
3027
3028        assert_eq!(
3029            compiled_a.shape_hash, compiled_b.shape_hash,
3030            "different bind values, same structural shape → same hash"
3031        );
3032        assert_eq!(coordinator.shape_sql_count(), 1);
3033    }
3034
3035    #[test]
3036    fn vector_read_degrades_gracefully_when_vec_table_absent() {
3037        let db = NamedTempFile::new().expect("temporary db");
3038        let coordinator = ExecutionCoordinator::open(
3039            db.path(),
3040            Arc::new(SchemaManager::new()),
3041            None,
3042            1,
3043            Arc::new(TelemetryCounters::default()),
3044            None,
3045        )
3046        .expect("coordinator");
3047
3048        let compiled = QueryBuilder::nodes("Meeting")
3049            .vector_search("budget embeddings", 5)
3050            .compile()
3051            .expect("vector query compiles");
3052
3053        let result = coordinator.execute_compiled_read(&compiled);
3054        let rows = result.expect("degraded read must succeed, not error");
3055        assert!(
3056            rows.was_degraded,
3057            "result must be flagged as degraded when vec_nodes_active is absent"
3058        );
3059        assert!(
3060            rows.nodes.is_empty(),
3061            "degraded result must return empty nodes"
3062        );
3063    }
3064
3065    #[test]
3066    fn coordinator_caches_by_shape_hash() {
3067        let db = NamedTempFile::new().expect("temporary db");
3068        let coordinator = ExecutionCoordinator::open(
3069            db.path(),
3070            Arc::new(SchemaManager::new()),
3071            None,
3072            1,
3073            Arc::new(TelemetryCounters::default()),
3074            None,
3075        )
3076        .expect("coordinator");
3077
3078        let compiled = QueryBuilder::nodes("Meeting")
3079            .text_search("budget", 5)
3080            .compile()
3081            .expect("compiled query");
3082
3083        coordinator
3084            .execute_compiled_read(&compiled)
3085            .expect("execute compiled read");
3086        assert_eq!(coordinator.shape_sql_count(), 1);
3087    }
3088
3089    // --- Item 6: explain_compiled_read tests ---
3090
3091    #[test]
3092    fn explain_returns_correct_sql() {
3093        let db = NamedTempFile::new().expect("temporary db");
3094        let coordinator = ExecutionCoordinator::open(
3095            db.path(),
3096            Arc::new(SchemaManager::new()),
3097            None,
3098            1,
3099            Arc::new(TelemetryCounters::default()),
3100            None,
3101        )
3102        .expect("coordinator");
3103
3104        let compiled = QueryBuilder::nodes("Meeting")
3105            .text_search("budget", 5)
3106            .compile()
3107            .expect("compiled query");
3108
3109        let plan = coordinator.explain_compiled_read(&compiled);
3110
3111        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3112    }
3113
3114    #[test]
3115    fn explain_returns_correct_driving_table() {
3116        use fathomdb_query::DrivingTable;
3117
3118        let db = NamedTempFile::new().expect("temporary db");
3119        let coordinator = ExecutionCoordinator::open(
3120            db.path(),
3121            Arc::new(SchemaManager::new()),
3122            None,
3123            1,
3124            Arc::new(TelemetryCounters::default()),
3125            None,
3126        )
3127        .expect("coordinator");
3128
3129        let compiled = QueryBuilder::nodes("Meeting")
3130            .text_search("budget", 5)
3131            .compile()
3132            .expect("compiled query");
3133
3134        let plan = coordinator.explain_compiled_read(&compiled);
3135
3136        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3137    }
3138
3139    #[test]
3140    fn explain_reports_cache_miss_then_hit() {
3141        let db = NamedTempFile::new().expect("temporary db");
3142        let coordinator = ExecutionCoordinator::open(
3143            db.path(),
3144            Arc::new(SchemaManager::new()),
3145            None,
3146            1,
3147            Arc::new(TelemetryCounters::default()),
3148            None,
3149        )
3150        .expect("coordinator");
3151
3152        let compiled = QueryBuilder::nodes("Meeting")
3153            .text_search("budget", 5)
3154            .compile()
3155            .expect("compiled query");
3156
3157        // Before execution: cache miss
3158        let plan_before = coordinator.explain_compiled_read(&compiled);
3159        assert!(
3160            !plan_before.cache_hit,
3161            "cache miss expected before first execute"
3162        );
3163
3164        // Execute to populate cache
3165        coordinator
3166            .execute_compiled_read(&compiled)
3167            .expect("execute read");
3168
3169        // After execution: cache hit
3170        let plan_after = coordinator.explain_compiled_read(&compiled);
3171        assert!(
3172            plan_after.cache_hit,
3173            "cache hit expected after first execute"
3174        );
3175    }
3176
3177    #[test]
3178    fn explain_does_not_execute_query() {
3179        // Call explain_compiled_read on an empty database. If explain were
3180        // actually executing SQL, it would return Ok with 0 rows. But the
3181        // key assertion is that it returns a QueryPlan (not an error) even
3182        // without touching the database.
3183        let db = NamedTempFile::new().expect("temporary db");
3184        let coordinator = ExecutionCoordinator::open(
3185            db.path(),
3186            Arc::new(SchemaManager::new()),
3187            None,
3188            1,
3189            Arc::new(TelemetryCounters::default()),
3190            None,
3191        )
3192        .expect("coordinator");
3193
3194        let compiled = QueryBuilder::nodes("Meeting")
3195            .text_search("anything", 5)
3196            .compile()
3197            .expect("compiled query");
3198
3199        // This must not error, even though the database is empty
3200        let plan = coordinator.explain_compiled_read(&compiled);
3201
3202        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3203        assert_eq!(plan.bind_count, compiled.binds.len());
3204    }
3205
3206    #[test]
3207    fn coordinator_executes_compiled_read() {
3208        let db = NamedTempFile::new().expect("temporary db");
3209        let coordinator = ExecutionCoordinator::open(
3210            db.path(),
3211            Arc::new(SchemaManager::new()),
3212            None,
3213            1,
3214            Arc::new(TelemetryCounters::default()),
3215            None,
3216        )
3217        .expect("coordinator");
3218        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3219
3220        conn.execute_batch(
3221            r#"
3222            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3223            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3224            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3225            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
3226            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3227            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
3228            "#,
3229        )
3230        .expect("seed data");
3231
3232        let compiled = QueryBuilder::nodes("Meeting")
3233            .text_search("budget", 5)
3234            .limit(5)
3235            .compile()
3236            .expect("compiled query");
3237
3238        let rows = coordinator
3239            .execute_compiled_read(&compiled)
3240            .expect("execute read");
3241
3242        assert_eq!(rows.nodes.len(), 1);
3243        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3244    }
3245
3246    #[test]
3247    fn text_search_finds_structured_only_node_via_property_fts() {
3248        let db = NamedTempFile::new().expect("temporary db");
3249        let coordinator = ExecutionCoordinator::open(
3250            db.path(),
3251            Arc::new(SchemaManager::new()),
3252            None,
3253            1,
3254            Arc::new(TelemetryCounters::default()),
3255            None,
3256        )
3257        .expect("coordinator");
3258        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3259
3260        // Insert a structured-only node (no chunks) with a property FTS row.
3261        conn.execute_batch(
3262            r#"
3263            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3264            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
3265            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
3266            VALUES ('goal-1', 'Goal', 'Ship v2');
3267            "#,
3268        )
3269        .expect("seed data");
3270
3271        let compiled = QueryBuilder::nodes("Goal")
3272            .text_search("Ship", 5)
3273            .limit(5)
3274            .compile()
3275            .expect("compiled query");
3276
3277        let rows = coordinator
3278            .execute_compiled_read(&compiled)
3279            .expect("execute read");
3280
3281        assert_eq!(rows.nodes.len(), 1);
3282        assert_eq!(rows.nodes[0].logical_id, "goal-1");
3283    }
3284
3285    #[test]
3286    fn text_search_returns_both_chunk_and_property_backed_hits() {
3287        let db = NamedTempFile::new().expect("temporary db");
3288        let coordinator = ExecutionCoordinator::open(
3289            db.path(),
3290            Arc::new(SchemaManager::new()),
3291            None,
3292            1,
3293            Arc::new(TelemetryCounters::default()),
3294            None,
3295        )
3296        .expect("coordinator");
3297        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3298
3299        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
3300        conn.execute_batch(
3301            r"
3302            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3303            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3304            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3305            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3306            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3307            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3308            ",
3309        )
3310        .expect("seed chunk-backed node");
3311
3312        // Property-backed hit: a Meeting with property FTS containing "quarterly".
3313        conn.execute_batch(
3314            r#"
3315            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3316            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3317            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
3318            VALUES ('meeting-2', 'Meeting', 'quarterly sync');
3319            "#,
3320        )
3321        .expect("seed property-backed node");
3322
3323        let compiled = QueryBuilder::nodes("Meeting")
3324            .text_search("quarterly", 10)
3325            .limit(10)
3326            .compile()
3327            .expect("compiled query");
3328
3329        let rows = coordinator
3330            .execute_compiled_read(&compiled)
3331            .expect("execute read");
3332
3333        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3334        ids.sort_unstable();
3335        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3336    }
3337
3338    #[test]
3339    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3340        let db = NamedTempFile::new().expect("temporary db");
3341        let coordinator = ExecutionCoordinator::open(
3342            db.path(),
3343            Arc::new(SchemaManager::new()),
3344            None,
3345            1,
3346            Arc::new(TelemetryCounters::default()),
3347            None,
3348        )
3349        .expect("coordinator");
3350        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3351
3352        conn.execute_batch(
3353            r"
3354            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3355            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3356            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3357            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3358            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3359            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3360            ",
3361        )
3362        .expect("seed chunk-backed node");
3363
3364        let compiled = QueryBuilder::nodes("Meeting")
3365            .text_search("not a ship", 10)
3366            .limit(10)
3367            .compile()
3368            .expect("compiled query");
3369
3370        let rows = coordinator
3371            .execute_compiled_read(&compiled)
3372            .expect("execute read");
3373
3374        assert_eq!(rows.nodes.len(), 1);
3375        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3376    }
3377
3378    // --- Item 1: capability gate tests ---
3379
3380    #[test]
3381    fn capability_gate_reports_false_without_feature() {
3382        let db = NamedTempFile::new().expect("temporary db");
3383        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
3384        // when no dimension is requested (the vector profile is never bootstrapped).
3385        let coordinator = ExecutionCoordinator::open(
3386            db.path(),
3387            Arc::new(SchemaManager::new()),
3388            None,
3389            1,
3390            Arc::new(TelemetryCounters::default()),
3391            None,
3392        )
3393        .expect("coordinator");
3394        assert!(
3395            !coordinator.vector_enabled(),
3396            "vector_enabled must be false when no dimension is requested"
3397        );
3398    }
3399
3400    #[cfg(feature = "sqlite-vec")]
3401    #[test]
3402    fn capability_gate_reports_true_when_feature_enabled() {
3403        let db = NamedTempFile::new().expect("temporary db");
3404        let coordinator = ExecutionCoordinator::open(
3405            db.path(),
3406            Arc::new(SchemaManager::new()),
3407            Some(128),
3408            1,
3409            Arc::new(TelemetryCounters::default()),
3410            None,
3411        )
3412        .expect("coordinator");
3413        assert!(
3414            coordinator.vector_enabled(),
3415            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3416        );
3417    }
3418
3419    // --- Item 4: runtime table read tests ---
3420
3421    #[test]
3422    fn read_run_returns_inserted_run() {
3423        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3424
3425        let db = NamedTempFile::new().expect("temporary db");
3426        let writer = WriterActor::start(
3427            db.path(),
3428            Arc::new(SchemaManager::new()),
3429            ProvenanceMode::Warn,
3430            Arc::new(TelemetryCounters::default()),
3431        )
3432        .expect("writer");
3433        writer
3434            .submit(WriteRequest {
3435                label: "runtime".to_owned(),
3436                nodes: vec![],
3437                node_retires: vec![],
3438                edges: vec![],
3439                edge_retires: vec![],
3440                chunks: vec![],
3441                runs: vec![RunInsert {
3442                    id: "run-r1".to_owned(),
3443                    kind: "session".to_owned(),
3444                    status: "active".to_owned(),
3445                    properties: "{}".to_owned(),
3446                    source_ref: Some("src-1".to_owned()),
3447                    upsert: false,
3448                    supersedes_id: None,
3449                }],
3450                steps: vec![],
3451                actions: vec![],
3452                optional_backfills: vec![],
3453                vec_inserts: vec![],
3454                operational_writes: vec![],
3455            })
3456            .expect("write run");
3457
3458        let coordinator = ExecutionCoordinator::open(
3459            db.path(),
3460            Arc::new(SchemaManager::new()),
3461            None,
3462            1,
3463            Arc::new(TelemetryCounters::default()),
3464            None,
3465        )
3466        .expect("coordinator");
3467        let row = coordinator
3468            .read_run("run-r1")
3469            .expect("read_run")
3470            .expect("row exists");
3471        assert_eq!(row.id, "run-r1");
3472        assert_eq!(row.kind, "session");
3473        assert_eq!(row.status, "active");
3474    }
3475
3476    #[test]
3477    fn read_step_returns_inserted_step() {
3478        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
3479
3480        let db = NamedTempFile::new().expect("temporary db");
3481        let writer = WriterActor::start(
3482            db.path(),
3483            Arc::new(SchemaManager::new()),
3484            ProvenanceMode::Warn,
3485            Arc::new(TelemetryCounters::default()),
3486        )
3487        .expect("writer");
3488        writer
3489            .submit(WriteRequest {
3490                label: "runtime".to_owned(),
3491                nodes: vec![],
3492                node_retires: vec![],
3493                edges: vec![],
3494                edge_retires: vec![],
3495                chunks: vec![],
3496                runs: vec![RunInsert {
3497                    id: "run-s1".to_owned(),
3498                    kind: "session".to_owned(),
3499                    status: "active".to_owned(),
3500                    properties: "{}".to_owned(),
3501                    source_ref: Some("src-1".to_owned()),
3502                    upsert: false,
3503                    supersedes_id: None,
3504                }],
3505                steps: vec![StepInsert {
3506                    id: "step-s1".to_owned(),
3507                    run_id: "run-s1".to_owned(),
3508                    kind: "llm".to_owned(),
3509                    status: "completed".to_owned(),
3510                    properties: "{}".to_owned(),
3511                    source_ref: Some("src-1".to_owned()),
3512                    upsert: false,
3513                    supersedes_id: None,
3514                }],
3515                actions: vec![],
3516                optional_backfills: vec![],
3517                vec_inserts: vec![],
3518                operational_writes: vec![],
3519            })
3520            .expect("write step");
3521
3522        let coordinator = ExecutionCoordinator::open(
3523            db.path(),
3524            Arc::new(SchemaManager::new()),
3525            None,
3526            1,
3527            Arc::new(TelemetryCounters::default()),
3528            None,
3529        )
3530        .expect("coordinator");
3531        let row = coordinator
3532            .read_step("step-s1")
3533            .expect("read_step")
3534            .expect("row exists");
3535        assert_eq!(row.id, "step-s1");
3536        assert_eq!(row.run_id, "run-s1");
3537        assert_eq!(row.kind, "llm");
3538    }
3539
3540    #[test]
3541    fn read_action_returns_inserted_action() {
3542        use crate::{
3543            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
3544            writer::{ActionInsert, StepInsert},
3545        };
3546
3547        let db = NamedTempFile::new().expect("temporary db");
3548        let writer = WriterActor::start(
3549            db.path(),
3550            Arc::new(SchemaManager::new()),
3551            ProvenanceMode::Warn,
3552            Arc::new(TelemetryCounters::default()),
3553        )
3554        .expect("writer");
3555        writer
3556            .submit(WriteRequest {
3557                label: "runtime".to_owned(),
3558                nodes: vec![],
3559                node_retires: vec![],
3560                edges: vec![],
3561                edge_retires: vec![],
3562                chunks: vec![],
3563                runs: vec![RunInsert {
3564                    id: "run-a1".to_owned(),
3565                    kind: "session".to_owned(),
3566                    status: "active".to_owned(),
3567                    properties: "{}".to_owned(),
3568                    source_ref: Some("src-1".to_owned()),
3569                    upsert: false,
3570                    supersedes_id: None,
3571                }],
3572                steps: vec![StepInsert {
3573                    id: "step-a1".to_owned(),
3574                    run_id: "run-a1".to_owned(),
3575                    kind: "llm".to_owned(),
3576                    status: "completed".to_owned(),
3577                    properties: "{}".to_owned(),
3578                    source_ref: Some("src-1".to_owned()),
3579                    upsert: false,
3580                    supersedes_id: None,
3581                }],
3582                actions: vec![ActionInsert {
3583                    id: "action-a1".to_owned(),
3584                    step_id: "step-a1".to_owned(),
3585                    kind: "emit".to_owned(),
3586                    status: "completed".to_owned(),
3587                    properties: "{}".to_owned(),
3588                    source_ref: Some("src-1".to_owned()),
3589                    upsert: false,
3590                    supersedes_id: None,
3591                }],
3592                optional_backfills: vec![],
3593                vec_inserts: vec![],
3594                operational_writes: vec![],
3595            })
3596            .expect("write action");
3597
3598        let coordinator = ExecutionCoordinator::open(
3599            db.path(),
3600            Arc::new(SchemaManager::new()),
3601            None,
3602            1,
3603            Arc::new(TelemetryCounters::default()),
3604            None,
3605        )
3606        .expect("coordinator");
3607        let row = coordinator
3608            .read_action("action-a1")
3609            .expect("read_action")
3610            .expect("row exists");
3611        assert_eq!(row.id, "action-a1");
3612        assert_eq!(row.step_id, "step-a1");
3613        assert_eq!(row.kind, "emit");
3614    }
3615
3616    #[test]
3617    fn read_active_runs_excludes_superseded() {
3618        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3619
3620        let db = NamedTempFile::new().expect("temporary db");
3621        let writer = WriterActor::start(
3622            db.path(),
3623            Arc::new(SchemaManager::new()),
3624            ProvenanceMode::Warn,
3625            Arc::new(TelemetryCounters::default()),
3626        )
3627        .expect("writer");
3628
3629        // Insert original run
3630        writer
3631            .submit(WriteRequest {
3632                label: "v1".to_owned(),
3633                nodes: vec![],
3634                node_retires: vec![],
3635                edges: vec![],
3636                edge_retires: vec![],
3637                chunks: vec![],
3638                runs: vec![RunInsert {
3639                    id: "run-v1".to_owned(),
3640                    kind: "session".to_owned(),
3641                    status: "active".to_owned(),
3642                    properties: "{}".to_owned(),
3643                    source_ref: Some("src-1".to_owned()),
3644                    upsert: false,
3645                    supersedes_id: None,
3646                }],
3647                steps: vec![],
3648                actions: vec![],
3649                optional_backfills: vec![],
3650                vec_inserts: vec![],
3651                operational_writes: vec![],
3652            })
3653            .expect("v1 write");
3654
3655        // Supersede original run with v2
3656        writer
3657            .submit(WriteRequest {
3658                label: "v2".to_owned(),
3659                nodes: vec![],
3660                node_retires: vec![],
3661                edges: vec![],
3662                edge_retires: vec![],
3663                chunks: vec![],
3664                runs: vec![RunInsert {
3665                    id: "run-v2".to_owned(),
3666                    kind: "session".to_owned(),
3667                    status: "completed".to_owned(),
3668                    properties: "{}".to_owned(),
3669                    source_ref: Some("src-2".to_owned()),
3670                    upsert: true,
3671                    supersedes_id: Some("run-v1".to_owned()),
3672                }],
3673                steps: vec![],
3674                actions: vec![],
3675                optional_backfills: vec![],
3676                vec_inserts: vec![],
3677                operational_writes: vec![],
3678            })
3679            .expect("v2 write");
3680
3681        let coordinator = ExecutionCoordinator::open(
3682            db.path(),
3683            Arc::new(SchemaManager::new()),
3684            None,
3685            1,
3686            Arc::new(TelemetryCounters::default()),
3687            None,
3688        )
3689        .expect("coordinator");
3690        let active = coordinator.read_active_runs().expect("read_active_runs");
3691
3692        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
3693        assert_eq!(active[0].id, "run-v2");
3694    }
3695
3696    #[allow(clippy::panic)]
3697    fn poison_connection(coordinator: &ExecutionCoordinator) {
3698        let result = catch_unwind(AssertUnwindSafe(|| {
3699            let _guard = coordinator.pool.connections[0]
3700                .lock()
3701                .expect("poison test lock");
3702            panic!("poison coordinator connection mutex");
3703        }));
3704        assert!(
3705            result.is_err(),
3706            "poison test must unwind while holding the connection mutex"
3707        );
3708    }
3709
3710    #[allow(clippy::panic)]
3711    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
3712    where
3713        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
3714    {
3715        match op(coordinator) {
3716            Err(EngineError::Bridge(message)) => {
3717                assert_eq!(message, "connection mutex poisoned");
3718            }
3719            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
3720            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
3721        }
3722    }
3723
3724    #[test]
3725    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
3726        let db = NamedTempFile::new().expect("temporary db");
3727        let coordinator = ExecutionCoordinator::open(
3728            db.path(),
3729            Arc::new(SchemaManager::new()),
3730            None,
3731            1,
3732            Arc::new(TelemetryCounters::default()),
3733            None,
3734        )
3735        .expect("coordinator");
3736
3737        poison_connection(&coordinator);
3738
3739        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
3740        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
3741        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
3742        assert_poisoned_connection_error(
3743            &coordinator,
3744            super::ExecutionCoordinator::read_active_runs,
3745        );
3746        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
3747        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
3748    }
3749
3750    // --- M-2: Bounded shape cache ---
3751
3752    #[test]
3753    fn shape_cache_stays_bounded() {
3754        use fathomdb_query::ShapeHash;
3755
3756        let db = NamedTempFile::new().expect("temporary db");
3757        let coordinator = ExecutionCoordinator::open(
3758            db.path(),
3759            Arc::new(SchemaManager::new()),
3760            None,
3761            1,
3762            Arc::new(TelemetryCounters::default()),
3763            None,
3764        )
3765        .expect("coordinator");
3766
3767        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
3768        {
3769            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
3770            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
3771                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
3772            }
3773        }
3774        // The cache is now over the limit but hasn't been pruned yet (pruning
3775        // happens on the insert path in execute_compiled_read).
3776
3777        // Execute a compiled read to trigger the bounded-cache check.
3778        let compiled = QueryBuilder::nodes("Meeting")
3779            .text_search("budget", 5)
3780            .limit(10)
3781            .compile()
3782            .expect("compiled query");
3783
3784        coordinator
3785            .execute_compiled_read(&compiled)
3786            .expect("execute read");
3787
3788        assert!(
3789            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
3790            "shape cache must stay bounded: got {} entries, max {}",
3791            coordinator.shape_sql_count(),
3792            super::MAX_SHAPE_CACHE_SIZE
3793        );
3794    }
3795
3796    // --- M-1: Read pool size ---
3797
3798    #[test]
3799    fn read_pool_size_configurable() {
3800        let db = NamedTempFile::new().expect("temporary db");
3801        let coordinator = ExecutionCoordinator::open(
3802            db.path(),
3803            Arc::new(SchemaManager::new()),
3804            None,
3805            2,
3806            Arc::new(TelemetryCounters::default()),
3807            None,
3808        )
3809        .expect("coordinator with pool_size=2");
3810
3811        assert_eq!(coordinator.pool.size(), 2);
3812
3813        // Basic read should succeed through the pool.
3814        let compiled = QueryBuilder::nodes("Meeting")
3815            .text_search("budget", 5)
3816            .limit(10)
3817            .compile()
3818            .expect("compiled query");
3819
3820        let result = coordinator.execute_compiled_read(&compiled);
3821        assert!(result.is_ok(), "read through pool must succeed");
3822    }
3823
3824    // --- M-4: Grouped read batching ---
3825
3826    #[test]
3827    fn grouped_read_results_match_baseline() {
3828        use fathomdb_query::TraverseDirection;
3829
3830        let db = NamedTempFile::new().expect("temporary db");
3831
3832        // Bootstrap the database via coordinator (creates schema).
3833        let coordinator = ExecutionCoordinator::open(
3834            db.path(),
3835            Arc::new(SchemaManager::new()),
3836            None,
3837            1,
3838            Arc::new(TelemetryCounters::default()),
3839            None,
3840        )
3841        .expect("coordinator");
3842
3843        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
3844        // to expansion nodes (Task-0-a, Task-0-b, etc.).
3845        {
3846            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
3847            for i in 0..10 {
3848                conn.execute_batch(&format!(
3849                    r#"
3850                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3851                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
3852                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3853                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
3854                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3855                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
3856
3857                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3858                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
3859                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3860                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
3861
3862                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3863                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
3864                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3865                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
3866                    "#,
3867                )).expect("seed data");
3868            }
3869        }
3870
3871        let compiled = QueryBuilder::nodes("Meeting")
3872            .text_search("meeting", 10)
3873            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None)
3874            .limit(10)
3875            .compile_grouped()
3876            .expect("compiled grouped query");
3877
3878        let result = coordinator
3879            .execute_compiled_grouped_read(&compiled)
3880            .expect("grouped read");
3881
3882        assert!(!result.was_degraded, "grouped read should not be degraded");
3883        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
3884        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
3885        assert_eq!(result.expansions[0].slot, "tasks");
3886        assert_eq!(
3887            result.expansions[0].roots.len(),
3888            10,
3889            "each expansion slot should have entries for all 10 roots"
3890        );
3891
3892        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
3893        for root_expansion in &result.expansions[0].roots {
3894            assert_eq!(
3895                root_expansion.nodes.len(),
3896                2,
3897                "root {} should have 2 expansion nodes, got {}",
3898                root_expansion.root_logical_id,
3899                root_expansion.nodes.len()
3900            );
3901        }
3902    }
3903}