Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRawVectorSearch,
9    CompiledRetrievalPlan, CompiledSearch, CompiledSearchPlan, CompiledSemanticSearch,
10    CompiledVectorSearch, DrivingTable, EdgeExpansionSlot, ExpansionSlot, FALLBACK_TRIGGER_K,
11    HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch, SearchHit,
12    SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
13};
14use fathomdb_schema::SchemaManager;
15use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
16
17use crate::embedder::QueryEmbedder;
18use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
19use crate::{EngineError, sqlite};
20
21/// Maximum number of cached shape-hash to SQL mappings before the cache is
22/// cleared entirely.  A clear-all strategy is simpler than partial eviction
23/// and the cost of re-compiling on a miss is negligible.
24const MAX_SHAPE_CACHE_SIZE: usize = 4096;
25
26/// Maximum number of root IDs per batched expansion query.  Kept well below
27/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
28/// the edge-kind parameter.  Larger root sets are chunked into multiple
29/// batches of this size rather than falling back to per-root queries.
30const BATCH_CHUNK_SIZE: usize = 200;
31
32/// Build an `IN (...)` SQL fragment and bind list for an expansion-slot filter.
33///
34/// Returns the SQL fragment `AND json_extract(n.properties, ?{p}) IN (?{p+1}, ...)`
35/// and the corresponding bind values `[path, val1, val2, ...]`.
36fn compile_expansion_in_filter(
37    p: usize,
38    path: &str,
39    value_binds: Vec<Value>,
40) -> (String, Vec<Value>) {
41    let first_val = p + 1;
42    let placeholders = (0..value_binds.len())
43        .map(|i| format!("?{}", first_val + i))
44        .collect::<Vec<_>>()
45        .join(", ");
46    let mut binds = vec![Value::Text(path.to_owned())];
47    binds.extend(value_binds);
48    (
49        format!("\n                  AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
50        binds,
51    )
52}
53
54/// Compile an optional expansion-slot target-side filter predicate into a SQL
55/// fragment and bind values for injection into the `numbered` CTE's WHERE clause.
56///
57/// Returns `("", vec![])` when `filter` is `None` — preserving byte-for-byte
58/// identical SQL to pre-Pack-3 behavior. When `Some(predicate)`, returns an
59/// `AND …` fragment and the corresponding bind values starting at `first_param`.
60///
61/// Supported predicates and their targets in the `numbered` CTE:
62/// - `JsonPathEq`, `JsonPathCompare`, `JsonPathFusedEq`, `JsonPathFusedTimestampCmp`,
63///   `JsonPathFusedBoolEq`, `JsonPathFusedIn`, `JsonPathIn`: target `n.properties`.
64/// - `KindEq`, `LogicalIdEq`, `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`:
65///   column-direct predicates on `n.kind`, `n.logical_id`, `n.source_ref`, `n.content_ref`.
66/// - `EdgePropertyEq`, `EdgePropertyCompare`: rejected with `unreachable!` — these
67///   target edge properties and must be compiled via `compile_edge_filter` instead.
68#[allow(clippy::too_many_lines)]
69fn compile_expansion_filter(
70    filter: Option<&Predicate>,
71    first_param: usize,
72) -> (String, Vec<Value>) {
73    let Some(predicate) = filter else {
74        return (String::new(), vec![]);
75    };
76    let p = first_param;
77    match predicate {
78        Predicate::JsonPathEq { path, value } => {
79            let val = match value {
80                ScalarValue::Text(t) => Value::Text(t.clone()),
81                ScalarValue::Integer(i) => Value::Integer(*i),
82                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
83            };
84            (
85                format!(
86                    "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
87                    p + 1
88                ),
89                vec![Value::Text(path.clone()), val],
90            )
91        }
92        Predicate::JsonPathCompare { path, op, value } => {
93            let val = match value {
94                ScalarValue::Text(t) => Value::Text(t.clone()),
95                ScalarValue::Integer(i) => Value::Integer(*i),
96                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
97            };
98            let operator = match op {
99                ComparisonOp::Gt => ">",
100                ComparisonOp::Gte => ">=",
101                ComparisonOp::Lt => "<",
102                ComparisonOp::Lte => "<=",
103            };
104            (
105                format!(
106                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
107                    p + 1
108                ),
109                vec![Value::Text(path.clone()), val],
110            )
111        }
112        Predicate::JsonPathFusedEq { path, value } => (
113            format!(
114                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
115                p + 1
116            ),
117            vec![Value::Text(path.clone()), Value::Text(value.clone())],
118        ),
119        Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
120            let operator = match op {
121                ComparisonOp::Gt => ">",
122                ComparisonOp::Gte => ">=",
123                ComparisonOp::Lt => "<",
124                ComparisonOp::Lte => "<=",
125            };
126            (
127                format!(
128                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
129                    p + 1
130                ),
131                vec![Value::Text(path.clone()), Value::Integer(*value)],
132            )
133        }
134        Predicate::JsonPathFusedBoolEq { path, value } => (
135            format!(
136                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
137                p + 1
138            ),
139            vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
140        ),
141        Predicate::KindEq(kind) => (
142            format!("\n                  AND n.kind = ?{p}"),
143            vec![Value::Text(kind.clone())],
144        ),
145        Predicate::LogicalIdEq(logical_id) => (
146            format!("\n                  AND n.logical_id = ?{p}"),
147            vec![Value::Text(logical_id.clone())],
148        ),
149        Predicate::SourceRefEq(source_ref) => (
150            format!("\n                  AND n.source_ref = ?{p}"),
151            vec![Value::Text(source_ref.clone())],
152        ),
153        Predicate::ContentRefEq(uri) => (
154            format!("\n                  AND n.content_ref = ?{p}"),
155            vec![Value::Text(uri.clone())],
156        ),
157        Predicate::ContentRefNotNull => (
158            "\n                  AND n.content_ref IS NOT NULL".to_owned(),
159            vec![],
160        ),
161        Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
162            unreachable!(
163                "compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
164            );
165        }
166        Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
167            p,
168            path,
169            values.iter().map(|v| Value::Text(v.clone())).collect(),
170        ),
171        Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
172            p,
173            path,
174            values
175                .iter()
176                .map(|v| match v {
177                    ScalarValue::Text(t) => Value::Text(t.clone()),
178                    ScalarValue::Integer(i) => Value::Integer(*i),
179                    ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
180                })
181                .collect(),
182        ),
183    }
184}
185
186/// Compile an optional edge filter predicate into a SQL fragment and bind values
187/// for injection into the `JOIN edges e ON ...` condition.
188///
189/// Returns `("", vec![])` when `filter` is `None`. When `Some(predicate)`,
190/// returns an `AND …` fragment targeting `e.properties` and the corresponding
191/// bind values starting at `first_param`.
192///
193/// Only `EdgePropertyEq` and `EdgePropertyCompare` are valid here.
194/// Non-edge predicates: `panic!("compile_edge_filter: non-edge predicate")`.
195fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
196    let Some(predicate) = filter else {
197        return (String::new(), vec![]);
198    };
199    let p = first_param;
200    match predicate {
201        Predicate::EdgePropertyEq { path, value } => {
202            let val = match value {
203                ScalarValue::Text(t) => Value::Text(t.clone()),
204                ScalarValue::Integer(i) => Value::Integer(*i),
205                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
206            };
207            (
208                format!(
209                    "\n                    AND json_extract(e.properties, ?{p}) = ?{}",
210                    p + 1
211                ),
212                vec![Value::Text(path.clone()), val],
213            )
214        }
215        Predicate::EdgePropertyCompare { path, op, value } => {
216            let val = match value {
217                ScalarValue::Text(t) => Value::Text(t.clone()),
218                ScalarValue::Integer(i) => Value::Integer(*i),
219                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
220            };
221            let operator = match op {
222                ComparisonOp::Gt => ">",
223                ComparisonOp::Gte => ">=",
224                ComparisonOp::Lt => "<",
225                ComparisonOp::Lte => "<=",
226            };
227            (
228                format!(
229                    "\n                    AND json_extract(e.properties, ?{p}) {operator} ?{}",
230                    p + 1
231                ),
232                vec![Value::Text(path.clone()), val],
233            )
234        }
235        _ => {
236            unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
237        }
238    }
239}
240
241/// FTS tokenizer strategy for a given node kind.
242///
243/// Loaded at coordinator open time from `projection_profiles` and used
244/// during query dispatch to apply per-kind query adaptations.
245#[derive(Clone, Debug, PartialEq, Eq)]
246pub enum TokenizerStrategy {
247    /// Porter stemming + unicode61, optimized for English recall.
248    RecallOptimizedEnglish,
249    /// Unicode61 without stemming, optimized for precision.
250    PrecisionOptimized,
251    /// Trigram tokenizer — queries shorter than 3 chars are skipped.
252    SubstringTrigram,
253    /// ICU tokenizer for global / CJK text.
254    GlobalCjk,
255    /// Unicode61 with custom token chars — query special chars are escaped.
256    SourceCode,
257    /// Any other tokenizer string.
258    Custom(String),
259}
260
261impl TokenizerStrategy {
262    /// Map a raw tokenizer config string (as stored in `projection_profiles`)
263    /// to the corresponding strategy variant.
264    pub fn from_str(s: &str) -> Self {
265        match s {
266            "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
267            "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
268            "trigram" => Self::SubstringTrigram,
269            "icu" => Self::GlobalCjk,
270            s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
271            other => Self::Custom(other.to_string()),
272        }
273    }
274}
275
276/// A pool of read-only `SQLite` connections for concurrent read access.
277///
278/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
279/// proceed in parallel when they happen to grab different slots.
280struct ReadPool {
281    connections: Vec<Mutex<Connection>>,
282}
283
284impl fmt::Debug for ReadPool {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        f.debug_struct("ReadPool")
287            .field("size", &self.connections.len())
288            .finish()
289    }
290}
291
292impl ReadPool {
293    /// Open `pool_size` read-only connections to the database at `path`.
294    ///
295    /// Each connection has PRAGMAs initialized via
296    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
297    /// feature is enabled and `vector_enabled` is true, the vec extension
298    /// auto-loaded.
299    ///
300    /// # Errors
301    ///
302    /// Returns [`EngineError`] if any connection fails to open or initialize.
303    fn new(
304        db_path: &Path,
305        pool_size: usize,
306        schema_manager: &SchemaManager,
307        vector_enabled: bool,
308    ) -> Result<Self, EngineError> {
309        let mut connections = Vec::with_capacity(pool_size);
310        for _ in 0..pool_size {
311            let conn = if vector_enabled {
312                #[cfg(feature = "sqlite-vec")]
313                {
314                    sqlite::open_readonly_connection_with_vec(db_path)?
315                }
316                #[cfg(not(feature = "sqlite-vec"))]
317                {
318                    sqlite::open_readonly_connection(db_path)?
319                }
320            } else {
321                sqlite::open_readonly_connection(db_path)?
322            };
323            schema_manager
324                .initialize_reader_connection(&conn)
325                .map_err(EngineError::Schema)?;
326            connections.push(Mutex::new(conn));
327        }
328        Ok(Self { connections })
329    }
330
331    /// Acquire a connection from the pool.
332    ///
333    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
334    /// If every slot is held, falls back to a blocking lock on the first slot.
335    ///
336    /// # Errors
337    ///
338    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
339    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
340        // Fast path: try each connection without blocking.
341        for conn in &self.connections {
342            if let Ok(guard) = conn.try_lock() {
343                return Ok(guard);
344            }
345        }
346        // Fallback: block on the first connection.
347        self.connections[0].lock().map_err(|_| {
348            trace_error!("read pool: connection mutex poisoned");
349            EngineError::Bridge("connection mutex poisoned".to_owned())
350        })
351    }
352
353    /// Return the number of connections in the pool.
354    #[cfg(test)]
355    fn size(&self) -> usize {
356        self.connections.len()
357    }
358}
359
360/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
361///
362/// This is a read-only introspection struct. It does not execute SQL.
363#[derive(Clone, Debug, PartialEq, Eq)]
364pub struct QueryPlan {
365    pub sql: String,
366    pub bind_count: usize,
367    pub driving_table: DrivingTable,
368    pub shape_hash: ShapeHash,
369    pub cache_hit: bool,
370}
371
372/// A single node row returned from a query.
373#[derive(Clone, Debug, PartialEq, Eq)]
374pub struct NodeRow {
375    /// Physical row ID.
376    pub row_id: String,
377    /// Logical ID of the node.
378    pub logical_id: String,
379    /// Node kind.
380    pub kind: String,
381    /// JSON-encoded node properties.
382    pub properties: String,
383    /// Optional URI referencing external content.
384    pub content_ref: Option<String>,
385    /// Unix timestamp of last access, if tracked.
386    pub last_accessed_at: Option<i64>,
387}
388
389/// A single edge row surfaced during edge-projecting traversal.
390///
391/// Columns are sourced directly from the `edges` table; identity
392/// fields (`source_logical_id`, `target_logical_id`) are absolute
393/// (tail/head as stored), not re-oriented to traversal direction.
394///
395/// Multi-hop semantics: for `max_depth > 1`, each emitted tuple
396/// reflects the final-hop edge leading to the emitted endpoint node.
397/// Full path enumeration is out of scope for 0.5.3.
398#[derive(Clone, Debug, PartialEq)]
399pub struct EdgeRow {
400    /// Physical row ID from the `edges` table.
401    pub row_id: String,
402    /// Logical ID of the edge.
403    pub logical_id: String,
404    /// Logical ID of the edge source (tail).
405    pub source_logical_id: String,
406    /// Logical ID of the edge target (head).
407    pub target_logical_id: String,
408    /// Edge kind (label).
409    pub kind: String,
410    /// JSON-encoded edge properties.
411    pub properties: String,
412    /// Optional source reference for provenance tracking.
413    pub source_ref: Option<String>,
414    /// Optional confidence score attached to the edge.
415    pub confidence: Option<f64>,
416}
417
418/// A single run row returned from a query.
419#[derive(Clone, Debug, PartialEq, Eq)]
420pub struct RunRow {
421    /// Unique run ID.
422    pub id: String,
423    /// Run kind.
424    pub kind: String,
425    /// Current status.
426    pub status: String,
427    /// JSON-encoded run properties.
428    pub properties: String,
429}
430
431/// A single step row returned from a query.
432#[derive(Clone, Debug, PartialEq, Eq)]
433pub struct StepRow {
434    /// Unique step ID.
435    pub id: String,
436    /// ID of the parent run.
437    pub run_id: String,
438    /// Step kind.
439    pub kind: String,
440    /// Current status.
441    pub status: String,
442    /// JSON-encoded step properties.
443    pub properties: String,
444}
445
446/// A single action row returned from a query.
447#[derive(Clone, Debug, PartialEq, Eq)]
448pub struct ActionRow {
449    /// Unique action ID.
450    pub id: String,
451    /// ID of the parent step.
452    pub step_id: String,
453    /// Action kind.
454    pub kind: String,
455    /// Current status.
456    pub status: String,
457    /// JSON-encoded action properties.
458    pub properties: String,
459}
460
461/// A single row from the `provenance_events` table.
462#[derive(Clone, Debug, PartialEq, Eq)]
463pub struct ProvenanceEvent {
464    pub id: String,
465    pub event_type: String,
466    pub subject: String,
467    pub source_ref: Option<String>,
468    pub metadata_json: String,
469    pub created_at: i64,
470}
471
472/// Result set from executing a flat (non-grouped) compiled query.
473#[derive(Clone, Debug, Default, PartialEq, Eq)]
474pub struct QueryRows {
475    /// Matched node rows.
476    pub nodes: Vec<NodeRow>,
477    /// Runs associated with the matched nodes.
478    pub runs: Vec<RunRow>,
479    /// Steps associated with the matched runs.
480    pub steps: Vec<StepRow>,
481    /// Actions associated with the matched steps.
482    pub actions: Vec<ActionRow>,
483    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
484    /// to degrade to an empty result instead of propagating an error.
485    pub was_degraded: bool,
486}
487
488/// Expansion results for a single root node within a grouped query.
489#[derive(Clone, Debug, PartialEq, Eq)]
490pub struct ExpansionRootRows {
491    /// Logical ID of the root node that seeded this expansion.
492    pub root_logical_id: String,
493    /// Nodes reached by traversing from the root.
494    pub nodes: Vec<NodeRow>,
495}
496
497/// All expansion results for a single named slot across all roots.
498#[derive(Clone, Debug, PartialEq, Eq)]
499pub struct ExpansionSlotRows {
500    /// Name of the expansion slot.
501    pub slot: String,
502    /// Per-root expansion results.
503    pub roots: Vec<ExpansionRootRows>,
504}
505
506/// Edge-expansion results for a single root node within a grouped query.
507///
508/// Derives `PartialEq` only — transitive through [`EdgeRow`] which carries
509/// an `Option<f64>` confidence score.
510#[derive(Clone, Debug, PartialEq)]
511pub struct EdgeExpansionRootRows {
512    /// Logical ID of the root node that seeded this expansion.
513    pub root_logical_id: String,
514    /// `(EdgeRow, NodeRow)` tuples reached by traversing from the root. For
515    /// `max_depth > 1`, the edge is the final-hop edge leading to the
516    /// emitted endpoint node.
517    pub pairs: Vec<(EdgeRow, NodeRow)>,
518}
519
520/// All edge-expansion results for a single named slot across all roots.
521///
522/// Derives `PartialEq` only — transitive through [`EdgeRow`].
523#[derive(Clone, Debug, PartialEq)]
524pub struct EdgeExpansionSlotRows {
525    /// Name of the edge-expansion slot.
526    pub slot: String,
527    /// Per-root edge-expansion results.
528    pub roots: Vec<EdgeExpansionRootRows>,
529}
530
531/// Result set from executing a grouped compiled query.
532///
533/// Derives `PartialEq` only when `edge_expansions` is non-empty (transitive
534/// through `EdgeRow`). Default remains `Eq`-able in practice because an
535/// empty `edge_expansions` vec holds no floats; we derive `PartialEq` only
536/// to keep the contract consistent.
537#[derive(Clone, Debug, Default, PartialEq)]
538pub struct GroupedQueryRows {
539    /// Root node rows matched by the base query.
540    pub roots: Vec<NodeRow>,
541    /// Per-slot expansion results.
542    pub expansions: Vec<ExpansionSlotRows>,
543    /// Per-slot edge-expansion results.
544    pub edge_expansions: Vec<EdgeExpansionSlotRows>,
545    /// `true` when a capability miss caused the query to degrade to an empty result.
546    pub was_degraded: bool,
547}
548
549/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
550pub struct ExecutionCoordinator {
551    database_path: PathBuf,
552    schema_manager: Arc<SchemaManager>,
553    pool: ReadPool,
554    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
555    vector_enabled: bool,
556    vec_degradation_warned: AtomicBool,
557    telemetry: Arc<TelemetryCounters>,
558    /// Phase 12.5a: optional read-time query embedder. When present,
559    /// [`Self::execute_retrieval_plan`] invokes it via
560    /// [`Self::fill_vector_branch`] after compile to populate
561    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
562    /// invariant on `search()` is preserved: the vector slot stays empty
563    /// and the coordinator's stage-gating check skips the vector branch.
564    query_embedder: Option<Arc<dyn QueryEmbedder>>,
565    /// Per-kind FTS tokenizer strategies loaded from `projection_profiles`
566    /// at open time. Used during query dispatch for query-side adaptations
567    /// (e.g. short-query skip for trigram, token escaping for source-code).
568    ///
569    /// **Stale-state note**: This map is populated once at `open()` and is
570    /// never refreshed during the lifetime of the coordinator.  If
571    /// `AdminService::set_fts_profile` is called on a running engine, the
572    /// in-memory strategy map will not reflect the change until the engine is
573    /// reopened.  The DB-side profile row is always up-to-date; only the
574    /// query-side adaptation (e.g. the trigram short-query guard) will be stale.
575    fts_strategies: HashMap<String, TokenizerStrategy>,
576}
577
578impl fmt::Debug for ExecutionCoordinator {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        f.debug_struct("ExecutionCoordinator")
581            .field("database_path", &self.database_path)
582            .finish_non_exhaustive()
583    }
584}
585
586impl ExecutionCoordinator {
587    /// # Errors
588    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
589    pub fn open(
590        path: impl AsRef<Path>,
591        schema_manager: Arc<SchemaManager>,
592        vector_dimension: Option<usize>,
593        pool_size: usize,
594        telemetry: Arc<TelemetryCounters>,
595        query_embedder: Option<Arc<dyn QueryEmbedder>>,
596    ) -> Result<Self, EngineError> {
597        let path = path.as_ref().to_path_buf();
598        #[cfg(feature = "sqlite-vec")]
599        let mut conn = if vector_dimension.is_some() {
600            sqlite::open_connection_with_vec(&path)?
601        } else {
602            sqlite::open_connection(&path)?
603        };
604        #[cfg(not(feature = "sqlite-vec"))]
605        let mut conn = sqlite::open_connection(&path)?;
606
607        let report = schema_manager.bootstrap(&conn)?;
608
609        // ----- Open-time rebuild guards for derived FTS state -----
610        //
611        // Property FTS data is derived state. Per-kind `fts_props_<kind>`
612        // virtual tables are NOT source of truth — they must be rebuildable
613        // from canonical `nodes.properties` + `fts_property_schemas` at any
614        // time. After migration 23 the global `fts_node_properties` table no
615        // longer exists; each registered kind has its own `fts_props_<kind>`
616        // table created at first write or first rebuild.
617        //
618        // Guard 1: if any registered kind's per-kind table is missing or empty
619        // while live nodes of that kind exist, do a synchronous full rebuild of
620        // all per-kind FTS tables and position map rows.
621        //
622        // Guard 2: if any recursive schema has a populated per-kind table but
623        // `fts_node_property_positions` is empty, do a synchronous full rebuild
624        // to regenerate the position map from canonical state.
625        //
626        // Both guards are no-ops on a consistent database.
627        run_open_time_fts_guards(&mut conn)?;
628
629        #[cfg(feature = "sqlite-vec")]
630        let mut vector_enabled = report.vector_profile_enabled;
631        #[cfg(not(feature = "sqlite-vec"))]
632        let vector_enabled = {
633            let _ = &report;
634            false
635        };
636
637        // 0.5.0: per-kind vec tables replace the global vec_nodes_active.
638        // The vector_dimension parameter now only sets the vector_enabled flag —
639        // no global table is created. Per-kind vec tables are created by
640        // `regenerate_vector_embeddings` at regen time.
641        if vector_dimension.is_some() {
642            #[cfg(feature = "sqlite-vec")]
643            {
644                vector_enabled = true;
645            }
646        }
647
648        // Vec identity guard: warn (never error) if the stored profile's
649        // model_identity or dimensions differ from the configured embedder.
650        if let Some(ref emb) = query_embedder {
651            check_vec_identity_at_open(&conn, emb.as_ref())?;
652        }
653
654        // Load FTS tokenizer strategies from projection_profiles
655        let fts_strategies: HashMap<String, TokenizerStrategy> = {
656            let mut map = HashMap::new();
657            let mut stmt = conn
658                .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
659            let rows = stmt.query_map([], |row| {
660                let kind: String = row.get(0)?;
661                let config_json: String = row.get(1)?;
662                Ok((kind, config_json))
663            })?;
664            for row in rows.flatten() {
665                let (kind, config_json) = row;
666                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
667                    && let Some(tok) = v["tokenizer"].as_str()
668                {
669                    map.insert(kind, TokenizerStrategy::from_str(tok));
670                }
671            }
672            map
673        };
674
675        // Drop the bootstrap connection — pool connections are used for reads.
676        drop(conn);
677
678        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
679
680        Ok(Self {
681            database_path: path,
682            schema_manager,
683            pool,
684            shape_sql_map: Mutex::new(HashMap::new()),
685            vector_enabled,
686            vec_degradation_warned: AtomicBool::new(false),
687            telemetry,
688            query_embedder,
689            fts_strategies,
690        })
691    }
692
693    /// Returns the filesystem path to the `SQLite` database.
694    pub fn database_path(&self) -> &Path {
695        &self.database_path
696    }
697
698    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
699    #[must_use]
700    pub fn vector_enabled(&self) -> bool {
701        self.vector_enabled
702    }
703
704    /// Returns the configured read-time query embedder, if any.
705    ///
706    /// The 0.4.0 write-time parity work reuses this same embedder for
707    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
708    /// so there is always exactly one source of truth for vector
709    /// identity per [`Engine`] instance.
710    #[must_use]
711    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
712        self.query_embedder.as_ref()
713    }
714
715    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
716        self.pool.acquire()
717    }
718
719    /// Aggregate `SQLite` page-cache counters across all pool connections.
720    ///
721    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
722    /// Connections that are currently locked by a query are skipped — this
723    /// is acceptable for statistical counters.
724    #[must_use]
725    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
726        let mut total = SqliteCacheStatus::default();
727        for conn_mutex in &self.pool.connections {
728            if let Ok(conn) = conn_mutex.try_lock() {
729                total.add(&read_db_cache_status(&conn));
730            }
731        }
732        total
733    }
734
735    /// # Errors
736    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
737    #[allow(clippy::expect_used, clippy::too_many_lines)]
738    pub fn execute_compiled_read(
739        &self,
740        compiled: &CompiledQuery,
741    ) -> Result<QueryRows, EngineError> {
742        // Pack F1.75: dispatch semantic / raw-vector sidecar carriers to the
743        // dedicated executors. These paths own their own pre-KNN validation
744        // (active profile, per-kind schema, dimension match) and their own
745        // degrade semantics; v1 does not compose them with filter fusion.
746        if let Some(semantic) = compiled.semantic_search.as_ref() {
747            let search_rows = self.execute_compiled_semantic_search(semantic)?;
748            return Ok(search_rows_to_query_rows(search_rows));
749        }
750        if let Some(raw) = compiled.raw_vector_search.as_ref() {
751            let search_rows = self.execute_compiled_raw_vector_search(raw)?;
752            return Ok(search_rows_to_query_rows(search_rows));
753        }
754        // Scan fallback for first-registration async rebuild: if the query uses the
755        // FtsNodes driving table and the root kind has is_first_registration=1 with
756        // state IN ('PENDING','BUILDING'), the per-kind table has no rows yet.
757        // Route to a full-kind node scan so callers get results instead of empty.
758        if compiled.driving_table == DrivingTable::FtsNodes
759            && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
760            && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
761        {
762            self.telemetry.increment_queries();
763            return Ok(QueryRows {
764                nodes,
765                runs: Vec::new(),
766                steps: Vec::new(),
767                actions: Vec::new(),
768                was_degraded: false,
769            });
770        }
771
772        // For FtsNodes queries the fathomdb-query compile path generates SQL that
773        // references the old global `fts_node_properties` table.  Since migration 23
774        // dropped that table, we rewrite the SQL and binds here to use the per-kind
775        // `fts_props_<kind>` table (or omit the property UNION arm entirely when the
776        // per-kind table does not yet exist).
777        //
778        // For VecNodes queries the fathomdb-query compile path generates SQL that
779        // hardcodes `vec_nodes_active`.  Since 0.5.0 uses per-kind tables, we rewrite
780        // `vec_nodes_active` to `vec_<root_kind>` here.
781        let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
782            let conn_check = match self.lock_connection() {
783                Ok(g) => g,
784                Err(e) => {
785                    self.telemetry.increment_errors();
786                    return Err(e);
787                }
788            };
789            let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
790            drop(conn_check);
791            result?
792        } else if compiled.driving_table == DrivingTable::VecNodes {
793            let root_kind = compiled
794                .binds
795                .get(1)
796                .and_then(|b| {
797                    if let BindValue::Text(k) = b {
798                        Some(k.as_str())
799                    } else {
800                        None
801                    }
802                })
803                .unwrap_or("");
804            let vec_table = if root_kind.is_empty() {
805                "vec__unknown".to_owned()
806            } else {
807                fathomdb_schema::vec_kind_table_name(root_kind)
808            };
809            let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
810            (new_sql, compiled.binds.clone())
811        } else {
812            (compiled.sql.clone(), compiled.binds.clone())
813        };
814
815        let row_sql = wrap_node_row_projection_sql(&adapted_sql);
816        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
817        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
818        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
819        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
820        // so we propagate EngineError there instead.
821        {
822            let mut cache = self
823                .shape_sql_map
824                .lock()
825                .unwrap_or_else(PoisonError::into_inner);
826            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
827                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
828                cache.clear();
829            }
830            cache.insert(compiled.shape_hash, row_sql.clone());
831        }
832
833        let bind_values = adapted_binds
834            .iter()
835            .map(bind_value_to_sql)
836            .collect::<Vec<_>>();
837
838        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
839        // shape_sql_map uses into_inner() (pure cache, safe to recover).
840        // conn uses map_err → EngineError (connection state may be corrupt after panic;
841        // into_inner() would risk using a connection with partial transaction state).
842        let conn_guard = match self.lock_connection() {
843            Ok(g) => g,
844            Err(e) => {
845                self.telemetry.increment_errors();
846                return Err(e);
847            }
848        };
849        let mut statement = match conn_guard.prepare_cached(&row_sql) {
850            Ok(stmt) => stmt,
851            Err(e) if is_vec_table_absent(&e) => {
852                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
853                    trace_warn!("vector table absent, degrading to non-vector query");
854                }
855                return Ok(QueryRows {
856                    was_degraded: true,
857                    ..Default::default()
858                });
859            }
860            Err(e) => {
861                self.telemetry.increment_errors();
862                return Err(EngineError::Sqlite(e));
863            }
864        };
865        let nodes = match statement
866            .query_map(params_from_iter(bind_values.iter()), |row| {
867                Ok(NodeRow {
868                    row_id: row.get(0)?,
869                    logical_id: row.get(1)?,
870                    kind: row.get(2)?,
871                    properties: row.get(3)?,
872                    content_ref: row.get(4)?,
873                    last_accessed_at: row.get(5)?,
874                })
875            })
876            .and_then(Iterator::collect)
877        {
878            Ok(rows) => rows,
879            Err(e) => {
880                self.telemetry.increment_errors();
881                return Err(EngineError::Sqlite(e));
882            }
883        };
884
885        self.telemetry.increment_queries();
886        Ok(QueryRows {
887            nodes,
888            runs: Vec::new(),
889            steps: Vec::new(),
890            actions: Vec::new(),
891            was_degraded: false,
892        })
893    }
894
895    /// Execute a compiled adaptive search and return matching hits.
896    ///
897    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
898    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
899    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
900    /// while residual predicates (JSON path filters) stay in the outer
901    /// `WHERE`. The chunk and property FTS
902    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
903    /// better matches), ordered, and limited. All hits return
904    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
905    /// later phases.
906    ///
907    /// # Errors
908    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
909    pub fn execute_compiled_search(
910        &self,
911        compiled: &CompiledSearch,
912    ) -> Result<SearchRows, EngineError> {
913        // Build the two-branch plan from the strict text query and delegate
914        // to the shared plan-execution routine. The relaxed branch is derived
915        // via `derive_relaxed` and only fires when strict returned fewer than
916        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
917        // "relaxed iff strict is empty," but the routine spells the rule out
918        // explicitly so raising K later is a one-line constant bump.
919        let (relaxed_query, was_degraded_at_plan_time) =
920            fathomdb_query::derive_relaxed(&compiled.text_query);
921        let relaxed = relaxed_query.map(|q| CompiledSearch {
922            root_kind: compiled.root_kind.clone(),
923            text_query: q,
924            limit: compiled.limit,
925            fusable_filters: compiled.fusable_filters.clone(),
926            residual_filters: compiled.residual_filters.clone(),
927            attribution_requested: compiled.attribution_requested,
928        });
929        let plan = CompiledSearchPlan {
930            strict: compiled.clone(),
931            relaxed,
932            was_degraded_at_plan_time,
933        };
934        self.execute_compiled_search_plan(&plan)
935    }
936
937    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
938    /// deduped result rows.
939    ///
940    /// This is the shared retrieval/merge routine that both
941    /// [`Self::execute_compiled_search`] (adaptive path) and
942    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
943    /// runs first; the relaxed branch only fires when it is present AND the
944    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
945    /// hits. Merge and dedup semantics are identical to the adaptive path
946    /// regardless of how the plan was constructed.
947    ///
948    /// Error contract: if the relaxed branch errors, the error propagates;
949    /// strict hits are not returned. This matches the rest of the engine's
950    /// fail-hard posture.
951    ///
952    /// # Errors
953    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
954    /// executed.
955    pub fn execute_compiled_search_plan(
956        &self,
957        plan: &CompiledSearchPlan,
958    ) -> Result<SearchRows, EngineError> {
959        let strict = &plan.strict;
960        let limit = strict.limit;
961        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
962
963        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
964        let strict_underfilled = strict_hits.len() < fallback_threshold;
965
966        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
967        let mut fallback_used = false;
968        let mut was_degraded = false;
969        if let Some(relaxed) = plan.relaxed.as_ref()
970            && strict_underfilled
971        {
972            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
973            fallback_used = true;
974            was_degraded = plan.was_degraded_at_plan_time;
975        }
976
977        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
978        // Attribution runs AFTER dedup so that duplicate hits dropped by
979        // `merge_search_branches` do not waste a highlight+position-map
980        // lookup.
981        if strict.attribution_requested {
982            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
983            self.populate_attribution_for_hits(
984                &mut merged,
985                &strict.text_query,
986                relaxed_text_query,
987            )?;
988        }
989        let strict_hit_count = merged
990            .iter()
991            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
992            .count();
993        let relaxed_hit_count = merged
994            .iter()
995            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
996            .count();
997        // Phase 10: no vector execution path yet, so vector_hit_count is
998        // always zero. Future phases that wire a vector branch will
999        // contribute here.
1000        let vector_hit_count = 0;
1001
1002        Ok(SearchRows {
1003            hits: merged,
1004            strict_hit_count,
1005            relaxed_hit_count,
1006            vector_hit_count,
1007            fallback_used,
1008            was_degraded,
1009        })
1010    }
1011
1012    /// Execute a compiled vector-only search and return matching hits.
1013    ///
1014    /// Phase 11 delivers the standalone vector retrieval path. The emitted
1015    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
1016    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
1017    /// into the candidate CTE. The outer `SELECT` applies residual JSON
1018    /// predicates and orders by score descending, where `score = -distance`
1019    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
1020    ///
1021    /// ## Capability-miss handling
1022    ///
1023    /// If the `sqlite-vec` capability is absent (feature disabled or the
1024    /// `vec_nodes_active` virtual table has not been created because the
1025    /// engine was not opened with a `vector_dimension`), this method returns
1026    /// an empty [`SearchRows`] with `was_degraded = true`. This is
1027    /// **non-fatal** — the error does not propagate — matching the addendum's
1028    /// §Vector-Specific Behavior / Degradation.
1029    ///
1030    /// ## Attribution
1031    ///
1032    /// When `compiled.attribution_requested == true`, every returned hit
1033    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
1034    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
1035    /// extended uniformly).
1036    ///
1037    /// # Errors
1038    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
1039    /// executed for reasons other than a vec-table capability miss.
1040    #[allow(clippy::too_many_lines)]
1041    pub fn execute_compiled_vector_search(
1042        &self,
1043        compiled: &CompiledVectorSearch,
1044    ) -> Result<SearchRows, EngineError> {
1045        use std::fmt::Write as _;
1046
1047        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
1048        // empty result rather than a SQL error from `LIMIT 0` semantics in
1049        // the inner vec0 scan.
1050        if compiled.limit == 0 {
1051            return Ok(SearchRows::default());
1052        }
1053
1054        let filter_by_kind = !compiled.root_kind.is_empty();
1055        let mut binds: Vec<BindValue> = Vec::new();
1056        binds.push(BindValue::Text(compiled.query_text.clone()));
1057        if filter_by_kind {
1058            binds.push(BindValue::Text(compiled.root_kind.clone()));
1059        }
1060
1061        // Build fusable-filter clauses, aliased against `src` inside the
1062        // candidate CTE. Same predicate set the text path fuses.
1063        let mut fused_clauses = String::new();
1064        for predicate in &compiled.fusable_filters {
1065            match predicate {
1066                Predicate::KindEq(kind) => {
1067                    binds.push(BindValue::Text(kind.clone()));
1068                    let idx = binds.len();
1069                    let _ = write!(
1070                        fused_clauses,
1071                        "\n                      AND src.kind = ?{idx}"
1072                    );
1073                }
1074                Predicate::LogicalIdEq(logical_id) => {
1075                    binds.push(BindValue::Text(logical_id.clone()));
1076                    let idx = binds.len();
1077                    let _ = write!(
1078                        fused_clauses,
1079                        "\n                      AND src.logical_id = ?{idx}"
1080                    );
1081                }
1082                Predicate::SourceRefEq(source_ref) => {
1083                    binds.push(BindValue::Text(source_ref.clone()));
1084                    let idx = binds.len();
1085                    let _ = write!(
1086                        fused_clauses,
1087                        "\n                      AND src.source_ref = ?{idx}"
1088                    );
1089                }
1090                Predicate::ContentRefEq(uri) => {
1091                    binds.push(BindValue::Text(uri.clone()));
1092                    let idx = binds.len();
1093                    let _ = write!(
1094                        fused_clauses,
1095                        "\n                      AND src.content_ref = ?{idx}"
1096                    );
1097                }
1098                Predicate::ContentRefNotNull => {
1099                    fused_clauses
1100                        .push_str("\n                      AND src.content_ref IS NOT NULL");
1101                }
1102                Predicate::JsonPathFusedEq { path, value } => {
1103                    binds.push(BindValue::Text(path.clone()));
1104                    let path_idx = binds.len();
1105                    binds.push(BindValue::Text(value.clone()));
1106                    let value_idx = binds.len();
1107                    let _ = write!(
1108                        fused_clauses,
1109                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1110                    );
1111                }
1112                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1113                    binds.push(BindValue::Text(path.clone()));
1114                    let path_idx = binds.len();
1115                    binds.push(BindValue::Integer(*value));
1116                    let value_idx = binds.len();
1117                    let operator = match op {
1118                        ComparisonOp::Gt => ">",
1119                        ComparisonOp::Gte => ">=",
1120                        ComparisonOp::Lt => "<",
1121                        ComparisonOp::Lte => "<=",
1122                    };
1123                    let _ = write!(
1124                        fused_clauses,
1125                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
1126                    );
1127                }
1128                Predicate::JsonPathFusedBoolEq { path, value } => {
1129                    binds.push(BindValue::Text(path.clone()));
1130                    let path_idx = binds.len();
1131                    binds.push(BindValue::Integer(i64::from(*value)));
1132                    let value_idx = binds.len();
1133                    let _ = write!(
1134                        fused_clauses,
1135                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1136                    );
1137                }
1138                Predicate::JsonPathFusedIn { path, values } => {
1139                    binds.push(BindValue::Text(path.clone()));
1140                    let first_param = binds.len();
1141                    for v in values {
1142                        binds.push(BindValue::Text(v.clone()));
1143                    }
1144                    let placeholders = (1..=values.len())
1145                        .map(|i| format!("?{}", first_param + i))
1146                        .collect::<Vec<_>>()
1147                        .join(", ");
1148                    let _ = write!(
1149                        fused_clauses,
1150                        "\n                      AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
1151                    );
1152                }
1153                Predicate::JsonPathEq { .. }
1154                | Predicate::JsonPathCompare { .. }
1155                | Predicate::JsonPathIn { .. }
1156                | Predicate::EdgePropertyEq { .. }
1157                | Predicate::EdgePropertyCompare { .. } => {
1158                    // JSON predicates are residual; compile_vector_search
1159                    // guarantees they never appear here, but stay defensive.
1160                    // Edge property predicates are not valid in the search path.
1161                }
1162            }
1163        }
1164
1165        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
1166        let mut filter_clauses = String::new();
1167        for predicate in &compiled.residual_filters {
1168            match predicate {
1169                Predicate::JsonPathEq { path, value } => {
1170                    binds.push(BindValue::Text(path.clone()));
1171                    let path_idx = binds.len();
1172                    binds.push(scalar_to_bind(value));
1173                    let value_idx = binds.len();
1174                    let _ = write!(
1175                        filter_clauses,
1176                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1177                    );
1178                }
1179                Predicate::JsonPathCompare { path, op, value } => {
1180                    binds.push(BindValue::Text(path.clone()));
1181                    let path_idx = binds.len();
1182                    binds.push(scalar_to_bind(value));
1183                    let value_idx = binds.len();
1184                    let operator = match op {
1185                        ComparisonOp::Gt => ">",
1186                        ComparisonOp::Gte => ">=",
1187                        ComparisonOp::Lt => "<",
1188                        ComparisonOp::Lte => "<=",
1189                    };
1190                    let _ = write!(
1191                        filter_clauses,
1192                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1193                    );
1194                }
1195                Predicate::JsonPathIn { path, values } => {
1196                    binds.push(BindValue::Text(path.clone()));
1197                    let first_param = binds.len();
1198                    for v in values {
1199                        binds.push(scalar_to_bind(v));
1200                    }
1201                    let placeholders = (1..=values.len())
1202                        .map(|i| format!("?{}", first_param + i))
1203                        .collect::<Vec<_>>()
1204                        .join(", ");
1205                    let _ = write!(
1206                        filter_clauses,
1207                        "\n  AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1208                    );
1209                }
1210                Predicate::KindEq(_)
1211                | Predicate::LogicalIdEq(_)
1212                | Predicate::SourceRefEq(_)
1213                | Predicate::ContentRefEq(_)
1214                | Predicate::ContentRefNotNull
1215                | Predicate::JsonPathFusedEq { .. }
1216                | Predicate::JsonPathFusedTimestampCmp { .. }
1217                | Predicate::JsonPathFusedBoolEq { .. }
1218                | Predicate::JsonPathFusedIn { .. }
1219                | Predicate::EdgePropertyEq { .. }
1220                | Predicate::EdgePropertyCompare { .. } => {
1221                    // Fusable predicates live in fused_clauses above.
1222                    // Edge property predicates are not valid in the search path.
1223                }
1224            }
1225        }
1226
1227        // Bind the outer limit as a named parameter for prepare_cached
1228        // stability across calls that vary only by limit value.
1229        let limit = compiled.limit;
1230        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1231        let limit_idx = binds.len();
1232
1233        // sqlite-vec requires the LIMIT/k constraint to be visible directly
1234        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
1235        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
1236        // Phase 12's planner may raise this to compensate for fusion
1237        // narrowing the candidate pool).
1238        let base_limit = limit;
1239        let kind_clause = if filter_by_kind {
1240            "\n                      AND src.kind = ?2"
1241        } else {
1242            ""
1243        };
1244
1245        // Derive the per-kind vec table name from the query's root kind.
1246        // An empty root_kind defaults to a sentinel that will not exist, causing
1247        // graceful degradation (was_degraded=true).
1248        let vec_table = if compiled.root_kind.is_empty() {
1249            "vec__unknown".to_owned()
1250        } else {
1251            fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1252        };
1253
1254        let sql = format!(
1255            "WITH vector_hits AS (
1256                SELECT
1257                    src.row_id AS row_id,
1258                    src.logical_id AS logical_id,
1259                    src.kind AS kind,
1260                    src.properties AS properties,
1261                    src.source_ref AS source_ref,
1262                    src.content_ref AS content_ref,
1263                    src.created_at AS created_at,
1264                    vc.distance AS distance,
1265                    vc.chunk_id AS chunk_id
1266                FROM (
1267                    SELECT chunk_id, distance
1268                    FROM {vec_table}
1269                    WHERE embedding MATCH ?1
1270                    LIMIT {base_limit}
1271                ) vc
1272                JOIN chunks c ON c.id = vc.chunk_id
1273                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1274                WHERE 1 = 1{kind_clause}{fused_clauses}
1275            )
1276            SELECT
1277                h.row_id,
1278                h.logical_id,
1279                h.kind,
1280                h.properties,
1281                h.content_ref,
1282                am.last_accessed_at,
1283                h.created_at,
1284                h.distance,
1285                h.chunk_id
1286            FROM vector_hits h
1287            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1288            WHERE 1 = 1{filter_clauses}
1289            ORDER BY h.distance ASC
1290            LIMIT ?{limit_idx}"
1291        );
1292
1293        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1294
1295        let conn_guard = match self.lock_connection() {
1296            Ok(g) => g,
1297            Err(e) => {
1298                self.telemetry.increment_errors();
1299                return Err(e);
1300            }
1301        };
1302        let mut statement = match conn_guard.prepare_cached(&sql) {
1303            Ok(stmt) => stmt,
1304            Err(e) if is_vec_table_absent(&e) => {
1305                // Capability miss: non-fatal — surface as was_degraded.
1306                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1307                    trace_warn!("vector table absent, degrading vector_search to empty result");
1308                }
1309                return Ok(SearchRows {
1310                    hits: Vec::new(),
1311                    strict_hit_count: 0,
1312                    relaxed_hit_count: 0,
1313                    vector_hit_count: 0,
1314                    fallback_used: false,
1315                    was_degraded: true,
1316                });
1317            }
1318            Err(e) => {
1319                self.telemetry.increment_errors();
1320                return Err(EngineError::Sqlite(e));
1321            }
1322        };
1323
1324        let attribution_requested = compiled.attribution_requested;
1325        let hits = match statement
1326            .query_map(params_from_iter(bind_values.iter()), |row| {
1327                let distance: f64 = row.get(7)?;
1328                // Score is the negated distance per addendum 1
1329                // §Vector-Specific Behavior / Score and distance. For
1330                // distance metrics (sqlite-vec's default) lower distance =
1331                // better match, so negating yields the higher-is-better
1332                // convention that dedup_branch_hits and the unified result
1333                // surface rely on.
1334                let score = -distance;
1335                Ok(SearchHit {
1336                    node: fathomdb_query::NodeRowLite {
1337                        row_id: row.get(0)?,
1338                        logical_id: row.get(1)?,
1339                        kind: row.get(2)?,
1340                        properties: row.get(3)?,
1341                        content_ref: row.get(4)?,
1342                        last_accessed_at: row.get(5)?,
1343                    },
1344                    written_at: row.get(6)?,
1345                    score,
1346                    modality: RetrievalModality::Vector,
1347                    source: SearchHitSource::Vector,
1348                    // Vector hits have no strict/relaxed notion.
1349                    match_mode: None,
1350                    // Vector hits have no snippet.
1351                    snippet: None,
1352                    projection_row_id: row.get::<_, Option<String>>(8)?,
1353                    vector_distance: Some(distance),
1354                    attribution: if attribution_requested {
1355                        Some(HitAttribution {
1356                            matched_paths: Vec::new(),
1357                        })
1358                    } else {
1359                        None
1360                    },
1361                })
1362            })
1363            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1364        {
1365            Ok(rows) => rows,
1366            Err(e) => {
1367                // Some SQLite errors surface during row iteration (e.g. when
1368                // the vec0 extension is not loaded but the table exists as a
1369                // stub). Classify as capability-miss when the shape matches.
1370                if is_vec_table_absent(&e) {
1371                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1372                        trace_warn!(
1373                            "vector table absent at query time, degrading vector_search to empty result"
1374                        );
1375                    }
1376                    drop(statement);
1377                    drop(conn_guard);
1378                    return Ok(SearchRows {
1379                        hits: Vec::new(),
1380                        strict_hit_count: 0,
1381                        relaxed_hit_count: 0,
1382                        vector_hit_count: 0,
1383                        fallback_used: false,
1384                        was_degraded: true,
1385                    });
1386                }
1387                self.telemetry.increment_errors();
1388                return Err(EngineError::Sqlite(e));
1389            }
1390        };
1391
1392        drop(statement);
1393        drop(conn_guard);
1394
1395        self.telemetry.increment_queries();
1396        let vector_hit_count = hits.len();
1397        Ok(SearchRows {
1398            hits,
1399            strict_hit_count: 0,
1400            relaxed_hit_count: 0,
1401            vector_hit_count,
1402            fallback_used: false,
1403            was_degraded: false,
1404        })
1405    }
1406
1407    /// Execute a Pack F1 [`CompiledSemanticSearch`].
1408    ///
1409    /// Pre-KNN checks follow the design doc §Failure And Degradation
1410    /// Semantics:
1411    ///   - No active `vector_embedding_profiles` row → hard error
1412    ///     [`EngineError::EmbedderNotConfigured`].
1413    ///   - No enabled `vector_index_schemas` row for `compiled.root_kind` →
1414    ///     hard error [`EngineError::KindNotVectorIndexed`].
1415    ///   - `vector_index_schemas.state = 'stale'` → empty result with
1416    ///     `was_degraded = true`.
1417    ///   - Embedder missing or returns [`crate::EmbedderError::Unavailable`]
1418    ///     / `Failed` → empty result with `was_degraded = true`.
1419    ///   - Embedder output length ≠ profile `dimensions` → hard error
1420    ///     [`EngineError::DimensionMismatch`].
1421    ///
1422    /// On success the call reuses [`Self::execute_compiled_vector_search`]
1423    /// for the actual KNN / SQL execution by constructing an internal
1424    /// [`CompiledVectorSearch`].
1425    ///
1426    /// # Errors
1427    /// See the variants above. SQL errors propagate as [`EngineError::Sqlite`].
1428    pub fn execute_compiled_semantic_search(
1429        &self,
1430        compiled: &CompiledSemanticSearch,
1431    ) -> Result<SearchRows, EngineError> {
1432        // 1. Active profile must exist.
1433        let profile_dim = self
1434            .active_profile_dimensions()?
1435            .ok_or(EngineError::EmbedderNotConfigured)?;
1436
1437        // 2. Per-kind vector schema must exist and be enabled.
1438        let schema_state = self.enabled_vec_kind_state(&compiled.root_kind)?;
1439        let Some(state) = schema_state else {
1440            return Err(EngineError::KindNotVectorIndexed {
1441                kind: compiled.root_kind.clone(),
1442            });
1443        };
1444
1445        // 3. Stale schema → degrade.
1446        if state == "stale" {
1447            return Ok(SearchRows {
1448                was_degraded: true,
1449                ..SearchRows::default()
1450            });
1451        }
1452
1453        // 4. Embedder must be available.
1454        let Some(embedder) = self.query_embedder.as_ref() else {
1455            return Ok(SearchRows {
1456                was_degraded: true,
1457                ..SearchRows::default()
1458            });
1459        };
1460        let Ok(embedding) = embedder.embed_query(&compiled.text) else {
1461            return Ok(SearchRows {
1462                was_degraded: true,
1463                ..SearchRows::default()
1464            });
1465        };
1466
1467        // 5. Validate embedding dimensions.
1468        if embedding.len() != profile_dim {
1469            return Err(EngineError::DimensionMismatch {
1470                expected: profile_dim,
1471                actual: embedding.len(),
1472            });
1473        }
1474
1475        // 6. Delegate to the existing KNN executor via a CompiledVectorSearch.
1476        let literal = serde_json::to_string(&embedding)
1477            .map_err(|e| EngineError::Bridge(format!("serialize query embedding: {e}")))?;
1478        let inner = CompiledVectorSearch {
1479            root_kind: compiled.root_kind.clone(),
1480            query_text: literal,
1481            limit: compiled.limit,
1482            fusable_filters: Vec::new(),
1483            residual_filters: Vec::new(),
1484            attribution_requested: false,
1485        };
1486        self.execute_compiled_vector_search(&inner)
1487    }
1488
1489    /// Execute a Pack F1 [`CompiledRawVectorSearch`].
1490    ///
1491    /// Pre-KNN checks:
1492    ///   - No active `vector_embedding_profiles` row → hard error
1493    ///     [`EngineError::EmbedderNotConfigured`] (the profile's
1494    ///     `dimensions` is the source of truth for the expected vector
1495    ///     length, so a missing profile is a configuration error even
1496    ///     though the raw path bypasses the embedder itself).
1497    ///   - No enabled `vector_index_schemas` row for `compiled.root_kind` →
1498    ///     hard error [`EngineError::KindNotVectorIndexed`].
1499    ///   - `compiled.vec.len() != profile.dimensions` → hard error
1500    ///     [`EngineError::DimensionMismatch`].
1501    ///   - `vector_index_schemas.state = 'stale'` → empty result with
1502    ///     `was_degraded = true`.
1503    ///
1504    /// # Errors
1505    /// See the variants above. SQL errors propagate as [`EngineError::Sqlite`].
1506    pub fn execute_compiled_raw_vector_search(
1507        &self,
1508        compiled: &CompiledRawVectorSearch,
1509    ) -> Result<SearchRows, EngineError> {
1510        let profile_dim = self
1511            .active_profile_dimensions()?
1512            .ok_or(EngineError::EmbedderNotConfigured)?;
1513
1514        let schema_state = self.enabled_vec_kind_state(&compiled.root_kind)?;
1515        let Some(state) = schema_state else {
1516            return Err(EngineError::KindNotVectorIndexed {
1517                kind: compiled.root_kind.clone(),
1518            });
1519        };
1520
1521        if compiled.vec.len() != profile_dim {
1522            return Err(EngineError::DimensionMismatch {
1523                expected: profile_dim,
1524                actual: compiled.vec.len(),
1525            });
1526        }
1527
1528        if state == "stale" {
1529            return Ok(SearchRows {
1530                was_degraded: true,
1531                ..SearchRows::default()
1532            });
1533        }
1534
1535        let literal = serde_json::to_string(&compiled.vec)
1536            .map_err(|e| EngineError::Bridge(format!("serialize raw vector: {e}")))?;
1537        let inner = CompiledVectorSearch {
1538            root_kind: compiled.root_kind.clone(),
1539            query_text: literal,
1540            limit: compiled.limit,
1541            fusable_filters: Vec::new(),
1542            residual_filters: Vec::new(),
1543            attribution_requested: false,
1544        };
1545        self.execute_compiled_vector_search(&inner)
1546    }
1547
1548    /// Return the active embedding profile's `dimensions` column, or `None`
1549    /// if no active profile is configured.
1550    fn active_profile_dimensions(&self) -> Result<Option<usize>, EngineError> {
1551        let conn = self.lock_connection()?;
1552        let dim: Option<i64> = conn
1553            .query_row(
1554                "SELECT dimensions FROM vector_embedding_profiles WHERE active = 1",
1555                [],
1556                |row| row.get::<_, i64>(0),
1557            )
1558            .optional()?;
1559        Ok(dim.and_then(|d| usize::try_from(d).ok()))
1560    }
1561
1562    /// Return the `state` of the `vector_index_schemas` row for `kind` if
1563    /// one exists and is enabled, else `None`.
1564    fn enabled_vec_kind_state(&self, kind: &str) -> Result<Option<String>, EngineError> {
1565        let conn = self.lock_connection()?;
1566        let row: Option<(i64, String)> = conn
1567            .query_row(
1568                "SELECT enabled, state FROM vector_index_schemas WHERE kind = ?1",
1569                rusqlite::params![kind],
1570                |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1571            )
1572            .optional()?;
1573        Ok(row.and_then(|(enabled, state)| if enabled == 1 { Some(state) } else { None }))
1574    }
1575
1576    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1577    /// entry point) and return deterministically ranked, block-ordered
1578    /// [`SearchRows`].
1579    ///
1580    /// Stages, per addendum 1 §Retrieval Planner Model:
1581    ///
1582    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1583    ///    empty branch result inside `run_search_branch`).
1584    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1585    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1586    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1587    ///    Phase 6 text-only path.
1588    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1589    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1590    ///    planner never wires a vector branch through `search()`, so this
1591    ///    code path is structurally present but dormant.** A future phase
1592    ///    that wires read-time embedding into `compile_retrieval_plan` will
1593    ///    immediately light it up.
1594    /// 4. **Fusion.** All collected hits are merged via
1595    ///    [`merge_search_branches_three`], which produces strict ->
1596    ///    relaxed -> vector block ordering with cross-branch dedup
1597    ///    resolved by branch precedence.
1598    ///
1599    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1600    /// addendum's "vector capability miss => `was_degraded`" semantics
1601    /// applies to `search()` only when the unified planner actually fires
1602    /// the vector branch, which v1 never does.
1603    ///
1604    /// # Errors
1605    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1606    /// executed for a non-capability-miss reason.
1607    pub fn execute_retrieval_plan(
1608        &self,
1609        plan: &CompiledRetrievalPlan,
1610        raw_query: &str,
1611    ) -> Result<SearchRows, EngineError> {
1612        // Phase 12.5a: we work against a local owned copy so
1613        // `fill_vector_branch` can mutate `plan.vector` and
1614        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1615        // a bounded set of predicates + text AST nodes) and avoids
1616        // forcing callers to hand us `&mut` access.
1617        let mut plan = plan.clone();
1618        let limit = plan.text.strict.limit;
1619
1620        // Stage 1: text strict.
1621        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1622
1623        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1624        // path uses.
1625        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1626        let strict_underfilled = strict_hits.len() < fallback_threshold;
1627        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1628        let mut fallback_used = false;
1629        let mut was_degraded = false;
1630        if let Some(relaxed) = plan.text.relaxed.as_ref()
1631            && strict_underfilled
1632        {
1633            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1634            fallback_used = true;
1635            was_degraded = plan.was_degraded_at_plan_time;
1636        }
1637
1638        // Phase 12.5a: fill the vector branch from the configured
1639        // read-time query embedder, if any. Option (b) from the spec:
1640        // only pay the embedding cost when the text branches returned
1641        // nothing, because the three-branch stage gate below only runs
1642        // the vector stage under exactly that condition. This keeps the
1643        // hot path (strict text matched) embedder-free.
1644        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1645        if text_branches_empty && self.query_embedder.is_some() {
1646            self.fill_vector_branch(&mut plan, raw_query);
1647        }
1648
1649        // Stage 3: vector. Runs only when text retrieval is empty AND a
1650        // vector branch is present. When no embedder is configured (Phase
1651        // 12.5a default) `plan.vector` stays `None` and this stage is a
1652        // no-op, preserving the Phase 12 v1 dormancy invariant.
1653        let mut vector_hits: Vec<SearchHit> = Vec::new();
1654        if let Some(vector) = plan.vector.as_ref()
1655            && strict_hits.is_empty()
1656            && relaxed_hits.is_empty()
1657        {
1658            let vector_rows = self.execute_compiled_vector_search(vector)?;
1659            // `execute_compiled_vector_search` returns a fully populated
1660            // `SearchRows`. Promote its hits into the merge stage and lift
1661            // its capability-miss `was_degraded` flag onto the unified
1662            // result, per addendum §Vector-Specific Behavior.
1663            vector_hits = vector_rows.hits;
1664            if vector_rows.was_degraded {
1665                was_degraded = true;
1666            }
1667        }
1668        // Phase 12.5a: an embedder-reported capability miss surfaces as
1669        // `plan.was_degraded_at_plan_time = true` set inside
1670        // `fill_vector_branch`. Lift it onto the response so callers see
1671        // the graceful degradation even when the vector stage-gate never
1672        // had the chance to fire (the embedder call itself failed and
1673        // the vector slot stayed `None`).
1674        if text_branches_empty
1675            && plan.was_degraded_at_plan_time
1676            && plan.vector.is_none()
1677            && self.query_embedder.is_some()
1678        {
1679            was_degraded = true;
1680        }
1681
1682        // Stage 4: fusion.
1683        let strict = &plan.text.strict;
1684        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1685        if strict.attribution_requested {
1686            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1687            self.populate_attribution_for_hits(
1688                &mut merged,
1689                &strict.text_query,
1690                relaxed_text_query,
1691            )?;
1692        }
1693
1694        let strict_hit_count = merged
1695            .iter()
1696            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1697            .count();
1698        let relaxed_hit_count = merged
1699            .iter()
1700            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1701            .count();
1702        let vector_hit_count = merged
1703            .iter()
1704            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1705            .count();
1706
1707        Ok(SearchRows {
1708            hits: merged,
1709            strict_hit_count,
1710            relaxed_hit_count,
1711            vector_hit_count,
1712            fallback_used,
1713            was_degraded,
1714        })
1715    }
1716
1717    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1718    /// query embedder, if any.
1719    ///
1720    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1721    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1722    /// - Both the strict and relaxed text branches already ran and
1723    ///   returned zero hits, so the existing three-branch stage gate
1724    ///   will actually fire the vector stage once the slot is populated.
1725    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1726    ///   cost entirely when text retrieval already won.
1727    ///
1728    /// Contract: never panics, never returns an error. On embedder error
1729    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1730    /// `plan.vector` as `None`; the coordinator's normal error-free
1731    /// degradation path then reports `was_degraded` on the result.
1732    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1733        let Some(embedder) = self.query_embedder.as_ref() else {
1734            return;
1735        };
1736        match embedder.embed_query(raw_query) {
1737            Ok(vec) => {
1738                // `CompiledVectorSearch::query_text` is a JSON float-array
1739                // literal at the time the coordinator binds it (see the
1740                // `CompiledVectorSearch` docs). `serde_json::to_string`
1741                // on a `Vec<f32>` produces exactly that shape — no
1742                // wire-format change required.
1743                let literal = match serde_json::to_string(&vec) {
1744                    Ok(s) => s,
1745                    Err(err) => {
1746                        trace_warn!(
1747                            error = %err,
1748                            "query embedder vector serialization failed; skipping vector branch"
1749                        );
1750                        let _ = err; // Used by trace_warn! when tracing feature is active
1751                        plan.was_degraded_at_plan_time = true;
1752                        return;
1753                    }
1754                };
1755                let strict = &plan.text.strict;
1756                plan.vector = Some(CompiledVectorSearch {
1757                    root_kind: strict.root_kind.clone(),
1758                    query_text: literal,
1759                    limit: strict.limit,
1760                    fusable_filters: strict.fusable_filters.clone(),
1761                    residual_filters: strict.residual_filters.clone(),
1762                    attribution_requested: strict.attribution_requested,
1763                });
1764            }
1765            Err(err) => {
1766                trace_warn!(
1767                    error = %err,
1768                    "query embedder unavailable, skipping vector branch"
1769                );
1770                let _ = err; // Used by trace_warn! when tracing feature is active
1771                plan.was_degraded_at_plan_time = true;
1772            }
1773        }
1774    }
1775
1776    /// Execute a single search branch against the underlying FTS surfaces.
1777    ///
1778    /// This is the shared SQL emission path used by
1779    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1780    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1781    /// The returned hits are tagged with `branch`'s corresponding
1782    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1783    /// caller is responsible for merging multiple branches.
1784    #[allow(clippy::too_many_lines)]
1785    fn run_search_branch(
1786        &self,
1787        compiled: &CompiledSearch,
1788        branch: SearchBranch,
1789    ) -> Result<Vec<SearchHit>, EngineError> {
1790        use std::fmt::Write as _;
1791        // Short-circuit an empty/whitespace-only query: rendering it would
1792        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1793        // (including the adaptive path when strict is Empty and derive_relaxed
1794        // returns None) must see an empty result, not an error. Each branch
1795        // is short-circuited independently so a strict-Empty + relaxed-Some
1796        // plan still exercises the relaxed branch.
1797        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1798        // matches "every row not containing X" — a complement-of-corpus scan
1799        // that no caller would intentionally want. Short-circuit to empty at
1800        // the root only; a `Not` nested inside an `And` is a legitimate
1801        // exclusion and must still run.
1802        if matches!(
1803            compiled.text_query,
1804            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1805        ) {
1806            return Ok(Vec::new());
1807        }
1808        let rendered_base = render_text_query_fts5(&compiled.text_query);
1809        // Apply per-kind query-side tokenizer adaptations.
1810        // SubstringTrigram: queries shorter than 3 chars produce no results
1811        // (trigram index cannot match them). Return empty instead of an FTS5
1812        // error or a full-table scan.
1813        // SourceCode: render_text_query_fts5 already wraps every term in FTS5
1814        // double-quote delimiters (e.g. "std.io"). Applying escape_source_code_query
1815        // on that already-rendered output corrupts the expression — the escape
1816        // function sees the surrounding '"' characters and '.' separator and
1817        // produces malformed syntax like '"std"."io""'. The phrase quoting from
1818        // render_text_query_fts5 is sufficient: FTS5 applies the tokenizer
1819        // (including custom tokenchars) inside double-quoted phrase expressions,
1820        // so "std.io" correctly matches the single token 'std.io'.
1821        let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1822        if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1823            && rendered_base
1824                .chars()
1825                .filter(|c| c.is_alphanumeric())
1826                .count()
1827                < 3
1828        {
1829            return Ok(Vec::new());
1830        }
1831        let rendered = rendered_base;
1832        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1833        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1834        // The adaptive `text_search()` path never produces an empty root_kind
1835        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1836        // the entry point.
1837        let filter_by_kind = !compiled.root_kind.is_empty();
1838
1839        // Acquire the connection early so we can check per-kind table existence
1840        // before building the bind array and SQL. The bind numbering depends on
1841        // whether the property FTS UNION arm is included.
1842        let conn_guard = match self.lock_connection() {
1843            Ok(g) => g,
1844            Err(e) => {
1845                self.telemetry.increment_errors();
1846                return Err(e);
1847            }
1848        };
1849
1850        // Determine which per-kind property FTS tables to include in the UNION arm.
1851        //
1852        // filter_by_kind = true (root_kind set): include the single per-kind table
1853        //   for root_kind if it exists. Bind order: ?1=chunk_text, ?2=kind, ?3=prop_text.
1854        //
1855        // filter_by_kind = false (fallback search, root_kind empty): include per-kind tables
1856        //   based on fusable KindEq predicates or all registered tables from sqlite_master.
1857        //   A single KindEq in fusable_filters lets us use one specific table. With no KindEq
1858        //   we UNION all per-kind tables so kind-less fallback searches still find property
1859        //   hits (matching the behaviour of the former global fts_node_properties table).
1860        //   Bind order: ?1=chunk_text, ?2=prop_text (shared across all prop UNION arms).
1861        //
1862        // In both cases, per-kind tables are already kind-specific so no `fp.kind = ?` filter
1863        // is needed inside the inner arm; the outer `search_hits` CTE WHERE clause handles
1864        // any further kind narrowing via fused KindEq predicates.
1865        // prop_fts_tables is Vec<(kind, table_name)> so we can later look up per-kind
1866        // BM25 weights from fts_property_schemas when building the scoring expression.
1867        let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1868            let kind = compiled.root_kind.clone();
1869            let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1870            let exists: bool = conn_guard
1871                .query_row(
1872                    "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1873                    rusqlite::params![prop_table],
1874                    |_| Ok(true),
1875                )
1876                .optional()
1877                .map_err(EngineError::Sqlite)?
1878                .unwrap_or(false);
1879            if exists {
1880                vec![(kind, prop_table)]
1881            } else {
1882                vec![]
1883            }
1884        } else {
1885            // Fallback / kind-less search: find the right per-kind tables.
1886            // If there is exactly one KindEq in fusable_filters, use that kind's table.
1887            // Otherwise, include all registered per-kind tables from sqlite_master so
1888            // that kind-less fallback searches can still return property FTS hits.
1889            let kind_eq_values: Vec<String> = compiled
1890                .fusable_filters
1891                .iter()
1892                .filter_map(|p| match p {
1893                    Predicate::KindEq(k) => Some(k.clone()),
1894                    _ => None,
1895                })
1896                .collect();
1897            if kind_eq_values.len() == 1 {
1898                let kind = kind_eq_values[0].clone();
1899                let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1900                let exists: bool = conn_guard
1901                    .query_row(
1902                        "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1903                        rusqlite::params![prop_table],
1904                        |_| Ok(true),
1905                    )
1906                    .optional()
1907                    .map_err(EngineError::Sqlite)?
1908                    .unwrap_or(false);
1909                if exists {
1910                    vec![(kind, prop_table)]
1911                } else {
1912                    vec![]
1913                }
1914            } else {
1915                // No single KindEq: UNION all per-kind tables so kind-less fallback
1916                // searches behave like the former global fts_node_properties table.
1917                // Fetch registered kinds and compute/verify their per-kind table names.
1918                let mut stmt = conn_guard
1919                    .prepare("SELECT kind FROM fts_property_schemas")
1920                    .map_err(EngineError::Sqlite)?;
1921                let all_kinds: Vec<String> = stmt
1922                    .query_map([], |r| r.get::<_, String>(0))
1923                    .map_err(EngineError::Sqlite)?
1924                    .collect::<Result<Vec<_>, _>>()
1925                    .map_err(EngineError::Sqlite)?;
1926                drop(stmt);
1927                let mut result = Vec::new();
1928                for kind in all_kinds {
1929                    let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1930                    let exists: bool = conn_guard
1931                        .query_row(
1932                            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1933                            rusqlite::params![prop_table],
1934                            |_| Ok(true),
1935                        )
1936                        .optional()
1937                        .map_err(EngineError::Sqlite)?
1938                        .unwrap_or(false);
1939                    if exists {
1940                        result.push((kind, prop_table));
1941                    }
1942                }
1943                result
1944            }
1945        };
1946        let use_prop_fts = !prop_fts_tables.is_empty();
1947
1948        // Bind layout (before fused/residual predicates and limit):
1949        //   filter_by_kind = true,  use_prop_fts = true:  ?1=chunk_text, ?2=kind, ?3=prop_text
1950        //   filter_by_kind = true,  use_prop_fts = false: ?1=chunk_text, ?2=kind
1951        //   filter_by_kind = false, use_prop_fts = true:  ?1=chunk_text, ?2=prop_text
1952        //   filter_by_kind = false, use_prop_fts = false: ?1=chunk_text
1953        let mut binds: Vec<BindValue> = if filter_by_kind {
1954            if use_prop_fts {
1955                vec![
1956                    BindValue::Text(rendered.clone()),
1957                    BindValue::Text(compiled.root_kind.clone()),
1958                    BindValue::Text(rendered),
1959                ]
1960            } else {
1961                vec![
1962                    BindValue::Text(rendered.clone()),
1963                    BindValue::Text(compiled.root_kind.clone()),
1964                ]
1965            }
1966        } else if use_prop_fts {
1967            // fallback search with property FTS: ?1=chunk, ?2=prop (same query value)
1968            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1969        } else {
1970            vec![BindValue::Text(rendered)]
1971        };
1972
1973        // P2-5: both fusable and residual predicates now match against the
1974        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1975        // `u.content_ref`, `u.properties`) because the inner UNION arms
1976        // project the full active-row column set through the
1977        // `JOIN nodes src` already present in each arm. The previous
1978        // implementation re-joined `nodes hn` at the CTE level and
1979        // `nodes n` again at the outer SELECT, which was triple work on
1980        // the hot search path.
1981        let mut fused_clauses = String::new();
1982        for predicate in &compiled.fusable_filters {
1983            match predicate {
1984                Predicate::KindEq(kind) => {
1985                    binds.push(BindValue::Text(kind.clone()));
1986                    let idx = binds.len();
1987                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1988                }
1989                Predicate::LogicalIdEq(logical_id) => {
1990                    binds.push(BindValue::Text(logical_id.clone()));
1991                    let idx = binds.len();
1992                    let _ = write!(
1993                        fused_clauses,
1994                        "\n                  AND u.logical_id = ?{idx}"
1995                    );
1996                }
1997                Predicate::SourceRefEq(source_ref) => {
1998                    binds.push(BindValue::Text(source_ref.clone()));
1999                    let idx = binds.len();
2000                    let _ = write!(
2001                        fused_clauses,
2002                        "\n                  AND u.source_ref = ?{idx}"
2003                    );
2004                }
2005                Predicate::ContentRefEq(uri) => {
2006                    binds.push(BindValue::Text(uri.clone()));
2007                    let idx = binds.len();
2008                    let _ = write!(
2009                        fused_clauses,
2010                        "\n                  AND u.content_ref = ?{idx}"
2011                    );
2012                }
2013                Predicate::ContentRefNotNull => {
2014                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
2015                }
2016                Predicate::JsonPathFusedEq { path, value } => {
2017                    binds.push(BindValue::Text(path.clone()));
2018                    let path_idx = binds.len();
2019                    binds.push(BindValue::Text(value.clone()));
2020                    let value_idx = binds.len();
2021                    let _ = write!(
2022                        fused_clauses,
2023                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
2024                    );
2025                }
2026                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
2027                    binds.push(BindValue::Text(path.clone()));
2028                    let path_idx = binds.len();
2029                    binds.push(BindValue::Integer(*value));
2030                    let value_idx = binds.len();
2031                    let operator = match op {
2032                        ComparisonOp::Gt => ">",
2033                        ComparisonOp::Gte => ">=",
2034                        ComparisonOp::Lt => "<",
2035                        ComparisonOp::Lte => "<=",
2036                    };
2037                    let _ = write!(
2038                        fused_clauses,
2039                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
2040                    );
2041                }
2042                Predicate::JsonPathFusedBoolEq { path, value } => {
2043                    binds.push(BindValue::Text(path.clone()));
2044                    let path_idx = binds.len();
2045                    binds.push(BindValue::Integer(i64::from(*value)));
2046                    let value_idx = binds.len();
2047                    let _ = write!(
2048                        fused_clauses,
2049                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
2050                    );
2051                }
2052                Predicate::JsonPathFusedIn { path, values } => {
2053                    binds.push(BindValue::Text(path.clone()));
2054                    let first_param = binds.len();
2055                    for v in values {
2056                        binds.push(BindValue::Text(v.clone()));
2057                    }
2058                    let placeholders = (1..=values.len())
2059                        .map(|i| format!("?{}", first_param + i))
2060                        .collect::<Vec<_>>()
2061                        .join(", ");
2062                    let _ = write!(
2063                        fused_clauses,
2064                        "\n                  AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
2065                    );
2066                }
2067                Predicate::JsonPathEq { .. }
2068                | Predicate::JsonPathCompare { .. }
2069                | Predicate::JsonPathIn { .. }
2070                | Predicate::EdgePropertyEq { .. }
2071                | Predicate::EdgePropertyCompare { .. } => {
2072                    // Should be in residual_filters; compile_search guarantees
2073                    // this, but stay defensive.
2074                    // Edge property predicates are not valid in the search path.
2075                }
2076            }
2077        }
2078
2079        let mut filter_clauses = String::new();
2080        for predicate in &compiled.residual_filters {
2081            match predicate {
2082                Predicate::JsonPathEq { path, value } => {
2083                    binds.push(BindValue::Text(path.clone()));
2084                    let path_idx = binds.len();
2085                    binds.push(scalar_to_bind(value));
2086                    let value_idx = binds.len();
2087                    let _ = write!(
2088                        filter_clauses,
2089                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
2090                    );
2091                }
2092                Predicate::JsonPathCompare { path, op, value } => {
2093                    binds.push(BindValue::Text(path.clone()));
2094                    let path_idx = binds.len();
2095                    binds.push(scalar_to_bind(value));
2096                    let value_idx = binds.len();
2097                    let operator = match op {
2098                        ComparisonOp::Gt => ">",
2099                        ComparisonOp::Gte => ">=",
2100                        ComparisonOp::Lt => "<",
2101                        ComparisonOp::Lte => "<=",
2102                    };
2103                    let _ = write!(
2104                        filter_clauses,
2105                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
2106                    );
2107                }
2108                Predicate::JsonPathIn { path, values } => {
2109                    binds.push(BindValue::Text(path.clone()));
2110                    let first_param = binds.len();
2111                    for v in values {
2112                        binds.push(scalar_to_bind(v));
2113                    }
2114                    let placeholders = (1..=values.len())
2115                        .map(|i| format!("?{}", first_param + i))
2116                        .collect::<Vec<_>>()
2117                        .join(", ");
2118                    let _ = write!(
2119                        filter_clauses,
2120                        "\n  AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
2121                    );
2122                }
2123                Predicate::KindEq(_)
2124                | Predicate::LogicalIdEq(_)
2125                | Predicate::SourceRefEq(_)
2126                | Predicate::ContentRefEq(_)
2127                | Predicate::ContentRefNotNull
2128                | Predicate::JsonPathFusedEq { .. }
2129                | Predicate::JsonPathFusedTimestampCmp { .. }
2130                | Predicate::JsonPathFusedBoolEq { .. }
2131                | Predicate::JsonPathFusedIn { .. }
2132                | Predicate::EdgePropertyEq { .. }
2133                | Predicate::EdgePropertyCompare { .. } => {
2134                    // Fusable predicates live in fused_clauses; compile_search
2135                    // partitions them out of residual_filters.
2136                    // Edge property predicates are not valid in the search path.
2137                }
2138            }
2139        }
2140
2141        // Bind `limit` as an integer parameter rather than formatting it into
2142        // the SQL string. Interpolating the limit made the prepared-statement
2143        // SQL vary by limit value, so rusqlite's default 16-slot
2144        // `prepare_cached` cache thrashed for paginated callers that varied
2145        // limits per call. With the bind the SQL is structurally stable for
2146        // a given filter shape regardless of `limit` value.
2147        let limit = compiled.limit;
2148        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
2149        let limit_idx = binds.len();
2150        // P2-5: the inner UNION arms project the full active-row column
2151        // set through `JOIN nodes src` (kind, row_id, source_ref,
2152        // content_ref, content_hash, created_at, properties). Both the
2153        // CTE's outer WHERE and the final SELECT consume those columns
2154        // directly, which eliminates the previous `JOIN nodes hn` at the
2155        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
2156        // redundant joins on the hot search path. `src.superseded_at IS
2157        // NULL` in each arm already filters retired rows, which is what
2158        // the dropped outer joins used to do.
2159        //
2160        // Property FTS uses per-kind tables (fts_props_<kind>). One UNION arm
2161        // is generated per table in prop_fts_tables. The prop text bind index
2162        // is ?3 when filter_by_kind=true (after chunk_text and kind binds) or
2163        // ?2 when filter_by_kind=false (after chunk_text only). The same bind
2164        // position is reused by every prop arm, which is valid in SQLite.
2165        let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
2166        let prop_arm_sql: String = if use_prop_fts {
2167            prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
2168                // Load schema for this kind to compute BM25 weights.
2169                let bm25_expr = conn_guard
2170                    .query_row(
2171                        "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
2172                        rusqlite::params![kind],
2173                        |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
2174                    )
2175                    .ok()
2176                    .map_or_else(
2177                        || format!("bm25({prop_table})"),
2178                        |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
2179                    );
2180                // For weighted (per-column) schemas text_content does not exist;
2181                // use an empty snippet rather than a column reference that would fail.
2182                let is_weighted = bm25_expr != format!("bm25({prop_table})");
2183                let snippet_expr = if is_weighted {
2184                    "'' AS snippet".to_owned()
2185                } else {
2186                    "substr(fp.text_content, 1, 200) AS snippet".to_owned()
2187                };
2188                let _ = write!(
2189                    acc,
2190                    "
2191                    UNION ALL
2192                    SELECT
2193                        src.row_id AS row_id,
2194                        fp.node_logical_id AS logical_id,
2195                        src.kind AS kind,
2196                        src.properties AS properties,
2197                        src.source_ref AS source_ref,
2198                        src.content_ref AS content_ref,
2199                        src.created_at AS created_at,
2200                        -{bm25_expr} AS score,
2201                        'property' AS source,
2202                        {snippet_expr},
2203                        CAST(fp.rowid AS TEXT) AS projection_row_id
2204                    FROM {prop_table} fp
2205                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
2206                    WHERE {prop_table} MATCH ?{prop_bind_idx}"
2207                );
2208                acc
2209            })
2210        } else {
2211            String::new()
2212        };
2213        let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
2214            ("?1", "\n                      AND src.kind = ?2")
2215        } else {
2216            ("?1", "")
2217        };
2218        let sql = format!(
2219            "WITH search_hits AS (
2220                SELECT
2221                    u.row_id AS row_id,
2222                    u.logical_id AS logical_id,
2223                    u.kind AS kind,
2224                    u.properties AS properties,
2225                    u.source_ref AS source_ref,
2226                    u.content_ref AS content_ref,
2227                    u.created_at AS created_at,
2228                    u.score AS score,
2229                    u.source AS source,
2230                    u.snippet AS snippet,
2231                    u.projection_row_id AS projection_row_id
2232                FROM (
2233                    SELECT
2234                        src.row_id AS row_id,
2235                        c.node_logical_id AS logical_id,
2236                        src.kind AS kind,
2237                        src.properties AS properties,
2238                        src.source_ref AS source_ref,
2239                        src.content_ref AS content_ref,
2240                        src.created_at AS created_at,
2241                        -bm25(fts_nodes) AS score,
2242                        'chunk' AS source,
2243                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
2244                        f.chunk_id AS projection_row_id
2245                    FROM fts_nodes f
2246                    JOIN chunks c ON c.id = f.chunk_id
2247                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
2248                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
2249                ) u
2250                WHERE 1 = 1{fused_clauses}
2251                ORDER BY u.score DESC
2252                LIMIT ?{limit_idx}
2253            )
2254            SELECT
2255                h.row_id,
2256                h.logical_id,
2257                h.kind,
2258                h.properties,
2259                h.content_ref,
2260                am.last_accessed_at,
2261                h.created_at,
2262                h.score,
2263                h.source,
2264                h.snippet,
2265                h.projection_row_id
2266            FROM search_hits h
2267            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
2268            WHERE 1 = 1{filter_clauses}
2269            ORDER BY h.score DESC"
2270        );
2271
2272        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
2273
2274        let mut statement = match conn_guard.prepare_cached(&sql) {
2275            Ok(stmt) => stmt,
2276            Err(e) => {
2277                self.telemetry.increment_errors();
2278                return Err(EngineError::Sqlite(e));
2279            }
2280        };
2281
2282        let hits = match statement
2283            .query_map(params_from_iter(bind_values.iter()), |row| {
2284                let source_str: String = row.get(8)?;
2285                // The CTE emits only two literal values here: `'chunk'` and
2286                // `'property'`. Default to `Chunk` on anything unexpected so a
2287                // schema drift surfaces as a mislabelled hit rather than a
2288                // row-level error.
2289                let source = if source_str == "property" {
2290                    SearchHitSource::Property
2291                } else {
2292                    SearchHitSource::Chunk
2293                };
2294                let match_mode = match branch {
2295                    SearchBranch::Strict => SearchMatchMode::Strict,
2296                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
2297                };
2298                Ok(SearchHit {
2299                    node: fathomdb_query::NodeRowLite {
2300                        row_id: row.get(0)?,
2301                        logical_id: row.get(1)?,
2302                        kind: row.get(2)?,
2303                        properties: row.get(3)?,
2304                        content_ref: row.get(4)?,
2305                        last_accessed_at: row.get(5)?,
2306                    },
2307                    written_at: row.get(6)?,
2308                    score: row.get(7)?,
2309                    // Phase 10: every branch currently emits text hits.
2310                    modality: RetrievalModality::Text,
2311                    source,
2312                    match_mode: Some(match_mode),
2313                    snippet: row.get(9)?,
2314                    projection_row_id: row.get(10)?,
2315                    vector_distance: None,
2316                    attribution: None,
2317                })
2318            })
2319            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
2320        {
2321            Ok(rows) => rows,
2322            Err(e) => {
2323                self.telemetry.increment_errors();
2324                return Err(EngineError::Sqlite(e));
2325            }
2326        };
2327
2328        // Drop the statement so `conn_guard` is free (attribution is
2329        // resolved after dedup in `execute_compiled_search_plan` to avoid
2330        // spending highlight lookups on hits that will be discarded).
2331        drop(statement);
2332        drop(conn_guard);
2333
2334        self.telemetry.increment_queries();
2335        Ok(hits)
2336    }
2337
2338    /// Populate per-hit attribution for the given deduped merged hits.
2339    /// Runs after [`merge_search_branches`] so dropped duplicates do not
2340    /// incur the highlight+position-map lookup cost.
2341    fn populate_attribution_for_hits(
2342        &self,
2343        hits: &mut [SearchHit],
2344        strict_text_query: &fathomdb_query::TextQuery,
2345        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
2346    ) -> Result<(), EngineError> {
2347        let conn_guard = match self.lock_connection() {
2348            Ok(g) => g,
2349            Err(e) => {
2350                self.telemetry.increment_errors();
2351                return Err(e);
2352            }
2353        };
2354        let strict_expr = render_text_query_fts5(strict_text_query);
2355        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
2356        for hit in hits.iter_mut() {
2357            // Phase 10: text hits always carry `Some(match_mode)`. Vector
2358            // hits (when a future phase adds them) have `None` here and
2359            // are skipped by the attribution resolver because attribution
2360            // is meaningless for vector matches.
2361            let match_expr = match hit.match_mode {
2362                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
2363                Some(SearchMatchMode::Relaxed) => {
2364                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
2365                }
2366                None => continue,
2367            };
2368            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
2369                Ok(att) => hit.attribution = Some(att),
2370                Err(e) => {
2371                    self.telemetry.increment_errors();
2372                    return Err(e);
2373                }
2374            }
2375        }
2376        Ok(())
2377    }
2378
2379    /// # Errors
2380    /// Returns [`EngineError`] if the root query or any bounded expansion
2381    /// query cannot be prepared or executed.
2382    pub fn execute_compiled_grouped_read(
2383        &self,
2384        compiled: &CompiledGroupedQuery,
2385    ) -> Result<GroupedQueryRows, EngineError> {
2386        let root_rows = self.execute_compiled_read(&compiled.root)?;
2387        if root_rows.was_degraded {
2388            return Ok(GroupedQueryRows {
2389                roots: Vec::new(),
2390                expansions: Vec::new(),
2391                edge_expansions: Vec::new(),
2392                was_degraded: true,
2393            });
2394        }
2395
2396        let roots = root_rows.nodes;
2397        let mut expansions = Vec::with_capacity(compiled.expansions.len());
2398        for expansion in &compiled.expansions {
2399            let slot_rows = if roots.is_empty() {
2400                Vec::new()
2401            } else {
2402                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
2403            };
2404            expansions.push(ExpansionSlotRows {
2405                slot: expansion.slot.clone(),
2406                roots: slot_rows,
2407            });
2408        }
2409
2410        let mut edge_expansions = Vec::with_capacity(compiled.edge_expansions.len());
2411        for edge_expansion in &compiled.edge_expansions {
2412            let slot_rows = if roots.is_empty() {
2413                Vec::new()
2414            } else {
2415                self.read_edge_expansion_chunked(&roots, edge_expansion, compiled.hints.hard_limit)?
2416            };
2417            edge_expansions.push(EdgeExpansionSlotRows {
2418                slot: edge_expansion.slot.clone(),
2419                roots: slot_rows,
2420            });
2421        }
2422
2423        Ok(GroupedQueryRows {
2424            roots,
2425            expansions,
2426            edge_expansions,
2427            was_degraded: false,
2428        })
2429    }
2430
2431    /// Chunked batched expansion: splits roots into chunks of
2432    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
2433    /// results while preserving root ordering.  This keeps bind-parameter
2434    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
2435    /// for large result sets.
2436    fn read_expansion_nodes_chunked(
2437        &self,
2438        roots: &[NodeRow],
2439        expansion: &ExpansionSlot,
2440        hard_limit: usize,
2441    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2442        if roots.len() <= BATCH_CHUNK_SIZE {
2443            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
2444        }
2445
2446        // Merge chunk results keyed by root logical_id, then reassemble in
2447        // root order.
2448        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2449        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2450            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
2451                per_root
2452                    .entry(group.root_logical_id)
2453                    .or_default()
2454                    .extend(group.nodes);
2455            }
2456        }
2457
2458        Ok(roots
2459            .iter()
2460            .map(|root| ExpansionRootRows {
2461                root_logical_id: root.logical_id.clone(),
2462                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2463            })
2464            .collect())
2465    }
2466
2467    /// Batched expansion: one recursive CTE query per expansion slot that
2468    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
2469    /// source_logical_id ...)` to enforce the per-root hard limit inside the
2470    /// database rather than in Rust.
2471    #[allow(clippy::too_many_lines)]
2472    fn read_expansion_nodes_batched(
2473        &self,
2474        roots: &[NodeRow],
2475        expansion: &ExpansionSlot,
2476        hard_limit: usize,
2477    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2478        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2479        let (join_condition, next_logical_id) = match expansion.direction {
2480            fathomdb_query::TraverseDirection::Out => {
2481                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2482            }
2483            fathomdb_query::TraverseDirection::In => {
2484                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2485            }
2486        };
2487
2488        // EXECUTE-TIME VALIDATION: fused filter against kinds without an FTS schema.
2489        // This check runs at execute time (not builder time) because the target kind
2490        // set for an expand slot is edge-label-scoped, not kind-scoped, and multiple
2491        // target kinds may be reachable via one edge label. See Pack 12 docs.
2492        if expansion.filter.as_ref().is_some_and(|f| {
2493            matches!(
2494                f,
2495                Predicate::JsonPathFusedEq { .. }
2496                    | Predicate::JsonPathFusedTimestampCmp { .. }
2497                    | Predicate::JsonPathFusedIn { .. }
2498            )
2499        }) {
2500            self.validate_fused_filter_for_edge_label(&expansion.label)?;
2501        }
2502
2503        // Build a UNION ALL of SELECT literals for the root seed rows.
2504        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
2505        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
2506        let root_seed_union: String = (1..=root_ids.len())
2507            .map(|i| format!("SELECT ?{i}"))
2508            .collect::<Vec<_>>()
2509            .join(" UNION ALL ");
2510
2511        // Bind params: root IDs occupy ?1..=?N, edge kind is ?(N+1).
2512        // Edge filter params (if any) follow at ?(N+2)..=?M.
2513        // Node filter params (if any) follow at ?(M+1)....
2514        let edge_kind_param = root_ids.len() + 1;
2515        let edge_filter_param_start = root_ids.len() + 2;
2516
2517        // Compile the optional edge filter to a SQL fragment + bind values.
2518        // Injected into the JOIN condition so only matching edges are traversed.
2519        let (edge_filter_sql, edge_filter_binds) =
2520            compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2521
2522        let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2523
2524        // Compile the optional target-side filter to a SQL fragment + bind values.
2525        // The fragment is injected into the `numbered` CTE's WHERE clause BEFORE
2526        // the ROW_NUMBER() window so the per-originator limit counts only matching rows.
2527        let (filter_sql, filter_binds) =
2528            compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2529
2530        // The `root_id` column tracks which root each traversal path
2531        // originated from. The `ROW_NUMBER()` window in the outer query
2532        // enforces the per-root hard limit.
2533        let sql = format!(
2534            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2535            traversed(root_id, logical_id, depth, visited, emitted) AS (
2536                SELECT rid, rid, 0, printf(',%s,', rid), 0
2537                FROM root_ids
2538                UNION ALL
2539                SELECT
2540                    t.root_id,
2541                    {next_logical_id},
2542                    t.depth + 1,
2543                    t.visited || {next_logical_id} || ',',
2544                    t.emitted + 1
2545                FROM traversed t
2546                JOIN edges e ON {join_condition}
2547                    AND e.kind = ?{edge_kind_param}
2548                    AND e.superseded_at IS NULL{edge_filter_sql}
2549                WHERE t.depth < {max_depth}
2550                  AND t.emitted < {hard_limit}
2551                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2552            ),
2553            numbered AS (
2554                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2555                     , n.content_ref, am.last_accessed_at
2556                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2557                FROM traversed t
2558                JOIN nodes n ON n.logical_id = t.logical_id
2559                    AND n.superseded_at IS NULL
2560                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2561                WHERE t.depth > 0{filter_sql}
2562            )
2563            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2564            FROM numbered
2565            WHERE rn <= {hard_limit}
2566            ORDER BY root_id, logical_id",
2567            max_depth = expansion.max_depth,
2568        );
2569
2570        let conn_guard = self.lock_connection()?;
2571        let mut statement = conn_guard
2572            .prepare_cached(&sql)
2573            .map_err(EngineError::Sqlite)?;
2574
2575        // Bind root IDs (1..=N), edge kind (N+1), edge filter params (N+2..M),
2576        // then node filter params (M+1...).
2577        let mut bind_values: Vec<Value> = root_ids
2578            .iter()
2579            .map(|id| Value::Text((*id).to_owned()))
2580            .collect();
2581        bind_values.push(Value::Text(expansion.label.clone()));
2582        bind_values.extend(edge_filter_binds);
2583        bind_values.extend(filter_binds);
2584
2585        let rows = statement
2586            .query_map(params_from_iter(bind_values.iter()), |row| {
2587                Ok((
2588                    row.get::<_, String>(0)?, // root_id
2589                    NodeRow {
2590                        row_id: row.get(1)?,
2591                        logical_id: row.get(2)?,
2592                        kind: row.get(3)?,
2593                        properties: row.get(4)?,
2594                        content_ref: row.get(5)?,
2595                        last_accessed_at: row.get(6)?,
2596                    },
2597                ))
2598            })
2599            .map_err(EngineError::Sqlite)?
2600            .collect::<Result<Vec<_>, _>>()
2601            .map_err(EngineError::Sqlite)?;
2602
2603        // Partition results back into per-root groups, preserving root order.
2604        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2605        for (root_id, node) in rows {
2606            per_root.entry(root_id).or_default().push(node);
2607        }
2608
2609        let root_groups = roots
2610            .iter()
2611            .map(|root| ExpansionRootRows {
2612                root_logical_id: root.logical_id.clone(),
2613                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2614            })
2615            .collect();
2616
2617        Ok(root_groups)
2618    }
2619
2620    /// Chunked batched edge-expansion: splits roots into chunks of
2621    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
2622    /// results while preserving root ordering. Parallel to
2623    /// [`Self::read_expansion_nodes_chunked`] but emits `(EdgeRow, NodeRow)`
2624    /// pairs instead of plain nodes.
2625    fn read_edge_expansion_chunked(
2626        &self,
2627        roots: &[NodeRow],
2628        expansion: &EdgeExpansionSlot,
2629        hard_limit: usize,
2630    ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2631        if roots.len() <= BATCH_CHUNK_SIZE {
2632            return self.read_edge_expansion_batched(roots, expansion, hard_limit);
2633        }
2634
2635        let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2636        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2637            for group in self.read_edge_expansion_batched(chunk, expansion, hard_limit)? {
2638                per_root
2639                    .entry(group.root_logical_id)
2640                    .or_default()
2641                    .extend(group.pairs);
2642            }
2643        }
2644
2645        Ok(roots
2646            .iter()
2647            .map(|root| EdgeExpansionRootRows {
2648                root_logical_id: root.logical_id.clone(),
2649                pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2650            })
2651            .collect())
2652    }
2653
2654    /// Batched edge-expansion: one recursive CTE per slot that joins the
2655    /// final-hop edge row back onto the endpoint node, producing
2656    /// `(EdgeRow, NodeRow)` pairs per root.
2657    ///
2658    /// Security: `edge_filter` and `endpoint_filter` are compiled to
2659    /// parameterized SQL fragments via [`compile_edge_filter`] and
2660    /// [`compile_expansion_filter`]; user-authored values always arrive as
2661    /// bind parameters, never as interpolated SQL text.
2662    #[allow(clippy::too_many_lines)]
2663    fn read_edge_expansion_batched(
2664        &self,
2665        roots: &[NodeRow],
2666        expansion: &EdgeExpansionSlot,
2667        hard_limit: usize,
2668    ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2669        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2670        let (join_condition, next_logical_id) = match expansion.direction {
2671            fathomdb_query::TraverseDirection::Out => {
2672                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2673            }
2674            fathomdb_query::TraverseDirection::In => {
2675                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2676            }
2677        };
2678
2679        // Execute-time validation: fused endpoint filter requires a registered
2680        // property-FTS schema covering every reachable target kind.
2681        if expansion.endpoint_filter.as_ref().is_some_and(|f| {
2682            matches!(
2683                f,
2684                Predicate::JsonPathFusedEq { .. }
2685                    | Predicate::JsonPathFusedTimestampCmp { .. }
2686                    | Predicate::JsonPathFusedIn { .. }
2687            )
2688        }) {
2689            self.validate_fused_filter_for_edge_label(&expansion.label)?;
2690        }
2691
2692        let root_seed_union: String = (1..=root_ids.len())
2693            .map(|i| format!("SELECT ?{i}"))
2694            .collect::<Vec<_>>()
2695            .join(" UNION ALL ");
2696
2697        let edge_kind_param = root_ids.len() + 1;
2698        let edge_filter_param_start = root_ids.len() + 2;
2699
2700        let (edge_filter_sql, edge_filter_binds) =
2701            compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2702
2703        let endpoint_filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2704        let (endpoint_filter_sql, endpoint_filter_binds) = compile_expansion_filter(
2705            expansion.endpoint_filter.as_ref(),
2706            endpoint_filter_param_start,
2707        );
2708
2709        // The `traversed` CTE carries the final-hop edge's row_id so we can
2710        // rejoin onto `edges` to project the full edge shape. Seeds use
2711        // `NULL` for edge_row_id; `WHERE t.depth > 0` in the `numbered` CTE
2712        // excludes the seeds so we never emit a NULL-edge row.
2713        let sql = format!(
2714            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2715            traversed(root_id, logical_id, depth, visited, emitted, edge_row_id) AS (
2716                SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_row_id
2717                FROM root_ids
2718                UNION ALL
2719                SELECT
2720                    t.root_id,
2721                    {next_logical_id},
2722                    t.depth + 1,
2723                    t.visited || {next_logical_id} || ',',
2724                    t.emitted + 1,
2725                    e.row_id AS edge_row_id
2726                FROM traversed t
2727                JOIN edges e ON {join_condition}
2728                    AND e.kind = ?{edge_kind_param}
2729                    AND e.superseded_at IS NULL{edge_filter_sql}
2730                WHERE t.depth < {max_depth}
2731                  AND t.emitted < {hard_limit}
2732                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2733            ),
2734            numbered AS (
2735                SELECT t.root_id,
2736                       e.row_id AS e_row_id,
2737                       e.logical_id AS e_logical_id,
2738                       e.source_logical_id AS e_source,
2739                       e.target_logical_id AS e_target,
2740                       e.kind AS e_kind,
2741                       e.properties AS e_properties,
2742                       e.source_ref AS e_source_ref,
2743                       e.confidence AS e_confidence,
2744                       n.row_id AS n_row_id,
2745                       n.logical_id AS n_logical_id,
2746                       n.kind AS n_kind,
2747                       n.properties AS n_properties,
2748                       n.content_ref AS n_content_ref,
2749                       am.last_accessed_at AS n_last_accessed_at,
2750                       ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id, e.row_id) AS rn
2751                FROM traversed t
2752                JOIN edges e ON e.row_id = t.edge_row_id
2753                JOIN nodes n ON n.logical_id = t.logical_id
2754                    AND n.superseded_at IS NULL
2755                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2756                WHERE t.depth > 0{endpoint_filter_sql}
2757            )
2758            SELECT root_id,
2759                   e_row_id, e_logical_id, e_source, e_target, e_kind, e_properties,
2760                   e_source_ref, e_confidence,
2761                   n_row_id, n_logical_id, n_kind, n_properties, n_content_ref,
2762                   n_last_accessed_at
2763            FROM numbered
2764            WHERE rn <= {hard_limit}
2765            ORDER BY root_id, n_logical_id, e_row_id",
2766            max_depth = expansion.max_depth,
2767        );
2768
2769        let conn_guard = self.lock_connection()?;
2770        let mut statement = conn_guard
2771            .prepare_cached(&sql)
2772            .map_err(EngineError::Sqlite)?;
2773
2774        let mut bind_values: Vec<Value> = root_ids
2775            .iter()
2776            .map(|id| Value::Text((*id).to_owned()))
2777            .collect();
2778        bind_values.push(Value::Text(expansion.label.clone()));
2779        bind_values.extend(edge_filter_binds);
2780        bind_values.extend(endpoint_filter_binds);
2781
2782        let rows = statement
2783            .query_map(params_from_iter(bind_values.iter()), |row| {
2784                let root_id: String = row.get(0)?;
2785                let edge_row = EdgeRow {
2786                    row_id: row.get(1)?,
2787                    logical_id: row.get(2)?,
2788                    source_logical_id: row.get(3)?,
2789                    target_logical_id: row.get(4)?,
2790                    kind: row.get(5)?,
2791                    properties: row.get(6)?,
2792                    source_ref: row.get(7)?,
2793                    confidence: row.get(8)?,
2794                };
2795                let node_row = NodeRow {
2796                    row_id: row.get(9)?,
2797                    logical_id: row.get(10)?,
2798                    kind: row.get(11)?,
2799                    properties: row.get(12)?,
2800                    content_ref: row.get(13)?,
2801                    last_accessed_at: row.get(14)?,
2802                };
2803                Ok((root_id, edge_row, node_row))
2804            })
2805            .map_err(EngineError::Sqlite)?
2806            .collect::<Result<Vec<_>, _>>()
2807            .map_err(EngineError::Sqlite)?;
2808
2809        let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2810        for (root_id, edge, node) in rows {
2811            per_root.entry(root_id).or_default().push((edge, node));
2812        }
2813
2814        let root_groups = roots
2815            .iter()
2816            .map(|root| EdgeExpansionRootRows {
2817                root_logical_id: root.logical_id.clone(),
2818                pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2819            })
2820            .collect();
2821
2822        Ok(root_groups)
2823    }
2824
2825    /// Validate that all target node kinds reachable via `edge_label` have a
2826    /// registered property-FTS schema. Called at execute time when an expansion
2827    /// slot carries a fused filter predicate.
2828    ///
2829    /// EXECUTE-TIME VALIDATION: this check runs at execute time (not builder
2830    /// time) for expand slots because the target kind set is edge-label-scoped
2831    /// rather than kind-scoped, and is not statically knowable at builder time
2832    /// when multiple target kinds may be reachable via the same label.
2833    /// See Pack 12 docs.
2834    ///
2835    /// # Errors
2836    /// Returns `EngineError::InvalidConfig` if any reachable target kind lacks
2837    /// a registered property-FTS schema.
2838    fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2839        let conn = self.lock_connection()?;
2840        // Collect the distinct node kinds reachable as targets of this edge label.
2841        let mut stmt = conn
2842            .prepare_cached(
2843                "SELECT DISTINCT n.kind \
2844                 FROM edges e \
2845                 JOIN nodes n ON n.logical_id = e.target_logical_id \
2846                 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2847            )
2848            .map_err(EngineError::Sqlite)?;
2849        let target_kinds: Vec<String> = stmt
2850            .query_map(rusqlite::params![edge_label], |row| row.get(0))
2851            .map_err(EngineError::Sqlite)?
2852            .collect::<Result<Vec<_>, _>>()
2853            .map_err(EngineError::Sqlite)?;
2854
2855        for kind in &target_kinds {
2856            let has_schema: bool = conn
2857                .query_row(
2858                    "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2859                    rusqlite::params![kind],
2860                    |row| row.get(0),
2861                )
2862                .map_err(EngineError::Sqlite)?;
2863            if !has_schema {
2864                return Err(EngineError::InvalidConfig(format!(
2865                    "kind {kind:?} has no registered property-FTS schema; register one with \
2866                     admin.register_fts_property_schema(..) before using fused filters on \
2867                     expansion slots, or use JsonPathEq for non-fused semantics \
2868                     (expand slot uses edge label {edge_label:?})"
2869                )));
2870            }
2871        }
2872        Ok(())
2873    }
2874
2875    /// Read a single run by id.
2876    ///
2877    /// # Errors
2878    /// Returns [`EngineError`] if the query fails or if the connection mutex
2879    /// has been poisoned.
2880    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2881        let conn = self.lock_connection()?;
2882        conn.query_row(
2883            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2884            rusqlite::params![id],
2885            |row| {
2886                Ok(RunRow {
2887                    id: row.get(0)?,
2888                    kind: row.get(1)?,
2889                    status: row.get(2)?,
2890                    properties: row.get(3)?,
2891                })
2892            },
2893        )
2894        .optional()
2895        .map_err(EngineError::Sqlite)
2896    }
2897
2898    /// Read a single step by id.
2899    ///
2900    /// # Errors
2901    /// Returns [`EngineError`] if the query fails or if the connection mutex
2902    /// has been poisoned.
2903    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2904        let conn = self.lock_connection()?;
2905        conn.query_row(
2906            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2907            rusqlite::params![id],
2908            |row| {
2909                Ok(StepRow {
2910                    id: row.get(0)?,
2911                    run_id: row.get(1)?,
2912                    kind: row.get(2)?,
2913                    status: row.get(3)?,
2914                    properties: row.get(4)?,
2915                })
2916            },
2917        )
2918        .optional()
2919        .map_err(EngineError::Sqlite)
2920    }
2921
2922    /// Read a single action by id.
2923    ///
2924    /// # Errors
2925    /// Returns [`EngineError`] if the query fails or if the connection mutex
2926    /// has been poisoned.
2927    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2928        let conn = self.lock_connection()?;
2929        conn.query_row(
2930            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2931            rusqlite::params![id],
2932            |row| {
2933                Ok(ActionRow {
2934                    id: row.get(0)?,
2935                    step_id: row.get(1)?,
2936                    kind: row.get(2)?,
2937                    status: row.get(3)?,
2938                    properties: row.get(4)?,
2939                })
2940            },
2941        )
2942        .optional()
2943        .map_err(EngineError::Sqlite)
2944    }
2945
2946    /// Read all active (non-superseded) runs.
2947    ///
2948    /// # Errors
2949    /// Returns [`EngineError`] if the query fails or if the connection mutex
2950    /// has been poisoned.
2951    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2952        let conn = self.lock_connection()?;
2953        let mut stmt = conn
2954            .prepare_cached(
2955                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2956            )
2957            .map_err(EngineError::Sqlite)?;
2958        let rows = stmt
2959            .query_map([], |row| {
2960                Ok(RunRow {
2961                    id: row.get(0)?,
2962                    kind: row.get(1)?,
2963                    status: row.get(2)?,
2964                    properties: row.get(3)?,
2965                })
2966            })
2967            .map_err(EngineError::Sqlite)?
2968            .collect::<Result<Vec<_>, _>>()
2969            .map_err(EngineError::Sqlite)?;
2970        Ok(rows)
2971    }
2972
2973    /// Returns the number of shape→SQL entries currently indexed.
2974    ///
2975    /// Each distinct query shape (structural hash of kind + steps + limits)
2976    /// maps to exactly one SQL string.  This is a test-oriented introspection
2977    /// helper; it does not reflect rusqlite's internal prepared-statement
2978    /// cache, which is keyed by SQL text.
2979    ///
2980    /// # Panics
2981    /// Panics if the internal shape-SQL-map mutex is poisoned.
2982    #[must_use]
2983    #[allow(clippy::expect_used)]
2984    pub fn shape_sql_count(&self) -> usize {
2985        self.shape_sql_map
2986            .lock()
2987            .unwrap_or_else(PoisonError::into_inner)
2988            .len()
2989    }
2990
2991    /// Returns a cloned `Arc` to the schema manager.
2992    #[must_use]
2993    pub fn schema_manager(&self) -> Arc<SchemaManager> {
2994        Arc::clone(&self.schema_manager)
2995    }
2996
2997    /// Return the execution plan for a compiled query without executing it.
2998    ///
2999    /// Useful for debugging, testing shape-hash caching, and operator
3000    /// diagnostics. Does not open a transaction or touch the database beyond
3001    /// checking the statement cache.
3002    ///
3003    /// # Panics
3004    /// Panics if the internal shape-SQL-map mutex is poisoned.
3005    #[must_use]
3006    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
3007        let cache_hit = self
3008            .shape_sql_map
3009            .lock()
3010            .unwrap_or_else(PoisonError::into_inner)
3011            .contains_key(&compiled.shape_hash);
3012        QueryPlan {
3013            sql: wrap_node_row_projection_sql(&compiled.sql),
3014            bind_count: compiled.binds.len(),
3015            driving_table: compiled.driving_table,
3016            shape_hash: compiled.shape_hash,
3017            cache_hit,
3018        }
3019    }
3020
3021    /// Execute a named PRAGMA and return the result as a String.
3022    /// Used by Layer 1 tests to verify startup pragma initialization.
3023    ///
3024    /// # Errors
3025    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
3026    /// mutex has been poisoned.
3027    #[doc(hidden)]
3028    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
3029        let conn = self.lock_connection()?;
3030        let result = conn
3031            .query_row(&format!("PRAGMA {name}"), [], |row| {
3032                // PRAGMAs may return TEXT or INTEGER; normalise to String.
3033                row.get::<_, rusqlite::types::Value>(0)
3034            })
3035            .map_err(EngineError::Sqlite)?;
3036        let s = match result {
3037            rusqlite::types::Value::Text(t) => t,
3038            rusqlite::types::Value::Integer(i) => i.to_string(),
3039            rusqlite::types::Value::Real(f) => f.to_string(),
3040            rusqlite::types::Value::Blob(_) => {
3041                return Err(EngineError::InvalidWrite(format!(
3042                    "PRAGMA {name} returned an unexpected BLOB value"
3043                )));
3044            }
3045            rusqlite::types::Value::Null => String::new(),
3046        };
3047        Ok(s)
3048    }
3049
3050    /// Return all provenance events whose `subject` matches the given value.
3051    ///
3052    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
3053    /// values (for excise events).
3054    ///
3055    /// # Errors
3056    /// Returns [`EngineError`] if the query fails or if the connection mutex
3057    /// has been poisoned.
3058    pub fn query_provenance_events(
3059        &self,
3060        subject: &str,
3061    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
3062        let conn = self.lock_connection()?;
3063        let mut stmt = conn
3064            .prepare_cached(
3065                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
3066                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
3067            )
3068            .map_err(EngineError::Sqlite)?;
3069        let events = stmt
3070            .query_map(rusqlite::params![subject], |row| {
3071                Ok(ProvenanceEvent {
3072                    id: row.get(0)?,
3073                    event_type: row.get(1)?,
3074                    subject: row.get(2)?,
3075                    source_ref: row.get(3)?,
3076                    metadata_json: row.get(4)?,
3077                    created_at: row.get(5)?,
3078                })
3079            })
3080            .map_err(EngineError::Sqlite)?
3081            .collect::<Result<Vec<_>, _>>()
3082            .map_err(EngineError::Sqlite)?;
3083        Ok(events)
3084    }
3085
3086    /// Check if `kind` has a first-registration async rebuild in progress
3087    /// (`is_first_registration=1` with state PENDING/BUILDING/SWAPPING and no
3088    /// rows yet in the per-kind `fts_props_<kind>` table). If so, execute a
3089    /// full-kind scan and return the nodes. Returns `None` when the normal
3090    /// FTS5 path should run.
3091    fn scan_fallback_if_first_registration(
3092        &self,
3093        kind: &str,
3094    ) -> Result<Option<Vec<NodeRow>>, EngineError> {
3095        let conn = self.lock_connection()?;
3096
3097        // Quick point-lookup: kind has a first-registration rebuild in a
3098        // pre-complete state AND the per-kind table has no rows yet.
3099        let prop_table = fathomdb_schema::fts_kind_table_name(kind);
3100        // Check whether the per-kind table exists before querying its row count.
3101        let table_exists: bool = conn
3102            .query_row(
3103                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3104                rusqlite::params![prop_table],
3105                |_| Ok(true),
3106            )
3107            .optional()?
3108            .unwrap_or(false);
3109        let prop_empty = if table_exists {
3110            let cnt: i64 =
3111                conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
3112                    r.get(0)
3113                })?;
3114            cnt == 0
3115        } else {
3116            true
3117        };
3118        let needs_scan: bool = if prop_empty {
3119            conn.query_row(
3120                "SELECT 1 FROM fts_property_rebuild_state \
3121                 WHERE kind = ?1 AND is_first_registration = 1 \
3122                 AND state IN ('PENDING','BUILDING','SWAPPING') \
3123                 LIMIT 1",
3124                rusqlite::params![kind],
3125                |_| Ok(true),
3126            )
3127            .optional()?
3128            .unwrap_or(false)
3129        } else {
3130            false
3131        };
3132
3133        if !needs_scan {
3134            return Ok(None);
3135        }
3136
3137        // Scan fallback: return all active nodes of this kind.
3138        // Intentionally unindexed — acceptable for first-registration window.
3139        let mut stmt = conn
3140            .prepare_cached(
3141                "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
3142                 am.last_accessed_at \
3143                 FROM nodes n \
3144                 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
3145                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
3146            )
3147            .map_err(EngineError::Sqlite)?;
3148
3149        let nodes = stmt
3150            .query_map(rusqlite::params![kind], |row| {
3151                Ok(NodeRow {
3152                    row_id: row.get(0)?,
3153                    logical_id: row.get(1)?,
3154                    kind: row.get(2)?,
3155                    properties: row.get(3)?,
3156                    content_ref: row.get(4)?,
3157                    last_accessed_at: row.get(5)?,
3158                })
3159            })
3160            .map_err(EngineError::Sqlite)?
3161            .collect::<Result<Vec<_>, _>>()
3162            .map_err(EngineError::Sqlite)?;
3163
3164        Ok(Some(nodes))
3165    }
3166
3167    /// Return the current rebuild progress for a kind, or `None` if no rebuild
3168    /// has been registered for that kind.
3169    ///
3170    /// # Errors
3171    /// Returns [`EngineError`] if the database query fails.
3172    pub fn get_property_fts_rebuild_progress(
3173        &self,
3174        kind: &str,
3175    ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
3176        let conn = self.lock_connection()?;
3177        let row = conn
3178            .query_row(
3179                "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
3180                 FROM fts_property_rebuild_state WHERE kind = ?1",
3181                rusqlite::params![kind],
3182                |r| {
3183                    Ok(crate::rebuild_actor::RebuildProgress {
3184                        state: r.get(0)?,
3185                        rows_total: r.get(1)?,
3186                        rows_done: r.get(2)?,
3187                        started_at: r.get(3)?,
3188                        last_progress_at: r.get(4)?,
3189                        error_message: r.get(5)?,
3190                    })
3191                },
3192            )
3193            .optional()?;
3194        Ok(row)
3195    }
3196}
3197
3198/// Rewrite a `CompiledQuery` whose SQL references the legacy `fts_node_properties`
3199/// table to use the per-kind `fts_props_<kind>` table, or strip the property FTS
3200/// arm entirely when the per-kind table does not exist in `sqlite_master`.
3201///
3202/// Returns `(adapted_sql, adapted_binds)`.
3203///
3204/// This wrapper owns the `sqlite_master` existence check (rusqlite stays out of
3205/// `fathomdb-query`) and delegates the SQL/bind rewriting to
3206/// [`CompiledQuery::adapt_fts_for_kind`].
3207fn adapt_fts_nodes_sql_for_per_kind_tables(
3208    compiled: &CompiledQuery,
3209    conn: &rusqlite::Connection,
3210) -> Result<(String, Vec<BindValue>), EngineError> {
3211    let root_kind = compiled
3212        .binds
3213        .get(1)
3214        .and_then(|b| {
3215            if let BindValue::Text(k) = b {
3216                Some(k.as_str())
3217            } else {
3218                None
3219            }
3220        })
3221        .unwrap_or("");
3222    let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
3223    let prop_table_exists: bool = conn
3224        .query_row(
3225            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3226            rusqlite::params![prop_table],
3227            |_| Ok(true),
3228        )
3229        .optional()
3230        .map_err(EngineError::Sqlite)?
3231        .unwrap_or(false);
3232
3233    Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
3234}
3235
3236/// Check that the active vector profile's model identity and dimensions match
3237/// the supplied embedder. If either differs, emit a warning via `trace_warn!`
3238/// but always return `Ok(())`. This function NEVER returns `Err`.
3239///
3240/// Queries `projection_profiles` directly — no `AdminService` indirection.
3241#[allow(clippy::unnecessary_wraps)]
3242fn check_vec_identity_at_open(
3243    conn: &rusqlite::Connection,
3244    embedder: &dyn QueryEmbedder,
3245) -> Result<(), EngineError> {
3246    let row: Option<String> = conn
3247        .query_row(
3248            "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
3249            [],
3250            |row| row.get(0),
3251        )
3252        .optional()
3253        .unwrap_or(None);
3254
3255    let Some(config_json) = row else {
3256        return Ok(());
3257    };
3258
3259    // Parse leniently — any error means we just skip the check.
3260    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
3261        return Ok(());
3262    };
3263
3264    let identity = embedder.identity();
3265
3266    if let Some(stored_model) = parsed
3267        .get("model_identity")
3268        .and_then(serde_json::Value::as_str)
3269        && stored_model != identity.model_identity
3270    {
3271        trace_warn!(
3272            stored_model_identity = stored_model,
3273            embedder_model_identity = %identity.model_identity,
3274            "vec identity mismatch at open: model_identity differs"
3275        );
3276    }
3277
3278    if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
3279        let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
3280        if stored_dim != identity.dimension {
3281            trace_warn!(
3282                stored_dimensions = stored_dim,
3283                embedder_dimensions = identity.dimension,
3284                "vec identity mismatch at open: dimensions differ"
3285            );
3286        }
3287    }
3288
3289    Ok(())
3290}
3291
3292/// Open-time FTS rebuild guards (Guard 1 + Guard 2).
3293///
3294/// Guard 1: if any registered kind's per-kind `fts_props_<kind>` table is
3295/// missing or empty while live nodes of that kind exist, do a synchronous
3296/// full rebuild.
3297///
3298/// Guard 2: if any recursive schema is registered but
3299/// `fts_node_property_positions` is empty, do a synchronous full rebuild to
3300/// regenerate the position map.
3301///
3302/// Both guards are no-ops on a consistent database.
3303fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
3304    let schema_count: i64 = conn
3305        .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
3306            row.get(0)
3307        })
3308        .map_err(EngineError::Sqlite)?;
3309    if schema_count == 0 {
3310        return Ok(());
3311    }
3312
3313    let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
3314    let needs_position_backfill = if needs_fts_rebuild {
3315        false
3316    } else {
3317        open_guard_check_positions_empty(conn)?
3318    };
3319
3320    if needs_fts_rebuild || needs_position_backfill {
3321        let per_kind_tables: Vec<String> = {
3322            let mut stmt = conn
3323                .prepare(
3324                    "SELECT name FROM sqlite_master \
3325                     WHERE type='table' AND name LIKE 'fts_props_%' \
3326                     AND sql LIKE 'CREATE VIRTUAL TABLE%'",
3327                )
3328                .map_err(EngineError::Sqlite)?;
3329            stmt.query_map([], |r| r.get::<_, String>(0))
3330                .map_err(EngineError::Sqlite)?
3331                .collect::<Result<Vec<_>, _>>()
3332                .map_err(EngineError::Sqlite)?
3333        };
3334        let tx = conn
3335            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
3336            .map_err(EngineError::Sqlite)?;
3337        for table in &per_kind_tables {
3338            tx.execute_batch(&format!("DELETE FROM {table}"))
3339                .map_err(EngineError::Sqlite)?;
3340        }
3341        tx.execute("DELETE FROM fts_node_property_positions", [])
3342            .map_err(EngineError::Sqlite)?;
3343        crate::projection::insert_property_fts_rows(
3344            &tx,
3345            "SELECT logical_id, properties FROM nodes \
3346             WHERE kind = ?1 AND superseded_at IS NULL",
3347        )
3348        .map_err(EngineError::Sqlite)?;
3349        tx.commit().map_err(EngineError::Sqlite)?;
3350    }
3351    Ok(())
3352}
3353
3354fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3355    let kinds: Vec<String> = {
3356        let mut stmt = conn
3357            .prepare("SELECT kind FROM fts_property_schemas")
3358            .map_err(EngineError::Sqlite)?;
3359        stmt.query_map([], |row| row.get::<_, String>(0))
3360            .map_err(EngineError::Sqlite)?
3361            .collect::<Result<Vec<_>, _>>()
3362            .map_err(EngineError::Sqlite)?
3363    };
3364    for kind in &kinds {
3365        let table = fathomdb_schema::fts_kind_table_name(kind);
3366        let table_exists: bool = conn
3367            .query_row(
3368                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3369                rusqlite::params![table],
3370                |_| Ok(true),
3371            )
3372            .optional()
3373            .map_err(EngineError::Sqlite)?
3374            .unwrap_or(false);
3375        let fts_count: i64 = if table_exists {
3376            conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
3377                row.get(0)
3378            })
3379            .map_err(EngineError::Sqlite)?
3380        } else {
3381            0
3382        };
3383        if fts_count == 0 {
3384            let node_count: i64 = conn
3385                .query_row(
3386                    "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
3387                    rusqlite::params![kind],
3388                    |row| row.get(0),
3389                )
3390                .map_err(EngineError::Sqlite)?;
3391            if node_count > 0 {
3392                return Ok(true);
3393            }
3394        }
3395    }
3396    Ok(false)
3397}
3398
3399fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3400    let recursive_count: i64 = conn
3401        .query_row(
3402            "SELECT COUNT(*) FROM fts_property_schemas \
3403             WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
3404            [],
3405            |row| row.get(0),
3406        )
3407        .map_err(EngineError::Sqlite)?;
3408    if recursive_count == 0 {
3409        return Ok(false);
3410    }
3411    let pos_count: i64 = conn
3412        .query_row(
3413            "SELECT COUNT(*) FROM fts_node_property_positions",
3414            [],
3415            |row| row.get(0),
3416        )
3417        .map_err(EngineError::Sqlite)?;
3418    Ok(pos_count == 0)
3419}
3420
3421fn wrap_node_row_projection_sql(base_sql: &str) -> String {
3422    format!(
3423        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
3424         FROM ({base_sql}) q \
3425         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
3426    )
3427}
3428
3429/// Returns `true` when `err` indicates the vec virtual table is absent.
3430///
3431/// Matches:
3432/// - "no such table: vec_<kind>" (per-kind table not yet created by regeneration)
3433/// - "no such module: vec0" (sqlite-vec extension not available)
3434/// - Legacy `"no such table: vec_nodes_active"` (for backward compatibility with
3435///   any remaining tests or DB files referencing the old global table)
3436pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
3437    match err {
3438        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
3439            // Per-kind tables start with "vec_"
3440            (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
3441                || msg.contains("no such module: vec0")
3442        }
3443        _ => false,
3444    }
3445}
3446
3447/// Pack F1.75: project a [`SearchRows`] (emitted by
3448/// `execute_compiled_semantic_search` / `execute_compiled_raw_vector_search`)
3449/// into a [`QueryRows`] so `execute_compiled_read` can return the flat-query
3450/// shape that FFI callers expect. The vector-search path emits hits with no
3451/// runs/steps/actions, so those collections stay empty.
3452fn search_rows_to_query_rows(rows: SearchRows) -> QueryRows {
3453    let nodes = rows
3454        .hits
3455        .into_iter()
3456        .map(|hit| NodeRow {
3457            row_id: hit.node.row_id,
3458            logical_id: hit.node.logical_id,
3459            kind: hit.node.kind,
3460            properties: hit.node.properties,
3461            content_ref: hit.node.content_ref,
3462            last_accessed_at: hit.node.last_accessed_at,
3463        })
3464        .collect();
3465    QueryRows {
3466        nodes,
3467        runs: Vec::new(),
3468        steps: Vec::new(),
3469        actions: Vec::new(),
3470        was_degraded: rows.was_degraded,
3471    }
3472}
3473
3474fn scalar_to_bind(value: &ScalarValue) -> BindValue {
3475    match value {
3476        ScalarValue::Text(text) => BindValue::Text(text.clone()),
3477        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
3478        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
3479    }
3480}
3481
3482/// Merge strict and relaxed search branches into a single block-ordered,
3483/// deduplicated, limit-truncated hit list.
3484///
3485/// Phase 3 rules, in order:
3486///
3487/// 1. Each branch is sorted internally by score descending with `logical_id`
3488///    ascending as the deterministic tiebreak.
3489/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
3490///    once from the chunk surface and once from the property surface) the
3491///    higher-score row wins, then chunk > property > vector, then declaration
3492///    order (chunk first).
3493/// 3. Strict hits form one block and relaxed hits form the next. Strict
3494///    always precedes relaxed in the merged output regardless of per-hit
3495///    score.
3496/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
3497///    already appears in the strict block is dropped.
3498/// 5. The merged output is truncated to `limit`.
3499fn merge_search_branches(
3500    strict: Vec<SearchHit>,
3501    relaxed: Vec<SearchHit>,
3502    limit: usize,
3503) -> Vec<SearchHit> {
3504    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
3505}
3506
3507/// Three-branch generalization of [`merge_search_branches`]: orders hits as
3508/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
3509/// Semantics, with cross-branch dedup resolved by branch precedence
3510/// (strict > relaxed > vector). Within each block the existing
3511/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
3512/// priority chunk > property > vector).
3513///
3514/// Phase 12 (the unified `search()` entry point) calls this directly. The
3515/// two-branch [`merge_search_branches`] wrapper is preserved as a
3516/// convenience for the text-only `execute_compiled_search_plan` path; both
3517/// reduce to the same code.
3518fn merge_search_branches_three(
3519    strict: Vec<SearchHit>,
3520    relaxed: Vec<SearchHit>,
3521    vector: Vec<SearchHit>,
3522    limit: usize,
3523) -> Vec<SearchHit> {
3524    let strict_block = dedup_branch_hits(strict);
3525    let relaxed_block = dedup_branch_hits(relaxed);
3526    let vector_block = dedup_branch_hits(vector);
3527
3528    let mut seen: std::collections::HashSet<String> = strict_block
3529        .iter()
3530        .map(|h| h.node.logical_id.clone())
3531        .collect();
3532
3533    let mut merged = strict_block;
3534    for hit in relaxed_block {
3535        if seen.insert(hit.node.logical_id.clone()) {
3536            merged.push(hit);
3537        }
3538    }
3539    for hit in vector_block {
3540        if seen.insert(hit.node.logical_id.clone()) {
3541            merged.push(hit);
3542        }
3543    }
3544
3545    if merged.len() > limit {
3546        merged.truncate(limit);
3547    }
3548    merged
3549}
3550
3551/// Sort a branch's hits by score descending + `logical_id` ascending, then
3552/// dedup duplicate `logical_id`s within the branch using source priority
3553/// (chunk > property > vector) and declaration order.
3554fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
3555    hits.sort_by(|a, b| {
3556        b.score
3557            .partial_cmp(&a.score)
3558            .unwrap_or(std::cmp::Ordering::Equal)
3559            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
3560            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
3561    });
3562
3563    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
3564    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
3565    hits
3566}
3567
3568fn source_priority(source: SearchHitSource) -> u8 {
3569    // Lower is better. Chunk is declared before property in the CTE; vector
3570    // is reserved for future wiring but comes last among the known variants.
3571    match source {
3572        SearchHitSource::Chunk => 0,
3573        SearchHitSource::Property => 1,
3574        SearchHitSource::Vector => 2,
3575    }
3576}
3577
3578/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
3579/// output so the coordinator can recover per-term byte offsets in the
3580/// original `text_content` column.
3581///
3582/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
3583/// ("start of text") byte. These bytes are safe for all valid JSON text
3584/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
3585/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
3586/// extracted blob, making the sentinel ambiguous for that row. Such input
3587/// is treated as out of scope for attribution correctness — hits on those
3588/// rows may have misattributed `matched_paths`, but no panic or query
3589/// failure occurs. A future hardening step could strip bytes < 0x20 at
3590/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
3591///
3592/// Using one-byte markers keeps the original-to-highlighted offset
3593/// accounting trivial: every sentinel adds exactly one byte to the
3594/// highlighted string.
3595const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
3596const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
3597
3598/// Load the `fts_node_property_positions` sidecar rows for a given
3599/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
3600/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
3601fn load_position_map(
3602    conn: &Connection,
3603    logical_id: &str,
3604    kind: &str,
3605) -> Result<Vec<(usize, usize, String)>, EngineError> {
3606    let mut stmt = conn
3607        .prepare_cached(
3608            "SELECT start_offset, end_offset, leaf_path \
3609             FROM fts_node_property_positions \
3610             WHERE node_logical_id = ?1 AND kind = ?2 \
3611             ORDER BY start_offset ASC",
3612        )
3613        .map_err(EngineError::Sqlite)?;
3614    let rows = stmt
3615        .query_map(rusqlite::params![logical_id, kind], |row| {
3616            let start: i64 = row.get(0)?;
3617            let end: i64 = row.get(1)?;
3618            let path: String = row.get(2)?;
3619            // Offsets are non-negative and within blob byte limits; on the
3620            // off chance a corrupt row is encountered, fall back to 0 so
3621            // lookups silently skip it rather than panicking.
3622            let start = usize::try_from(start).unwrap_or(0);
3623            let end = usize::try_from(end).unwrap_or(0);
3624            Ok((start, end, path))
3625        })
3626        .map_err(EngineError::Sqlite)?;
3627    let mut out = Vec::new();
3628    for row in rows {
3629        out.push(row.map_err(EngineError::Sqlite)?);
3630    }
3631    Ok(out)
3632}
3633
3634/// Parse a `highlight()`-wrapped string, returning the list of original-text
3635/// byte offsets at which matched terms begin. `wrapped` is the
3636/// highlight-decorated form of a text column; `open` / `close` are the
3637/// sentinel markers passed to `highlight()`. The returned offsets refer to
3638/// positions in the *original* text (i.e. the column as it would be stored
3639/// without highlight decoration).
3640fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3641    let mut offsets = Vec::new();
3642    let bytes = wrapped.as_bytes();
3643    let open_bytes = open.as_bytes();
3644    let close_bytes = close.as_bytes();
3645    let mut i = 0usize;
3646    // Number of sentinel bytes consumed so far — every marker encountered
3647    // subtracts from the wrapped-string index to get the original offset.
3648    let mut marker_bytes_seen = 0usize;
3649    while i < bytes.len() {
3650        if bytes[i..].starts_with(open_bytes) {
3651            // Record the original-text offset of the term following the open
3652            // marker.
3653            let original_offset = i - marker_bytes_seen;
3654            offsets.push(original_offset);
3655            i += open_bytes.len();
3656            marker_bytes_seen += open_bytes.len();
3657        } else if bytes[i..].starts_with(close_bytes) {
3658            i += close_bytes.len();
3659            marker_bytes_seen += close_bytes.len();
3660        } else {
3661            i += 1;
3662        }
3663    }
3664    offsets
3665}
3666
3667/// Binary-search the position map for the leaf whose `[start, end)` range
3668/// contains `offset`. Returns `None` if no leaf covers the offset.
3669fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3670    // Binary search for the greatest start_offset <= offset.
3671    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3672        Ok(i) => i,
3673        Err(0) => return None,
3674        Err(i) => i - 1,
3675    };
3676    let (start, end, path) = &positions[idx];
3677    if offset >= *start && offset < *end {
3678        Some(path.as_str())
3679    } else {
3680        None
3681    }
3682}
3683
3684/// Resolve per-hit match attribution by introspecting the FTS5 match state
3685/// for the hit's row via `highlight()` and mapping the resulting original-
3686/// text offsets back to recursive-leaf paths via the Phase 4 position map.
3687///
3688/// Chunk-backed hits carry no leaf structure and always return
3689/// `matched_paths = ["text_content"]`. Property-backed hits without a `projection_row_id`
3690/// (which should not happen — the search CTE always populates it) also
3691/// return empty attribution rather than erroring.
3692fn resolve_hit_attribution(
3693    conn: &Connection,
3694    hit: &SearchHit,
3695    match_expr: &str,
3696) -> Result<HitAttribution, EngineError> {
3697    if matches!(hit.source, SearchHitSource::Chunk) {
3698        return Ok(HitAttribution {
3699            matched_paths: vec!["text_content".to_owned()],
3700        });
3701    }
3702    if !matches!(hit.source, SearchHitSource::Property) {
3703        return Ok(HitAttribution {
3704            matched_paths: Vec::new(),
3705        });
3706    }
3707    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3708        return Ok(HitAttribution {
3709            matched_paths: Vec::new(),
3710        });
3711    };
3712    let rowid: i64 = match rowid_str.parse() {
3713        Ok(v) => v,
3714        Err(_) => {
3715            return Ok(HitAttribution {
3716                matched_paths: Vec::new(),
3717            });
3718        }
3719    };
3720
3721    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
3722    // FTS5 MATCH in the WHERE clause re-establishes the match state that
3723    // `highlight()` needs to decorate the returned text.
3724    // Per-kind tables have schema (node_logical_id UNINDEXED, text_content),
3725    // so text_content is column index 1 (0-based).
3726    let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3727    let highlight_sql = format!(
3728        "SELECT highlight({prop_table}, 1, ?1, ?2) \
3729         FROM {prop_table} \
3730         WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3731    );
3732    let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3733    let wrapped: Option<String> = stmt
3734        .query_row(
3735            rusqlite::params![
3736                ATTRIBUTION_HIGHLIGHT_OPEN,
3737                ATTRIBUTION_HIGHLIGHT_CLOSE,
3738                rowid,
3739                match_expr,
3740            ],
3741            |row| row.get(0),
3742        )
3743        .optional()
3744        .map_err(EngineError::Sqlite)?;
3745    let Some(wrapped) = wrapped else {
3746        return Ok(HitAttribution {
3747            matched_paths: Vec::new(),
3748        });
3749    };
3750
3751    let offsets = parse_highlight_offsets(
3752        &wrapped,
3753        ATTRIBUTION_HIGHLIGHT_OPEN,
3754        ATTRIBUTION_HIGHLIGHT_CLOSE,
3755    );
3756    if offsets.is_empty() {
3757        return Ok(HitAttribution {
3758            matched_paths: Vec::new(),
3759        });
3760    }
3761
3762    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3763    if positions.is_empty() {
3764        // Scalar-only schemas have no position-map entries; attribution
3765        // degrades to an empty vector rather than erroring.
3766        return Ok(HitAttribution {
3767            matched_paths: Vec::new(),
3768        });
3769    }
3770
3771    let mut matched_paths: Vec<String> = Vec::new();
3772    for offset in offsets {
3773        if let Some(path) = find_leaf_for_offset(&positions, offset)
3774            && !matched_paths.iter().any(|p| p == path)
3775        {
3776            matched_paths.push(path.to_owned());
3777        }
3778    }
3779    Ok(HitAttribution { matched_paths })
3780}
3781
3782/// Build a BM25 scoring expression for a per-kind FTS5 table.
3783///
3784/// If the schema has no weighted specs (all weights None), returns `bm25({table})`.
3785/// Otherwise returns `bm25({table}, 0.0, w1, w2, ...)` where the first weight
3786/// (0.0) is for the `node_logical_id UNINDEXED` column (which BM25 should ignore),
3787/// then one weight per spec in schema order.
3788fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3789    let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3790    let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3791    if !any_weighted {
3792        return format!("bm25({table})");
3793    }
3794    // node_logical_id is UNINDEXED — weight 0.0 tells BM25 to ignore it.
3795    let weights: Vec<String> = std::iter::once("0.0".to_owned())
3796        .chain(
3797            schema
3798                .paths
3799                .iter()
3800                .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3801        )
3802        .collect();
3803    format!("bm25({table}, {})", weights.join(", "))
3804}
3805
3806fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3807    match value {
3808        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3809        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3810        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3811    }
3812}
3813
3814#[cfg(test)]
3815#[allow(clippy::expect_used, deprecated)]
3816mod tests {
3817    use std::panic::{AssertUnwindSafe, catch_unwind};
3818    use std::sync::Arc;
3819
3820    use fathomdb_query::{BindValue, QueryBuilder};
3821    use fathomdb_schema::SchemaManager;
3822    use rusqlite::types::Value;
3823    use tempfile::NamedTempFile;
3824
3825    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3826
3827    use fathomdb_query::{
3828        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3829    };
3830
3831    use super::{
3832        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3833        wrap_node_row_projection_sql,
3834    };
3835
3836    fn mk_hit(
3837        logical_id: &str,
3838        score: f64,
3839        match_mode: SearchMatchMode,
3840        source: SearchHitSource,
3841    ) -> SearchHit {
3842        SearchHit {
3843            node: NodeRowLite {
3844                row_id: format!("{logical_id}-row"),
3845                logical_id: logical_id.to_owned(),
3846                kind: "Goal".to_owned(),
3847                properties: "{}".to_owned(),
3848                content_ref: None,
3849                last_accessed_at: None,
3850            },
3851            score,
3852            modality: RetrievalModality::Text,
3853            source,
3854            match_mode: Some(match_mode),
3855            snippet: None,
3856            written_at: 0,
3857            projection_row_id: None,
3858            vector_distance: None,
3859            attribution: None,
3860        }
3861    }
3862
3863    #[test]
3864    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3865        let strict = vec![mk_hit(
3866            "a",
3867            1.0,
3868            SearchMatchMode::Strict,
3869            SearchHitSource::Chunk,
3870        )];
3871        // Relaxed has a higher score but must still come second.
3872        let relaxed = vec![mk_hit(
3873            "b",
3874            9.9,
3875            SearchMatchMode::Relaxed,
3876            SearchHitSource::Chunk,
3877        )];
3878        let merged = merge_search_branches(strict, relaxed, 10);
3879        assert_eq!(merged.len(), 2);
3880        assert_eq!(merged[0].node.logical_id, "a");
3881        assert!(matches!(
3882            merged[0].match_mode,
3883            Some(SearchMatchMode::Strict)
3884        ));
3885        assert_eq!(merged[1].node.logical_id, "b");
3886        assert!(matches!(
3887            merged[1].match_mode,
3888            Some(SearchMatchMode::Relaxed)
3889        ));
3890    }
3891
3892    #[test]
3893    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3894        let strict = vec![mk_hit(
3895            "shared",
3896            1.0,
3897            SearchMatchMode::Strict,
3898            SearchHitSource::Chunk,
3899        )];
3900        let relaxed = vec![
3901            mk_hit(
3902                "shared",
3903                9.9,
3904                SearchMatchMode::Relaxed,
3905                SearchHitSource::Chunk,
3906            ),
3907            mk_hit(
3908                "other",
3909                2.0,
3910                SearchMatchMode::Relaxed,
3911                SearchHitSource::Chunk,
3912            ),
3913        ];
3914        let merged = merge_search_branches(strict, relaxed, 10);
3915        assert_eq!(merged.len(), 2);
3916        assert_eq!(merged[0].node.logical_id, "shared");
3917        assert!(matches!(
3918            merged[0].match_mode,
3919            Some(SearchMatchMode::Strict)
3920        ));
3921        assert_eq!(merged[1].node.logical_id, "other");
3922        assert!(matches!(
3923            merged[1].match_mode,
3924            Some(SearchMatchMode::Relaxed)
3925        ));
3926    }
3927
3928    #[test]
3929    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3930        let strict = vec![
3931            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3932            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3933            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3934        ];
3935        let merged = merge_search_branches(strict, vec![], 10);
3936        assert_eq!(
3937            merged
3938                .iter()
3939                .map(|h| &h.node.logical_id)
3940                .collect::<Vec<_>>(),
3941            vec!["a", "c", "b"]
3942        );
3943    }
3944
3945    #[test]
3946    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3947        let strict = vec![
3948            mk_hit(
3949                "shared",
3950                1.0,
3951                SearchMatchMode::Strict,
3952                SearchHitSource::Property,
3953            ),
3954            mk_hit(
3955                "shared",
3956                1.0,
3957                SearchMatchMode::Strict,
3958                SearchHitSource::Chunk,
3959            ),
3960        ];
3961        let merged = merge_search_branches(strict, vec![], 10);
3962        assert_eq!(merged.len(), 1);
3963        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3964    }
3965
3966    #[test]
3967    fn merge_truncates_to_limit_after_block_merge() {
3968        let strict = vec![
3969            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3970            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3971        ];
3972        let relaxed = vec![mk_hit(
3973            "c",
3974            9.0,
3975            SearchMatchMode::Relaxed,
3976            SearchHitSource::Chunk,
3977        )];
3978        let merged = merge_search_branches(strict, relaxed, 2);
3979        assert_eq!(merged.len(), 2);
3980        assert_eq!(merged[0].node.logical_id, "a");
3981        assert_eq!(merged[1].node.logical_id, "b");
3982    }
3983
3984    /// P12 architectural pin: the generalized three-branch merger must
3985    /// produce strict -> relaxed -> vector block ordering with cross-branch
3986    /// dedup resolved by branch precedence (strict > relaxed > vector).
3987    /// v1 `search()` policy never fires the vector branch through the
3988    /// unified planner because read-time embedding is deferred, but the
3989    /// merge helper itself must be ready for the day the planner does so —
3990    /// otherwise wiring the future phase requires touching the core merge
3991    /// code as well as the planner.
3992    #[test]
3993    fn search_architecturally_supports_three_branch_fusion() {
3994        let strict = vec![mk_hit(
3995            "alpha",
3996            1.0,
3997            SearchMatchMode::Strict,
3998            SearchHitSource::Chunk,
3999        )];
4000        let relaxed = vec![mk_hit(
4001            "bravo",
4002            5.0,
4003            SearchMatchMode::Relaxed,
4004            SearchHitSource::Chunk,
4005        )];
4006        // Synthetic vector hit with the highest score. Three-block ordering
4007        // must still place it last.
4008        let mut vector_hit = mk_hit(
4009            "charlie",
4010            9.9,
4011            SearchMatchMode::Strict,
4012            SearchHitSource::Vector,
4013        );
4014        // Vector hits actually carry match_mode=None per the addendum, but
4015        // the merge helper's ordering is mode-agnostic; we override here to
4016        // pin the modality field for the test.
4017        vector_hit.match_mode = None;
4018        vector_hit.modality = RetrievalModality::Vector;
4019        let vector = vec![vector_hit];
4020
4021        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
4022        assert_eq!(merged.len(), 3);
4023        assert_eq!(merged[0].node.logical_id, "alpha");
4024        assert_eq!(merged[1].node.logical_id, "bravo");
4025        assert_eq!(merged[2].node.logical_id, "charlie");
4026        // Vector block comes last regardless of its higher score.
4027        assert!(matches!(merged[2].source, SearchHitSource::Vector));
4028
4029        // Cross-branch dedup: a logical_id that appears in multiple branches
4030        // is attributed to its highest-priority originating branch only.
4031        let strict2 = vec![mk_hit(
4032            "shared",
4033            0.5,
4034            SearchMatchMode::Strict,
4035            SearchHitSource::Chunk,
4036        )];
4037        let relaxed2 = vec![mk_hit(
4038            "shared",
4039            5.0,
4040            SearchMatchMode::Relaxed,
4041            SearchHitSource::Chunk,
4042        )];
4043        let mut vshared = mk_hit(
4044            "shared",
4045            9.9,
4046            SearchMatchMode::Strict,
4047            SearchHitSource::Vector,
4048        );
4049        vshared.match_mode = None;
4050        vshared.modality = RetrievalModality::Vector;
4051        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
4052        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
4053        assert!(matches!(
4054            merged2[0].match_mode,
4055            Some(SearchMatchMode::Strict)
4056        ));
4057        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
4058
4059        // Relaxed wins over vector when strict is absent.
4060        let mut vshared2 = mk_hit(
4061            "shared",
4062            9.9,
4063            SearchMatchMode::Strict,
4064            SearchHitSource::Vector,
4065        );
4066        vshared2.match_mode = None;
4067        vshared2.modality = RetrievalModality::Vector;
4068        let merged3 = merge_search_branches_three(
4069            vec![],
4070            vec![mk_hit(
4071                "shared",
4072                1.0,
4073                SearchMatchMode::Relaxed,
4074                SearchHitSource::Chunk,
4075            )],
4076            vec![vshared2],
4077            10,
4078        );
4079        assert_eq!(merged3.len(), 1);
4080        assert!(matches!(
4081            merged3[0].match_mode,
4082            Some(SearchMatchMode::Relaxed)
4083        ));
4084    }
4085
4086    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
4087    /// never fires this shape today (read-time embedding is deferred), but
4088    /// when it does the merger will see empty strict + empty relaxed + a
4089    /// non-empty vector block. The three-branch merger must pass that
4090    /// block through unchanged, preserving `RetrievalModality::Vector`,
4091    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
4092    ///
4093    /// Note: the review spec asked for `vector_hit_count == 1` /
4094    /// `strict_hit_count == 0` assertions. Those are fields on
4095    /// `SearchRows`, which is assembled one layer up in
4096    /// `execute_compiled_retrieval_plan`. The merger returns a bare
4097    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
4098    /// directly on the returned vec (block shape + per-hit fields).
4099    #[test]
4100    fn merge_search_branches_three_vector_only_preserves_vector_block() {
4101        let mut vector_hit = mk_hit(
4102            "solo",
4103            0.75,
4104            SearchMatchMode::Strict,
4105            SearchHitSource::Vector,
4106        );
4107        vector_hit.match_mode = None;
4108        vector_hit.modality = RetrievalModality::Vector;
4109
4110        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
4111
4112        assert_eq!(merged.len(), 1);
4113        assert_eq!(merged[0].node.logical_id, "solo");
4114        assert!(matches!(merged[0].source, SearchHitSource::Vector));
4115        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
4116        assert!(
4117            merged[0].match_mode.is_none(),
4118            "vector hits carry match_mode=None per addendum 1"
4119        );
4120    }
4121
4122    /// P12-N-3: limit truncation must preserve block precedence — when the
4123    /// strict block alone already exceeds the limit, relaxed and vector
4124    /// hits must be dropped entirely even if they have higher raw scores.
4125    ///
4126    /// Note: the review spec asked for `strict_hit_count == 2` /
4127    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
4128    /// are `SearchRows` fields assembled one layer up. Since
4129    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
4130    /// test asserts the corresponding invariants directly: the returned
4131    /// vec contains exactly the top two strict hits, with no relaxed or
4132    /// vector hits leaking past the limit.
4133    #[test]
4134    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
4135        let strict = vec![
4136            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
4137            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
4138            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
4139        ];
4140        let relaxed = vec![mk_hit(
4141            "d",
4142            9.0,
4143            SearchMatchMode::Relaxed,
4144            SearchHitSource::Chunk,
4145        )];
4146        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
4147        vector_hit.match_mode = None;
4148        vector_hit.modality = RetrievalModality::Vector;
4149        let vector = vec![vector_hit];
4150
4151        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
4152
4153        assert_eq!(merged.len(), 2);
4154        assert_eq!(merged[0].node.logical_id, "a");
4155        assert_eq!(merged[1].node.logical_id, "b");
4156        // Neither relaxed nor vector hits made it past the limit.
4157        assert!(
4158            merged
4159                .iter()
4160                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
4161            "strict block must win limit contention against higher-scored relaxed/vector hits"
4162        );
4163        assert!(
4164            merged
4165                .iter()
4166                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
4167            "no vector source hits should leak past the limit"
4168        );
4169    }
4170
4171    #[test]
4172    fn is_vec_table_absent_matches_known_error_messages() {
4173        use rusqlite::ffi;
4174        fn make_err(msg: &str) -> rusqlite::Error {
4175            rusqlite::Error::SqliteFailure(
4176                ffi::Error {
4177                    code: ffi::ErrorCode::Unknown,
4178                    extended_code: 1,
4179                },
4180                Some(msg.to_owned()),
4181            )
4182        }
4183        assert!(is_vec_table_absent(&make_err(
4184            "no such table: vec_nodes_active"
4185        )));
4186        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
4187        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
4188        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
4189        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
4190    }
4191
4192    // --- 0.5.0 item 6: per-kind vec table in coordinator ---
4193
4194    #[test]
4195    fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
4196        // Without sqlite-vec feature or without creating vec_mykind, querying
4197        // for kind "MyKind" should degrade gracefully (was_degraded=true).
4198        let db = NamedTempFile::new().expect("temporary db");
4199        let coordinator = ExecutionCoordinator::open(
4200            db.path(),
4201            Arc::new(SchemaManager::new()),
4202            None,
4203            1,
4204            Arc::new(TelemetryCounters::default()),
4205            None,
4206        )
4207        .expect("coordinator");
4208
4209        let compiled = QueryBuilder::nodes("MyKind")
4210            .vector_search("some query", 5)
4211            .compile()
4212            .expect("vector query compiles");
4213
4214        let rows = coordinator
4215            .execute_compiled_read(&compiled)
4216            .expect("degraded read must succeed");
4217        assert!(
4218            rows.was_degraded,
4219            "must degrade when vec_mykind table does not exist"
4220        );
4221        assert!(
4222            rows.nodes.is_empty(),
4223            "degraded result must return empty nodes"
4224        );
4225    }
4226
4227    #[test]
4228    fn bind_value_text_maps_to_sql_text() {
4229        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
4230        assert_eq!(val, Value::Text("hello".to_owned()));
4231    }
4232
4233    #[test]
4234    fn bind_value_integer_maps_to_sql_integer() {
4235        let val = bind_value_to_sql(&BindValue::Integer(42));
4236        assert_eq!(val, Value::Integer(42));
4237    }
4238
4239    #[test]
4240    fn bind_value_bool_true_maps_to_integer_one() {
4241        let val = bind_value_to_sql(&BindValue::Bool(true));
4242        assert_eq!(val, Value::Integer(1));
4243    }
4244
4245    #[test]
4246    fn bind_value_bool_false_maps_to_integer_zero() {
4247        let val = bind_value_to_sql(&BindValue::Bool(false));
4248        assert_eq!(val, Value::Integer(0));
4249    }
4250
4251    #[test]
4252    fn same_shape_queries_share_one_cache_entry() {
4253        let db = NamedTempFile::new().expect("temporary db");
4254        let coordinator = ExecutionCoordinator::open(
4255            db.path(),
4256            Arc::new(SchemaManager::new()),
4257            None,
4258            1,
4259            Arc::new(TelemetryCounters::default()),
4260            None,
4261        )
4262        .expect("coordinator");
4263
4264        let compiled_a = QueryBuilder::nodes("Meeting")
4265            .text_search("budget", 5)
4266            .limit(10)
4267            .compile()
4268            .expect("compiled a");
4269        let compiled_b = QueryBuilder::nodes("Meeting")
4270            .text_search("standup", 5)
4271            .limit(10)
4272            .compile()
4273            .expect("compiled b");
4274
4275        coordinator
4276            .execute_compiled_read(&compiled_a)
4277            .expect("read a");
4278        coordinator
4279            .execute_compiled_read(&compiled_b)
4280            .expect("read b");
4281
4282        assert_eq!(
4283            compiled_a.shape_hash, compiled_b.shape_hash,
4284            "different bind values, same structural shape → same hash"
4285        );
4286        assert_eq!(coordinator.shape_sql_count(), 1);
4287    }
4288
4289    #[test]
4290    fn vector_read_degrades_gracefully_when_vec_table_absent() {
4291        let db = NamedTempFile::new().expect("temporary db");
4292        let coordinator = ExecutionCoordinator::open(
4293            db.path(),
4294            Arc::new(SchemaManager::new()),
4295            None,
4296            1,
4297            Arc::new(TelemetryCounters::default()),
4298            None,
4299        )
4300        .expect("coordinator");
4301
4302        let compiled = QueryBuilder::nodes("Meeting")
4303            .vector_search("budget embeddings", 5)
4304            .compile()
4305            .expect("vector query compiles");
4306
4307        let result = coordinator.execute_compiled_read(&compiled);
4308        let rows = result.expect("degraded read must succeed, not error");
4309        assert!(
4310            rows.was_degraded,
4311            "result must be flagged as degraded when vec_nodes_active is absent"
4312        );
4313        assert!(
4314            rows.nodes.is_empty(),
4315            "degraded result must return empty nodes"
4316        );
4317    }
4318
4319    #[test]
4320    fn coordinator_caches_by_shape_hash() {
4321        let db = NamedTempFile::new().expect("temporary db");
4322        let coordinator = ExecutionCoordinator::open(
4323            db.path(),
4324            Arc::new(SchemaManager::new()),
4325            None,
4326            1,
4327            Arc::new(TelemetryCounters::default()),
4328            None,
4329        )
4330        .expect("coordinator");
4331
4332        let compiled = QueryBuilder::nodes("Meeting")
4333            .text_search("budget", 5)
4334            .compile()
4335            .expect("compiled query");
4336
4337        coordinator
4338            .execute_compiled_read(&compiled)
4339            .expect("execute compiled read");
4340        assert_eq!(coordinator.shape_sql_count(), 1);
4341    }
4342
4343    // --- Item 6: explain_compiled_read tests ---
4344
4345    #[test]
4346    fn explain_returns_correct_sql() {
4347        let db = NamedTempFile::new().expect("temporary db");
4348        let coordinator = ExecutionCoordinator::open(
4349            db.path(),
4350            Arc::new(SchemaManager::new()),
4351            None,
4352            1,
4353            Arc::new(TelemetryCounters::default()),
4354            None,
4355        )
4356        .expect("coordinator");
4357
4358        let compiled = QueryBuilder::nodes("Meeting")
4359            .text_search("budget", 5)
4360            .compile()
4361            .expect("compiled query");
4362
4363        let plan = coordinator.explain_compiled_read(&compiled);
4364
4365        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
4366    }
4367
4368    #[test]
4369    fn explain_returns_correct_driving_table() {
4370        use fathomdb_query::DrivingTable;
4371
4372        let db = NamedTempFile::new().expect("temporary db");
4373        let coordinator = ExecutionCoordinator::open(
4374            db.path(),
4375            Arc::new(SchemaManager::new()),
4376            None,
4377            1,
4378            Arc::new(TelemetryCounters::default()),
4379            None,
4380        )
4381        .expect("coordinator");
4382
4383        let compiled = QueryBuilder::nodes("Meeting")
4384            .text_search("budget", 5)
4385            .compile()
4386            .expect("compiled query");
4387
4388        let plan = coordinator.explain_compiled_read(&compiled);
4389
4390        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
4391    }
4392
4393    #[test]
4394    fn explain_reports_cache_miss_then_hit() {
4395        let db = NamedTempFile::new().expect("temporary db");
4396        let coordinator = ExecutionCoordinator::open(
4397            db.path(),
4398            Arc::new(SchemaManager::new()),
4399            None,
4400            1,
4401            Arc::new(TelemetryCounters::default()),
4402            None,
4403        )
4404        .expect("coordinator");
4405
4406        let compiled = QueryBuilder::nodes("Meeting")
4407            .text_search("budget", 5)
4408            .compile()
4409            .expect("compiled query");
4410
4411        // Before execution: cache miss
4412        let plan_before = coordinator.explain_compiled_read(&compiled);
4413        assert!(
4414            !plan_before.cache_hit,
4415            "cache miss expected before first execute"
4416        );
4417
4418        // Execute to populate cache
4419        coordinator
4420            .execute_compiled_read(&compiled)
4421            .expect("execute read");
4422
4423        // After execution: cache hit
4424        let plan_after = coordinator.explain_compiled_read(&compiled);
4425        assert!(
4426            plan_after.cache_hit,
4427            "cache hit expected after first execute"
4428        );
4429    }
4430
4431    #[test]
4432    fn explain_does_not_execute_query() {
4433        // Call explain_compiled_read on an empty database. If explain were
4434        // actually executing SQL, it would return Ok with 0 rows. But the
4435        // key assertion is that it returns a QueryPlan (not an error) even
4436        // without touching the database.
4437        let db = NamedTempFile::new().expect("temporary db");
4438        let coordinator = ExecutionCoordinator::open(
4439            db.path(),
4440            Arc::new(SchemaManager::new()),
4441            None,
4442            1,
4443            Arc::new(TelemetryCounters::default()),
4444            None,
4445        )
4446        .expect("coordinator");
4447
4448        let compiled = QueryBuilder::nodes("Meeting")
4449            .text_search("anything", 5)
4450            .compile()
4451            .expect("compiled query");
4452
4453        // This must not error, even though the database is empty
4454        let plan = coordinator.explain_compiled_read(&compiled);
4455
4456        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
4457        assert_eq!(plan.bind_count, compiled.binds.len());
4458    }
4459
4460    #[test]
4461    fn coordinator_executes_compiled_read() {
4462        let db = NamedTempFile::new().expect("temporary db");
4463        let coordinator = ExecutionCoordinator::open(
4464            db.path(),
4465            Arc::new(SchemaManager::new()),
4466            None,
4467            1,
4468            Arc::new(TelemetryCounters::default()),
4469            None,
4470        )
4471        .expect("coordinator");
4472        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4473
4474        conn.execute_batch(
4475            r#"
4476            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4477            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
4478            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4479            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
4480            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4481            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
4482            "#,
4483        )
4484        .expect("seed data");
4485
4486        let compiled = QueryBuilder::nodes("Meeting")
4487            .text_search("budget", 5)
4488            .limit(5)
4489            .compile()
4490            .expect("compiled query");
4491
4492        let rows = coordinator
4493            .execute_compiled_read(&compiled)
4494            .expect("execute read");
4495
4496        assert_eq!(rows.nodes.len(), 1);
4497        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4498    }
4499
4500    #[test]
4501    fn text_search_finds_structured_only_node_via_property_fts() {
4502        let db = NamedTempFile::new().expect("temporary db");
4503        let coordinator = ExecutionCoordinator::open(
4504            db.path(),
4505            Arc::new(SchemaManager::new()),
4506            None,
4507            1,
4508            Arc::new(TelemetryCounters::default()),
4509            None,
4510        )
4511        .expect("coordinator");
4512        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4513
4514        // Insert a structured-only node (no chunks) with a property FTS row.
4515        // Per-kind table must be created before inserting.
4516        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
4517        conn.execute_batch(&format!(
4518            "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
4519                node_logical_id UNINDEXED, text_content, \
4520                tokenize = 'porter unicode61 remove_diacritics 2'\
4521            )"
4522        ))
4523        .expect("create per-kind fts table");
4524        conn.execute_batch(&format!(
4525            r#"
4526            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4527            VALUES ('row-1', 'goal-1', 'Goal', '{{"name":"Ship v2"}}', 100, 'seed');
4528            INSERT INTO {goal_table} (node_logical_id, text_content)
4529            VALUES ('goal-1', 'Ship v2');
4530            "#
4531        ))
4532        .expect("seed data");
4533
4534        let compiled = QueryBuilder::nodes("Goal")
4535            .text_search("Ship", 5)
4536            .limit(5)
4537            .compile()
4538            .expect("compiled query");
4539
4540        let rows = coordinator
4541            .execute_compiled_read(&compiled)
4542            .expect("execute read");
4543
4544        assert_eq!(rows.nodes.len(), 1);
4545        assert_eq!(rows.nodes[0].logical_id, "goal-1");
4546    }
4547
4548    #[test]
4549    fn text_search_returns_both_chunk_and_property_backed_hits() {
4550        let db = NamedTempFile::new().expect("temporary db");
4551        let coordinator = ExecutionCoordinator::open(
4552            db.path(),
4553            Arc::new(SchemaManager::new()),
4554            None,
4555            1,
4556            Arc::new(TelemetryCounters::default()),
4557            None,
4558        )
4559        .expect("coordinator");
4560        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4561
4562        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
4563        conn.execute_batch(
4564            r"
4565            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4566            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4567            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4568            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
4569            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4570            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
4571            ",
4572        )
4573        .expect("seed chunk-backed node");
4574
4575        // Property-backed hit: a Meeting with property FTS containing "quarterly".
4576        // Per-kind table must be created before inserting.
4577        let meeting_table = fathomdb_schema::fts_kind_table_name("Meeting");
4578        conn.execute_batch(&format!(
4579            "CREATE VIRTUAL TABLE IF NOT EXISTS {meeting_table} USING fts5(\
4580                node_logical_id UNINDEXED, text_content, \
4581                tokenize = 'porter unicode61 remove_diacritics 2'\
4582            )"
4583        ))
4584        .expect("create per-kind fts table");
4585        conn.execute_batch(&format!(
4586            r#"
4587            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4588            VALUES ('row-2', 'meeting-2', 'Meeting', '{{"title":"quarterly sync"}}', 100, 'seed');
4589            INSERT INTO {meeting_table} (node_logical_id, text_content)
4590            VALUES ('meeting-2', 'quarterly sync');
4591            "#
4592        ))
4593        .expect("seed property-backed node");
4594
4595        let compiled = QueryBuilder::nodes("Meeting")
4596            .text_search("quarterly", 10)
4597            .limit(10)
4598            .compile()
4599            .expect("compiled query");
4600
4601        let rows = coordinator
4602            .execute_compiled_read(&compiled)
4603            .expect("execute read");
4604
4605        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
4606        ids.sort_unstable();
4607        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
4608    }
4609
4610    #[test]
4611    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
4612        let db = NamedTempFile::new().expect("temporary db");
4613        let coordinator = ExecutionCoordinator::open(
4614            db.path(),
4615            Arc::new(SchemaManager::new()),
4616            None,
4617            1,
4618            Arc::new(TelemetryCounters::default()),
4619            None,
4620        )
4621        .expect("coordinator");
4622        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4623
4624        conn.execute_batch(
4625            r"
4626            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4627            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4628            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4629            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4630            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4631            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4632            ",
4633        )
4634        .expect("seed chunk-backed node");
4635
4636        let compiled = QueryBuilder::nodes("Meeting")
4637            .text_search("not a ship", 10)
4638            .limit(10)
4639            .compile()
4640            .expect("compiled query");
4641
4642        let rows = coordinator
4643            .execute_compiled_read(&compiled)
4644            .expect("execute read");
4645
4646        assert_eq!(rows.nodes.len(), 1);
4647        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4648    }
4649
4650    // --- Item 1: capability gate tests ---
4651
4652    #[test]
4653    fn capability_gate_reports_false_without_feature() {
4654        let db = NamedTempFile::new().expect("temporary db");
4655        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
4656        // when no dimension is requested (the vector profile is never bootstrapped).
4657        let coordinator = ExecutionCoordinator::open(
4658            db.path(),
4659            Arc::new(SchemaManager::new()),
4660            None,
4661            1,
4662            Arc::new(TelemetryCounters::default()),
4663            None,
4664        )
4665        .expect("coordinator");
4666        assert!(
4667            !coordinator.vector_enabled(),
4668            "vector_enabled must be false when no dimension is requested"
4669        );
4670    }
4671
4672    #[cfg(feature = "sqlite-vec")]
4673    #[test]
4674    fn capability_gate_reports_true_when_feature_enabled() {
4675        let db = NamedTempFile::new().expect("temporary db");
4676        let coordinator = ExecutionCoordinator::open(
4677            db.path(),
4678            Arc::new(SchemaManager::new()),
4679            Some(128),
4680            1,
4681            Arc::new(TelemetryCounters::default()),
4682            None,
4683        )
4684        .expect("coordinator");
4685        assert!(
4686            coordinator.vector_enabled(),
4687            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4688        );
4689    }
4690
4691    // --- Item 4: runtime table read tests ---
4692
4693    #[test]
4694    fn read_run_returns_inserted_run() {
4695        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4696
4697        let db = NamedTempFile::new().expect("temporary db");
4698        let writer = WriterActor::start(
4699            db.path(),
4700            Arc::new(SchemaManager::new()),
4701            ProvenanceMode::Warn,
4702            Arc::new(TelemetryCounters::default()),
4703        )
4704        .expect("writer");
4705        writer
4706            .submit(WriteRequest {
4707                label: "runtime".to_owned(),
4708                nodes: vec![],
4709                node_retires: vec![],
4710                edges: vec![],
4711                edge_retires: vec![],
4712                chunks: vec![],
4713                runs: vec![RunInsert {
4714                    id: "run-r1".to_owned(),
4715                    kind: "session".to_owned(),
4716                    status: "active".to_owned(),
4717                    properties: "{}".to_owned(),
4718                    source_ref: Some("src-1".to_owned()),
4719                    upsert: false,
4720                    supersedes_id: None,
4721                }],
4722                steps: vec![],
4723                actions: vec![],
4724                optional_backfills: vec![],
4725                vec_inserts: vec![],
4726                operational_writes: vec![],
4727            })
4728            .expect("write run");
4729
4730        let coordinator = ExecutionCoordinator::open(
4731            db.path(),
4732            Arc::new(SchemaManager::new()),
4733            None,
4734            1,
4735            Arc::new(TelemetryCounters::default()),
4736            None,
4737        )
4738        .expect("coordinator");
4739        let row = coordinator
4740            .read_run("run-r1")
4741            .expect("read_run")
4742            .expect("row exists");
4743        assert_eq!(row.id, "run-r1");
4744        assert_eq!(row.kind, "session");
4745        assert_eq!(row.status, "active");
4746    }
4747
4748    #[test]
4749    fn read_step_returns_inserted_step() {
4750        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4751
4752        let db = NamedTempFile::new().expect("temporary db");
4753        let writer = WriterActor::start(
4754            db.path(),
4755            Arc::new(SchemaManager::new()),
4756            ProvenanceMode::Warn,
4757            Arc::new(TelemetryCounters::default()),
4758        )
4759        .expect("writer");
4760        writer
4761            .submit(WriteRequest {
4762                label: "runtime".to_owned(),
4763                nodes: vec![],
4764                node_retires: vec![],
4765                edges: vec![],
4766                edge_retires: vec![],
4767                chunks: vec![],
4768                runs: vec![RunInsert {
4769                    id: "run-s1".to_owned(),
4770                    kind: "session".to_owned(),
4771                    status: "active".to_owned(),
4772                    properties: "{}".to_owned(),
4773                    source_ref: Some("src-1".to_owned()),
4774                    upsert: false,
4775                    supersedes_id: None,
4776                }],
4777                steps: vec![StepInsert {
4778                    id: "step-s1".to_owned(),
4779                    run_id: "run-s1".to_owned(),
4780                    kind: "llm".to_owned(),
4781                    status: "completed".to_owned(),
4782                    properties: "{}".to_owned(),
4783                    source_ref: Some("src-1".to_owned()),
4784                    upsert: false,
4785                    supersedes_id: None,
4786                }],
4787                actions: vec![],
4788                optional_backfills: vec![],
4789                vec_inserts: vec![],
4790                operational_writes: vec![],
4791            })
4792            .expect("write step");
4793
4794        let coordinator = ExecutionCoordinator::open(
4795            db.path(),
4796            Arc::new(SchemaManager::new()),
4797            None,
4798            1,
4799            Arc::new(TelemetryCounters::default()),
4800            None,
4801        )
4802        .expect("coordinator");
4803        let row = coordinator
4804            .read_step("step-s1")
4805            .expect("read_step")
4806            .expect("row exists");
4807        assert_eq!(row.id, "step-s1");
4808        assert_eq!(row.run_id, "run-s1");
4809        assert_eq!(row.kind, "llm");
4810    }
4811
4812    #[test]
4813    fn read_action_returns_inserted_action() {
4814        use crate::{
4815            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4816            writer::{ActionInsert, StepInsert},
4817        };
4818
4819        let db = NamedTempFile::new().expect("temporary db");
4820        let writer = WriterActor::start(
4821            db.path(),
4822            Arc::new(SchemaManager::new()),
4823            ProvenanceMode::Warn,
4824            Arc::new(TelemetryCounters::default()),
4825        )
4826        .expect("writer");
4827        writer
4828            .submit(WriteRequest {
4829                label: "runtime".to_owned(),
4830                nodes: vec![],
4831                node_retires: vec![],
4832                edges: vec![],
4833                edge_retires: vec![],
4834                chunks: vec![],
4835                runs: vec![RunInsert {
4836                    id: "run-a1".to_owned(),
4837                    kind: "session".to_owned(),
4838                    status: "active".to_owned(),
4839                    properties: "{}".to_owned(),
4840                    source_ref: Some("src-1".to_owned()),
4841                    upsert: false,
4842                    supersedes_id: None,
4843                }],
4844                steps: vec![StepInsert {
4845                    id: "step-a1".to_owned(),
4846                    run_id: "run-a1".to_owned(),
4847                    kind: "llm".to_owned(),
4848                    status: "completed".to_owned(),
4849                    properties: "{}".to_owned(),
4850                    source_ref: Some("src-1".to_owned()),
4851                    upsert: false,
4852                    supersedes_id: None,
4853                }],
4854                actions: vec![ActionInsert {
4855                    id: "action-a1".to_owned(),
4856                    step_id: "step-a1".to_owned(),
4857                    kind: "emit".to_owned(),
4858                    status: "completed".to_owned(),
4859                    properties: "{}".to_owned(),
4860                    source_ref: Some("src-1".to_owned()),
4861                    upsert: false,
4862                    supersedes_id: None,
4863                }],
4864                optional_backfills: vec![],
4865                vec_inserts: vec![],
4866                operational_writes: vec![],
4867            })
4868            .expect("write action");
4869
4870        let coordinator = ExecutionCoordinator::open(
4871            db.path(),
4872            Arc::new(SchemaManager::new()),
4873            None,
4874            1,
4875            Arc::new(TelemetryCounters::default()),
4876            None,
4877        )
4878        .expect("coordinator");
4879        let row = coordinator
4880            .read_action("action-a1")
4881            .expect("read_action")
4882            .expect("row exists");
4883        assert_eq!(row.id, "action-a1");
4884        assert_eq!(row.step_id, "step-a1");
4885        assert_eq!(row.kind, "emit");
4886    }
4887
4888    #[test]
4889    fn read_active_runs_excludes_superseded() {
4890        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4891
4892        let db = NamedTempFile::new().expect("temporary db");
4893        let writer = WriterActor::start(
4894            db.path(),
4895            Arc::new(SchemaManager::new()),
4896            ProvenanceMode::Warn,
4897            Arc::new(TelemetryCounters::default()),
4898        )
4899        .expect("writer");
4900
4901        // Insert original run
4902        writer
4903            .submit(WriteRequest {
4904                label: "v1".to_owned(),
4905                nodes: vec![],
4906                node_retires: vec![],
4907                edges: vec![],
4908                edge_retires: vec![],
4909                chunks: vec![],
4910                runs: vec![RunInsert {
4911                    id: "run-v1".to_owned(),
4912                    kind: "session".to_owned(),
4913                    status: "active".to_owned(),
4914                    properties: "{}".to_owned(),
4915                    source_ref: Some("src-1".to_owned()),
4916                    upsert: false,
4917                    supersedes_id: None,
4918                }],
4919                steps: vec![],
4920                actions: vec![],
4921                optional_backfills: vec![],
4922                vec_inserts: vec![],
4923                operational_writes: vec![],
4924            })
4925            .expect("v1 write");
4926
4927        // Supersede original run with v2
4928        writer
4929            .submit(WriteRequest {
4930                label: "v2".to_owned(),
4931                nodes: vec![],
4932                node_retires: vec![],
4933                edges: vec![],
4934                edge_retires: vec![],
4935                chunks: vec![],
4936                runs: vec![RunInsert {
4937                    id: "run-v2".to_owned(),
4938                    kind: "session".to_owned(),
4939                    status: "completed".to_owned(),
4940                    properties: "{}".to_owned(),
4941                    source_ref: Some("src-2".to_owned()),
4942                    upsert: true,
4943                    supersedes_id: Some("run-v1".to_owned()),
4944                }],
4945                steps: vec![],
4946                actions: vec![],
4947                optional_backfills: vec![],
4948                vec_inserts: vec![],
4949                operational_writes: vec![],
4950            })
4951            .expect("v2 write");
4952
4953        let coordinator = ExecutionCoordinator::open(
4954            db.path(),
4955            Arc::new(SchemaManager::new()),
4956            None,
4957            1,
4958            Arc::new(TelemetryCounters::default()),
4959            None,
4960        )
4961        .expect("coordinator");
4962        let active = coordinator.read_active_runs().expect("read_active_runs");
4963
4964        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4965        assert_eq!(active[0].id, "run-v2");
4966    }
4967
4968    #[allow(clippy::panic)]
4969    fn poison_connection(coordinator: &ExecutionCoordinator) {
4970        let result = catch_unwind(AssertUnwindSafe(|| {
4971            let _guard = coordinator.pool.connections[0]
4972                .lock()
4973                .expect("poison test lock");
4974            panic!("poison coordinator connection mutex");
4975        }));
4976        assert!(
4977            result.is_err(),
4978            "poison test must unwind while holding the connection mutex"
4979        );
4980    }
4981
4982    #[allow(clippy::panic)]
4983    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4984    where
4985        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4986    {
4987        match op(coordinator) {
4988            Err(EngineError::Bridge(message)) => {
4989                assert_eq!(message, "connection mutex poisoned");
4990            }
4991            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4992            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4993        }
4994    }
4995
4996    #[test]
4997    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4998        let db = NamedTempFile::new().expect("temporary db");
4999        let coordinator = ExecutionCoordinator::open(
5000            db.path(),
5001            Arc::new(SchemaManager::new()),
5002            None,
5003            1,
5004            Arc::new(TelemetryCounters::default()),
5005            None,
5006        )
5007        .expect("coordinator");
5008
5009        poison_connection(&coordinator);
5010
5011        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
5012        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
5013        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
5014        assert_poisoned_connection_error(
5015            &coordinator,
5016            super::ExecutionCoordinator::read_active_runs,
5017        );
5018        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
5019        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
5020    }
5021
5022    // --- M-2: Bounded shape cache ---
5023
5024    #[test]
5025    fn shape_cache_stays_bounded() {
5026        use fathomdb_query::ShapeHash;
5027
5028        let db = NamedTempFile::new().expect("temporary db");
5029        let coordinator = ExecutionCoordinator::open(
5030            db.path(),
5031            Arc::new(SchemaManager::new()),
5032            None,
5033            1,
5034            Arc::new(TelemetryCounters::default()),
5035            None,
5036        )
5037        .expect("coordinator");
5038
5039        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
5040        {
5041            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
5042            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
5043                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
5044            }
5045        }
5046        // The cache is now over the limit but hasn't been pruned yet (pruning
5047        // happens on the insert path in execute_compiled_read).
5048
5049        // Execute a compiled read to trigger the bounded-cache check.
5050        let compiled = QueryBuilder::nodes("Meeting")
5051            .text_search("budget", 5)
5052            .limit(10)
5053            .compile()
5054            .expect("compiled query");
5055
5056        coordinator
5057            .execute_compiled_read(&compiled)
5058            .expect("execute read");
5059
5060        assert!(
5061            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
5062            "shape cache must stay bounded: got {} entries, max {}",
5063            coordinator.shape_sql_count(),
5064            super::MAX_SHAPE_CACHE_SIZE
5065        );
5066    }
5067
5068    // --- M-1: Read pool size ---
5069
5070    #[test]
5071    fn read_pool_size_configurable() {
5072        let db = NamedTempFile::new().expect("temporary db");
5073        let coordinator = ExecutionCoordinator::open(
5074            db.path(),
5075            Arc::new(SchemaManager::new()),
5076            None,
5077            2,
5078            Arc::new(TelemetryCounters::default()),
5079            None,
5080        )
5081        .expect("coordinator with pool_size=2");
5082
5083        assert_eq!(coordinator.pool.size(), 2);
5084
5085        // Basic read should succeed through the pool.
5086        let compiled = QueryBuilder::nodes("Meeting")
5087            .text_search("budget", 5)
5088            .limit(10)
5089            .compile()
5090            .expect("compiled query");
5091
5092        let result = coordinator.execute_compiled_read(&compiled);
5093        assert!(result.is_ok(), "read through pool must succeed");
5094    }
5095
5096    // --- M-4: Grouped read batching ---
5097
5098    #[test]
5099    fn grouped_read_results_match_baseline() {
5100        use fathomdb_query::TraverseDirection;
5101
5102        let db = NamedTempFile::new().expect("temporary db");
5103
5104        // Bootstrap the database via coordinator (creates schema).
5105        let coordinator = ExecutionCoordinator::open(
5106            db.path(),
5107            Arc::new(SchemaManager::new()),
5108            None,
5109            1,
5110            Arc::new(TelemetryCounters::default()),
5111            None,
5112        )
5113        .expect("coordinator");
5114
5115        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
5116        // to expansion nodes (Task-0-a, Task-0-b, etc.).
5117        {
5118            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
5119            for i in 0..10 {
5120                conn.execute_batch(&format!(
5121                    r#"
5122                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5123                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
5124                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5125                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
5126                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5127                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
5128
5129                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5130                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
5131                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5132                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
5133
5134                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
5135                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
5136                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
5137                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
5138                    "#,
5139                )).expect("seed data");
5140            }
5141        }
5142
5143        let compiled = QueryBuilder::nodes("Meeting")
5144            .text_search("meeting", 10)
5145            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
5146            .limit(10)
5147            .compile_grouped()
5148            .expect("compiled grouped query");
5149
5150        let result = coordinator
5151            .execute_compiled_grouped_read(&compiled)
5152            .expect("grouped read");
5153
5154        assert!(!result.was_degraded, "grouped read should not be degraded");
5155        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
5156        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
5157        assert_eq!(result.expansions[0].slot, "tasks");
5158        assert_eq!(
5159            result.expansions[0].roots.len(),
5160            10,
5161            "each expansion slot should have entries for all 10 roots"
5162        );
5163
5164        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
5165        for root_expansion in &result.expansions[0].roots {
5166            assert_eq!(
5167                root_expansion.nodes.len(),
5168                2,
5169                "root {} should have 2 expansion nodes, got {}",
5170                root_expansion.root_logical_id,
5171                root_expansion.nodes.len()
5172            );
5173        }
5174    }
5175
5176    // --- B-4: build_bm25_expr unit tests ---
5177
5178    #[test]
5179    fn build_bm25_expr_no_weights() {
5180        let schema_json = r#"["$.title","$.body"]"#;
5181        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
5182        assert_eq!(result, "bm25(fts_props_testkind)");
5183    }
5184
5185    #[test]
5186    fn build_bm25_expr_with_weights() {
5187        let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
5188        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
5189        assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
5190    }
5191
5192    // --- B-4: weighted schema integration test ---
5193
5194    #[test]
5195    #[allow(clippy::too_many_lines)]
5196    fn weighted_schema_bm25_orders_title_match_above_body_match() {
5197        use crate::{
5198            AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
5199            WriterActor, writer::ChunkPolicy,
5200        };
5201        use fathomdb_schema::fts_column_name;
5202
5203        let db = NamedTempFile::new().expect("temporary db");
5204        let schema_manager = Arc::new(SchemaManager::new());
5205
5206        // Step 1: bootstrap, register schema with weights, create per-column table.
5207        {
5208            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5209            admin
5210                .register_fts_property_schema_with_entries(
5211                    "Article",
5212                    &[
5213                        FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
5214                        FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
5215                    ],
5216                    None,
5217                    &[],
5218                    crate::rebuild_actor::RebuildMode::Eager,
5219                )
5220                .expect("register schema with weights");
5221        }
5222
5223        // Step 2: write two nodes.
5224        let writer = WriterActor::start(
5225            db.path(),
5226            Arc::clone(&schema_manager),
5227            ProvenanceMode::Warn,
5228            Arc::new(TelemetryCounters::default()),
5229        )
5230        .expect("writer");
5231
5232        // Node A: "rust" in title (high-weight column).
5233        writer
5234            .submit(WriteRequest {
5235                label: "insert-a".to_owned(),
5236                nodes: vec![NodeInsert {
5237                    row_id: "row-a".to_owned(),
5238                    logical_id: "article-a".to_owned(),
5239                    kind: "Article".to_owned(),
5240                    properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
5241                    source_ref: Some("src-a".to_owned()),
5242                    upsert: false,
5243                    chunk_policy: ChunkPolicy::Preserve,
5244                    content_ref: None,
5245                }],
5246                node_retires: vec![],
5247                edges: vec![],
5248                edge_retires: vec![],
5249                chunks: vec![],
5250                runs: vec![],
5251                steps: vec![],
5252                actions: vec![],
5253                optional_backfills: vec![],
5254                vec_inserts: vec![],
5255                operational_writes: vec![],
5256            })
5257            .expect("write node A");
5258
5259        // Node B: "rust" in body (low-weight column).
5260        writer
5261            .submit(WriteRequest {
5262                label: "insert-b".to_owned(),
5263                nodes: vec![NodeInsert {
5264                    row_id: "row-b".to_owned(),
5265                    logical_id: "article-b".to_owned(),
5266                    kind: "Article".to_owned(),
5267                    properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
5268                    source_ref: Some("src-b".to_owned()),
5269                    upsert: false,
5270                    chunk_policy: ChunkPolicy::Preserve,
5271                    content_ref: None,
5272                }],
5273                node_retires: vec![],
5274                edges: vec![],
5275                edge_retires: vec![],
5276                chunks: vec![],
5277                runs: vec![],
5278                steps: vec![],
5279                actions: vec![],
5280                optional_backfills: vec![],
5281                vec_inserts: vec![],
5282                operational_writes: vec![],
5283            })
5284            .expect("write node B");
5285
5286        drop(writer);
5287
5288        // Verify per-column values were written.
5289        {
5290            let title_col = fts_column_name("$.title", false);
5291            let body_col = fts_column_name("$.body", false);
5292            let article_table = fathomdb_schema::fts_kind_table_name("Article");
5293            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5294            let count: i64 = conn
5295                .query_row(&format!("SELECT count(*) FROM {article_table}"), [], |r| {
5296                    r.get(0)
5297                })
5298                .expect("count fts rows");
5299            assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
5300            let (title_a, body_a): (String, String) = conn
5301                .query_row(
5302                    &format!(
5303                        "SELECT {title_col}, {body_col} FROM {article_table} \
5304                         WHERE node_logical_id = 'article-a'"
5305                    ),
5306                    [],
5307                    |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
5308                )
5309                .expect("select article-a");
5310            assert_eq!(
5311                title_a, "rust",
5312                "article-a must have 'rust' in title column"
5313            );
5314            assert_eq!(
5315                body_a, "other",
5316                "article-a must have 'other' in body column"
5317            );
5318        }
5319
5320        // Step 3: search for "rust" and assert node A ranks first.
5321        let coordinator = ExecutionCoordinator::open(
5322            db.path(),
5323            Arc::clone(&schema_manager),
5324            None,
5325            1,
5326            Arc::new(TelemetryCounters::default()),
5327            None,
5328        )
5329        .expect("coordinator");
5330
5331        let compiled = fathomdb_query::QueryBuilder::nodes("Article")
5332            .text_search("rust", 5)
5333            .limit(10)
5334            .compile()
5335            .expect("compiled query");
5336
5337        let rows = coordinator
5338            .execute_compiled_read(&compiled)
5339            .expect("execute read");
5340
5341        assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
5342        assert_eq!(
5343            rows.nodes[0].logical_id, "article-a",
5344            "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
5345        );
5346    }
5347
5348    // --- C-1: matched_paths attribution tests ---
5349
5350    /// Property FTS hit: `matched_paths` must reflect the *actual* leaves
5351    /// that contributed match tokens, queried from
5352    /// `fts_node_property_positions` via the highlight + offset path.
5353    ///
5354    /// Setup: one node with two indexed leaves (`$.body` = "other",
5355    /// `$.title` = "searchterm"). Searching for "searchterm" must produce a
5356    /// hit whose `matched_paths` contains `"$.title"` and does NOT contain
5357    /// `"$.body"`.
5358    #[test]
5359    fn property_fts_hit_matched_paths_from_positions() {
5360        use crate::{AdminService, rebuild_actor::RebuildMode};
5361        use fathomdb_query::compile_search;
5362
5363        let db = NamedTempFile::new().expect("temporary db");
5364        let schema_manager = Arc::new(SchemaManager::new());
5365
5366        // Register FTS schema for "Item" to create the per-kind table
5367        // fts_props_item before opening the coordinator.
5368        {
5369            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5370            admin
5371                .register_fts_property_schema_with_entries(
5372                    "Item",
5373                    &[
5374                        crate::FtsPropertyPathSpec::scalar("$.body"),
5375                        crate::FtsPropertyPathSpec::scalar("$.title"),
5376                    ],
5377                    None,
5378                    &[],
5379                    RebuildMode::Eager,
5380                )
5381                .expect("register Item FTS schema");
5382        }
5383
5384        let coordinator = ExecutionCoordinator::open(
5385            db.path(),
5386            Arc::clone(&schema_manager),
5387            None,
5388            1,
5389            Arc::new(TelemetryCounters::default()),
5390            None,
5391        )
5392        .expect("coordinator");
5393
5394        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5395
5396        // The recursive walker emits leaves in alphabetical key order:
5397        //   "body"  → "other"      bytes  0..5
5398        //   LEAF_SEPARATOR (29 bytes)
5399        //   "title" → "searchterm" bytes 34..44
5400        let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
5401        // Verify the constant length assumption used in the position table.
5402        assert_eq!(
5403            crate::writer::LEAF_SEPARATOR.len(),
5404            29,
5405            "LEAF_SEPARATOR length changed; update position offsets"
5406        );
5407
5408        conn.execute(
5409            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5410             VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
5411            [],
5412        )
5413        .expect("insert node");
5414        // Insert into the per-kind table (migration 23 dropped global fts_node_properties).
5415        let item_table = fathomdb_schema::fts_kind_table_name("Item");
5416        conn.execute(
5417            &format!(
5418                "INSERT INTO {item_table} (node_logical_id, text_content) \
5419                 VALUES ('item-1', ?1)"
5420            ),
5421            rusqlite::params![blob],
5422        )
5423        .expect("insert fts row");
5424        conn.execute(
5425            "INSERT INTO fts_node_property_positions \
5426             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5427             VALUES ('item-1', 'Item', 0, 5, '$.body')",
5428            [],
5429        )
5430        .expect("insert body position");
5431        conn.execute(
5432            "INSERT INTO fts_node_property_positions \
5433             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5434             VALUES ('item-1', 'Item', 34, 44, '$.title')",
5435            [],
5436        )
5437        .expect("insert title position");
5438
5439        let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
5440        let mut compiled = compile_search(ast.ast()).expect("compile search");
5441        compiled.attribution_requested = true;
5442
5443        let rows = coordinator
5444            .execute_compiled_search(&compiled)
5445            .expect("search");
5446
5447        assert!(!rows.hits.is_empty(), "expected at least one hit");
5448        let hit = rows
5449            .hits
5450            .iter()
5451            .find(|h| h.node.logical_id == "item-1")
5452            .expect("item-1 must be in hits");
5453
5454        let att = hit
5455            .attribution
5456            .as_ref()
5457            .expect("attribution must be Some when attribution_requested");
5458        assert!(
5459            att.matched_paths.contains(&"$.title".to_owned()),
5460            "matched_paths must contain '$.title', got {:?}",
5461            att.matched_paths,
5462        );
5463        assert!(
5464            !att.matched_paths.contains(&"$.body".to_owned()),
5465            "matched_paths must NOT contain '$.body', got {:?}",
5466            att.matched_paths,
5467        );
5468    }
5469
5470    /// Vector hits must carry `attribution = None` regardless of the
5471    /// `attribution_requested` flag.  The vector retrieval path has no
5472    /// FTS5 match positions to attribute.
5473    ///
5474    /// This test exercises the degraded (no sqlite-vec) path which returns
5475    /// an empty hit list; the invariant is that `was_degraded = true` and
5476    /// no hits carry a non-None attribution.
5477    #[test]
5478    fn vector_hit_has_no_attribution() {
5479        use fathomdb_query::compile_vector_search;
5480
5481        let db = NamedTempFile::new().expect("temporary db");
5482        let coordinator = ExecutionCoordinator::open(
5483            db.path(),
5484            Arc::new(SchemaManager::new()),
5485            None,
5486            1,
5487            Arc::new(TelemetryCounters::default()),
5488            None,
5489        )
5490        .expect("coordinator");
5491
5492        // Compile a vector search with attribution requested.
5493        let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
5494        let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
5495        compiled.attribution_requested = true;
5496
5497        // Without sqlite-vec the result degrades to empty; every hit
5498        // (vacuously) must carry attribution == None.
5499        let rows = coordinator
5500            .execute_compiled_vector_search(&compiled)
5501            .expect("vector search must not error");
5502
5503        assert!(
5504            rows.was_degraded,
5505            "vector search without vec table must degrade"
5506        );
5507        for hit in &rows.hits {
5508            assert!(
5509                hit.attribution.is_none(),
5510                "vector hits must carry attribution = None, got {:?}",
5511                hit.attribution
5512            );
5513        }
5514    }
5515
5516    /// Chunk-backed hits with attribution requested must carry
5517    /// `matched_paths = ["text_content"]` — they have no recursive-leaf
5518    /// structure, but callers need a non-empty signal that the match came
5519    /// from the chunk surface.
5520    ///
5521    /// NOTE: This test documents the desired target behavior per the C-1
5522    /// pack spec.  Implementing it requires updating the chunk-hit arm of
5523    /// `resolve_hit_attribution` to return `vec!["text_content"]`.  That
5524    /// change currently conflicts with integration tests in
5525    /// `crates/fathomdb/tests/text_search_surface.rs` which assert empty
5526    /// `matched_paths` for chunk hits.  Until those tests are updated this
5527    /// test verifies the *current* (placeholder) behavior: chunk hits carry
5528    /// `Some(HitAttribution { matched_paths: vec![] })`.
5529    #[test]
5530    fn chunk_hit_has_text_content_attribution() {
5531        use fathomdb_query::compile_search;
5532
5533        let db = NamedTempFile::new().expect("temporary db");
5534        let coordinator = ExecutionCoordinator::open(
5535            db.path(),
5536            Arc::new(SchemaManager::new()),
5537            None,
5538            1,
5539            Arc::new(TelemetryCounters::default()),
5540            None,
5541        )
5542        .expect("coordinator");
5543
5544        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5545
5546        conn.execute_batch(
5547            r"
5548            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5549            VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
5550            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5551            VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
5552            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5553            VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
5554            ",
5555        )
5556        .expect("seed chunk node");
5557
5558        let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
5559        let mut compiled = compile_search(ast.ast()).expect("compile search");
5560        compiled.attribution_requested = true;
5561
5562        let rows = coordinator
5563            .execute_compiled_search(&compiled)
5564            .expect("search");
5565
5566        assert!(!rows.hits.is_empty(), "expected chunk hit");
5567        let hit = rows
5568            .hits
5569            .iter()
5570            .find(|h| matches!(h.source, SearchHitSource::Chunk))
5571            .expect("must have a Chunk hit");
5572
5573        let att = hit
5574            .attribution
5575            .as_ref()
5576            .expect("attribution must be Some when attribution_requested");
5577        assert_eq!(
5578            att.matched_paths,
5579            vec!["text_content".to_owned()],
5580            "chunk matched_paths must be [\"text_content\"], got {:?}",
5581            att.matched_paths,
5582        );
5583    }
5584
5585    /// Property FTS hits from two different kinds must each carry
5586    /// `matched_paths` corresponding to their own kind's registered leaf
5587    /// paths, not those of the other kind.
5588    ///
5589    /// This pins the per-`(node_logical_id, kind)` isolation in the
5590    /// `load_position_map` query.
5591    #[test]
5592    #[allow(clippy::too_many_lines)]
5593    fn mixed_kind_results_get_per_kind_matched_paths() {
5594        use crate::{AdminService, rebuild_actor::RebuildMode};
5595        use fathomdb_query::compile_search;
5596
5597        let db = NamedTempFile::new().expect("temporary db");
5598        let schema_manager = Arc::new(SchemaManager::new());
5599
5600        // Register FTS schemas for KindA and KindB to create per-kind tables
5601        // fts_props_kinda and fts_props_kindb before opening the coordinator.
5602        {
5603            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5604            admin
5605                .register_fts_property_schema_with_entries(
5606                    "KindA",
5607                    &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
5608                    None,
5609                    &[],
5610                    RebuildMode::Eager,
5611                )
5612                .expect("register KindA FTS schema");
5613            admin
5614                .register_fts_property_schema_with_entries(
5615                    "KindB",
5616                    &[crate::FtsPropertyPathSpec::scalar("$.beta")],
5617                    None,
5618                    &[],
5619                    RebuildMode::Eager,
5620                )
5621                .expect("register KindB FTS schema");
5622        }
5623
5624        let coordinator = ExecutionCoordinator::open(
5625            db.path(),
5626            Arc::clone(&schema_manager),
5627            None,
5628            1,
5629            Arc::new(TelemetryCounters::default()),
5630            None,
5631        )
5632        .expect("coordinator");
5633
5634        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5635
5636        // KindA: leaf "$.alpha" = "xenoterm" (start=0, end=8)
5637        conn.execute(
5638            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5639             VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5640            [],
5641        )
5642        .expect("insert KindA node");
5643        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5644        let table_for_a = fathomdb_schema::fts_kind_table_name("KindA");
5645        conn.execute(
5646            &format!(
5647                "INSERT INTO {table_for_a} (node_logical_id, text_content) \
5648                 VALUES ('node-a', 'xenoterm')"
5649            ),
5650            [],
5651        )
5652        .expect("insert KindA fts row");
5653        conn.execute(
5654            "INSERT INTO fts_node_property_positions \
5655             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5656             VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5657            [],
5658        )
5659        .expect("insert KindA position");
5660
5661        // KindB: leaf "$.beta" = "xenoterm" (start=0, end=8)
5662        conn.execute(
5663            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5664             VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5665            [],
5666        )
5667        .expect("insert KindB node");
5668        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5669        let table_for_b = fathomdb_schema::fts_kind_table_name("KindB");
5670        conn.execute(
5671            &format!(
5672                "INSERT INTO {table_for_b} (node_logical_id, text_content) \
5673                 VALUES ('node-b', 'xenoterm')"
5674            ),
5675            [],
5676        )
5677        .expect("insert KindB fts row");
5678        conn.execute(
5679            "INSERT INTO fts_node_property_positions \
5680             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5681             VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5682            [],
5683        )
5684        .expect("insert KindB position");
5685
5686        // Search across both kinds (empty root_kind = no kind filter).
5687        let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5688        let mut compiled = compile_search(ast.ast()).expect("compile search");
5689        compiled.attribution_requested = true;
5690
5691        let rows = coordinator
5692            .execute_compiled_search(&compiled)
5693            .expect("search");
5694
5695        // Both nodes must appear.
5696        assert!(
5697            rows.hits.len() >= 2,
5698            "expected hits for both kinds, got {}",
5699            rows.hits.len()
5700        );
5701
5702        for hit in &rows.hits {
5703            let att = hit
5704                .attribution
5705                .as_ref()
5706                .expect("attribution must be Some when attribution_requested");
5707            match hit.node.kind.as_str() {
5708                "KindA" => {
5709                    assert_eq!(
5710                        att.matched_paths,
5711                        vec!["$.alpha".to_owned()],
5712                        "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5713                        att.matched_paths,
5714                    );
5715                }
5716                "KindB" => {
5717                    assert_eq!(
5718                        att.matched_paths,
5719                        vec!["$.beta".to_owned()],
5720                        "KindB hit must have matched_paths=['$.beta'], got {:?}",
5721                        att.matched_paths,
5722                    );
5723                }
5724                other => {
5725                    // Only KindA and KindB are expected in this test.
5726                    assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5727                }
5728            }
5729        }
5730    }
5731
5732    // --- Pack H: TokenizerStrategy tests ---
5733
5734    #[test]
5735    fn tokenizer_strategy_from_str() {
5736        use super::TokenizerStrategy;
5737        assert_eq!(
5738            TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5739            TokenizerStrategy::RecallOptimizedEnglish,
5740        );
5741        assert_eq!(
5742            TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5743            TokenizerStrategy::PrecisionOptimized,
5744        );
5745        assert_eq!(
5746            TokenizerStrategy::from_str("trigram"),
5747            TokenizerStrategy::SubstringTrigram,
5748        );
5749        assert_eq!(
5750            TokenizerStrategy::from_str("icu"),
5751            TokenizerStrategy::GlobalCjk,
5752        );
5753        assert_eq!(
5754            TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5755            TokenizerStrategy::SourceCode,
5756        );
5757        // Canonical source-code preset value
5758        assert_eq!(
5759            TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5760            TokenizerStrategy::SourceCode,
5761        );
5762        assert_eq!(
5763            TokenizerStrategy::from_str("my_custom_tokenizer"),
5764            TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5765        );
5766    }
5767
5768    #[test]
5769    fn trigram_short_query_returns_empty() {
5770        use fathomdb_query::compile_search;
5771
5772        let db = NamedTempFile::new().expect("temporary db");
5773        let schema_manager = Arc::new(SchemaManager::new());
5774
5775        // First open: bootstrap the schema so projection_profiles table exists.
5776        {
5777            let bootstrap = ExecutionCoordinator::open(
5778                db.path(),
5779                Arc::clone(&schema_manager),
5780                None,
5781                1,
5782                Arc::new(TelemetryCounters::default()),
5783                None,
5784            )
5785            .expect("bootstrap coordinator");
5786            drop(bootstrap);
5787        }
5788
5789        // Insert a SubstringTrigram strategy for kind "Snippet" directly in the DB.
5790        {
5791            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5792            conn.execute_batch(
5793                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5794                 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5795            )
5796            .expect("insert profile");
5797        }
5798
5799        // Reopen coordinator to load strategies.
5800        let coordinator = ExecutionCoordinator::open(
5801            db.path(),
5802            Arc::clone(&schema_manager),
5803            None,
5804            1,
5805            Arc::new(TelemetryCounters::default()),
5806            None,
5807        )
5808        .expect("coordinator reopen");
5809
5810        // 2-char query for a SubstringTrigram kind must return empty without error.
5811        let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5812        let compiled = compile_search(ast.ast()).expect("compile search");
5813        let rows = coordinator
5814            .execute_compiled_search(&compiled)
5815            .expect("short trigram query must not error");
5816        assert!(
5817            rows.hits.is_empty(),
5818            "2-char trigram query must return empty"
5819        );
5820    }
5821
5822    #[test]
5823    fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5824        // Regression: escape_source_code_query must NOT be applied to the
5825        // already-rendered FTS5 expression. render_text_query_fts5 wraps every
5826        // term in double-quote delimiters (e.g. std.io -> "std.io"). Calling the
5827        // escape function on that output corrupts the expression: the escaper
5828        // sees the surrounding '"' as ordinary chars and produces '"std"."io""'
5829        // — malformed FTS5 syntax — causing searches to silently return empty.
5830        //
5831        // This test verifies a round-trip search for 'std.io' succeeds when the
5832        // SourceCode tokenizer strategy is active.
5833        use fathomdb_query::compile_search;
5834
5835        let db = NamedTempFile::new().expect("temporary db");
5836        let schema_manager = Arc::new(SchemaManager::new());
5837
5838        // Bootstrap the DB schema.
5839        {
5840            let bootstrap = ExecutionCoordinator::open(
5841                db.path(),
5842                Arc::clone(&schema_manager),
5843                None,
5844                1,
5845                Arc::new(TelemetryCounters::default()),
5846                None,
5847            )
5848            .expect("bootstrap coordinator");
5849            drop(bootstrap);
5850        }
5851
5852        // Insert the SourceCode tokenizer profile for kind "Symbol" and seed a document.
5853        {
5854            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5855            conn.execute(
5856                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5857                 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5858                [],
5859            )
5860            .expect("insert profile");
5861            conn.execute_batch(
5862                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5863                 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5864                 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5865                 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5866                 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5867                 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5868            )
5869            .expect("insert node and fts row");
5870        }
5871
5872        // Reopen coordinator to pick up the SourceCode strategy.
5873        let coordinator = ExecutionCoordinator::open(
5874            db.path(),
5875            Arc::clone(&schema_manager),
5876            None,
5877            1,
5878            Arc::new(TelemetryCounters::default()),
5879            None,
5880        )
5881        .expect("coordinator reopen");
5882
5883        // Search for "std.io": must find the document (not error, not return empty).
5884        let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5885        let compiled = compile_search(ast.ast()).expect("compile search");
5886        let rows = coordinator
5887            .execute_compiled_search(&compiled)
5888            .expect("source code search must not error");
5889        assert!(
5890            !rows.hits.is_empty(),
5891            "SourceCode strategy search for 'std.io' must return the document; \
5892             got empty — FTS5 expression was likely corrupted by post-render escaping"
5893        );
5894    }
5895
5896    // ---- Pack E: vec identity guard tests ----
5897
5898    #[derive(Debug)]
5899    struct StubEmbedder {
5900        model_identity: String,
5901        dimension: usize,
5902    }
5903
5904    impl StubEmbedder {
5905        fn new(model_identity: &str, dimension: usize) -> Self {
5906            Self {
5907                model_identity: model_identity.to_owned(),
5908                dimension,
5909            }
5910        }
5911    }
5912
5913    impl crate::embedder::QueryEmbedder for StubEmbedder {
5914        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5915            Ok(vec![0.0; self.dimension])
5916        }
5917        fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5918            crate::embedder::QueryEmbedderIdentity {
5919                model_identity: self.model_identity.clone(),
5920                model_version: "1.0".to_owned(),
5921                dimension: self.dimension,
5922                normalization_policy: "l2".to_owned(),
5923            }
5924        }
5925        fn max_tokens(&self) -> usize {
5926            512
5927        }
5928    }
5929
5930    fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5931        let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5932        conn.execute_batch(
5933            "CREATE TABLE IF NOT EXISTS projection_profiles (
5934                kind TEXT NOT NULL,
5935                facet TEXT NOT NULL,
5936                config_json TEXT NOT NULL,
5937                active_at INTEGER,
5938                created_at INTEGER,
5939                PRIMARY KEY (kind, facet)
5940            );",
5941        )
5942        .expect("create projection_profiles");
5943        conn
5944    }
5945
5946    #[test]
5947    fn check_vec_identity_no_profile_no_panic() {
5948        let conn = make_in_memory_db_with_projection_profiles();
5949        let embedder = StubEmbedder::new("bge-small", 384);
5950        let result = super::check_vec_identity_at_open(&conn, &embedder);
5951        assert!(result.is_ok(), "no profile row must return Ok(())");
5952    }
5953
5954    #[test]
5955    fn check_vec_identity_matching_identity_ok() {
5956        let conn = make_in_memory_db_with_projection_profiles();
5957        conn.execute(
5958            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5959             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5960            [],
5961        )
5962        .expect("insert profile");
5963        let embedder = StubEmbedder::new("bge-small", 384);
5964        let result = super::check_vec_identity_at_open(&conn, &embedder);
5965        assert!(result.is_ok(), "matching profile must return Ok(())");
5966    }
5967
5968    #[test]
5969    fn check_vec_identity_mismatched_dimensions_ok() {
5970        let conn = make_in_memory_db_with_projection_profiles();
5971        conn.execute(
5972            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5973             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5974            [],
5975        )
5976        .expect("insert profile");
5977        // embedder reports 768, profile says 384 — should warn but return Ok(())
5978        let embedder = StubEmbedder::new("bge-small", 768);
5979        let result = super::check_vec_identity_at_open(&conn, &embedder);
5980        assert!(
5981            result.is_ok(),
5982            "dimension mismatch must warn and return Ok(())"
5983        );
5984    }
5985
5986    #[test]
5987    fn custom_tokenizer_passthrough() {
5988        use super::TokenizerStrategy;
5989        let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5990        // Custom strategy is just stored — it doesn't modify queries.
5991        assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5992        // Verify it does not match any known variant.
5993        assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5994        assert_ne!(strategy, TokenizerStrategy::SourceCode);
5995    }
5996
5997    #[test]
5998    fn check_vec_identity_mismatched_model_ok() {
5999        let conn = make_in_memory_db_with_projection_profiles();
6000        conn.execute(
6001            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
6002             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
6003            [],
6004        )
6005        .expect("insert profile");
6006        // embedder reports bge-large, profile says bge-small — should warn but return Ok(())
6007        let embedder = StubEmbedder::new("bge-large", 384);
6008        let result = super::check_vec_identity_at_open(&conn, &embedder);
6009        assert!(
6010            result.is_ok(),
6011            "model_identity mismatch must warn and return Ok(())"
6012        );
6013    }
6014}