Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, EdgeExpansionSlot,
10    ExpansionSlot, FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue,
11    SearchBranch, SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash,
12    render_text_query_fts5,
13};
14use fathomdb_schema::SchemaManager;
15use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
16
17use crate::embedder::QueryEmbedder;
18use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
19use crate::{EngineError, sqlite};
20
21/// Maximum number of cached shape-hash to SQL mappings before the cache is
22/// cleared entirely.  A clear-all strategy is simpler than partial eviction
23/// and the cost of re-compiling on a miss is negligible.
24const MAX_SHAPE_CACHE_SIZE: usize = 4096;
25
26/// Maximum number of root IDs per batched expansion query.  Kept well below
27/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
28/// the edge-kind parameter.  Larger root sets are chunked into multiple
29/// batches of this size rather than falling back to per-root queries.
30const BATCH_CHUNK_SIZE: usize = 200;
31
32/// Build an `IN (...)` SQL fragment and bind list for an expansion-slot filter.
33///
34/// Returns the SQL fragment `AND json_extract(n.properties, ?{p}) IN (?{p+1}, ...)`
35/// and the corresponding bind values `[path, val1, val2, ...]`.
36fn compile_expansion_in_filter(
37    p: usize,
38    path: &str,
39    value_binds: Vec<Value>,
40) -> (String, Vec<Value>) {
41    let first_val = p + 1;
42    let placeholders = (0..value_binds.len())
43        .map(|i| format!("?{}", first_val + i))
44        .collect::<Vec<_>>()
45        .join(", ");
46    let mut binds = vec![Value::Text(path.to_owned())];
47    binds.extend(value_binds);
48    (
49        format!("\n                  AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
50        binds,
51    )
52}
53
54/// Compile an optional expansion-slot target-side filter predicate into a SQL
55/// fragment and bind values for injection into the `numbered` CTE's WHERE clause.
56///
57/// Returns `("", vec![])` when `filter` is `None` — preserving byte-for-byte
58/// identical SQL to pre-Pack-3 behavior. When `Some(predicate)`, returns an
59/// `AND …` fragment and the corresponding bind values starting at `first_param`.
60///
61/// Supported predicates and their targets in the `numbered` CTE:
62/// - `JsonPathEq`, `JsonPathCompare`, `JsonPathFusedEq`, `JsonPathFusedTimestampCmp`,
63///   `JsonPathFusedBoolEq`, `JsonPathFusedIn`, `JsonPathIn`: target `n.properties`.
64/// - `KindEq`, `LogicalIdEq`, `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`:
65///   column-direct predicates on `n.kind`, `n.logical_id`, `n.source_ref`, `n.content_ref`.
66/// - `EdgePropertyEq`, `EdgePropertyCompare`: rejected with `unreachable!` — these
67///   target edge properties and must be compiled via `compile_edge_filter` instead.
68#[allow(clippy::too_many_lines)]
69fn compile_expansion_filter(
70    filter: Option<&Predicate>,
71    first_param: usize,
72) -> (String, Vec<Value>) {
73    let Some(predicate) = filter else {
74        return (String::new(), vec![]);
75    };
76    let p = first_param;
77    match predicate {
78        Predicate::JsonPathEq { path, value } => {
79            let val = match value {
80                ScalarValue::Text(t) => Value::Text(t.clone()),
81                ScalarValue::Integer(i) => Value::Integer(*i),
82                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
83            };
84            (
85                format!(
86                    "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
87                    p + 1
88                ),
89                vec![Value::Text(path.clone()), val],
90            )
91        }
92        Predicate::JsonPathCompare { path, op, value } => {
93            let val = match value {
94                ScalarValue::Text(t) => Value::Text(t.clone()),
95                ScalarValue::Integer(i) => Value::Integer(*i),
96                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
97            };
98            let operator = match op {
99                ComparisonOp::Gt => ">",
100                ComparisonOp::Gte => ">=",
101                ComparisonOp::Lt => "<",
102                ComparisonOp::Lte => "<=",
103            };
104            (
105                format!(
106                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
107                    p + 1
108                ),
109                vec![Value::Text(path.clone()), val],
110            )
111        }
112        Predicate::JsonPathFusedEq { path, value } => (
113            format!(
114                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
115                p + 1
116            ),
117            vec![Value::Text(path.clone()), Value::Text(value.clone())],
118        ),
119        Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
120            let operator = match op {
121                ComparisonOp::Gt => ">",
122                ComparisonOp::Gte => ">=",
123                ComparisonOp::Lt => "<",
124                ComparisonOp::Lte => "<=",
125            };
126            (
127                format!(
128                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
129                    p + 1
130                ),
131                vec![Value::Text(path.clone()), Value::Integer(*value)],
132            )
133        }
134        Predicate::JsonPathFusedBoolEq { path, value } => (
135            format!(
136                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
137                p + 1
138            ),
139            vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
140        ),
141        Predicate::KindEq(kind) => (
142            format!("\n                  AND n.kind = ?{p}"),
143            vec![Value::Text(kind.clone())],
144        ),
145        Predicate::LogicalIdEq(logical_id) => (
146            format!("\n                  AND n.logical_id = ?{p}"),
147            vec![Value::Text(logical_id.clone())],
148        ),
149        Predicate::SourceRefEq(source_ref) => (
150            format!("\n                  AND n.source_ref = ?{p}"),
151            vec![Value::Text(source_ref.clone())],
152        ),
153        Predicate::ContentRefEq(uri) => (
154            format!("\n                  AND n.content_ref = ?{p}"),
155            vec![Value::Text(uri.clone())],
156        ),
157        Predicate::ContentRefNotNull => (
158            "\n                  AND n.content_ref IS NOT NULL".to_owned(),
159            vec![],
160        ),
161        Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
162            unreachable!(
163                "compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
164            );
165        }
166        Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
167            p,
168            path,
169            values.iter().map(|v| Value::Text(v.clone())).collect(),
170        ),
171        Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
172            p,
173            path,
174            values
175                .iter()
176                .map(|v| match v {
177                    ScalarValue::Text(t) => Value::Text(t.clone()),
178                    ScalarValue::Integer(i) => Value::Integer(*i),
179                    ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
180                })
181                .collect(),
182        ),
183    }
184}
185
186/// Compile an optional edge filter predicate into a SQL fragment and bind values
187/// for injection into the `JOIN edges e ON ...` condition.
188///
189/// Returns `("", vec![])` when `filter` is `None`. When `Some(predicate)`,
190/// returns an `AND …` fragment targeting `e.properties` and the corresponding
191/// bind values starting at `first_param`.
192///
193/// Only `EdgePropertyEq` and `EdgePropertyCompare` are valid here.
194/// Non-edge predicates: `panic!("compile_edge_filter: non-edge predicate")`.
195fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
196    let Some(predicate) = filter else {
197        return (String::new(), vec![]);
198    };
199    let p = first_param;
200    match predicate {
201        Predicate::EdgePropertyEq { path, value } => {
202            let val = match value {
203                ScalarValue::Text(t) => Value::Text(t.clone()),
204                ScalarValue::Integer(i) => Value::Integer(*i),
205                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
206            };
207            (
208                format!(
209                    "\n                    AND json_extract(e.properties, ?{p}) = ?{}",
210                    p + 1
211                ),
212                vec![Value::Text(path.clone()), val],
213            )
214        }
215        Predicate::EdgePropertyCompare { path, op, value } => {
216            let val = match value {
217                ScalarValue::Text(t) => Value::Text(t.clone()),
218                ScalarValue::Integer(i) => Value::Integer(*i),
219                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
220            };
221            let operator = match op {
222                ComparisonOp::Gt => ">",
223                ComparisonOp::Gte => ">=",
224                ComparisonOp::Lt => "<",
225                ComparisonOp::Lte => "<=",
226            };
227            (
228                format!(
229                    "\n                    AND json_extract(e.properties, ?{p}) {operator} ?{}",
230                    p + 1
231                ),
232                vec![Value::Text(path.clone()), val],
233            )
234        }
235        _ => {
236            unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
237        }
238    }
239}
240
241/// FTS tokenizer strategy for a given node kind.
242///
243/// Loaded at coordinator open time from `projection_profiles` and used
244/// during query dispatch to apply per-kind query adaptations.
245#[derive(Clone, Debug, PartialEq, Eq)]
246pub enum TokenizerStrategy {
247    /// Porter stemming + unicode61, optimized for English recall.
248    RecallOptimizedEnglish,
249    /// Unicode61 without stemming, optimized for precision.
250    PrecisionOptimized,
251    /// Trigram tokenizer — queries shorter than 3 chars are skipped.
252    SubstringTrigram,
253    /// ICU tokenizer for global / CJK text.
254    GlobalCjk,
255    /// Unicode61 with custom token chars — query special chars are escaped.
256    SourceCode,
257    /// Any other tokenizer string.
258    Custom(String),
259}
260
261impl TokenizerStrategy {
262    /// Map a raw tokenizer config string (as stored in `projection_profiles`)
263    /// to the corresponding strategy variant.
264    pub fn from_str(s: &str) -> Self {
265        match s {
266            "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
267            "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
268            "trigram" => Self::SubstringTrigram,
269            "icu" => Self::GlobalCjk,
270            s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
271            other => Self::Custom(other.to_string()),
272        }
273    }
274}
275
276/// A pool of read-only `SQLite` connections for concurrent read access.
277///
278/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
279/// proceed in parallel when they happen to grab different slots.
280struct ReadPool {
281    connections: Vec<Mutex<Connection>>,
282}
283
284impl fmt::Debug for ReadPool {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        f.debug_struct("ReadPool")
287            .field("size", &self.connections.len())
288            .finish()
289    }
290}
291
292impl ReadPool {
293    /// Open `pool_size` read-only connections to the database at `path`.
294    ///
295    /// Each connection has PRAGMAs initialized via
296    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
297    /// feature is enabled and `vector_enabled` is true, the vec extension
298    /// auto-loaded.
299    ///
300    /// # Errors
301    ///
302    /// Returns [`EngineError`] if any connection fails to open or initialize.
303    fn new(
304        db_path: &Path,
305        pool_size: usize,
306        schema_manager: &SchemaManager,
307        vector_enabled: bool,
308    ) -> Result<Self, EngineError> {
309        let mut connections = Vec::with_capacity(pool_size);
310        for _ in 0..pool_size {
311            let conn = if vector_enabled {
312                #[cfg(feature = "sqlite-vec")]
313                {
314                    sqlite::open_readonly_connection_with_vec(db_path)?
315                }
316                #[cfg(not(feature = "sqlite-vec"))]
317                {
318                    sqlite::open_readonly_connection(db_path)?
319                }
320            } else {
321                sqlite::open_readonly_connection(db_path)?
322            };
323            schema_manager
324                .initialize_reader_connection(&conn)
325                .map_err(EngineError::Schema)?;
326            connections.push(Mutex::new(conn));
327        }
328        Ok(Self { connections })
329    }
330
331    /// Acquire a connection from the pool.
332    ///
333    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
334    /// If every slot is held, falls back to a blocking lock on the first slot.
335    ///
336    /// # Errors
337    ///
338    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
339    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
340        // Fast path: try each connection without blocking.
341        for conn in &self.connections {
342            if let Ok(guard) = conn.try_lock() {
343                return Ok(guard);
344            }
345        }
346        // Fallback: block on the first connection.
347        self.connections[0].lock().map_err(|_| {
348            trace_error!("read pool: connection mutex poisoned");
349            EngineError::Bridge("connection mutex poisoned".to_owned())
350        })
351    }
352
353    /// Return the number of connections in the pool.
354    #[cfg(test)]
355    fn size(&self) -> usize {
356        self.connections.len()
357    }
358}
359
360/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
361///
362/// This is a read-only introspection struct. It does not execute SQL.
363#[derive(Clone, Debug, PartialEq, Eq)]
364pub struct QueryPlan {
365    pub sql: String,
366    pub bind_count: usize,
367    pub driving_table: DrivingTable,
368    pub shape_hash: ShapeHash,
369    pub cache_hit: bool,
370}
371
372/// A single node row returned from a query.
373#[derive(Clone, Debug, PartialEq, Eq)]
374pub struct NodeRow {
375    /// Physical row ID.
376    pub row_id: String,
377    /// Logical ID of the node.
378    pub logical_id: String,
379    /// Node kind.
380    pub kind: String,
381    /// JSON-encoded node properties.
382    pub properties: String,
383    /// Optional URI referencing external content.
384    pub content_ref: Option<String>,
385    /// Unix timestamp of last access, if tracked.
386    pub last_accessed_at: Option<i64>,
387}
388
389/// A single edge row surfaced during edge-projecting traversal.
390///
391/// Columns are sourced directly from the `edges` table; identity
392/// fields (`source_logical_id`, `target_logical_id`) are absolute
393/// (tail/head as stored), not re-oriented to traversal direction.
394///
395/// Multi-hop semantics: for `max_depth > 1`, each emitted tuple
396/// reflects the final-hop edge leading to the emitted endpoint node.
397/// Full path enumeration is out of scope for 0.5.3.
398#[derive(Clone, Debug, PartialEq)]
399pub struct EdgeRow {
400    /// Physical row ID from the `edges` table.
401    pub row_id: String,
402    /// Logical ID of the edge.
403    pub logical_id: String,
404    /// Logical ID of the edge source (tail).
405    pub source_logical_id: String,
406    /// Logical ID of the edge target (head).
407    pub target_logical_id: String,
408    /// Edge kind (label).
409    pub kind: String,
410    /// JSON-encoded edge properties.
411    pub properties: String,
412    /// Optional source reference for provenance tracking.
413    pub source_ref: Option<String>,
414    /// Optional confidence score attached to the edge.
415    pub confidence: Option<f64>,
416}
417
418/// A single run row returned from a query.
419#[derive(Clone, Debug, PartialEq, Eq)]
420pub struct RunRow {
421    /// Unique run ID.
422    pub id: String,
423    /// Run kind.
424    pub kind: String,
425    /// Current status.
426    pub status: String,
427    /// JSON-encoded run properties.
428    pub properties: String,
429}
430
431/// A single step row returned from a query.
432#[derive(Clone, Debug, PartialEq, Eq)]
433pub struct StepRow {
434    /// Unique step ID.
435    pub id: String,
436    /// ID of the parent run.
437    pub run_id: String,
438    /// Step kind.
439    pub kind: String,
440    /// Current status.
441    pub status: String,
442    /// JSON-encoded step properties.
443    pub properties: String,
444}
445
446/// A single action row returned from a query.
447#[derive(Clone, Debug, PartialEq, Eq)]
448pub struct ActionRow {
449    /// Unique action ID.
450    pub id: String,
451    /// ID of the parent step.
452    pub step_id: String,
453    /// Action kind.
454    pub kind: String,
455    /// Current status.
456    pub status: String,
457    /// JSON-encoded action properties.
458    pub properties: String,
459}
460
461/// A single row from the `provenance_events` table.
462#[derive(Clone, Debug, PartialEq, Eq)]
463pub struct ProvenanceEvent {
464    pub id: String,
465    pub event_type: String,
466    pub subject: String,
467    pub source_ref: Option<String>,
468    pub metadata_json: String,
469    pub created_at: i64,
470}
471
472/// Result set from executing a flat (non-grouped) compiled query.
473#[derive(Clone, Debug, Default, PartialEq, Eq)]
474pub struct QueryRows {
475    /// Matched node rows.
476    pub nodes: Vec<NodeRow>,
477    /// Runs associated with the matched nodes.
478    pub runs: Vec<RunRow>,
479    /// Steps associated with the matched runs.
480    pub steps: Vec<StepRow>,
481    /// Actions associated with the matched steps.
482    pub actions: Vec<ActionRow>,
483    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
484    /// to degrade to an empty result instead of propagating an error.
485    pub was_degraded: bool,
486}
487
488/// Expansion results for a single root node within a grouped query.
489#[derive(Clone, Debug, PartialEq, Eq)]
490pub struct ExpansionRootRows {
491    /// Logical ID of the root node that seeded this expansion.
492    pub root_logical_id: String,
493    /// Nodes reached by traversing from the root.
494    pub nodes: Vec<NodeRow>,
495}
496
497/// All expansion results for a single named slot across all roots.
498#[derive(Clone, Debug, PartialEq, Eq)]
499pub struct ExpansionSlotRows {
500    /// Name of the expansion slot.
501    pub slot: String,
502    /// Per-root expansion results.
503    pub roots: Vec<ExpansionRootRows>,
504}
505
506/// Edge-expansion results for a single root node within a grouped query.
507///
508/// Derives `PartialEq` only — transitive through [`EdgeRow`] which carries
509/// an `Option<f64>` confidence score.
510#[derive(Clone, Debug, PartialEq)]
511pub struct EdgeExpansionRootRows {
512    /// Logical ID of the root node that seeded this expansion.
513    pub root_logical_id: String,
514    /// `(EdgeRow, NodeRow)` tuples reached by traversing from the root. For
515    /// `max_depth > 1`, the edge is the final-hop edge leading to the
516    /// emitted endpoint node.
517    pub pairs: Vec<(EdgeRow, NodeRow)>,
518}
519
520/// All edge-expansion results for a single named slot across all roots.
521///
522/// Derives `PartialEq` only — transitive through [`EdgeRow`].
523#[derive(Clone, Debug, PartialEq)]
524pub struct EdgeExpansionSlotRows {
525    /// Name of the edge-expansion slot.
526    pub slot: String,
527    /// Per-root edge-expansion results.
528    pub roots: Vec<EdgeExpansionRootRows>,
529}
530
531/// Result set from executing a grouped compiled query.
532///
533/// Derives `PartialEq` only when `edge_expansions` is non-empty (transitive
534/// through `EdgeRow`). Default remains `Eq`-able in practice because an
535/// empty `edge_expansions` vec holds no floats; we derive `PartialEq` only
536/// to keep the contract consistent.
537#[derive(Clone, Debug, Default, PartialEq)]
538pub struct GroupedQueryRows {
539    /// Root node rows matched by the base query.
540    pub roots: Vec<NodeRow>,
541    /// Per-slot expansion results.
542    pub expansions: Vec<ExpansionSlotRows>,
543    /// Per-slot edge-expansion results.
544    pub edge_expansions: Vec<EdgeExpansionSlotRows>,
545    /// `true` when a capability miss caused the query to degrade to an empty result.
546    pub was_degraded: bool,
547}
548
549/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
550pub struct ExecutionCoordinator {
551    database_path: PathBuf,
552    schema_manager: Arc<SchemaManager>,
553    pool: ReadPool,
554    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
555    vector_enabled: bool,
556    vec_degradation_warned: AtomicBool,
557    telemetry: Arc<TelemetryCounters>,
558    /// Phase 12.5a: optional read-time query embedder. When present,
559    /// [`Self::execute_retrieval_plan`] invokes it via
560    /// [`Self::fill_vector_branch`] after compile to populate
561    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
562    /// invariant on `search()` is preserved: the vector slot stays empty
563    /// and the coordinator's stage-gating check skips the vector branch.
564    query_embedder: Option<Arc<dyn QueryEmbedder>>,
565    /// Per-kind FTS tokenizer strategies loaded from `projection_profiles`
566    /// at open time. Used during query dispatch for query-side adaptations
567    /// (e.g. short-query skip for trigram, token escaping for source-code).
568    ///
569    /// **Stale-state note**: This map is populated once at `open()` and is
570    /// never refreshed during the lifetime of the coordinator.  If
571    /// `AdminService::set_fts_profile` is called on a running engine, the
572    /// in-memory strategy map will not reflect the change until the engine is
573    /// reopened.  The DB-side profile row is always up-to-date; only the
574    /// query-side adaptation (e.g. the trigram short-query guard) will be stale.
575    fts_strategies: HashMap<String, TokenizerStrategy>,
576}
577
578impl fmt::Debug for ExecutionCoordinator {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        f.debug_struct("ExecutionCoordinator")
581            .field("database_path", &self.database_path)
582            .finish_non_exhaustive()
583    }
584}
585
586impl ExecutionCoordinator {
587    /// # Errors
588    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
589    pub fn open(
590        path: impl AsRef<Path>,
591        schema_manager: Arc<SchemaManager>,
592        vector_dimension: Option<usize>,
593        pool_size: usize,
594        telemetry: Arc<TelemetryCounters>,
595        query_embedder: Option<Arc<dyn QueryEmbedder>>,
596    ) -> Result<Self, EngineError> {
597        let path = path.as_ref().to_path_buf();
598        #[cfg(feature = "sqlite-vec")]
599        let mut conn = if vector_dimension.is_some() {
600            sqlite::open_connection_with_vec(&path)?
601        } else {
602            sqlite::open_connection(&path)?
603        };
604        #[cfg(not(feature = "sqlite-vec"))]
605        let mut conn = sqlite::open_connection(&path)?;
606
607        let report = schema_manager.bootstrap(&conn)?;
608
609        // ----- Open-time rebuild guards for derived FTS state -----
610        //
611        // Property FTS data is derived state. Per-kind `fts_props_<kind>`
612        // virtual tables are NOT source of truth — they must be rebuildable
613        // from canonical `nodes.properties` + `fts_property_schemas` at any
614        // time. After migration 23 the global `fts_node_properties` table no
615        // longer exists; each registered kind has its own `fts_props_<kind>`
616        // table created at first write or first rebuild.
617        //
618        // Guard 1: if any registered kind's per-kind table is missing or empty
619        // while live nodes of that kind exist, do a synchronous full rebuild of
620        // all per-kind FTS tables and position map rows.
621        //
622        // Guard 2: if any recursive schema has a populated per-kind table but
623        // `fts_node_property_positions` is empty, do a synchronous full rebuild
624        // to regenerate the position map from canonical state.
625        //
626        // Both guards are no-ops on a consistent database.
627        run_open_time_fts_guards(&mut conn)?;
628
629        #[cfg(feature = "sqlite-vec")]
630        let mut vector_enabled = report.vector_profile_enabled;
631        #[cfg(not(feature = "sqlite-vec"))]
632        let vector_enabled = {
633            let _ = &report;
634            false
635        };
636
637        // 0.5.0: per-kind vec tables replace the global vec_nodes_active.
638        // The vector_dimension parameter now only sets the vector_enabled flag —
639        // no global table is created. Per-kind vec tables are created by
640        // `regenerate_vector_embeddings` at regen time.
641        if vector_dimension.is_some() {
642            #[cfg(feature = "sqlite-vec")]
643            {
644                vector_enabled = true;
645            }
646        }
647
648        // Vec identity guard: warn (never error) if the stored profile's
649        // model_identity or dimensions differ from the configured embedder.
650        if let Some(ref emb) = query_embedder {
651            check_vec_identity_at_open(&conn, emb.as_ref())?;
652        }
653
654        // Load FTS tokenizer strategies from projection_profiles
655        let fts_strategies: HashMap<String, TokenizerStrategy> = {
656            let mut map = HashMap::new();
657            let mut stmt = conn
658                .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
659            let rows = stmt.query_map([], |row| {
660                let kind: String = row.get(0)?;
661                let config_json: String = row.get(1)?;
662                Ok((kind, config_json))
663            })?;
664            for row in rows.flatten() {
665                let (kind, config_json) = row;
666                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
667                    && let Some(tok) = v["tokenizer"].as_str()
668                {
669                    map.insert(kind, TokenizerStrategy::from_str(tok));
670                }
671            }
672            map
673        };
674
675        // Drop the bootstrap connection — pool connections are used for reads.
676        drop(conn);
677
678        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
679
680        Ok(Self {
681            database_path: path,
682            schema_manager,
683            pool,
684            shape_sql_map: Mutex::new(HashMap::new()),
685            vector_enabled,
686            vec_degradation_warned: AtomicBool::new(false),
687            telemetry,
688            query_embedder,
689            fts_strategies,
690        })
691    }
692
693    /// Returns the filesystem path to the `SQLite` database.
694    pub fn database_path(&self) -> &Path {
695        &self.database_path
696    }
697
698    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
699    #[must_use]
700    pub fn vector_enabled(&self) -> bool {
701        self.vector_enabled
702    }
703
704    /// Returns the configured read-time query embedder, if any.
705    ///
706    /// The 0.4.0 write-time parity work reuses this same embedder for
707    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
708    /// so there is always exactly one source of truth for vector
709    /// identity per [`Engine`] instance.
710    #[must_use]
711    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
712        self.query_embedder.as_ref()
713    }
714
715    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
716        self.pool.acquire()
717    }
718
719    /// Aggregate `SQLite` page-cache counters across all pool connections.
720    ///
721    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
722    /// Connections that are currently locked by a query are skipped — this
723    /// is acceptable for statistical counters.
724    #[must_use]
725    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
726        let mut total = SqliteCacheStatus::default();
727        for conn_mutex in &self.pool.connections {
728            if let Ok(conn) = conn_mutex.try_lock() {
729                total.add(&read_db_cache_status(&conn));
730            }
731        }
732        total
733    }
734
735    /// # Errors
736    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
737    #[allow(clippy::expect_used, clippy::too_many_lines)]
738    pub fn execute_compiled_read(
739        &self,
740        compiled: &CompiledQuery,
741    ) -> Result<QueryRows, EngineError> {
742        // Scan fallback for first-registration async rebuild: if the query uses the
743        // FtsNodes driving table and the root kind has is_first_registration=1 with
744        // state IN ('PENDING','BUILDING'), the per-kind table has no rows yet.
745        // Route to a full-kind node scan so callers get results instead of empty.
746        if compiled.driving_table == DrivingTable::FtsNodes
747            && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
748            && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
749        {
750            self.telemetry.increment_queries();
751            return Ok(QueryRows {
752                nodes,
753                runs: Vec::new(),
754                steps: Vec::new(),
755                actions: Vec::new(),
756                was_degraded: false,
757            });
758        }
759
760        // For FtsNodes queries the fathomdb-query compile path generates SQL that
761        // references the old global `fts_node_properties` table.  Since migration 23
762        // dropped that table, we rewrite the SQL and binds here to use the per-kind
763        // `fts_props_<kind>` table (or omit the property UNION arm entirely when the
764        // per-kind table does not yet exist).
765        //
766        // For VecNodes queries the fathomdb-query compile path generates SQL that
767        // hardcodes `vec_nodes_active`.  Since 0.5.0 uses per-kind tables, we rewrite
768        // `vec_nodes_active` to `vec_<root_kind>` here.
769        let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
770            let conn_check = match self.lock_connection() {
771                Ok(g) => g,
772                Err(e) => {
773                    self.telemetry.increment_errors();
774                    return Err(e);
775                }
776            };
777            let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
778            drop(conn_check);
779            result?
780        } else if compiled.driving_table == DrivingTable::VecNodes {
781            let root_kind = compiled
782                .binds
783                .get(1)
784                .and_then(|b| {
785                    if let BindValue::Text(k) = b {
786                        Some(k.as_str())
787                    } else {
788                        None
789                    }
790                })
791                .unwrap_or("");
792            let vec_table = if root_kind.is_empty() {
793                "vec__unknown".to_owned()
794            } else {
795                fathomdb_schema::vec_kind_table_name(root_kind)
796            };
797            let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
798            (new_sql, compiled.binds.clone())
799        } else {
800            (compiled.sql.clone(), compiled.binds.clone())
801        };
802
803        let row_sql = wrap_node_row_projection_sql(&adapted_sql);
804        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
805        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
806        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
807        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
808        // so we propagate EngineError there instead.
809        {
810            let mut cache = self
811                .shape_sql_map
812                .lock()
813                .unwrap_or_else(PoisonError::into_inner);
814            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
815                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
816                cache.clear();
817            }
818            cache.insert(compiled.shape_hash, row_sql.clone());
819        }
820
821        let bind_values = adapted_binds
822            .iter()
823            .map(bind_value_to_sql)
824            .collect::<Vec<_>>();
825
826        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
827        // shape_sql_map uses into_inner() (pure cache, safe to recover).
828        // conn uses map_err → EngineError (connection state may be corrupt after panic;
829        // into_inner() would risk using a connection with partial transaction state).
830        let conn_guard = match self.lock_connection() {
831            Ok(g) => g,
832            Err(e) => {
833                self.telemetry.increment_errors();
834                return Err(e);
835            }
836        };
837        let mut statement = match conn_guard.prepare_cached(&row_sql) {
838            Ok(stmt) => stmt,
839            Err(e) if is_vec_table_absent(&e) => {
840                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
841                    trace_warn!("vector table absent, degrading to non-vector query");
842                }
843                return Ok(QueryRows {
844                    was_degraded: true,
845                    ..Default::default()
846                });
847            }
848            Err(e) => {
849                self.telemetry.increment_errors();
850                return Err(EngineError::Sqlite(e));
851            }
852        };
853        let nodes = match statement
854            .query_map(params_from_iter(bind_values.iter()), |row| {
855                Ok(NodeRow {
856                    row_id: row.get(0)?,
857                    logical_id: row.get(1)?,
858                    kind: row.get(2)?,
859                    properties: row.get(3)?,
860                    content_ref: row.get(4)?,
861                    last_accessed_at: row.get(5)?,
862                })
863            })
864            .and_then(Iterator::collect)
865        {
866            Ok(rows) => rows,
867            Err(e) => {
868                self.telemetry.increment_errors();
869                return Err(EngineError::Sqlite(e));
870            }
871        };
872
873        self.telemetry.increment_queries();
874        Ok(QueryRows {
875            nodes,
876            runs: Vec::new(),
877            steps: Vec::new(),
878            actions: Vec::new(),
879            was_degraded: false,
880        })
881    }
882
883    /// Execute a compiled adaptive search and return matching hits.
884    ///
885    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
886    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
887    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
888    /// while residual predicates (JSON path filters) stay in the outer
889    /// `WHERE`. The chunk and property FTS
890    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
891    /// better matches), ordered, and limited. All hits return
892    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
893    /// later phases.
894    ///
895    /// # Errors
896    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
897    pub fn execute_compiled_search(
898        &self,
899        compiled: &CompiledSearch,
900    ) -> Result<SearchRows, EngineError> {
901        // Build the two-branch plan from the strict text query and delegate
902        // to the shared plan-execution routine. The relaxed branch is derived
903        // via `derive_relaxed` and only fires when strict returned fewer than
904        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
905        // "relaxed iff strict is empty," but the routine spells the rule out
906        // explicitly so raising K later is a one-line constant bump.
907        let (relaxed_query, was_degraded_at_plan_time) =
908            fathomdb_query::derive_relaxed(&compiled.text_query);
909        let relaxed = relaxed_query.map(|q| CompiledSearch {
910            root_kind: compiled.root_kind.clone(),
911            text_query: q,
912            limit: compiled.limit,
913            fusable_filters: compiled.fusable_filters.clone(),
914            residual_filters: compiled.residual_filters.clone(),
915            attribution_requested: compiled.attribution_requested,
916        });
917        let plan = CompiledSearchPlan {
918            strict: compiled.clone(),
919            relaxed,
920            was_degraded_at_plan_time,
921        };
922        self.execute_compiled_search_plan(&plan)
923    }
924
925    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
926    /// deduped result rows.
927    ///
928    /// This is the shared retrieval/merge routine that both
929    /// [`Self::execute_compiled_search`] (adaptive path) and
930    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
931    /// runs first; the relaxed branch only fires when it is present AND the
932    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
933    /// hits. Merge and dedup semantics are identical to the adaptive path
934    /// regardless of how the plan was constructed.
935    ///
936    /// Error contract: if the relaxed branch errors, the error propagates;
937    /// strict hits are not returned. This matches the rest of the engine's
938    /// fail-hard posture.
939    ///
940    /// # Errors
941    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
942    /// executed.
943    pub fn execute_compiled_search_plan(
944        &self,
945        plan: &CompiledSearchPlan,
946    ) -> Result<SearchRows, EngineError> {
947        let strict = &plan.strict;
948        let limit = strict.limit;
949        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
950
951        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
952        let strict_underfilled = strict_hits.len() < fallback_threshold;
953
954        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
955        let mut fallback_used = false;
956        let mut was_degraded = false;
957        if let Some(relaxed) = plan.relaxed.as_ref()
958            && strict_underfilled
959        {
960            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
961            fallback_used = true;
962            was_degraded = plan.was_degraded_at_plan_time;
963        }
964
965        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
966        // Attribution runs AFTER dedup so that duplicate hits dropped by
967        // `merge_search_branches` do not waste a highlight+position-map
968        // lookup.
969        if strict.attribution_requested {
970            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
971            self.populate_attribution_for_hits(
972                &mut merged,
973                &strict.text_query,
974                relaxed_text_query,
975            )?;
976        }
977        let strict_hit_count = merged
978            .iter()
979            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
980            .count();
981        let relaxed_hit_count = merged
982            .iter()
983            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
984            .count();
985        // Phase 10: no vector execution path yet, so vector_hit_count is
986        // always zero. Future phases that wire a vector branch will
987        // contribute here.
988        let vector_hit_count = 0;
989
990        Ok(SearchRows {
991            hits: merged,
992            strict_hit_count,
993            relaxed_hit_count,
994            vector_hit_count,
995            fallback_used,
996            was_degraded,
997        })
998    }
999
1000    /// Execute a compiled vector-only search and return matching hits.
1001    ///
1002    /// Phase 11 delivers the standalone vector retrieval path. The emitted
1003    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
1004    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
1005    /// into the candidate CTE. The outer `SELECT` applies residual JSON
1006    /// predicates and orders by score descending, where `score = -distance`
1007    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
1008    ///
1009    /// ## Capability-miss handling
1010    ///
1011    /// If the `sqlite-vec` capability is absent (feature disabled or the
1012    /// `vec_nodes_active` virtual table has not been created because the
1013    /// engine was not opened with a `vector_dimension`), this method returns
1014    /// an empty [`SearchRows`] with `was_degraded = true`. This is
1015    /// **non-fatal** — the error does not propagate — matching the addendum's
1016    /// §Vector-Specific Behavior / Degradation.
1017    ///
1018    /// ## Attribution
1019    ///
1020    /// When `compiled.attribution_requested == true`, every returned hit
1021    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
1022    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
1023    /// extended uniformly).
1024    ///
1025    /// # Errors
1026    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
1027    /// executed for reasons other than a vec-table capability miss.
1028    #[allow(clippy::too_many_lines)]
1029    pub fn execute_compiled_vector_search(
1030        &self,
1031        compiled: &CompiledVectorSearch,
1032    ) -> Result<SearchRows, EngineError> {
1033        use std::fmt::Write as _;
1034
1035        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
1036        // empty result rather than a SQL error from `LIMIT 0` semantics in
1037        // the inner vec0 scan.
1038        if compiled.limit == 0 {
1039            return Ok(SearchRows::default());
1040        }
1041
1042        let filter_by_kind = !compiled.root_kind.is_empty();
1043        let mut binds: Vec<BindValue> = Vec::new();
1044        binds.push(BindValue::Text(compiled.query_text.clone()));
1045        if filter_by_kind {
1046            binds.push(BindValue::Text(compiled.root_kind.clone()));
1047        }
1048
1049        // Build fusable-filter clauses, aliased against `src` inside the
1050        // candidate CTE. Same predicate set the text path fuses.
1051        let mut fused_clauses = String::new();
1052        for predicate in &compiled.fusable_filters {
1053            match predicate {
1054                Predicate::KindEq(kind) => {
1055                    binds.push(BindValue::Text(kind.clone()));
1056                    let idx = binds.len();
1057                    let _ = write!(
1058                        fused_clauses,
1059                        "\n                      AND src.kind = ?{idx}"
1060                    );
1061                }
1062                Predicate::LogicalIdEq(logical_id) => {
1063                    binds.push(BindValue::Text(logical_id.clone()));
1064                    let idx = binds.len();
1065                    let _ = write!(
1066                        fused_clauses,
1067                        "\n                      AND src.logical_id = ?{idx}"
1068                    );
1069                }
1070                Predicate::SourceRefEq(source_ref) => {
1071                    binds.push(BindValue::Text(source_ref.clone()));
1072                    let idx = binds.len();
1073                    let _ = write!(
1074                        fused_clauses,
1075                        "\n                      AND src.source_ref = ?{idx}"
1076                    );
1077                }
1078                Predicate::ContentRefEq(uri) => {
1079                    binds.push(BindValue::Text(uri.clone()));
1080                    let idx = binds.len();
1081                    let _ = write!(
1082                        fused_clauses,
1083                        "\n                      AND src.content_ref = ?{idx}"
1084                    );
1085                }
1086                Predicate::ContentRefNotNull => {
1087                    fused_clauses
1088                        .push_str("\n                      AND src.content_ref IS NOT NULL");
1089                }
1090                Predicate::JsonPathFusedEq { path, value } => {
1091                    binds.push(BindValue::Text(path.clone()));
1092                    let path_idx = binds.len();
1093                    binds.push(BindValue::Text(value.clone()));
1094                    let value_idx = binds.len();
1095                    let _ = write!(
1096                        fused_clauses,
1097                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1098                    );
1099                }
1100                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1101                    binds.push(BindValue::Text(path.clone()));
1102                    let path_idx = binds.len();
1103                    binds.push(BindValue::Integer(*value));
1104                    let value_idx = binds.len();
1105                    let operator = match op {
1106                        ComparisonOp::Gt => ">",
1107                        ComparisonOp::Gte => ">=",
1108                        ComparisonOp::Lt => "<",
1109                        ComparisonOp::Lte => "<=",
1110                    };
1111                    let _ = write!(
1112                        fused_clauses,
1113                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
1114                    );
1115                }
1116                Predicate::JsonPathFusedBoolEq { path, value } => {
1117                    binds.push(BindValue::Text(path.clone()));
1118                    let path_idx = binds.len();
1119                    binds.push(BindValue::Integer(i64::from(*value)));
1120                    let value_idx = binds.len();
1121                    let _ = write!(
1122                        fused_clauses,
1123                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1124                    );
1125                }
1126                Predicate::JsonPathFusedIn { path, values } => {
1127                    binds.push(BindValue::Text(path.clone()));
1128                    let first_param = binds.len();
1129                    for v in values {
1130                        binds.push(BindValue::Text(v.clone()));
1131                    }
1132                    let placeholders = (1..=values.len())
1133                        .map(|i| format!("?{}", first_param + i))
1134                        .collect::<Vec<_>>()
1135                        .join(", ");
1136                    let _ = write!(
1137                        fused_clauses,
1138                        "\n                      AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
1139                    );
1140                }
1141                Predicate::JsonPathEq { .. }
1142                | Predicate::JsonPathCompare { .. }
1143                | Predicate::JsonPathIn { .. }
1144                | Predicate::EdgePropertyEq { .. }
1145                | Predicate::EdgePropertyCompare { .. } => {
1146                    // JSON predicates are residual; compile_vector_search
1147                    // guarantees they never appear here, but stay defensive.
1148                    // Edge property predicates are not valid in the search path.
1149                }
1150            }
1151        }
1152
1153        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
1154        let mut filter_clauses = String::new();
1155        for predicate in &compiled.residual_filters {
1156            match predicate {
1157                Predicate::JsonPathEq { path, value } => {
1158                    binds.push(BindValue::Text(path.clone()));
1159                    let path_idx = binds.len();
1160                    binds.push(scalar_to_bind(value));
1161                    let value_idx = binds.len();
1162                    let _ = write!(
1163                        filter_clauses,
1164                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1165                    );
1166                }
1167                Predicate::JsonPathCompare { path, op, value } => {
1168                    binds.push(BindValue::Text(path.clone()));
1169                    let path_idx = binds.len();
1170                    binds.push(scalar_to_bind(value));
1171                    let value_idx = binds.len();
1172                    let operator = match op {
1173                        ComparisonOp::Gt => ">",
1174                        ComparisonOp::Gte => ">=",
1175                        ComparisonOp::Lt => "<",
1176                        ComparisonOp::Lte => "<=",
1177                    };
1178                    let _ = write!(
1179                        filter_clauses,
1180                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1181                    );
1182                }
1183                Predicate::JsonPathIn { path, values } => {
1184                    binds.push(BindValue::Text(path.clone()));
1185                    let first_param = binds.len();
1186                    for v in values {
1187                        binds.push(scalar_to_bind(v));
1188                    }
1189                    let placeholders = (1..=values.len())
1190                        .map(|i| format!("?{}", first_param + i))
1191                        .collect::<Vec<_>>()
1192                        .join(", ");
1193                    let _ = write!(
1194                        filter_clauses,
1195                        "\n  AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1196                    );
1197                }
1198                Predicate::KindEq(_)
1199                | Predicate::LogicalIdEq(_)
1200                | Predicate::SourceRefEq(_)
1201                | Predicate::ContentRefEq(_)
1202                | Predicate::ContentRefNotNull
1203                | Predicate::JsonPathFusedEq { .. }
1204                | Predicate::JsonPathFusedTimestampCmp { .. }
1205                | Predicate::JsonPathFusedBoolEq { .. }
1206                | Predicate::JsonPathFusedIn { .. }
1207                | Predicate::EdgePropertyEq { .. }
1208                | Predicate::EdgePropertyCompare { .. } => {
1209                    // Fusable predicates live in fused_clauses above.
1210                    // Edge property predicates are not valid in the search path.
1211                }
1212            }
1213        }
1214
1215        // Bind the outer limit as a named parameter for prepare_cached
1216        // stability across calls that vary only by limit value.
1217        let limit = compiled.limit;
1218        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1219        let limit_idx = binds.len();
1220
1221        // sqlite-vec requires the LIMIT/k constraint to be visible directly
1222        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
1223        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
1224        // Phase 12's planner may raise this to compensate for fusion
1225        // narrowing the candidate pool).
1226        let base_limit = limit;
1227        let kind_clause = if filter_by_kind {
1228            "\n                      AND src.kind = ?2"
1229        } else {
1230            ""
1231        };
1232
1233        // Derive the per-kind vec table name from the query's root kind.
1234        // An empty root_kind defaults to a sentinel that will not exist, causing
1235        // graceful degradation (was_degraded=true).
1236        let vec_table = if compiled.root_kind.is_empty() {
1237            "vec__unknown".to_owned()
1238        } else {
1239            fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1240        };
1241
1242        let sql = format!(
1243            "WITH vector_hits AS (
1244                SELECT
1245                    src.row_id AS row_id,
1246                    src.logical_id AS logical_id,
1247                    src.kind AS kind,
1248                    src.properties AS properties,
1249                    src.source_ref AS source_ref,
1250                    src.content_ref AS content_ref,
1251                    src.created_at AS created_at,
1252                    vc.distance AS distance,
1253                    vc.chunk_id AS chunk_id
1254                FROM (
1255                    SELECT chunk_id, distance
1256                    FROM {vec_table}
1257                    WHERE embedding MATCH ?1
1258                    LIMIT {base_limit}
1259                ) vc
1260                JOIN chunks c ON c.id = vc.chunk_id
1261                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1262                WHERE 1 = 1{kind_clause}{fused_clauses}
1263            )
1264            SELECT
1265                h.row_id,
1266                h.logical_id,
1267                h.kind,
1268                h.properties,
1269                h.content_ref,
1270                am.last_accessed_at,
1271                h.created_at,
1272                h.distance,
1273                h.chunk_id
1274            FROM vector_hits h
1275            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1276            WHERE 1 = 1{filter_clauses}
1277            ORDER BY h.distance ASC
1278            LIMIT ?{limit_idx}"
1279        );
1280
1281        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1282
1283        let conn_guard = match self.lock_connection() {
1284            Ok(g) => g,
1285            Err(e) => {
1286                self.telemetry.increment_errors();
1287                return Err(e);
1288            }
1289        };
1290        let mut statement = match conn_guard.prepare_cached(&sql) {
1291            Ok(stmt) => stmt,
1292            Err(e) if is_vec_table_absent(&e) => {
1293                // Capability miss: non-fatal — surface as was_degraded.
1294                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1295                    trace_warn!("vector table absent, degrading vector_search to empty result");
1296                }
1297                return Ok(SearchRows {
1298                    hits: Vec::new(),
1299                    strict_hit_count: 0,
1300                    relaxed_hit_count: 0,
1301                    vector_hit_count: 0,
1302                    fallback_used: false,
1303                    was_degraded: true,
1304                });
1305            }
1306            Err(e) => {
1307                self.telemetry.increment_errors();
1308                return Err(EngineError::Sqlite(e));
1309            }
1310        };
1311
1312        let attribution_requested = compiled.attribution_requested;
1313        let hits = match statement
1314            .query_map(params_from_iter(bind_values.iter()), |row| {
1315                let distance: f64 = row.get(7)?;
1316                // Score is the negated distance per addendum 1
1317                // §Vector-Specific Behavior / Score and distance. For
1318                // distance metrics (sqlite-vec's default) lower distance =
1319                // better match, so negating yields the higher-is-better
1320                // convention that dedup_branch_hits and the unified result
1321                // surface rely on.
1322                let score = -distance;
1323                Ok(SearchHit {
1324                    node: fathomdb_query::NodeRowLite {
1325                        row_id: row.get(0)?,
1326                        logical_id: row.get(1)?,
1327                        kind: row.get(2)?,
1328                        properties: row.get(3)?,
1329                        content_ref: row.get(4)?,
1330                        last_accessed_at: row.get(5)?,
1331                    },
1332                    written_at: row.get(6)?,
1333                    score,
1334                    modality: RetrievalModality::Vector,
1335                    source: SearchHitSource::Vector,
1336                    // Vector hits have no strict/relaxed notion.
1337                    match_mode: None,
1338                    // Vector hits have no snippet.
1339                    snippet: None,
1340                    projection_row_id: row.get::<_, Option<String>>(8)?,
1341                    vector_distance: Some(distance),
1342                    attribution: if attribution_requested {
1343                        Some(HitAttribution {
1344                            matched_paths: Vec::new(),
1345                        })
1346                    } else {
1347                        None
1348                    },
1349                })
1350            })
1351            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1352        {
1353            Ok(rows) => rows,
1354            Err(e) => {
1355                // Some SQLite errors surface during row iteration (e.g. when
1356                // the vec0 extension is not loaded but the table exists as a
1357                // stub). Classify as capability-miss when the shape matches.
1358                if is_vec_table_absent(&e) {
1359                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1360                        trace_warn!(
1361                            "vector table absent at query time, degrading vector_search to empty result"
1362                        );
1363                    }
1364                    drop(statement);
1365                    drop(conn_guard);
1366                    return Ok(SearchRows {
1367                        hits: Vec::new(),
1368                        strict_hit_count: 0,
1369                        relaxed_hit_count: 0,
1370                        vector_hit_count: 0,
1371                        fallback_used: false,
1372                        was_degraded: true,
1373                    });
1374                }
1375                self.telemetry.increment_errors();
1376                return Err(EngineError::Sqlite(e));
1377            }
1378        };
1379
1380        drop(statement);
1381        drop(conn_guard);
1382
1383        self.telemetry.increment_queries();
1384        let vector_hit_count = hits.len();
1385        Ok(SearchRows {
1386            hits,
1387            strict_hit_count: 0,
1388            relaxed_hit_count: 0,
1389            vector_hit_count,
1390            fallback_used: false,
1391            was_degraded: false,
1392        })
1393    }
1394
1395    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1396    /// entry point) and return deterministically ranked, block-ordered
1397    /// [`SearchRows`].
1398    ///
1399    /// Stages, per addendum 1 §Retrieval Planner Model:
1400    ///
1401    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1402    ///    empty branch result inside `run_search_branch`).
1403    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1404    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1405    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1406    ///    Phase 6 text-only path.
1407    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1408    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1409    ///    planner never wires a vector branch through `search()`, so this
1410    ///    code path is structurally present but dormant.** A future phase
1411    ///    that wires read-time embedding into `compile_retrieval_plan` will
1412    ///    immediately light it up.
1413    /// 4. **Fusion.** All collected hits are merged via
1414    ///    [`merge_search_branches_three`], which produces strict ->
1415    ///    relaxed -> vector block ordering with cross-branch dedup
1416    ///    resolved by branch precedence.
1417    ///
1418    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1419    /// addendum's "vector capability miss => `was_degraded`" semantics
1420    /// applies to `search()` only when the unified planner actually fires
1421    /// the vector branch, which v1 never does.
1422    ///
1423    /// # Errors
1424    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1425    /// executed for a non-capability-miss reason.
1426    pub fn execute_retrieval_plan(
1427        &self,
1428        plan: &CompiledRetrievalPlan,
1429        raw_query: &str,
1430    ) -> Result<SearchRows, EngineError> {
1431        // Phase 12.5a: we work against a local owned copy so
1432        // `fill_vector_branch` can mutate `plan.vector` and
1433        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1434        // a bounded set of predicates + text AST nodes) and avoids
1435        // forcing callers to hand us `&mut` access.
1436        let mut plan = plan.clone();
1437        let limit = plan.text.strict.limit;
1438
1439        // Stage 1: text strict.
1440        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1441
1442        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1443        // path uses.
1444        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1445        let strict_underfilled = strict_hits.len() < fallback_threshold;
1446        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1447        let mut fallback_used = false;
1448        let mut was_degraded = false;
1449        if let Some(relaxed) = plan.text.relaxed.as_ref()
1450            && strict_underfilled
1451        {
1452            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1453            fallback_used = true;
1454            was_degraded = plan.was_degraded_at_plan_time;
1455        }
1456
1457        // Phase 12.5a: fill the vector branch from the configured
1458        // read-time query embedder, if any. Option (b) from the spec:
1459        // only pay the embedding cost when the text branches returned
1460        // nothing, because the three-branch stage gate below only runs
1461        // the vector stage under exactly that condition. This keeps the
1462        // hot path (strict text matched) embedder-free.
1463        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1464        if text_branches_empty && self.query_embedder.is_some() {
1465            self.fill_vector_branch(&mut plan, raw_query);
1466        }
1467
1468        // Stage 3: vector. Runs only when text retrieval is empty AND a
1469        // vector branch is present. When no embedder is configured (Phase
1470        // 12.5a default) `plan.vector` stays `None` and this stage is a
1471        // no-op, preserving the Phase 12 v1 dormancy invariant.
1472        let mut vector_hits: Vec<SearchHit> = Vec::new();
1473        if let Some(vector) = plan.vector.as_ref()
1474            && strict_hits.is_empty()
1475            && relaxed_hits.is_empty()
1476        {
1477            let vector_rows = self.execute_compiled_vector_search(vector)?;
1478            // `execute_compiled_vector_search` returns a fully populated
1479            // `SearchRows`. Promote its hits into the merge stage and lift
1480            // its capability-miss `was_degraded` flag onto the unified
1481            // result, per addendum §Vector-Specific Behavior.
1482            vector_hits = vector_rows.hits;
1483            if vector_rows.was_degraded {
1484                was_degraded = true;
1485            }
1486        }
1487        // Phase 12.5a: an embedder-reported capability miss surfaces as
1488        // `plan.was_degraded_at_plan_time = true` set inside
1489        // `fill_vector_branch`. Lift it onto the response so callers see
1490        // the graceful degradation even when the vector stage-gate never
1491        // had the chance to fire (the embedder call itself failed and
1492        // the vector slot stayed `None`).
1493        if text_branches_empty
1494            && plan.was_degraded_at_plan_time
1495            && plan.vector.is_none()
1496            && self.query_embedder.is_some()
1497        {
1498            was_degraded = true;
1499        }
1500
1501        // Stage 4: fusion.
1502        let strict = &plan.text.strict;
1503        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1504        if strict.attribution_requested {
1505            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1506            self.populate_attribution_for_hits(
1507                &mut merged,
1508                &strict.text_query,
1509                relaxed_text_query,
1510            )?;
1511        }
1512
1513        let strict_hit_count = merged
1514            .iter()
1515            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1516            .count();
1517        let relaxed_hit_count = merged
1518            .iter()
1519            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1520            .count();
1521        let vector_hit_count = merged
1522            .iter()
1523            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1524            .count();
1525
1526        Ok(SearchRows {
1527            hits: merged,
1528            strict_hit_count,
1529            relaxed_hit_count,
1530            vector_hit_count,
1531            fallback_used,
1532            was_degraded,
1533        })
1534    }
1535
1536    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1537    /// query embedder, if any.
1538    ///
1539    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1540    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1541    /// - Both the strict and relaxed text branches already ran and
1542    ///   returned zero hits, so the existing three-branch stage gate
1543    ///   will actually fire the vector stage once the slot is populated.
1544    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1545    ///   cost entirely when text retrieval already won.
1546    ///
1547    /// Contract: never panics, never returns an error. On embedder error
1548    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1549    /// `plan.vector` as `None`; the coordinator's normal error-free
1550    /// degradation path then reports `was_degraded` on the result.
1551    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1552        let Some(embedder) = self.query_embedder.as_ref() else {
1553            return;
1554        };
1555        match embedder.embed_query(raw_query) {
1556            Ok(vec) => {
1557                // `CompiledVectorSearch::query_text` is a JSON float-array
1558                // literal at the time the coordinator binds it (see the
1559                // `CompiledVectorSearch` docs). `serde_json::to_string`
1560                // on a `Vec<f32>` produces exactly that shape — no
1561                // wire-format change required.
1562                let literal = match serde_json::to_string(&vec) {
1563                    Ok(s) => s,
1564                    Err(err) => {
1565                        trace_warn!(
1566                            error = %err,
1567                            "query embedder vector serialization failed; skipping vector branch"
1568                        );
1569                        let _ = err; // Used by trace_warn! when tracing feature is active
1570                        plan.was_degraded_at_plan_time = true;
1571                        return;
1572                    }
1573                };
1574                let strict = &plan.text.strict;
1575                plan.vector = Some(CompiledVectorSearch {
1576                    root_kind: strict.root_kind.clone(),
1577                    query_text: literal,
1578                    limit: strict.limit,
1579                    fusable_filters: strict.fusable_filters.clone(),
1580                    residual_filters: strict.residual_filters.clone(),
1581                    attribution_requested: strict.attribution_requested,
1582                });
1583            }
1584            Err(err) => {
1585                trace_warn!(
1586                    error = %err,
1587                    "query embedder unavailable, skipping vector branch"
1588                );
1589                let _ = err; // Used by trace_warn! when tracing feature is active
1590                plan.was_degraded_at_plan_time = true;
1591            }
1592        }
1593    }
1594
1595    /// Execute a single search branch against the underlying FTS surfaces.
1596    ///
1597    /// This is the shared SQL emission path used by
1598    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1599    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1600    /// The returned hits are tagged with `branch`'s corresponding
1601    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1602    /// caller is responsible for merging multiple branches.
1603    #[allow(clippy::too_many_lines)]
1604    fn run_search_branch(
1605        &self,
1606        compiled: &CompiledSearch,
1607        branch: SearchBranch,
1608    ) -> Result<Vec<SearchHit>, EngineError> {
1609        use std::fmt::Write as _;
1610        // Short-circuit an empty/whitespace-only query: rendering it would
1611        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1612        // (including the adaptive path when strict is Empty and derive_relaxed
1613        // returns None) must see an empty result, not an error. Each branch
1614        // is short-circuited independently so a strict-Empty + relaxed-Some
1615        // plan still exercises the relaxed branch.
1616        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1617        // matches "every row not containing X" — a complement-of-corpus scan
1618        // that no caller would intentionally want. Short-circuit to empty at
1619        // the root only; a `Not` nested inside an `And` is a legitimate
1620        // exclusion and must still run.
1621        if matches!(
1622            compiled.text_query,
1623            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1624        ) {
1625            return Ok(Vec::new());
1626        }
1627        let rendered_base = render_text_query_fts5(&compiled.text_query);
1628        // Apply per-kind query-side tokenizer adaptations.
1629        // SubstringTrigram: queries shorter than 3 chars produce no results
1630        // (trigram index cannot match them). Return empty instead of an FTS5
1631        // error or a full-table scan.
1632        // SourceCode: render_text_query_fts5 already wraps every term in FTS5
1633        // double-quote delimiters (e.g. "std.io"). Applying escape_source_code_query
1634        // on that already-rendered output corrupts the expression — the escape
1635        // function sees the surrounding '"' characters and '.' separator and
1636        // produces malformed syntax like '"std"."io""'. The phrase quoting from
1637        // render_text_query_fts5 is sufficient: FTS5 applies the tokenizer
1638        // (including custom tokenchars) inside double-quoted phrase expressions,
1639        // so "std.io" correctly matches the single token 'std.io'.
1640        let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1641        if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1642            && rendered_base
1643                .chars()
1644                .filter(|c| c.is_alphanumeric())
1645                .count()
1646                < 3
1647        {
1648            return Ok(Vec::new());
1649        }
1650        let rendered = rendered_base;
1651        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1652        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1653        // The adaptive `text_search()` path never produces an empty root_kind
1654        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1655        // the entry point.
1656        let filter_by_kind = !compiled.root_kind.is_empty();
1657
1658        // Acquire the connection early so we can check per-kind table existence
1659        // before building the bind array and SQL. The bind numbering depends on
1660        // whether the property FTS UNION arm is included.
1661        let conn_guard = match self.lock_connection() {
1662            Ok(g) => g,
1663            Err(e) => {
1664                self.telemetry.increment_errors();
1665                return Err(e);
1666            }
1667        };
1668
1669        // Determine which per-kind property FTS tables to include in the UNION arm.
1670        //
1671        // filter_by_kind = true (root_kind set): include the single per-kind table
1672        //   for root_kind if it exists. Bind order: ?1=chunk_text, ?2=kind, ?3=prop_text.
1673        //
1674        // filter_by_kind = false (fallback search, root_kind empty): include per-kind tables
1675        //   based on fusable KindEq predicates or all registered tables from sqlite_master.
1676        //   A single KindEq in fusable_filters lets us use one specific table. With no KindEq
1677        //   we UNION all per-kind tables so kind-less fallback searches still find property
1678        //   hits (matching the behaviour of the former global fts_node_properties table).
1679        //   Bind order: ?1=chunk_text, ?2=prop_text (shared across all prop UNION arms).
1680        //
1681        // In both cases, per-kind tables are already kind-specific so no `fp.kind = ?` filter
1682        // is needed inside the inner arm; the outer `search_hits` CTE WHERE clause handles
1683        // any further kind narrowing via fused KindEq predicates.
1684        // prop_fts_tables is Vec<(kind, table_name)> so we can later look up per-kind
1685        // BM25 weights from fts_property_schemas when building the scoring expression.
1686        let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1687            let kind = compiled.root_kind.clone();
1688            let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1689            let exists: bool = conn_guard
1690                .query_row(
1691                    "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1692                    rusqlite::params![prop_table],
1693                    |_| Ok(true),
1694                )
1695                .optional()
1696                .map_err(EngineError::Sqlite)?
1697                .unwrap_or(false);
1698            if exists {
1699                vec![(kind, prop_table)]
1700            } else {
1701                vec![]
1702            }
1703        } else {
1704            // Fallback / kind-less search: find the right per-kind tables.
1705            // If there is exactly one KindEq in fusable_filters, use that kind's table.
1706            // Otherwise, include all registered per-kind tables from sqlite_master so
1707            // that kind-less fallback searches can still return property FTS hits.
1708            let kind_eq_values: Vec<String> = compiled
1709                .fusable_filters
1710                .iter()
1711                .filter_map(|p| match p {
1712                    Predicate::KindEq(k) => Some(k.clone()),
1713                    _ => None,
1714                })
1715                .collect();
1716            if kind_eq_values.len() == 1 {
1717                let kind = kind_eq_values[0].clone();
1718                let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1719                let exists: bool = conn_guard
1720                    .query_row(
1721                        "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1722                        rusqlite::params![prop_table],
1723                        |_| Ok(true),
1724                    )
1725                    .optional()
1726                    .map_err(EngineError::Sqlite)?
1727                    .unwrap_or(false);
1728                if exists {
1729                    vec![(kind, prop_table)]
1730                } else {
1731                    vec![]
1732                }
1733            } else {
1734                // No single KindEq: UNION all per-kind tables so kind-less fallback
1735                // searches behave like the former global fts_node_properties table.
1736                // Fetch registered kinds and compute/verify their per-kind table names.
1737                let mut stmt = conn_guard
1738                    .prepare("SELECT kind FROM fts_property_schemas")
1739                    .map_err(EngineError::Sqlite)?;
1740                let all_kinds: Vec<String> = stmt
1741                    .query_map([], |r| r.get::<_, String>(0))
1742                    .map_err(EngineError::Sqlite)?
1743                    .collect::<Result<Vec<_>, _>>()
1744                    .map_err(EngineError::Sqlite)?;
1745                drop(stmt);
1746                let mut result = Vec::new();
1747                for kind in all_kinds {
1748                    let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1749                    let exists: bool = conn_guard
1750                        .query_row(
1751                            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1752                            rusqlite::params![prop_table],
1753                            |_| Ok(true),
1754                        )
1755                        .optional()
1756                        .map_err(EngineError::Sqlite)?
1757                        .unwrap_or(false);
1758                    if exists {
1759                        result.push((kind, prop_table));
1760                    }
1761                }
1762                result
1763            }
1764        };
1765        let use_prop_fts = !prop_fts_tables.is_empty();
1766
1767        // Bind layout (before fused/residual predicates and limit):
1768        //   filter_by_kind = true,  use_prop_fts = true:  ?1=chunk_text, ?2=kind, ?3=prop_text
1769        //   filter_by_kind = true,  use_prop_fts = false: ?1=chunk_text, ?2=kind
1770        //   filter_by_kind = false, use_prop_fts = true:  ?1=chunk_text, ?2=prop_text
1771        //   filter_by_kind = false, use_prop_fts = false: ?1=chunk_text
1772        let mut binds: Vec<BindValue> = if filter_by_kind {
1773            if use_prop_fts {
1774                vec![
1775                    BindValue::Text(rendered.clone()),
1776                    BindValue::Text(compiled.root_kind.clone()),
1777                    BindValue::Text(rendered),
1778                ]
1779            } else {
1780                vec![
1781                    BindValue::Text(rendered.clone()),
1782                    BindValue::Text(compiled.root_kind.clone()),
1783                ]
1784            }
1785        } else if use_prop_fts {
1786            // fallback search with property FTS: ?1=chunk, ?2=prop (same query value)
1787            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1788        } else {
1789            vec![BindValue::Text(rendered)]
1790        };
1791
1792        // P2-5: both fusable and residual predicates now match against the
1793        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1794        // `u.content_ref`, `u.properties`) because the inner UNION arms
1795        // project the full active-row column set through the
1796        // `JOIN nodes src` already present in each arm. The previous
1797        // implementation re-joined `nodes hn` at the CTE level and
1798        // `nodes n` again at the outer SELECT, which was triple work on
1799        // the hot search path.
1800        let mut fused_clauses = String::new();
1801        for predicate in &compiled.fusable_filters {
1802            match predicate {
1803                Predicate::KindEq(kind) => {
1804                    binds.push(BindValue::Text(kind.clone()));
1805                    let idx = binds.len();
1806                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1807                }
1808                Predicate::LogicalIdEq(logical_id) => {
1809                    binds.push(BindValue::Text(logical_id.clone()));
1810                    let idx = binds.len();
1811                    let _ = write!(
1812                        fused_clauses,
1813                        "\n                  AND u.logical_id = ?{idx}"
1814                    );
1815                }
1816                Predicate::SourceRefEq(source_ref) => {
1817                    binds.push(BindValue::Text(source_ref.clone()));
1818                    let idx = binds.len();
1819                    let _ = write!(
1820                        fused_clauses,
1821                        "\n                  AND u.source_ref = ?{idx}"
1822                    );
1823                }
1824                Predicate::ContentRefEq(uri) => {
1825                    binds.push(BindValue::Text(uri.clone()));
1826                    let idx = binds.len();
1827                    let _ = write!(
1828                        fused_clauses,
1829                        "\n                  AND u.content_ref = ?{idx}"
1830                    );
1831                }
1832                Predicate::ContentRefNotNull => {
1833                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1834                }
1835                Predicate::JsonPathFusedEq { path, value } => {
1836                    binds.push(BindValue::Text(path.clone()));
1837                    let path_idx = binds.len();
1838                    binds.push(BindValue::Text(value.clone()));
1839                    let value_idx = binds.len();
1840                    let _ = write!(
1841                        fused_clauses,
1842                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1843                    );
1844                }
1845                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1846                    binds.push(BindValue::Text(path.clone()));
1847                    let path_idx = binds.len();
1848                    binds.push(BindValue::Integer(*value));
1849                    let value_idx = binds.len();
1850                    let operator = match op {
1851                        ComparisonOp::Gt => ">",
1852                        ComparisonOp::Gte => ">=",
1853                        ComparisonOp::Lt => "<",
1854                        ComparisonOp::Lte => "<=",
1855                    };
1856                    let _ = write!(
1857                        fused_clauses,
1858                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1859                    );
1860                }
1861                Predicate::JsonPathFusedBoolEq { path, value } => {
1862                    binds.push(BindValue::Text(path.clone()));
1863                    let path_idx = binds.len();
1864                    binds.push(BindValue::Integer(i64::from(*value)));
1865                    let value_idx = binds.len();
1866                    let _ = write!(
1867                        fused_clauses,
1868                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1869                    );
1870                }
1871                Predicate::JsonPathFusedIn { path, values } => {
1872                    binds.push(BindValue::Text(path.clone()));
1873                    let first_param = binds.len();
1874                    for v in values {
1875                        binds.push(BindValue::Text(v.clone()));
1876                    }
1877                    let placeholders = (1..=values.len())
1878                        .map(|i| format!("?{}", first_param + i))
1879                        .collect::<Vec<_>>()
1880                        .join(", ");
1881                    let _ = write!(
1882                        fused_clauses,
1883                        "\n                  AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
1884                    );
1885                }
1886                Predicate::JsonPathEq { .. }
1887                | Predicate::JsonPathCompare { .. }
1888                | Predicate::JsonPathIn { .. }
1889                | Predicate::EdgePropertyEq { .. }
1890                | Predicate::EdgePropertyCompare { .. } => {
1891                    // Should be in residual_filters; compile_search guarantees
1892                    // this, but stay defensive.
1893                    // Edge property predicates are not valid in the search path.
1894                }
1895            }
1896        }
1897
1898        let mut filter_clauses = String::new();
1899        for predicate in &compiled.residual_filters {
1900            match predicate {
1901                Predicate::JsonPathEq { path, value } => {
1902                    binds.push(BindValue::Text(path.clone()));
1903                    let path_idx = binds.len();
1904                    binds.push(scalar_to_bind(value));
1905                    let value_idx = binds.len();
1906                    let _ = write!(
1907                        filter_clauses,
1908                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1909                    );
1910                }
1911                Predicate::JsonPathCompare { path, op, value } => {
1912                    binds.push(BindValue::Text(path.clone()));
1913                    let path_idx = binds.len();
1914                    binds.push(scalar_to_bind(value));
1915                    let value_idx = binds.len();
1916                    let operator = match op {
1917                        ComparisonOp::Gt => ">",
1918                        ComparisonOp::Gte => ">=",
1919                        ComparisonOp::Lt => "<",
1920                        ComparisonOp::Lte => "<=",
1921                    };
1922                    let _ = write!(
1923                        filter_clauses,
1924                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1925                    );
1926                }
1927                Predicate::JsonPathIn { path, values } => {
1928                    binds.push(BindValue::Text(path.clone()));
1929                    let first_param = binds.len();
1930                    for v in values {
1931                        binds.push(scalar_to_bind(v));
1932                    }
1933                    let placeholders = (1..=values.len())
1934                        .map(|i| format!("?{}", first_param + i))
1935                        .collect::<Vec<_>>()
1936                        .join(", ");
1937                    let _ = write!(
1938                        filter_clauses,
1939                        "\n  AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1940                    );
1941                }
1942                Predicate::KindEq(_)
1943                | Predicate::LogicalIdEq(_)
1944                | Predicate::SourceRefEq(_)
1945                | Predicate::ContentRefEq(_)
1946                | Predicate::ContentRefNotNull
1947                | Predicate::JsonPathFusedEq { .. }
1948                | Predicate::JsonPathFusedTimestampCmp { .. }
1949                | Predicate::JsonPathFusedBoolEq { .. }
1950                | Predicate::JsonPathFusedIn { .. }
1951                | Predicate::EdgePropertyEq { .. }
1952                | Predicate::EdgePropertyCompare { .. } => {
1953                    // Fusable predicates live in fused_clauses; compile_search
1954                    // partitions them out of residual_filters.
1955                    // Edge property predicates are not valid in the search path.
1956                }
1957            }
1958        }
1959
1960        // Bind `limit` as an integer parameter rather than formatting it into
1961        // the SQL string. Interpolating the limit made the prepared-statement
1962        // SQL vary by limit value, so rusqlite's default 16-slot
1963        // `prepare_cached` cache thrashed for paginated callers that varied
1964        // limits per call. With the bind the SQL is structurally stable for
1965        // a given filter shape regardless of `limit` value.
1966        let limit = compiled.limit;
1967        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1968        let limit_idx = binds.len();
1969        // P2-5: the inner UNION arms project the full active-row column
1970        // set through `JOIN nodes src` (kind, row_id, source_ref,
1971        // content_ref, content_hash, created_at, properties). Both the
1972        // CTE's outer WHERE and the final SELECT consume those columns
1973        // directly, which eliminates the previous `JOIN nodes hn` at the
1974        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1975        // redundant joins on the hot search path. `src.superseded_at IS
1976        // NULL` in each arm already filters retired rows, which is what
1977        // the dropped outer joins used to do.
1978        //
1979        // Property FTS uses per-kind tables (fts_props_<kind>). One UNION arm
1980        // is generated per table in prop_fts_tables. The prop text bind index
1981        // is ?3 when filter_by_kind=true (after chunk_text and kind binds) or
1982        // ?2 when filter_by_kind=false (after chunk_text only). The same bind
1983        // position is reused by every prop arm, which is valid in SQLite.
1984        let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1985        let prop_arm_sql: String = if use_prop_fts {
1986            prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1987                // Load schema for this kind to compute BM25 weights.
1988                let bm25_expr = conn_guard
1989                    .query_row(
1990                        "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1991                        rusqlite::params![kind],
1992                        |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1993                    )
1994                    .ok()
1995                    .map_or_else(
1996                        || format!("bm25({prop_table})"),
1997                        |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1998                    );
1999                // For weighted (per-column) schemas text_content does not exist;
2000                // use an empty snippet rather than a column reference that would fail.
2001                let is_weighted = bm25_expr != format!("bm25({prop_table})");
2002                let snippet_expr = if is_weighted {
2003                    "'' AS snippet".to_owned()
2004                } else {
2005                    "substr(fp.text_content, 1, 200) AS snippet".to_owned()
2006                };
2007                let _ = write!(
2008                    acc,
2009                    "
2010                    UNION ALL
2011                    SELECT
2012                        src.row_id AS row_id,
2013                        fp.node_logical_id AS logical_id,
2014                        src.kind AS kind,
2015                        src.properties AS properties,
2016                        src.source_ref AS source_ref,
2017                        src.content_ref AS content_ref,
2018                        src.created_at AS created_at,
2019                        -{bm25_expr} AS score,
2020                        'property' AS source,
2021                        {snippet_expr},
2022                        CAST(fp.rowid AS TEXT) AS projection_row_id
2023                    FROM {prop_table} fp
2024                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
2025                    WHERE {prop_table} MATCH ?{prop_bind_idx}"
2026                );
2027                acc
2028            })
2029        } else {
2030            String::new()
2031        };
2032        let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
2033            ("?1", "\n                      AND src.kind = ?2")
2034        } else {
2035            ("?1", "")
2036        };
2037        let sql = format!(
2038            "WITH search_hits AS (
2039                SELECT
2040                    u.row_id AS row_id,
2041                    u.logical_id AS logical_id,
2042                    u.kind AS kind,
2043                    u.properties AS properties,
2044                    u.source_ref AS source_ref,
2045                    u.content_ref AS content_ref,
2046                    u.created_at AS created_at,
2047                    u.score AS score,
2048                    u.source AS source,
2049                    u.snippet AS snippet,
2050                    u.projection_row_id AS projection_row_id
2051                FROM (
2052                    SELECT
2053                        src.row_id AS row_id,
2054                        c.node_logical_id AS logical_id,
2055                        src.kind AS kind,
2056                        src.properties AS properties,
2057                        src.source_ref AS source_ref,
2058                        src.content_ref AS content_ref,
2059                        src.created_at AS created_at,
2060                        -bm25(fts_nodes) AS score,
2061                        'chunk' AS source,
2062                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
2063                        f.chunk_id AS projection_row_id
2064                    FROM fts_nodes f
2065                    JOIN chunks c ON c.id = f.chunk_id
2066                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
2067                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
2068                ) u
2069                WHERE 1 = 1{fused_clauses}
2070                ORDER BY u.score DESC
2071                LIMIT ?{limit_idx}
2072            )
2073            SELECT
2074                h.row_id,
2075                h.logical_id,
2076                h.kind,
2077                h.properties,
2078                h.content_ref,
2079                am.last_accessed_at,
2080                h.created_at,
2081                h.score,
2082                h.source,
2083                h.snippet,
2084                h.projection_row_id
2085            FROM search_hits h
2086            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
2087            WHERE 1 = 1{filter_clauses}
2088            ORDER BY h.score DESC"
2089        );
2090
2091        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
2092
2093        let mut statement = match conn_guard.prepare_cached(&sql) {
2094            Ok(stmt) => stmt,
2095            Err(e) => {
2096                self.telemetry.increment_errors();
2097                return Err(EngineError::Sqlite(e));
2098            }
2099        };
2100
2101        let hits = match statement
2102            .query_map(params_from_iter(bind_values.iter()), |row| {
2103                let source_str: String = row.get(8)?;
2104                // The CTE emits only two literal values here: `'chunk'` and
2105                // `'property'`. Default to `Chunk` on anything unexpected so a
2106                // schema drift surfaces as a mislabelled hit rather than a
2107                // row-level error.
2108                let source = if source_str == "property" {
2109                    SearchHitSource::Property
2110                } else {
2111                    SearchHitSource::Chunk
2112                };
2113                let match_mode = match branch {
2114                    SearchBranch::Strict => SearchMatchMode::Strict,
2115                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
2116                };
2117                Ok(SearchHit {
2118                    node: fathomdb_query::NodeRowLite {
2119                        row_id: row.get(0)?,
2120                        logical_id: row.get(1)?,
2121                        kind: row.get(2)?,
2122                        properties: row.get(3)?,
2123                        content_ref: row.get(4)?,
2124                        last_accessed_at: row.get(5)?,
2125                    },
2126                    written_at: row.get(6)?,
2127                    score: row.get(7)?,
2128                    // Phase 10: every branch currently emits text hits.
2129                    modality: RetrievalModality::Text,
2130                    source,
2131                    match_mode: Some(match_mode),
2132                    snippet: row.get(9)?,
2133                    projection_row_id: row.get(10)?,
2134                    vector_distance: None,
2135                    attribution: None,
2136                })
2137            })
2138            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
2139        {
2140            Ok(rows) => rows,
2141            Err(e) => {
2142                self.telemetry.increment_errors();
2143                return Err(EngineError::Sqlite(e));
2144            }
2145        };
2146
2147        // Drop the statement so `conn_guard` is free (attribution is
2148        // resolved after dedup in `execute_compiled_search_plan` to avoid
2149        // spending highlight lookups on hits that will be discarded).
2150        drop(statement);
2151        drop(conn_guard);
2152
2153        self.telemetry.increment_queries();
2154        Ok(hits)
2155    }
2156
2157    /// Populate per-hit attribution for the given deduped merged hits.
2158    /// Runs after [`merge_search_branches`] so dropped duplicates do not
2159    /// incur the highlight+position-map lookup cost.
2160    fn populate_attribution_for_hits(
2161        &self,
2162        hits: &mut [SearchHit],
2163        strict_text_query: &fathomdb_query::TextQuery,
2164        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
2165    ) -> Result<(), EngineError> {
2166        let conn_guard = match self.lock_connection() {
2167            Ok(g) => g,
2168            Err(e) => {
2169                self.telemetry.increment_errors();
2170                return Err(e);
2171            }
2172        };
2173        let strict_expr = render_text_query_fts5(strict_text_query);
2174        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
2175        for hit in hits.iter_mut() {
2176            // Phase 10: text hits always carry `Some(match_mode)`. Vector
2177            // hits (when a future phase adds them) have `None` here and
2178            // are skipped by the attribution resolver because attribution
2179            // is meaningless for vector matches.
2180            let match_expr = match hit.match_mode {
2181                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
2182                Some(SearchMatchMode::Relaxed) => {
2183                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
2184                }
2185                None => continue,
2186            };
2187            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
2188                Ok(att) => hit.attribution = Some(att),
2189                Err(e) => {
2190                    self.telemetry.increment_errors();
2191                    return Err(e);
2192                }
2193            }
2194        }
2195        Ok(())
2196    }
2197
2198    /// # Errors
2199    /// Returns [`EngineError`] if the root query or any bounded expansion
2200    /// query cannot be prepared or executed.
2201    pub fn execute_compiled_grouped_read(
2202        &self,
2203        compiled: &CompiledGroupedQuery,
2204    ) -> Result<GroupedQueryRows, EngineError> {
2205        let root_rows = self.execute_compiled_read(&compiled.root)?;
2206        if root_rows.was_degraded {
2207            return Ok(GroupedQueryRows {
2208                roots: Vec::new(),
2209                expansions: Vec::new(),
2210                edge_expansions: Vec::new(),
2211                was_degraded: true,
2212            });
2213        }
2214
2215        let roots = root_rows.nodes;
2216        let mut expansions = Vec::with_capacity(compiled.expansions.len());
2217        for expansion in &compiled.expansions {
2218            let slot_rows = if roots.is_empty() {
2219                Vec::new()
2220            } else {
2221                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
2222            };
2223            expansions.push(ExpansionSlotRows {
2224                slot: expansion.slot.clone(),
2225                roots: slot_rows,
2226            });
2227        }
2228
2229        let mut edge_expansions = Vec::with_capacity(compiled.edge_expansions.len());
2230        for edge_expansion in &compiled.edge_expansions {
2231            let slot_rows = if roots.is_empty() {
2232                Vec::new()
2233            } else {
2234                self.read_edge_expansion_chunked(&roots, edge_expansion, compiled.hints.hard_limit)?
2235            };
2236            edge_expansions.push(EdgeExpansionSlotRows {
2237                slot: edge_expansion.slot.clone(),
2238                roots: slot_rows,
2239            });
2240        }
2241
2242        Ok(GroupedQueryRows {
2243            roots,
2244            expansions,
2245            edge_expansions,
2246            was_degraded: false,
2247        })
2248    }
2249
2250    /// Chunked batched expansion: splits roots into chunks of
2251    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
2252    /// results while preserving root ordering.  This keeps bind-parameter
2253    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
2254    /// for large result sets.
2255    fn read_expansion_nodes_chunked(
2256        &self,
2257        roots: &[NodeRow],
2258        expansion: &ExpansionSlot,
2259        hard_limit: usize,
2260    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2261        if roots.len() <= BATCH_CHUNK_SIZE {
2262            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
2263        }
2264
2265        // Merge chunk results keyed by root logical_id, then reassemble in
2266        // root order.
2267        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2268        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2269            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
2270                per_root
2271                    .entry(group.root_logical_id)
2272                    .or_default()
2273                    .extend(group.nodes);
2274            }
2275        }
2276
2277        Ok(roots
2278            .iter()
2279            .map(|root| ExpansionRootRows {
2280                root_logical_id: root.logical_id.clone(),
2281                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2282            })
2283            .collect())
2284    }
2285
2286    /// Batched expansion: one recursive CTE query per expansion slot that
2287    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
2288    /// source_logical_id ...)` to enforce the per-root hard limit inside the
2289    /// database rather than in Rust.
2290    #[allow(clippy::too_many_lines)]
2291    fn read_expansion_nodes_batched(
2292        &self,
2293        roots: &[NodeRow],
2294        expansion: &ExpansionSlot,
2295        hard_limit: usize,
2296    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2297        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2298        let (join_condition, next_logical_id) = match expansion.direction {
2299            fathomdb_query::TraverseDirection::Out => {
2300                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2301            }
2302            fathomdb_query::TraverseDirection::In => {
2303                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2304            }
2305        };
2306
2307        // EXECUTE-TIME VALIDATION: fused filter against kinds without an FTS schema.
2308        // This check runs at execute time (not builder time) because the target kind
2309        // set for an expand slot is edge-label-scoped, not kind-scoped, and multiple
2310        // target kinds may be reachable via one edge label. See Pack 12 docs.
2311        if expansion.filter.as_ref().is_some_and(|f| {
2312            matches!(
2313                f,
2314                Predicate::JsonPathFusedEq { .. }
2315                    | Predicate::JsonPathFusedTimestampCmp { .. }
2316                    | Predicate::JsonPathFusedIn { .. }
2317            )
2318        }) {
2319            self.validate_fused_filter_for_edge_label(&expansion.label)?;
2320        }
2321
2322        // Build a UNION ALL of SELECT literals for the root seed rows.
2323        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
2324        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
2325        let root_seed_union: String = (1..=root_ids.len())
2326            .map(|i| format!("SELECT ?{i}"))
2327            .collect::<Vec<_>>()
2328            .join(" UNION ALL ");
2329
2330        // Bind params: root IDs occupy ?1..=?N, edge kind is ?(N+1).
2331        // Edge filter params (if any) follow at ?(N+2)..=?M.
2332        // Node filter params (if any) follow at ?(M+1)....
2333        let edge_kind_param = root_ids.len() + 1;
2334        let edge_filter_param_start = root_ids.len() + 2;
2335
2336        // Compile the optional edge filter to a SQL fragment + bind values.
2337        // Injected into the JOIN condition so only matching edges are traversed.
2338        let (edge_filter_sql, edge_filter_binds) =
2339            compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2340
2341        let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2342
2343        // Compile the optional target-side filter to a SQL fragment + bind values.
2344        // The fragment is injected into the `numbered` CTE's WHERE clause BEFORE
2345        // the ROW_NUMBER() window so the per-originator limit counts only matching rows.
2346        let (filter_sql, filter_binds) =
2347            compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2348
2349        // The `root_id` column tracks which root each traversal path
2350        // originated from. The `ROW_NUMBER()` window in the outer query
2351        // enforces the per-root hard limit.
2352        let sql = format!(
2353            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2354            traversed(root_id, logical_id, depth, visited, emitted) AS (
2355                SELECT rid, rid, 0, printf(',%s,', rid), 0
2356                FROM root_ids
2357                UNION ALL
2358                SELECT
2359                    t.root_id,
2360                    {next_logical_id},
2361                    t.depth + 1,
2362                    t.visited || {next_logical_id} || ',',
2363                    t.emitted + 1
2364                FROM traversed t
2365                JOIN edges e ON {join_condition}
2366                    AND e.kind = ?{edge_kind_param}
2367                    AND e.superseded_at IS NULL{edge_filter_sql}
2368                WHERE t.depth < {max_depth}
2369                  AND t.emitted < {hard_limit}
2370                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2371            ),
2372            numbered AS (
2373                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2374                     , n.content_ref, am.last_accessed_at
2375                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2376                FROM traversed t
2377                JOIN nodes n ON n.logical_id = t.logical_id
2378                    AND n.superseded_at IS NULL
2379                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2380                WHERE t.depth > 0{filter_sql}
2381            )
2382            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2383            FROM numbered
2384            WHERE rn <= {hard_limit}
2385            ORDER BY root_id, logical_id",
2386            max_depth = expansion.max_depth,
2387        );
2388
2389        let conn_guard = self.lock_connection()?;
2390        let mut statement = conn_guard
2391            .prepare_cached(&sql)
2392            .map_err(EngineError::Sqlite)?;
2393
2394        // Bind root IDs (1..=N), edge kind (N+1), edge filter params (N+2..M),
2395        // then node filter params (M+1...).
2396        let mut bind_values: Vec<Value> = root_ids
2397            .iter()
2398            .map(|id| Value::Text((*id).to_owned()))
2399            .collect();
2400        bind_values.push(Value::Text(expansion.label.clone()));
2401        bind_values.extend(edge_filter_binds);
2402        bind_values.extend(filter_binds);
2403
2404        let rows = statement
2405            .query_map(params_from_iter(bind_values.iter()), |row| {
2406                Ok((
2407                    row.get::<_, String>(0)?, // root_id
2408                    NodeRow {
2409                        row_id: row.get(1)?,
2410                        logical_id: row.get(2)?,
2411                        kind: row.get(3)?,
2412                        properties: row.get(4)?,
2413                        content_ref: row.get(5)?,
2414                        last_accessed_at: row.get(6)?,
2415                    },
2416                ))
2417            })
2418            .map_err(EngineError::Sqlite)?
2419            .collect::<Result<Vec<_>, _>>()
2420            .map_err(EngineError::Sqlite)?;
2421
2422        // Partition results back into per-root groups, preserving root order.
2423        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2424        for (root_id, node) in rows {
2425            per_root.entry(root_id).or_default().push(node);
2426        }
2427
2428        let root_groups = roots
2429            .iter()
2430            .map(|root| ExpansionRootRows {
2431                root_logical_id: root.logical_id.clone(),
2432                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2433            })
2434            .collect();
2435
2436        Ok(root_groups)
2437    }
2438
2439    /// Chunked batched edge-expansion: splits roots into chunks of
2440    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
2441    /// results while preserving root ordering. Parallel to
2442    /// [`Self::read_expansion_nodes_chunked`] but emits `(EdgeRow, NodeRow)`
2443    /// pairs instead of plain nodes.
2444    fn read_edge_expansion_chunked(
2445        &self,
2446        roots: &[NodeRow],
2447        expansion: &EdgeExpansionSlot,
2448        hard_limit: usize,
2449    ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2450        if roots.len() <= BATCH_CHUNK_SIZE {
2451            return self.read_edge_expansion_batched(roots, expansion, hard_limit);
2452        }
2453
2454        let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2455        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2456            for group in self.read_edge_expansion_batched(chunk, expansion, hard_limit)? {
2457                per_root
2458                    .entry(group.root_logical_id)
2459                    .or_default()
2460                    .extend(group.pairs);
2461            }
2462        }
2463
2464        Ok(roots
2465            .iter()
2466            .map(|root| EdgeExpansionRootRows {
2467                root_logical_id: root.logical_id.clone(),
2468                pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2469            })
2470            .collect())
2471    }
2472
2473    /// Batched edge-expansion: one recursive CTE per slot that joins the
2474    /// final-hop edge row back onto the endpoint node, producing
2475    /// `(EdgeRow, NodeRow)` pairs per root.
2476    ///
2477    /// Security: `edge_filter` and `endpoint_filter` are compiled to
2478    /// parameterized SQL fragments via [`compile_edge_filter`] and
2479    /// [`compile_expansion_filter`]; user-authored values always arrive as
2480    /// bind parameters, never as interpolated SQL text.
2481    #[allow(clippy::too_many_lines)]
2482    fn read_edge_expansion_batched(
2483        &self,
2484        roots: &[NodeRow],
2485        expansion: &EdgeExpansionSlot,
2486        hard_limit: usize,
2487    ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2488        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2489        let (join_condition, next_logical_id) = match expansion.direction {
2490            fathomdb_query::TraverseDirection::Out => {
2491                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2492            }
2493            fathomdb_query::TraverseDirection::In => {
2494                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2495            }
2496        };
2497
2498        // Execute-time validation: fused endpoint filter requires a registered
2499        // property-FTS schema covering every reachable target kind.
2500        if expansion.endpoint_filter.as_ref().is_some_and(|f| {
2501            matches!(
2502                f,
2503                Predicate::JsonPathFusedEq { .. }
2504                    | Predicate::JsonPathFusedTimestampCmp { .. }
2505                    | Predicate::JsonPathFusedIn { .. }
2506            )
2507        }) {
2508            self.validate_fused_filter_for_edge_label(&expansion.label)?;
2509        }
2510
2511        let root_seed_union: String = (1..=root_ids.len())
2512            .map(|i| format!("SELECT ?{i}"))
2513            .collect::<Vec<_>>()
2514            .join(" UNION ALL ");
2515
2516        let edge_kind_param = root_ids.len() + 1;
2517        let edge_filter_param_start = root_ids.len() + 2;
2518
2519        let (edge_filter_sql, edge_filter_binds) =
2520            compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2521
2522        let endpoint_filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2523        let (endpoint_filter_sql, endpoint_filter_binds) = compile_expansion_filter(
2524            expansion.endpoint_filter.as_ref(),
2525            endpoint_filter_param_start,
2526        );
2527
2528        // The `traversed` CTE carries the final-hop edge's row_id so we can
2529        // rejoin onto `edges` to project the full edge shape. Seeds use
2530        // `NULL` for edge_row_id; `WHERE t.depth > 0` in the `numbered` CTE
2531        // excludes the seeds so we never emit a NULL-edge row.
2532        let sql = format!(
2533            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2534            traversed(root_id, logical_id, depth, visited, emitted, edge_row_id) AS (
2535                SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_row_id
2536                FROM root_ids
2537                UNION ALL
2538                SELECT
2539                    t.root_id,
2540                    {next_logical_id},
2541                    t.depth + 1,
2542                    t.visited || {next_logical_id} || ',',
2543                    t.emitted + 1,
2544                    e.row_id AS edge_row_id
2545                FROM traversed t
2546                JOIN edges e ON {join_condition}
2547                    AND e.kind = ?{edge_kind_param}
2548                    AND e.superseded_at IS NULL{edge_filter_sql}
2549                WHERE t.depth < {max_depth}
2550                  AND t.emitted < {hard_limit}
2551                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2552            ),
2553            numbered AS (
2554                SELECT t.root_id,
2555                       e.row_id AS e_row_id,
2556                       e.logical_id AS e_logical_id,
2557                       e.source_logical_id AS e_source,
2558                       e.target_logical_id AS e_target,
2559                       e.kind AS e_kind,
2560                       e.properties AS e_properties,
2561                       e.source_ref AS e_source_ref,
2562                       e.confidence AS e_confidence,
2563                       n.row_id AS n_row_id,
2564                       n.logical_id AS n_logical_id,
2565                       n.kind AS n_kind,
2566                       n.properties AS n_properties,
2567                       n.content_ref AS n_content_ref,
2568                       am.last_accessed_at AS n_last_accessed_at,
2569                       ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id, e.row_id) AS rn
2570                FROM traversed t
2571                JOIN edges e ON e.row_id = t.edge_row_id
2572                JOIN nodes n ON n.logical_id = t.logical_id
2573                    AND n.superseded_at IS NULL
2574                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2575                WHERE t.depth > 0{endpoint_filter_sql}
2576            )
2577            SELECT root_id,
2578                   e_row_id, e_logical_id, e_source, e_target, e_kind, e_properties,
2579                   e_source_ref, e_confidence,
2580                   n_row_id, n_logical_id, n_kind, n_properties, n_content_ref,
2581                   n_last_accessed_at
2582            FROM numbered
2583            WHERE rn <= {hard_limit}
2584            ORDER BY root_id, n_logical_id, e_row_id",
2585            max_depth = expansion.max_depth,
2586        );
2587
2588        let conn_guard = self.lock_connection()?;
2589        let mut statement = conn_guard
2590            .prepare_cached(&sql)
2591            .map_err(EngineError::Sqlite)?;
2592
2593        let mut bind_values: Vec<Value> = root_ids
2594            .iter()
2595            .map(|id| Value::Text((*id).to_owned()))
2596            .collect();
2597        bind_values.push(Value::Text(expansion.label.clone()));
2598        bind_values.extend(edge_filter_binds);
2599        bind_values.extend(endpoint_filter_binds);
2600
2601        let rows = statement
2602            .query_map(params_from_iter(bind_values.iter()), |row| {
2603                let root_id: String = row.get(0)?;
2604                let edge_row = EdgeRow {
2605                    row_id: row.get(1)?,
2606                    logical_id: row.get(2)?,
2607                    source_logical_id: row.get(3)?,
2608                    target_logical_id: row.get(4)?,
2609                    kind: row.get(5)?,
2610                    properties: row.get(6)?,
2611                    source_ref: row.get(7)?,
2612                    confidence: row.get(8)?,
2613                };
2614                let node_row = NodeRow {
2615                    row_id: row.get(9)?,
2616                    logical_id: row.get(10)?,
2617                    kind: row.get(11)?,
2618                    properties: row.get(12)?,
2619                    content_ref: row.get(13)?,
2620                    last_accessed_at: row.get(14)?,
2621                };
2622                Ok((root_id, edge_row, node_row))
2623            })
2624            .map_err(EngineError::Sqlite)?
2625            .collect::<Result<Vec<_>, _>>()
2626            .map_err(EngineError::Sqlite)?;
2627
2628        let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2629        for (root_id, edge, node) in rows {
2630            per_root.entry(root_id).or_default().push((edge, node));
2631        }
2632
2633        let root_groups = roots
2634            .iter()
2635            .map(|root| EdgeExpansionRootRows {
2636                root_logical_id: root.logical_id.clone(),
2637                pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2638            })
2639            .collect();
2640
2641        Ok(root_groups)
2642    }
2643
2644    /// Validate that all target node kinds reachable via `edge_label` have a
2645    /// registered property-FTS schema. Called at execute time when an expansion
2646    /// slot carries a fused filter predicate.
2647    ///
2648    /// EXECUTE-TIME VALIDATION: this check runs at execute time (not builder
2649    /// time) for expand slots because the target kind set is edge-label-scoped
2650    /// rather than kind-scoped, and is not statically knowable at builder time
2651    /// when multiple target kinds may be reachable via the same label.
2652    /// See Pack 12 docs.
2653    ///
2654    /// # Errors
2655    /// Returns `EngineError::InvalidConfig` if any reachable target kind lacks
2656    /// a registered property-FTS schema.
2657    fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2658        let conn = self.lock_connection()?;
2659        // Collect the distinct node kinds reachable as targets of this edge label.
2660        let mut stmt = conn
2661            .prepare_cached(
2662                "SELECT DISTINCT n.kind \
2663                 FROM edges e \
2664                 JOIN nodes n ON n.logical_id = e.target_logical_id \
2665                 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2666            )
2667            .map_err(EngineError::Sqlite)?;
2668        let target_kinds: Vec<String> = stmt
2669            .query_map(rusqlite::params![edge_label], |row| row.get(0))
2670            .map_err(EngineError::Sqlite)?
2671            .collect::<Result<Vec<_>, _>>()
2672            .map_err(EngineError::Sqlite)?;
2673
2674        for kind in &target_kinds {
2675            let has_schema: bool = conn
2676                .query_row(
2677                    "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2678                    rusqlite::params![kind],
2679                    |row| row.get(0),
2680                )
2681                .map_err(EngineError::Sqlite)?;
2682            if !has_schema {
2683                return Err(EngineError::InvalidConfig(format!(
2684                    "kind {kind:?} has no registered property-FTS schema; register one with \
2685                     admin.register_fts_property_schema(..) before using fused filters on \
2686                     expansion slots, or use JsonPathEq for non-fused semantics \
2687                     (expand slot uses edge label {edge_label:?})"
2688                )));
2689            }
2690        }
2691        Ok(())
2692    }
2693
2694    /// Read a single run by id.
2695    ///
2696    /// # Errors
2697    /// Returns [`EngineError`] if the query fails or if the connection mutex
2698    /// has been poisoned.
2699    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2700        let conn = self.lock_connection()?;
2701        conn.query_row(
2702            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2703            rusqlite::params![id],
2704            |row| {
2705                Ok(RunRow {
2706                    id: row.get(0)?,
2707                    kind: row.get(1)?,
2708                    status: row.get(2)?,
2709                    properties: row.get(3)?,
2710                })
2711            },
2712        )
2713        .optional()
2714        .map_err(EngineError::Sqlite)
2715    }
2716
2717    /// Read a single step by id.
2718    ///
2719    /// # Errors
2720    /// Returns [`EngineError`] if the query fails or if the connection mutex
2721    /// has been poisoned.
2722    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2723        let conn = self.lock_connection()?;
2724        conn.query_row(
2725            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2726            rusqlite::params![id],
2727            |row| {
2728                Ok(StepRow {
2729                    id: row.get(0)?,
2730                    run_id: row.get(1)?,
2731                    kind: row.get(2)?,
2732                    status: row.get(3)?,
2733                    properties: row.get(4)?,
2734                })
2735            },
2736        )
2737        .optional()
2738        .map_err(EngineError::Sqlite)
2739    }
2740
2741    /// Read a single action by id.
2742    ///
2743    /// # Errors
2744    /// Returns [`EngineError`] if the query fails or if the connection mutex
2745    /// has been poisoned.
2746    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2747        let conn = self.lock_connection()?;
2748        conn.query_row(
2749            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2750            rusqlite::params![id],
2751            |row| {
2752                Ok(ActionRow {
2753                    id: row.get(0)?,
2754                    step_id: row.get(1)?,
2755                    kind: row.get(2)?,
2756                    status: row.get(3)?,
2757                    properties: row.get(4)?,
2758                })
2759            },
2760        )
2761        .optional()
2762        .map_err(EngineError::Sqlite)
2763    }
2764
2765    /// Read all active (non-superseded) runs.
2766    ///
2767    /// # Errors
2768    /// Returns [`EngineError`] if the query fails or if the connection mutex
2769    /// has been poisoned.
2770    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2771        let conn = self.lock_connection()?;
2772        let mut stmt = conn
2773            .prepare_cached(
2774                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2775            )
2776            .map_err(EngineError::Sqlite)?;
2777        let rows = stmt
2778            .query_map([], |row| {
2779                Ok(RunRow {
2780                    id: row.get(0)?,
2781                    kind: row.get(1)?,
2782                    status: row.get(2)?,
2783                    properties: row.get(3)?,
2784                })
2785            })
2786            .map_err(EngineError::Sqlite)?
2787            .collect::<Result<Vec<_>, _>>()
2788            .map_err(EngineError::Sqlite)?;
2789        Ok(rows)
2790    }
2791
2792    /// Returns the number of shape→SQL entries currently indexed.
2793    ///
2794    /// Each distinct query shape (structural hash of kind + steps + limits)
2795    /// maps to exactly one SQL string.  This is a test-oriented introspection
2796    /// helper; it does not reflect rusqlite's internal prepared-statement
2797    /// cache, which is keyed by SQL text.
2798    ///
2799    /// # Panics
2800    /// Panics if the internal shape-SQL-map mutex is poisoned.
2801    #[must_use]
2802    #[allow(clippy::expect_used)]
2803    pub fn shape_sql_count(&self) -> usize {
2804        self.shape_sql_map
2805            .lock()
2806            .unwrap_or_else(PoisonError::into_inner)
2807            .len()
2808    }
2809
2810    /// Returns a cloned `Arc` to the schema manager.
2811    #[must_use]
2812    pub fn schema_manager(&self) -> Arc<SchemaManager> {
2813        Arc::clone(&self.schema_manager)
2814    }
2815
2816    /// Return the execution plan for a compiled query without executing it.
2817    ///
2818    /// Useful for debugging, testing shape-hash caching, and operator
2819    /// diagnostics. Does not open a transaction or touch the database beyond
2820    /// checking the statement cache.
2821    ///
2822    /// # Panics
2823    /// Panics if the internal shape-SQL-map mutex is poisoned.
2824    #[must_use]
2825    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2826        let cache_hit = self
2827            .shape_sql_map
2828            .lock()
2829            .unwrap_or_else(PoisonError::into_inner)
2830            .contains_key(&compiled.shape_hash);
2831        QueryPlan {
2832            sql: wrap_node_row_projection_sql(&compiled.sql),
2833            bind_count: compiled.binds.len(),
2834            driving_table: compiled.driving_table,
2835            shape_hash: compiled.shape_hash,
2836            cache_hit,
2837        }
2838    }
2839
2840    /// Execute a named PRAGMA and return the result as a String.
2841    /// Used by Layer 1 tests to verify startup pragma initialization.
2842    ///
2843    /// # Errors
2844    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
2845    /// mutex has been poisoned.
2846    #[doc(hidden)]
2847    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2848        let conn = self.lock_connection()?;
2849        let result = conn
2850            .query_row(&format!("PRAGMA {name}"), [], |row| {
2851                // PRAGMAs may return TEXT or INTEGER; normalise to String.
2852                row.get::<_, rusqlite::types::Value>(0)
2853            })
2854            .map_err(EngineError::Sqlite)?;
2855        let s = match result {
2856            rusqlite::types::Value::Text(t) => t,
2857            rusqlite::types::Value::Integer(i) => i.to_string(),
2858            rusqlite::types::Value::Real(f) => f.to_string(),
2859            rusqlite::types::Value::Blob(_) => {
2860                return Err(EngineError::InvalidWrite(format!(
2861                    "PRAGMA {name} returned an unexpected BLOB value"
2862                )));
2863            }
2864            rusqlite::types::Value::Null => String::new(),
2865        };
2866        Ok(s)
2867    }
2868
2869    /// Return all provenance events whose `subject` matches the given value.
2870    ///
2871    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
2872    /// values (for excise events).
2873    ///
2874    /// # Errors
2875    /// Returns [`EngineError`] if the query fails or if the connection mutex
2876    /// has been poisoned.
2877    pub fn query_provenance_events(
2878        &self,
2879        subject: &str,
2880    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2881        let conn = self.lock_connection()?;
2882        let mut stmt = conn
2883            .prepare_cached(
2884                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2885                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2886            )
2887            .map_err(EngineError::Sqlite)?;
2888        let events = stmt
2889            .query_map(rusqlite::params![subject], |row| {
2890                Ok(ProvenanceEvent {
2891                    id: row.get(0)?,
2892                    event_type: row.get(1)?,
2893                    subject: row.get(2)?,
2894                    source_ref: row.get(3)?,
2895                    metadata_json: row.get(4)?,
2896                    created_at: row.get(5)?,
2897                })
2898            })
2899            .map_err(EngineError::Sqlite)?
2900            .collect::<Result<Vec<_>, _>>()
2901            .map_err(EngineError::Sqlite)?;
2902        Ok(events)
2903    }
2904
2905    /// Check if `kind` has a first-registration async rebuild in progress
2906    /// (`is_first_registration=1` with state PENDING/BUILDING/SWAPPING and no
2907    /// rows yet in the per-kind `fts_props_<kind>` table). If so, execute a
2908    /// full-kind scan and return the nodes. Returns `None` when the normal
2909    /// FTS5 path should run.
2910    fn scan_fallback_if_first_registration(
2911        &self,
2912        kind: &str,
2913    ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2914        let conn = self.lock_connection()?;
2915
2916        // Quick point-lookup: kind has a first-registration rebuild in a
2917        // pre-complete state AND the per-kind table has no rows yet.
2918        let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2919        // Check whether the per-kind table exists before querying its row count.
2920        let table_exists: bool = conn
2921            .query_row(
2922                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2923                rusqlite::params![prop_table],
2924                |_| Ok(true),
2925            )
2926            .optional()?
2927            .unwrap_or(false);
2928        let prop_empty = if table_exists {
2929            let cnt: i64 =
2930                conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2931                    r.get(0)
2932                })?;
2933            cnt == 0
2934        } else {
2935            true
2936        };
2937        let needs_scan: bool = if prop_empty {
2938            conn.query_row(
2939                "SELECT 1 FROM fts_property_rebuild_state \
2940                 WHERE kind = ?1 AND is_first_registration = 1 \
2941                 AND state IN ('PENDING','BUILDING','SWAPPING') \
2942                 LIMIT 1",
2943                rusqlite::params![kind],
2944                |_| Ok(true),
2945            )
2946            .optional()?
2947            .unwrap_or(false)
2948        } else {
2949            false
2950        };
2951
2952        if !needs_scan {
2953            return Ok(None);
2954        }
2955
2956        // Scan fallback: return all active nodes of this kind.
2957        // Intentionally unindexed — acceptable for first-registration window.
2958        let mut stmt = conn
2959            .prepare_cached(
2960                "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2961                 am.last_accessed_at \
2962                 FROM nodes n \
2963                 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2964                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2965            )
2966            .map_err(EngineError::Sqlite)?;
2967
2968        let nodes = stmt
2969            .query_map(rusqlite::params![kind], |row| {
2970                Ok(NodeRow {
2971                    row_id: row.get(0)?,
2972                    logical_id: row.get(1)?,
2973                    kind: row.get(2)?,
2974                    properties: row.get(3)?,
2975                    content_ref: row.get(4)?,
2976                    last_accessed_at: row.get(5)?,
2977                })
2978            })
2979            .map_err(EngineError::Sqlite)?
2980            .collect::<Result<Vec<_>, _>>()
2981            .map_err(EngineError::Sqlite)?;
2982
2983        Ok(Some(nodes))
2984    }
2985
2986    /// Return the current rebuild progress for a kind, or `None` if no rebuild
2987    /// has been registered for that kind.
2988    ///
2989    /// # Errors
2990    /// Returns [`EngineError`] if the database query fails.
2991    pub fn get_property_fts_rebuild_progress(
2992        &self,
2993        kind: &str,
2994    ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2995        let conn = self.lock_connection()?;
2996        let row = conn
2997            .query_row(
2998                "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2999                 FROM fts_property_rebuild_state WHERE kind = ?1",
3000                rusqlite::params![kind],
3001                |r| {
3002                    Ok(crate::rebuild_actor::RebuildProgress {
3003                        state: r.get(0)?,
3004                        rows_total: r.get(1)?,
3005                        rows_done: r.get(2)?,
3006                        started_at: r.get(3)?,
3007                        last_progress_at: r.get(4)?,
3008                        error_message: r.get(5)?,
3009                    })
3010                },
3011            )
3012            .optional()?;
3013        Ok(row)
3014    }
3015}
3016
3017/// Rewrite a `CompiledQuery` whose SQL references the legacy `fts_node_properties`
3018/// table to use the per-kind `fts_props_<kind>` table, or strip the property FTS
3019/// arm entirely when the per-kind table does not exist in `sqlite_master`.
3020///
3021/// Returns `(adapted_sql, adapted_binds)`.
3022///
3023/// This wrapper owns the `sqlite_master` existence check (rusqlite stays out of
3024/// `fathomdb-query`) and delegates the SQL/bind rewriting to
3025/// [`CompiledQuery::adapt_fts_for_kind`].
3026fn adapt_fts_nodes_sql_for_per_kind_tables(
3027    compiled: &CompiledQuery,
3028    conn: &rusqlite::Connection,
3029) -> Result<(String, Vec<BindValue>), EngineError> {
3030    let root_kind = compiled
3031        .binds
3032        .get(1)
3033        .and_then(|b| {
3034            if let BindValue::Text(k) = b {
3035                Some(k.as_str())
3036            } else {
3037                None
3038            }
3039        })
3040        .unwrap_or("");
3041    let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
3042    let prop_table_exists: bool = conn
3043        .query_row(
3044            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3045            rusqlite::params![prop_table],
3046            |_| Ok(true),
3047        )
3048        .optional()
3049        .map_err(EngineError::Sqlite)?
3050        .unwrap_or(false);
3051
3052    Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
3053}
3054
3055/// Check that the active vector profile's model identity and dimensions match
3056/// the supplied embedder. If either differs, emit a warning via `trace_warn!`
3057/// but always return `Ok(())`. This function NEVER returns `Err`.
3058///
3059/// Queries `projection_profiles` directly — no `AdminService` indirection.
3060#[allow(clippy::unnecessary_wraps)]
3061fn check_vec_identity_at_open(
3062    conn: &rusqlite::Connection,
3063    embedder: &dyn QueryEmbedder,
3064) -> Result<(), EngineError> {
3065    let row: Option<String> = conn
3066        .query_row(
3067            "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
3068            [],
3069            |row| row.get(0),
3070        )
3071        .optional()
3072        .unwrap_or(None);
3073
3074    let Some(config_json) = row else {
3075        return Ok(());
3076    };
3077
3078    // Parse leniently — any error means we just skip the check.
3079    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
3080        return Ok(());
3081    };
3082
3083    let identity = embedder.identity();
3084
3085    if let Some(stored_model) = parsed
3086        .get("model_identity")
3087        .and_then(serde_json::Value::as_str)
3088        && stored_model != identity.model_identity
3089    {
3090        trace_warn!(
3091            stored_model_identity = stored_model,
3092            embedder_model_identity = %identity.model_identity,
3093            "vec identity mismatch at open: model_identity differs"
3094        );
3095    }
3096
3097    if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
3098        let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
3099        if stored_dim != identity.dimension {
3100            trace_warn!(
3101                stored_dimensions = stored_dim,
3102                embedder_dimensions = identity.dimension,
3103                "vec identity mismatch at open: dimensions differ"
3104            );
3105        }
3106    }
3107
3108    Ok(())
3109}
3110
3111/// Open-time FTS rebuild guards (Guard 1 + Guard 2).
3112///
3113/// Guard 1: if any registered kind's per-kind `fts_props_<kind>` table is
3114/// missing or empty while live nodes of that kind exist, do a synchronous
3115/// full rebuild.
3116///
3117/// Guard 2: if any recursive schema is registered but
3118/// `fts_node_property_positions` is empty, do a synchronous full rebuild to
3119/// regenerate the position map.
3120///
3121/// Both guards are no-ops on a consistent database.
3122fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
3123    let schema_count: i64 = conn
3124        .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
3125            row.get(0)
3126        })
3127        .map_err(EngineError::Sqlite)?;
3128    if schema_count == 0 {
3129        return Ok(());
3130    }
3131
3132    let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
3133    let needs_position_backfill = if needs_fts_rebuild {
3134        false
3135    } else {
3136        open_guard_check_positions_empty(conn)?
3137    };
3138
3139    if needs_fts_rebuild || needs_position_backfill {
3140        let per_kind_tables: Vec<String> = {
3141            let mut stmt = conn
3142                .prepare(
3143                    "SELECT name FROM sqlite_master \
3144                     WHERE type='table' AND name LIKE 'fts_props_%' \
3145                     AND sql LIKE 'CREATE VIRTUAL TABLE%'",
3146                )
3147                .map_err(EngineError::Sqlite)?;
3148            stmt.query_map([], |r| r.get::<_, String>(0))
3149                .map_err(EngineError::Sqlite)?
3150                .collect::<Result<Vec<_>, _>>()
3151                .map_err(EngineError::Sqlite)?
3152        };
3153        let tx = conn
3154            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
3155            .map_err(EngineError::Sqlite)?;
3156        for table in &per_kind_tables {
3157            tx.execute_batch(&format!("DELETE FROM {table}"))
3158                .map_err(EngineError::Sqlite)?;
3159        }
3160        tx.execute("DELETE FROM fts_node_property_positions", [])
3161            .map_err(EngineError::Sqlite)?;
3162        crate::projection::insert_property_fts_rows(
3163            &tx,
3164            "SELECT logical_id, properties FROM nodes \
3165             WHERE kind = ?1 AND superseded_at IS NULL",
3166        )
3167        .map_err(EngineError::Sqlite)?;
3168        tx.commit().map_err(EngineError::Sqlite)?;
3169    }
3170    Ok(())
3171}
3172
3173fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3174    let kinds: Vec<String> = {
3175        let mut stmt = conn
3176            .prepare("SELECT kind FROM fts_property_schemas")
3177            .map_err(EngineError::Sqlite)?;
3178        stmt.query_map([], |row| row.get::<_, String>(0))
3179            .map_err(EngineError::Sqlite)?
3180            .collect::<Result<Vec<_>, _>>()
3181            .map_err(EngineError::Sqlite)?
3182    };
3183    for kind in &kinds {
3184        let table = fathomdb_schema::fts_kind_table_name(kind);
3185        let table_exists: bool = conn
3186            .query_row(
3187                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3188                rusqlite::params![table],
3189                |_| Ok(true),
3190            )
3191            .optional()
3192            .map_err(EngineError::Sqlite)?
3193            .unwrap_or(false);
3194        let fts_count: i64 = if table_exists {
3195            conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
3196                row.get(0)
3197            })
3198            .map_err(EngineError::Sqlite)?
3199        } else {
3200            0
3201        };
3202        if fts_count == 0 {
3203            let node_count: i64 = conn
3204                .query_row(
3205                    "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
3206                    rusqlite::params![kind],
3207                    |row| row.get(0),
3208                )
3209                .map_err(EngineError::Sqlite)?;
3210            if node_count > 0 {
3211                return Ok(true);
3212            }
3213        }
3214    }
3215    Ok(false)
3216}
3217
3218fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3219    let recursive_count: i64 = conn
3220        .query_row(
3221            "SELECT COUNT(*) FROM fts_property_schemas \
3222             WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
3223            [],
3224            |row| row.get(0),
3225        )
3226        .map_err(EngineError::Sqlite)?;
3227    if recursive_count == 0 {
3228        return Ok(false);
3229    }
3230    let pos_count: i64 = conn
3231        .query_row(
3232            "SELECT COUNT(*) FROM fts_node_property_positions",
3233            [],
3234            |row| row.get(0),
3235        )
3236        .map_err(EngineError::Sqlite)?;
3237    Ok(pos_count == 0)
3238}
3239
3240fn wrap_node_row_projection_sql(base_sql: &str) -> String {
3241    format!(
3242        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
3243         FROM ({base_sql}) q \
3244         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
3245    )
3246}
3247
3248/// Returns `true` when `err` indicates the vec virtual table is absent.
3249///
3250/// Matches:
3251/// - "no such table: vec_<kind>" (per-kind table not yet created by regeneration)
3252/// - "no such module: vec0" (sqlite-vec extension not available)
3253/// - Legacy `"no such table: vec_nodes_active"` (for backward compatibility with
3254///   any remaining tests or DB files referencing the old global table)
3255pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
3256    match err {
3257        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
3258            // Per-kind tables start with "vec_"
3259            (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
3260                || msg.contains("no such module: vec0")
3261        }
3262        _ => false,
3263    }
3264}
3265
3266fn scalar_to_bind(value: &ScalarValue) -> BindValue {
3267    match value {
3268        ScalarValue::Text(text) => BindValue::Text(text.clone()),
3269        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
3270        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
3271    }
3272}
3273
3274/// Merge strict and relaxed search branches into a single block-ordered,
3275/// deduplicated, limit-truncated hit list.
3276///
3277/// Phase 3 rules, in order:
3278///
3279/// 1. Each branch is sorted internally by score descending with `logical_id`
3280///    ascending as the deterministic tiebreak.
3281/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
3282///    once from the chunk surface and once from the property surface) the
3283///    higher-score row wins, then chunk > property > vector, then declaration
3284///    order (chunk first).
3285/// 3. Strict hits form one block and relaxed hits form the next. Strict
3286///    always precedes relaxed in the merged output regardless of per-hit
3287///    score.
3288/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
3289///    already appears in the strict block is dropped.
3290/// 5. The merged output is truncated to `limit`.
3291fn merge_search_branches(
3292    strict: Vec<SearchHit>,
3293    relaxed: Vec<SearchHit>,
3294    limit: usize,
3295) -> Vec<SearchHit> {
3296    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
3297}
3298
3299/// Three-branch generalization of [`merge_search_branches`]: orders hits as
3300/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
3301/// Semantics, with cross-branch dedup resolved by branch precedence
3302/// (strict > relaxed > vector). Within each block the existing
3303/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
3304/// priority chunk > property > vector).
3305///
3306/// Phase 12 (the unified `search()` entry point) calls this directly. The
3307/// two-branch [`merge_search_branches`] wrapper is preserved as a
3308/// convenience for the text-only `execute_compiled_search_plan` path; both
3309/// reduce to the same code.
3310fn merge_search_branches_three(
3311    strict: Vec<SearchHit>,
3312    relaxed: Vec<SearchHit>,
3313    vector: Vec<SearchHit>,
3314    limit: usize,
3315) -> Vec<SearchHit> {
3316    let strict_block = dedup_branch_hits(strict);
3317    let relaxed_block = dedup_branch_hits(relaxed);
3318    let vector_block = dedup_branch_hits(vector);
3319
3320    let mut seen: std::collections::HashSet<String> = strict_block
3321        .iter()
3322        .map(|h| h.node.logical_id.clone())
3323        .collect();
3324
3325    let mut merged = strict_block;
3326    for hit in relaxed_block {
3327        if seen.insert(hit.node.logical_id.clone()) {
3328            merged.push(hit);
3329        }
3330    }
3331    for hit in vector_block {
3332        if seen.insert(hit.node.logical_id.clone()) {
3333            merged.push(hit);
3334        }
3335    }
3336
3337    if merged.len() > limit {
3338        merged.truncate(limit);
3339    }
3340    merged
3341}
3342
3343/// Sort a branch's hits by score descending + `logical_id` ascending, then
3344/// dedup duplicate `logical_id`s within the branch using source priority
3345/// (chunk > property > vector) and declaration order.
3346fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
3347    hits.sort_by(|a, b| {
3348        b.score
3349            .partial_cmp(&a.score)
3350            .unwrap_or(std::cmp::Ordering::Equal)
3351            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
3352            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
3353    });
3354
3355    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
3356    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
3357    hits
3358}
3359
3360fn source_priority(source: SearchHitSource) -> u8 {
3361    // Lower is better. Chunk is declared before property in the CTE; vector
3362    // is reserved for future wiring but comes last among the known variants.
3363    match source {
3364        SearchHitSource::Chunk => 0,
3365        SearchHitSource::Property => 1,
3366        SearchHitSource::Vector => 2,
3367    }
3368}
3369
3370/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
3371/// output so the coordinator can recover per-term byte offsets in the
3372/// original `text_content` column.
3373///
3374/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
3375/// ("start of text") byte. These bytes are safe for all valid JSON text
3376/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
3377/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
3378/// extracted blob, making the sentinel ambiguous for that row. Such input
3379/// is treated as out of scope for attribution correctness — hits on those
3380/// rows may have misattributed `matched_paths`, but no panic or query
3381/// failure occurs. A future hardening step could strip bytes < 0x20 at
3382/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
3383///
3384/// Using one-byte markers keeps the original-to-highlighted offset
3385/// accounting trivial: every sentinel adds exactly one byte to the
3386/// highlighted string.
3387const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
3388const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
3389
3390/// Load the `fts_node_property_positions` sidecar rows for a given
3391/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
3392/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
3393fn load_position_map(
3394    conn: &Connection,
3395    logical_id: &str,
3396    kind: &str,
3397) -> Result<Vec<(usize, usize, String)>, EngineError> {
3398    let mut stmt = conn
3399        .prepare_cached(
3400            "SELECT start_offset, end_offset, leaf_path \
3401             FROM fts_node_property_positions \
3402             WHERE node_logical_id = ?1 AND kind = ?2 \
3403             ORDER BY start_offset ASC",
3404        )
3405        .map_err(EngineError::Sqlite)?;
3406    let rows = stmt
3407        .query_map(rusqlite::params![logical_id, kind], |row| {
3408            let start: i64 = row.get(0)?;
3409            let end: i64 = row.get(1)?;
3410            let path: String = row.get(2)?;
3411            // Offsets are non-negative and within blob byte limits; on the
3412            // off chance a corrupt row is encountered, fall back to 0 so
3413            // lookups silently skip it rather than panicking.
3414            let start = usize::try_from(start).unwrap_or(0);
3415            let end = usize::try_from(end).unwrap_or(0);
3416            Ok((start, end, path))
3417        })
3418        .map_err(EngineError::Sqlite)?;
3419    let mut out = Vec::new();
3420    for row in rows {
3421        out.push(row.map_err(EngineError::Sqlite)?);
3422    }
3423    Ok(out)
3424}
3425
3426/// Parse a `highlight()`-wrapped string, returning the list of original-text
3427/// byte offsets at which matched terms begin. `wrapped` is the
3428/// highlight-decorated form of a text column; `open` / `close` are the
3429/// sentinel markers passed to `highlight()`. The returned offsets refer to
3430/// positions in the *original* text (i.e. the column as it would be stored
3431/// without highlight decoration).
3432fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3433    let mut offsets = Vec::new();
3434    let bytes = wrapped.as_bytes();
3435    let open_bytes = open.as_bytes();
3436    let close_bytes = close.as_bytes();
3437    let mut i = 0usize;
3438    // Number of sentinel bytes consumed so far — every marker encountered
3439    // subtracts from the wrapped-string index to get the original offset.
3440    let mut marker_bytes_seen = 0usize;
3441    while i < bytes.len() {
3442        if bytes[i..].starts_with(open_bytes) {
3443            // Record the original-text offset of the term following the open
3444            // marker.
3445            let original_offset = i - marker_bytes_seen;
3446            offsets.push(original_offset);
3447            i += open_bytes.len();
3448            marker_bytes_seen += open_bytes.len();
3449        } else if bytes[i..].starts_with(close_bytes) {
3450            i += close_bytes.len();
3451            marker_bytes_seen += close_bytes.len();
3452        } else {
3453            i += 1;
3454        }
3455    }
3456    offsets
3457}
3458
3459/// Binary-search the position map for the leaf whose `[start, end)` range
3460/// contains `offset`. Returns `None` if no leaf covers the offset.
3461fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3462    // Binary search for the greatest start_offset <= offset.
3463    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3464        Ok(i) => i,
3465        Err(0) => return None,
3466        Err(i) => i - 1,
3467    };
3468    let (start, end, path) = &positions[idx];
3469    if offset >= *start && offset < *end {
3470        Some(path.as_str())
3471    } else {
3472        None
3473    }
3474}
3475
3476/// Resolve per-hit match attribution by introspecting the FTS5 match state
3477/// for the hit's row via `highlight()` and mapping the resulting original-
3478/// text offsets back to recursive-leaf paths via the Phase 4 position map.
3479///
3480/// Chunk-backed hits carry no leaf structure and always return
3481/// `matched_paths = ["text_content"]`. Property-backed hits without a `projection_row_id`
3482/// (which should not happen — the search CTE always populates it) also
3483/// return empty attribution rather than erroring.
3484fn resolve_hit_attribution(
3485    conn: &Connection,
3486    hit: &SearchHit,
3487    match_expr: &str,
3488) -> Result<HitAttribution, EngineError> {
3489    if matches!(hit.source, SearchHitSource::Chunk) {
3490        return Ok(HitAttribution {
3491            matched_paths: vec!["text_content".to_owned()],
3492        });
3493    }
3494    if !matches!(hit.source, SearchHitSource::Property) {
3495        return Ok(HitAttribution {
3496            matched_paths: Vec::new(),
3497        });
3498    }
3499    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3500        return Ok(HitAttribution {
3501            matched_paths: Vec::new(),
3502        });
3503    };
3504    let rowid: i64 = match rowid_str.parse() {
3505        Ok(v) => v,
3506        Err(_) => {
3507            return Ok(HitAttribution {
3508                matched_paths: Vec::new(),
3509            });
3510        }
3511    };
3512
3513    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
3514    // FTS5 MATCH in the WHERE clause re-establishes the match state that
3515    // `highlight()` needs to decorate the returned text.
3516    // Per-kind tables have schema (node_logical_id UNINDEXED, text_content),
3517    // so text_content is column index 1 (0-based).
3518    let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3519    let highlight_sql = format!(
3520        "SELECT highlight({prop_table}, 1, ?1, ?2) \
3521         FROM {prop_table} \
3522         WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3523    );
3524    let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3525    let wrapped: Option<String> = stmt
3526        .query_row(
3527            rusqlite::params![
3528                ATTRIBUTION_HIGHLIGHT_OPEN,
3529                ATTRIBUTION_HIGHLIGHT_CLOSE,
3530                rowid,
3531                match_expr,
3532            ],
3533            |row| row.get(0),
3534        )
3535        .optional()
3536        .map_err(EngineError::Sqlite)?;
3537    let Some(wrapped) = wrapped else {
3538        return Ok(HitAttribution {
3539            matched_paths: Vec::new(),
3540        });
3541    };
3542
3543    let offsets = parse_highlight_offsets(
3544        &wrapped,
3545        ATTRIBUTION_HIGHLIGHT_OPEN,
3546        ATTRIBUTION_HIGHLIGHT_CLOSE,
3547    );
3548    if offsets.is_empty() {
3549        return Ok(HitAttribution {
3550            matched_paths: Vec::new(),
3551        });
3552    }
3553
3554    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3555    if positions.is_empty() {
3556        // Scalar-only schemas have no position-map entries; attribution
3557        // degrades to an empty vector rather than erroring.
3558        return Ok(HitAttribution {
3559            matched_paths: Vec::new(),
3560        });
3561    }
3562
3563    let mut matched_paths: Vec<String> = Vec::new();
3564    for offset in offsets {
3565        if let Some(path) = find_leaf_for_offset(&positions, offset)
3566            && !matched_paths.iter().any(|p| p == path)
3567        {
3568            matched_paths.push(path.to_owned());
3569        }
3570    }
3571    Ok(HitAttribution { matched_paths })
3572}
3573
3574/// Build a BM25 scoring expression for a per-kind FTS5 table.
3575///
3576/// If the schema has no weighted specs (all weights None), returns `bm25({table})`.
3577/// Otherwise returns `bm25({table}, 0.0, w1, w2, ...)` where the first weight
3578/// (0.0) is for the `node_logical_id UNINDEXED` column (which BM25 should ignore),
3579/// then one weight per spec in schema order.
3580fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3581    let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3582    let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3583    if !any_weighted {
3584        return format!("bm25({table})");
3585    }
3586    // node_logical_id is UNINDEXED — weight 0.0 tells BM25 to ignore it.
3587    let weights: Vec<String> = std::iter::once("0.0".to_owned())
3588        .chain(
3589            schema
3590                .paths
3591                .iter()
3592                .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3593        )
3594        .collect();
3595    format!("bm25({table}, {})", weights.join(", "))
3596}
3597
3598fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3599    match value {
3600        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3601        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3602        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3603    }
3604}
3605
3606#[cfg(test)]
3607#[allow(clippy::expect_used)]
3608mod tests {
3609    use std::panic::{AssertUnwindSafe, catch_unwind};
3610    use std::sync::Arc;
3611
3612    use fathomdb_query::{BindValue, QueryBuilder};
3613    use fathomdb_schema::SchemaManager;
3614    use rusqlite::types::Value;
3615    use tempfile::NamedTempFile;
3616
3617    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3618
3619    use fathomdb_query::{
3620        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3621    };
3622
3623    use super::{
3624        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3625        wrap_node_row_projection_sql,
3626    };
3627
3628    fn mk_hit(
3629        logical_id: &str,
3630        score: f64,
3631        match_mode: SearchMatchMode,
3632        source: SearchHitSource,
3633    ) -> SearchHit {
3634        SearchHit {
3635            node: NodeRowLite {
3636                row_id: format!("{logical_id}-row"),
3637                logical_id: logical_id.to_owned(),
3638                kind: "Goal".to_owned(),
3639                properties: "{}".to_owned(),
3640                content_ref: None,
3641                last_accessed_at: None,
3642            },
3643            score,
3644            modality: RetrievalModality::Text,
3645            source,
3646            match_mode: Some(match_mode),
3647            snippet: None,
3648            written_at: 0,
3649            projection_row_id: None,
3650            vector_distance: None,
3651            attribution: None,
3652        }
3653    }
3654
3655    #[test]
3656    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3657        let strict = vec![mk_hit(
3658            "a",
3659            1.0,
3660            SearchMatchMode::Strict,
3661            SearchHitSource::Chunk,
3662        )];
3663        // Relaxed has a higher score but must still come second.
3664        let relaxed = vec![mk_hit(
3665            "b",
3666            9.9,
3667            SearchMatchMode::Relaxed,
3668            SearchHitSource::Chunk,
3669        )];
3670        let merged = merge_search_branches(strict, relaxed, 10);
3671        assert_eq!(merged.len(), 2);
3672        assert_eq!(merged[0].node.logical_id, "a");
3673        assert!(matches!(
3674            merged[0].match_mode,
3675            Some(SearchMatchMode::Strict)
3676        ));
3677        assert_eq!(merged[1].node.logical_id, "b");
3678        assert!(matches!(
3679            merged[1].match_mode,
3680            Some(SearchMatchMode::Relaxed)
3681        ));
3682    }
3683
3684    #[test]
3685    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3686        let strict = vec![mk_hit(
3687            "shared",
3688            1.0,
3689            SearchMatchMode::Strict,
3690            SearchHitSource::Chunk,
3691        )];
3692        let relaxed = vec![
3693            mk_hit(
3694                "shared",
3695                9.9,
3696                SearchMatchMode::Relaxed,
3697                SearchHitSource::Chunk,
3698            ),
3699            mk_hit(
3700                "other",
3701                2.0,
3702                SearchMatchMode::Relaxed,
3703                SearchHitSource::Chunk,
3704            ),
3705        ];
3706        let merged = merge_search_branches(strict, relaxed, 10);
3707        assert_eq!(merged.len(), 2);
3708        assert_eq!(merged[0].node.logical_id, "shared");
3709        assert!(matches!(
3710            merged[0].match_mode,
3711            Some(SearchMatchMode::Strict)
3712        ));
3713        assert_eq!(merged[1].node.logical_id, "other");
3714        assert!(matches!(
3715            merged[1].match_mode,
3716            Some(SearchMatchMode::Relaxed)
3717        ));
3718    }
3719
3720    #[test]
3721    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3722        let strict = vec![
3723            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3724            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3725            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3726        ];
3727        let merged = merge_search_branches(strict, vec![], 10);
3728        assert_eq!(
3729            merged
3730                .iter()
3731                .map(|h| &h.node.logical_id)
3732                .collect::<Vec<_>>(),
3733            vec!["a", "c", "b"]
3734        );
3735    }
3736
3737    #[test]
3738    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3739        let strict = vec![
3740            mk_hit(
3741                "shared",
3742                1.0,
3743                SearchMatchMode::Strict,
3744                SearchHitSource::Property,
3745            ),
3746            mk_hit(
3747                "shared",
3748                1.0,
3749                SearchMatchMode::Strict,
3750                SearchHitSource::Chunk,
3751            ),
3752        ];
3753        let merged = merge_search_branches(strict, vec![], 10);
3754        assert_eq!(merged.len(), 1);
3755        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3756    }
3757
3758    #[test]
3759    fn merge_truncates_to_limit_after_block_merge() {
3760        let strict = vec![
3761            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3762            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3763        ];
3764        let relaxed = vec![mk_hit(
3765            "c",
3766            9.0,
3767            SearchMatchMode::Relaxed,
3768            SearchHitSource::Chunk,
3769        )];
3770        let merged = merge_search_branches(strict, relaxed, 2);
3771        assert_eq!(merged.len(), 2);
3772        assert_eq!(merged[0].node.logical_id, "a");
3773        assert_eq!(merged[1].node.logical_id, "b");
3774    }
3775
3776    /// P12 architectural pin: the generalized three-branch merger must
3777    /// produce strict -> relaxed -> vector block ordering with cross-branch
3778    /// dedup resolved by branch precedence (strict > relaxed > vector).
3779    /// v1 `search()` policy never fires the vector branch through the
3780    /// unified planner because read-time embedding is deferred, but the
3781    /// merge helper itself must be ready for the day the planner does so —
3782    /// otherwise wiring the future phase requires touching the core merge
3783    /// code as well as the planner.
3784    #[test]
3785    fn search_architecturally_supports_three_branch_fusion() {
3786        let strict = vec![mk_hit(
3787            "alpha",
3788            1.0,
3789            SearchMatchMode::Strict,
3790            SearchHitSource::Chunk,
3791        )];
3792        let relaxed = vec![mk_hit(
3793            "bravo",
3794            5.0,
3795            SearchMatchMode::Relaxed,
3796            SearchHitSource::Chunk,
3797        )];
3798        // Synthetic vector hit with the highest score. Three-block ordering
3799        // must still place it last.
3800        let mut vector_hit = mk_hit(
3801            "charlie",
3802            9.9,
3803            SearchMatchMode::Strict,
3804            SearchHitSource::Vector,
3805        );
3806        // Vector hits actually carry match_mode=None per the addendum, but
3807        // the merge helper's ordering is mode-agnostic; we override here to
3808        // pin the modality field for the test.
3809        vector_hit.match_mode = None;
3810        vector_hit.modality = RetrievalModality::Vector;
3811        let vector = vec![vector_hit];
3812
3813        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3814        assert_eq!(merged.len(), 3);
3815        assert_eq!(merged[0].node.logical_id, "alpha");
3816        assert_eq!(merged[1].node.logical_id, "bravo");
3817        assert_eq!(merged[2].node.logical_id, "charlie");
3818        // Vector block comes last regardless of its higher score.
3819        assert!(matches!(merged[2].source, SearchHitSource::Vector));
3820
3821        // Cross-branch dedup: a logical_id that appears in multiple branches
3822        // is attributed to its highest-priority originating branch only.
3823        let strict2 = vec![mk_hit(
3824            "shared",
3825            0.5,
3826            SearchMatchMode::Strict,
3827            SearchHitSource::Chunk,
3828        )];
3829        let relaxed2 = vec![mk_hit(
3830            "shared",
3831            5.0,
3832            SearchMatchMode::Relaxed,
3833            SearchHitSource::Chunk,
3834        )];
3835        let mut vshared = mk_hit(
3836            "shared",
3837            9.9,
3838            SearchMatchMode::Strict,
3839            SearchHitSource::Vector,
3840        );
3841        vshared.match_mode = None;
3842        vshared.modality = RetrievalModality::Vector;
3843        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3844        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3845        assert!(matches!(
3846            merged2[0].match_mode,
3847            Some(SearchMatchMode::Strict)
3848        ));
3849        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3850
3851        // Relaxed wins over vector when strict is absent.
3852        let mut vshared2 = mk_hit(
3853            "shared",
3854            9.9,
3855            SearchMatchMode::Strict,
3856            SearchHitSource::Vector,
3857        );
3858        vshared2.match_mode = None;
3859        vshared2.modality = RetrievalModality::Vector;
3860        let merged3 = merge_search_branches_three(
3861            vec![],
3862            vec![mk_hit(
3863                "shared",
3864                1.0,
3865                SearchMatchMode::Relaxed,
3866                SearchHitSource::Chunk,
3867            )],
3868            vec![vshared2],
3869            10,
3870        );
3871        assert_eq!(merged3.len(), 1);
3872        assert!(matches!(
3873            merged3[0].match_mode,
3874            Some(SearchMatchMode::Relaxed)
3875        ));
3876    }
3877
3878    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
3879    /// never fires this shape today (read-time embedding is deferred), but
3880    /// when it does the merger will see empty strict + empty relaxed + a
3881    /// non-empty vector block. The three-branch merger must pass that
3882    /// block through unchanged, preserving `RetrievalModality::Vector`,
3883    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
3884    ///
3885    /// Note: the review spec asked for `vector_hit_count == 1` /
3886    /// `strict_hit_count == 0` assertions. Those are fields on
3887    /// `SearchRows`, which is assembled one layer up in
3888    /// `execute_compiled_retrieval_plan`. The merger returns a bare
3889    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
3890    /// directly on the returned vec (block shape + per-hit fields).
3891    #[test]
3892    fn merge_search_branches_three_vector_only_preserves_vector_block() {
3893        let mut vector_hit = mk_hit(
3894            "solo",
3895            0.75,
3896            SearchMatchMode::Strict,
3897            SearchHitSource::Vector,
3898        );
3899        vector_hit.match_mode = None;
3900        vector_hit.modality = RetrievalModality::Vector;
3901
3902        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3903
3904        assert_eq!(merged.len(), 1);
3905        assert_eq!(merged[0].node.logical_id, "solo");
3906        assert!(matches!(merged[0].source, SearchHitSource::Vector));
3907        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3908        assert!(
3909            merged[0].match_mode.is_none(),
3910            "vector hits carry match_mode=None per addendum 1"
3911        );
3912    }
3913
3914    /// P12-N-3: limit truncation must preserve block precedence — when the
3915    /// strict block alone already exceeds the limit, relaxed and vector
3916    /// hits must be dropped entirely even if they have higher raw scores.
3917    ///
3918    /// Note: the review spec asked for `strict_hit_count == 2` /
3919    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
3920    /// are `SearchRows` fields assembled one layer up. Since
3921    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
3922    /// test asserts the corresponding invariants directly: the returned
3923    /// vec contains exactly the top two strict hits, with no relaxed or
3924    /// vector hits leaking past the limit.
3925    #[test]
3926    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3927        let strict = vec![
3928            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3929            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3930            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3931        ];
3932        let relaxed = vec![mk_hit(
3933            "d",
3934            9.0,
3935            SearchMatchMode::Relaxed,
3936            SearchHitSource::Chunk,
3937        )];
3938        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3939        vector_hit.match_mode = None;
3940        vector_hit.modality = RetrievalModality::Vector;
3941        let vector = vec![vector_hit];
3942
3943        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3944
3945        assert_eq!(merged.len(), 2);
3946        assert_eq!(merged[0].node.logical_id, "a");
3947        assert_eq!(merged[1].node.logical_id, "b");
3948        // Neither relaxed nor vector hits made it past the limit.
3949        assert!(
3950            merged
3951                .iter()
3952                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3953            "strict block must win limit contention against higher-scored relaxed/vector hits"
3954        );
3955        assert!(
3956            merged
3957                .iter()
3958                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3959            "no vector source hits should leak past the limit"
3960        );
3961    }
3962
3963    #[test]
3964    fn is_vec_table_absent_matches_known_error_messages() {
3965        use rusqlite::ffi;
3966        fn make_err(msg: &str) -> rusqlite::Error {
3967            rusqlite::Error::SqliteFailure(
3968                ffi::Error {
3969                    code: ffi::ErrorCode::Unknown,
3970                    extended_code: 1,
3971                },
3972                Some(msg.to_owned()),
3973            )
3974        }
3975        assert!(is_vec_table_absent(&make_err(
3976            "no such table: vec_nodes_active"
3977        )));
3978        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3979        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3980        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3981        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3982    }
3983
3984    // --- 0.5.0 item 6: per-kind vec table in coordinator ---
3985
3986    #[test]
3987    fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
3988        // Without sqlite-vec feature or without creating vec_mykind, querying
3989        // for kind "MyKind" should degrade gracefully (was_degraded=true).
3990        let db = NamedTempFile::new().expect("temporary db");
3991        let coordinator = ExecutionCoordinator::open(
3992            db.path(),
3993            Arc::new(SchemaManager::new()),
3994            None,
3995            1,
3996            Arc::new(TelemetryCounters::default()),
3997            None,
3998        )
3999        .expect("coordinator");
4000
4001        let compiled = QueryBuilder::nodes("MyKind")
4002            .vector_search("some query", 5)
4003            .compile()
4004            .expect("vector query compiles");
4005
4006        let rows = coordinator
4007            .execute_compiled_read(&compiled)
4008            .expect("degraded read must succeed");
4009        assert!(
4010            rows.was_degraded,
4011            "must degrade when vec_mykind table does not exist"
4012        );
4013        assert!(
4014            rows.nodes.is_empty(),
4015            "degraded result must return empty nodes"
4016        );
4017    }
4018
4019    #[test]
4020    fn bind_value_text_maps_to_sql_text() {
4021        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
4022        assert_eq!(val, Value::Text("hello".to_owned()));
4023    }
4024
4025    #[test]
4026    fn bind_value_integer_maps_to_sql_integer() {
4027        let val = bind_value_to_sql(&BindValue::Integer(42));
4028        assert_eq!(val, Value::Integer(42));
4029    }
4030
4031    #[test]
4032    fn bind_value_bool_true_maps_to_integer_one() {
4033        let val = bind_value_to_sql(&BindValue::Bool(true));
4034        assert_eq!(val, Value::Integer(1));
4035    }
4036
4037    #[test]
4038    fn bind_value_bool_false_maps_to_integer_zero() {
4039        let val = bind_value_to_sql(&BindValue::Bool(false));
4040        assert_eq!(val, Value::Integer(0));
4041    }
4042
4043    #[test]
4044    fn same_shape_queries_share_one_cache_entry() {
4045        let db = NamedTempFile::new().expect("temporary db");
4046        let coordinator = ExecutionCoordinator::open(
4047            db.path(),
4048            Arc::new(SchemaManager::new()),
4049            None,
4050            1,
4051            Arc::new(TelemetryCounters::default()),
4052            None,
4053        )
4054        .expect("coordinator");
4055
4056        let compiled_a = QueryBuilder::nodes("Meeting")
4057            .text_search("budget", 5)
4058            .limit(10)
4059            .compile()
4060            .expect("compiled a");
4061        let compiled_b = QueryBuilder::nodes("Meeting")
4062            .text_search("standup", 5)
4063            .limit(10)
4064            .compile()
4065            .expect("compiled b");
4066
4067        coordinator
4068            .execute_compiled_read(&compiled_a)
4069            .expect("read a");
4070        coordinator
4071            .execute_compiled_read(&compiled_b)
4072            .expect("read b");
4073
4074        assert_eq!(
4075            compiled_a.shape_hash, compiled_b.shape_hash,
4076            "different bind values, same structural shape → same hash"
4077        );
4078        assert_eq!(coordinator.shape_sql_count(), 1);
4079    }
4080
4081    #[test]
4082    fn vector_read_degrades_gracefully_when_vec_table_absent() {
4083        let db = NamedTempFile::new().expect("temporary db");
4084        let coordinator = ExecutionCoordinator::open(
4085            db.path(),
4086            Arc::new(SchemaManager::new()),
4087            None,
4088            1,
4089            Arc::new(TelemetryCounters::default()),
4090            None,
4091        )
4092        .expect("coordinator");
4093
4094        let compiled = QueryBuilder::nodes("Meeting")
4095            .vector_search("budget embeddings", 5)
4096            .compile()
4097            .expect("vector query compiles");
4098
4099        let result = coordinator.execute_compiled_read(&compiled);
4100        let rows = result.expect("degraded read must succeed, not error");
4101        assert!(
4102            rows.was_degraded,
4103            "result must be flagged as degraded when vec_nodes_active is absent"
4104        );
4105        assert!(
4106            rows.nodes.is_empty(),
4107            "degraded result must return empty nodes"
4108        );
4109    }
4110
4111    #[test]
4112    fn coordinator_caches_by_shape_hash() {
4113        let db = NamedTempFile::new().expect("temporary db");
4114        let coordinator = ExecutionCoordinator::open(
4115            db.path(),
4116            Arc::new(SchemaManager::new()),
4117            None,
4118            1,
4119            Arc::new(TelemetryCounters::default()),
4120            None,
4121        )
4122        .expect("coordinator");
4123
4124        let compiled = QueryBuilder::nodes("Meeting")
4125            .text_search("budget", 5)
4126            .compile()
4127            .expect("compiled query");
4128
4129        coordinator
4130            .execute_compiled_read(&compiled)
4131            .expect("execute compiled read");
4132        assert_eq!(coordinator.shape_sql_count(), 1);
4133    }
4134
4135    // --- Item 6: explain_compiled_read tests ---
4136
4137    #[test]
4138    fn explain_returns_correct_sql() {
4139        let db = NamedTempFile::new().expect("temporary db");
4140        let coordinator = ExecutionCoordinator::open(
4141            db.path(),
4142            Arc::new(SchemaManager::new()),
4143            None,
4144            1,
4145            Arc::new(TelemetryCounters::default()),
4146            None,
4147        )
4148        .expect("coordinator");
4149
4150        let compiled = QueryBuilder::nodes("Meeting")
4151            .text_search("budget", 5)
4152            .compile()
4153            .expect("compiled query");
4154
4155        let plan = coordinator.explain_compiled_read(&compiled);
4156
4157        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
4158    }
4159
4160    #[test]
4161    fn explain_returns_correct_driving_table() {
4162        use fathomdb_query::DrivingTable;
4163
4164        let db = NamedTempFile::new().expect("temporary db");
4165        let coordinator = ExecutionCoordinator::open(
4166            db.path(),
4167            Arc::new(SchemaManager::new()),
4168            None,
4169            1,
4170            Arc::new(TelemetryCounters::default()),
4171            None,
4172        )
4173        .expect("coordinator");
4174
4175        let compiled = QueryBuilder::nodes("Meeting")
4176            .text_search("budget", 5)
4177            .compile()
4178            .expect("compiled query");
4179
4180        let plan = coordinator.explain_compiled_read(&compiled);
4181
4182        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
4183    }
4184
4185    #[test]
4186    fn explain_reports_cache_miss_then_hit() {
4187        let db = NamedTempFile::new().expect("temporary db");
4188        let coordinator = ExecutionCoordinator::open(
4189            db.path(),
4190            Arc::new(SchemaManager::new()),
4191            None,
4192            1,
4193            Arc::new(TelemetryCounters::default()),
4194            None,
4195        )
4196        .expect("coordinator");
4197
4198        let compiled = QueryBuilder::nodes("Meeting")
4199            .text_search("budget", 5)
4200            .compile()
4201            .expect("compiled query");
4202
4203        // Before execution: cache miss
4204        let plan_before = coordinator.explain_compiled_read(&compiled);
4205        assert!(
4206            !plan_before.cache_hit,
4207            "cache miss expected before first execute"
4208        );
4209
4210        // Execute to populate cache
4211        coordinator
4212            .execute_compiled_read(&compiled)
4213            .expect("execute read");
4214
4215        // After execution: cache hit
4216        let plan_after = coordinator.explain_compiled_read(&compiled);
4217        assert!(
4218            plan_after.cache_hit,
4219            "cache hit expected after first execute"
4220        );
4221    }
4222
4223    #[test]
4224    fn explain_does_not_execute_query() {
4225        // Call explain_compiled_read on an empty database. If explain were
4226        // actually executing SQL, it would return Ok with 0 rows. But the
4227        // key assertion is that it returns a QueryPlan (not an error) even
4228        // without touching the database.
4229        let db = NamedTempFile::new().expect("temporary db");
4230        let coordinator = ExecutionCoordinator::open(
4231            db.path(),
4232            Arc::new(SchemaManager::new()),
4233            None,
4234            1,
4235            Arc::new(TelemetryCounters::default()),
4236            None,
4237        )
4238        .expect("coordinator");
4239
4240        let compiled = QueryBuilder::nodes("Meeting")
4241            .text_search("anything", 5)
4242            .compile()
4243            .expect("compiled query");
4244
4245        // This must not error, even though the database is empty
4246        let plan = coordinator.explain_compiled_read(&compiled);
4247
4248        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
4249        assert_eq!(plan.bind_count, compiled.binds.len());
4250    }
4251
4252    #[test]
4253    fn coordinator_executes_compiled_read() {
4254        let db = NamedTempFile::new().expect("temporary db");
4255        let coordinator = ExecutionCoordinator::open(
4256            db.path(),
4257            Arc::new(SchemaManager::new()),
4258            None,
4259            1,
4260            Arc::new(TelemetryCounters::default()),
4261            None,
4262        )
4263        .expect("coordinator");
4264        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4265
4266        conn.execute_batch(
4267            r#"
4268            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4269            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
4270            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4271            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
4272            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4273            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
4274            "#,
4275        )
4276        .expect("seed data");
4277
4278        let compiled = QueryBuilder::nodes("Meeting")
4279            .text_search("budget", 5)
4280            .limit(5)
4281            .compile()
4282            .expect("compiled query");
4283
4284        let rows = coordinator
4285            .execute_compiled_read(&compiled)
4286            .expect("execute read");
4287
4288        assert_eq!(rows.nodes.len(), 1);
4289        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4290    }
4291
4292    #[test]
4293    fn text_search_finds_structured_only_node_via_property_fts() {
4294        let db = NamedTempFile::new().expect("temporary db");
4295        let coordinator = ExecutionCoordinator::open(
4296            db.path(),
4297            Arc::new(SchemaManager::new()),
4298            None,
4299            1,
4300            Arc::new(TelemetryCounters::default()),
4301            None,
4302        )
4303        .expect("coordinator");
4304        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4305
4306        // Insert a structured-only node (no chunks) with a property FTS row.
4307        // Per-kind table fts_props_goal must be created before inserting.
4308        conn.execute_batch(
4309            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
4310                node_logical_id UNINDEXED, text_content, \
4311                tokenize = 'porter unicode61 remove_diacritics 2'\
4312            )",
4313        )
4314        .expect("create per-kind fts table");
4315        conn.execute_batch(
4316            r#"
4317            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4318            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
4319            INSERT INTO fts_props_goal (node_logical_id, text_content)
4320            VALUES ('goal-1', 'Ship v2');
4321            "#,
4322        )
4323        .expect("seed data");
4324
4325        let compiled = QueryBuilder::nodes("Goal")
4326            .text_search("Ship", 5)
4327            .limit(5)
4328            .compile()
4329            .expect("compiled query");
4330
4331        let rows = coordinator
4332            .execute_compiled_read(&compiled)
4333            .expect("execute read");
4334
4335        assert_eq!(rows.nodes.len(), 1);
4336        assert_eq!(rows.nodes[0].logical_id, "goal-1");
4337    }
4338
4339    #[test]
4340    fn text_search_returns_both_chunk_and_property_backed_hits() {
4341        let db = NamedTempFile::new().expect("temporary db");
4342        let coordinator = ExecutionCoordinator::open(
4343            db.path(),
4344            Arc::new(SchemaManager::new()),
4345            None,
4346            1,
4347            Arc::new(TelemetryCounters::default()),
4348            None,
4349        )
4350        .expect("coordinator");
4351        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4352
4353        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
4354        conn.execute_batch(
4355            r"
4356            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4357            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4358            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4359            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
4360            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4361            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
4362            ",
4363        )
4364        .expect("seed chunk-backed node");
4365
4366        // Property-backed hit: a Meeting with property FTS containing "quarterly".
4367        // Per-kind table fts_props_meeting must be created before inserting.
4368        conn.execute_batch(
4369            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
4370                node_logical_id UNINDEXED, text_content, \
4371                tokenize = 'porter unicode61 remove_diacritics 2'\
4372            )",
4373        )
4374        .expect("create per-kind fts table");
4375        conn.execute_batch(
4376            r#"
4377            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4378            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
4379            INSERT INTO fts_props_meeting (node_logical_id, text_content)
4380            VALUES ('meeting-2', 'quarterly sync');
4381            "#,
4382        )
4383        .expect("seed property-backed node");
4384
4385        let compiled = QueryBuilder::nodes("Meeting")
4386            .text_search("quarterly", 10)
4387            .limit(10)
4388            .compile()
4389            .expect("compiled query");
4390
4391        let rows = coordinator
4392            .execute_compiled_read(&compiled)
4393            .expect("execute read");
4394
4395        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
4396        ids.sort_unstable();
4397        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
4398    }
4399
4400    #[test]
4401    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
4402        let db = NamedTempFile::new().expect("temporary db");
4403        let coordinator = ExecutionCoordinator::open(
4404            db.path(),
4405            Arc::new(SchemaManager::new()),
4406            None,
4407            1,
4408            Arc::new(TelemetryCounters::default()),
4409            None,
4410        )
4411        .expect("coordinator");
4412        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4413
4414        conn.execute_batch(
4415            r"
4416            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4417            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4418            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4419            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4420            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4421            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4422            ",
4423        )
4424        .expect("seed chunk-backed node");
4425
4426        let compiled = QueryBuilder::nodes("Meeting")
4427            .text_search("not a ship", 10)
4428            .limit(10)
4429            .compile()
4430            .expect("compiled query");
4431
4432        let rows = coordinator
4433            .execute_compiled_read(&compiled)
4434            .expect("execute read");
4435
4436        assert_eq!(rows.nodes.len(), 1);
4437        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4438    }
4439
4440    // --- Item 1: capability gate tests ---
4441
4442    #[test]
4443    fn capability_gate_reports_false_without_feature() {
4444        let db = NamedTempFile::new().expect("temporary db");
4445        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
4446        // when no dimension is requested (the vector profile is never bootstrapped).
4447        let coordinator = ExecutionCoordinator::open(
4448            db.path(),
4449            Arc::new(SchemaManager::new()),
4450            None,
4451            1,
4452            Arc::new(TelemetryCounters::default()),
4453            None,
4454        )
4455        .expect("coordinator");
4456        assert!(
4457            !coordinator.vector_enabled(),
4458            "vector_enabled must be false when no dimension is requested"
4459        );
4460    }
4461
4462    #[cfg(feature = "sqlite-vec")]
4463    #[test]
4464    fn capability_gate_reports_true_when_feature_enabled() {
4465        let db = NamedTempFile::new().expect("temporary db");
4466        let coordinator = ExecutionCoordinator::open(
4467            db.path(),
4468            Arc::new(SchemaManager::new()),
4469            Some(128),
4470            1,
4471            Arc::new(TelemetryCounters::default()),
4472            None,
4473        )
4474        .expect("coordinator");
4475        assert!(
4476            coordinator.vector_enabled(),
4477            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4478        );
4479    }
4480
4481    // --- Item 4: runtime table read tests ---
4482
4483    #[test]
4484    fn read_run_returns_inserted_run() {
4485        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4486
4487        let db = NamedTempFile::new().expect("temporary db");
4488        let writer = WriterActor::start(
4489            db.path(),
4490            Arc::new(SchemaManager::new()),
4491            ProvenanceMode::Warn,
4492            Arc::new(TelemetryCounters::default()),
4493        )
4494        .expect("writer");
4495        writer
4496            .submit(WriteRequest {
4497                label: "runtime".to_owned(),
4498                nodes: vec![],
4499                node_retires: vec![],
4500                edges: vec![],
4501                edge_retires: vec![],
4502                chunks: vec![],
4503                runs: vec![RunInsert {
4504                    id: "run-r1".to_owned(),
4505                    kind: "session".to_owned(),
4506                    status: "active".to_owned(),
4507                    properties: "{}".to_owned(),
4508                    source_ref: Some("src-1".to_owned()),
4509                    upsert: false,
4510                    supersedes_id: None,
4511                }],
4512                steps: vec![],
4513                actions: vec![],
4514                optional_backfills: vec![],
4515                vec_inserts: vec![],
4516                operational_writes: vec![],
4517            })
4518            .expect("write run");
4519
4520        let coordinator = ExecutionCoordinator::open(
4521            db.path(),
4522            Arc::new(SchemaManager::new()),
4523            None,
4524            1,
4525            Arc::new(TelemetryCounters::default()),
4526            None,
4527        )
4528        .expect("coordinator");
4529        let row = coordinator
4530            .read_run("run-r1")
4531            .expect("read_run")
4532            .expect("row exists");
4533        assert_eq!(row.id, "run-r1");
4534        assert_eq!(row.kind, "session");
4535        assert_eq!(row.status, "active");
4536    }
4537
4538    #[test]
4539    fn read_step_returns_inserted_step() {
4540        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4541
4542        let db = NamedTempFile::new().expect("temporary db");
4543        let writer = WriterActor::start(
4544            db.path(),
4545            Arc::new(SchemaManager::new()),
4546            ProvenanceMode::Warn,
4547            Arc::new(TelemetryCounters::default()),
4548        )
4549        .expect("writer");
4550        writer
4551            .submit(WriteRequest {
4552                label: "runtime".to_owned(),
4553                nodes: vec![],
4554                node_retires: vec![],
4555                edges: vec![],
4556                edge_retires: vec![],
4557                chunks: vec![],
4558                runs: vec![RunInsert {
4559                    id: "run-s1".to_owned(),
4560                    kind: "session".to_owned(),
4561                    status: "active".to_owned(),
4562                    properties: "{}".to_owned(),
4563                    source_ref: Some("src-1".to_owned()),
4564                    upsert: false,
4565                    supersedes_id: None,
4566                }],
4567                steps: vec![StepInsert {
4568                    id: "step-s1".to_owned(),
4569                    run_id: "run-s1".to_owned(),
4570                    kind: "llm".to_owned(),
4571                    status: "completed".to_owned(),
4572                    properties: "{}".to_owned(),
4573                    source_ref: Some("src-1".to_owned()),
4574                    upsert: false,
4575                    supersedes_id: None,
4576                }],
4577                actions: vec![],
4578                optional_backfills: vec![],
4579                vec_inserts: vec![],
4580                operational_writes: vec![],
4581            })
4582            .expect("write step");
4583
4584        let coordinator = ExecutionCoordinator::open(
4585            db.path(),
4586            Arc::new(SchemaManager::new()),
4587            None,
4588            1,
4589            Arc::new(TelemetryCounters::default()),
4590            None,
4591        )
4592        .expect("coordinator");
4593        let row = coordinator
4594            .read_step("step-s1")
4595            .expect("read_step")
4596            .expect("row exists");
4597        assert_eq!(row.id, "step-s1");
4598        assert_eq!(row.run_id, "run-s1");
4599        assert_eq!(row.kind, "llm");
4600    }
4601
4602    #[test]
4603    fn read_action_returns_inserted_action() {
4604        use crate::{
4605            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4606            writer::{ActionInsert, StepInsert},
4607        };
4608
4609        let db = NamedTempFile::new().expect("temporary db");
4610        let writer = WriterActor::start(
4611            db.path(),
4612            Arc::new(SchemaManager::new()),
4613            ProvenanceMode::Warn,
4614            Arc::new(TelemetryCounters::default()),
4615        )
4616        .expect("writer");
4617        writer
4618            .submit(WriteRequest {
4619                label: "runtime".to_owned(),
4620                nodes: vec![],
4621                node_retires: vec![],
4622                edges: vec![],
4623                edge_retires: vec![],
4624                chunks: vec![],
4625                runs: vec![RunInsert {
4626                    id: "run-a1".to_owned(),
4627                    kind: "session".to_owned(),
4628                    status: "active".to_owned(),
4629                    properties: "{}".to_owned(),
4630                    source_ref: Some("src-1".to_owned()),
4631                    upsert: false,
4632                    supersedes_id: None,
4633                }],
4634                steps: vec![StepInsert {
4635                    id: "step-a1".to_owned(),
4636                    run_id: "run-a1".to_owned(),
4637                    kind: "llm".to_owned(),
4638                    status: "completed".to_owned(),
4639                    properties: "{}".to_owned(),
4640                    source_ref: Some("src-1".to_owned()),
4641                    upsert: false,
4642                    supersedes_id: None,
4643                }],
4644                actions: vec![ActionInsert {
4645                    id: "action-a1".to_owned(),
4646                    step_id: "step-a1".to_owned(),
4647                    kind: "emit".to_owned(),
4648                    status: "completed".to_owned(),
4649                    properties: "{}".to_owned(),
4650                    source_ref: Some("src-1".to_owned()),
4651                    upsert: false,
4652                    supersedes_id: None,
4653                }],
4654                optional_backfills: vec![],
4655                vec_inserts: vec![],
4656                operational_writes: vec![],
4657            })
4658            .expect("write action");
4659
4660        let coordinator = ExecutionCoordinator::open(
4661            db.path(),
4662            Arc::new(SchemaManager::new()),
4663            None,
4664            1,
4665            Arc::new(TelemetryCounters::default()),
4666            None,
4667        )
4668        .expect("coordinator");
4669        let row = coordinator
4670            .read_action("action-a1")
4671            .expect("read_action")
4672            .expect("row exists");
4673        assert_eq!(row.id, "action-a1");
4674        assert_eq!(row.step_id, "step-a1");
4675        assert_eq!(row.kind, "emit");
4676    }
4677
4678    #[test]
4679    fn read_active_runs_excludes_superseded() {
4680        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4681
4682        let db = NamedTempFile::new().expect("temporary db");
4683        let writer = WriterActor::start(
4684            db.path(),
4685            Arc::new(SchemaManager::new()),
4686            ProvenanceMode::Warn,
4687            Arc::new(TelemetryCounters::default()),
4688        )
4689        .expect("writer");
4690
4691        // Insert original run
4692        writer
4693            .submit(WriteRequest {
4694                label: "v1".to_owned(),
4695                nodes: vec![],
4696                node_retires: vec![],
4697                edges: vec![],
4698                edge_retires: vec![],
4699                chunks: vec![],
4700                runs: vec![RunInsert {
4701                    id: "run-v1".to_owned(),
4702                    kind: "session".to_owned(),
4703                    status: "active".to_owned(),
4704                    properties: "{}".to_owned(),
4705                    source_ref: Some("src-1".to_owned()),
4706                    upsert: false,
4707                    supersedes_id: None,
4708                }],
4709                steps: vec![],
4710                actions: vec![],
4711                optional_backfills: vec![],
4712                vec_inserts: vec![],
4713                operational_writes: vec![],
4714            })
4715            .expect("v1 write");
4716
4717        // Supersede original run with v2
4718        writer
4719            .submit(WriteRequest {
4720                label: "v2".to_owned(),
4721                nodes: vec![],
4722                node_retires: vec![],
4723                edges: vec![],
4724                edge_retires: vec![],
4725                chunks: vec![],
4726                runs: vec![RunInsert {
4727                    id: "run-v2".to_owned(),
4728                    kind: "session".to_owned(),
4729                    status: "completed".to_owned(),
4730                    properties: "{}".to_owned(),
4731                    source_ref: Some("src-2".to_owned()),
4732                    upsert: true,
4733                    supersedes_id: Some("run-v1".to_owned()),
4734                }],
4735                steps: vec![],
4736                actions: vec![],
4737                optional_backfills: vec![],
4738                vec_inserts: vec![],
4739                operational_writes: vec![],
4740            })
4741            .expect("v2 write");
4742
4743        let coordinator = ExecutionCoordinator::open(
4744            db.path(),
4745            Arc::new(SchemaManager::new()),
4746            None,
4747            1,
4748            Arc::new(TelemetryCounters::default()),
4749            None,
4750        )
4751        .expect("coordinator");
4752        let active = coordinator.read_active_runs().expect("read_active_runs");
4753
4754        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4755        assert_eq!(active[0].id, "run-v2");
4756    }
4757
4758    #[allow(clippy::panic)]
4759    fn poison_connection(coordinator: &ExecutionCoordinator) {
4760        let result = catch_unwind(AssertUnwindSafe(|| {
4761            let _guard = coordinator.pool.connections[0]
4762                .lock()
4763                .expect("poison test lock");
4764            panic!("poison coordinator connection mutex");
4765        }));
4766        assert!(
4767            result.is_err(),
4768            "poison test must unwind while holding the connection mutex"
4769        );
4770    }
4771
4772    #[allow(clippy::panic)]
4773    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4774    where
4775        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4776    {
4777        match op(coordinator) {
4778            Err(EngineError::Bridge(message)) => {
4779                assert_eq!(message, "connection mutex poisoned");
4780            }
4781            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4782            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4783        }
4784    }
4785
4786    #[test]
4787    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4788        let db = NamedTempFile::new().expect("temporary db");
4789        let coordinator = ExecutionCoordinator::open(
4790            db.path(),
4791            Arc::new(SchemaManager::new()),
4792            None,
4793            1,
4794            Arc::new(TelemetryCounters::default()),
4795            None,
4796        )
4797        .expect("coordinator");
4798
4799        poison_connection(&coordinator);
4800
4801        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4802        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4803        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4804        assert_poisoned_connection_error(
4805            &coordinator,
4806            super::ExecutionCoordinator::read_active_runs,
4807        );
4808        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4809        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4810    }
4811
4812    // --- M-2: Bounded shape cache ---
4813
4814    #[test]
4815    fn shape_cache_stays_bounded() {
4816        use fathomdb_query::ShapeHash;
4817
4818        let db = NamedTempFile::new().expect("temporary db");
4819        let coordinator = ExecutionCoordinator::open(
4820            db.path(),
4821            Arc::new(SchemaManager::new()),
4822            None,
4823            1,
4824            Arc::new(TelemetryCounters::default()),
4825            None,
4826        )
4827        .expect("coordinator");
4828
4829        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
4830        {
4831            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4832            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4833                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4834            }
4835        }
4836        // The cache is now over the limit but hasn't been pruned yet (pruning
4837        // happens on the insert path in execute_compiled_read).
4838
4839        // Execute a compiled read to trigger the bounded-cache check.
4840        let compiled = QueryBuilder::nodes("Meeting")
4841            .text_search("budget", 5)
4842            .limit(10)
4843            .compile()
4844            .expect("compiled query");
4845
4846        coordinator
4847            .execute_compiled_read(&compiled)
4848            .expect("execute read");
4849
4850        assert!(
4851            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4852            "shape cache must stay bounded: got {} entries, max {}",
4853            coordinator.shape_sql_count(),
4854            super::MAX_SHAPE_CACHE_SIZE
4855        );
4856    }
4857
4858    // --- M-1: Read pool size ---
4859
4860    #[test]
4861    fn read_pool_size_configurable() {
4862        let db = NamedTempFile::new().expect("temporary db");
4863        let coordinator = ExecutionCoordinator::open(
4864            db.path(),
4865            Arc::new(SchemaManager::new()),
4866            None,
4867            2,
4868            Arc::new(TelemetryCounters::default()),
4869            None,
4870        )
4871        .expect("coordinator with pool_size=2");
4872
4873        assert_eq!(coordinator.pool.size(), 2);
4874
4875        // Basic read should succeed through the pool.
4876        let compiled = QueryBuilder::nodes("Meeting")
4877            .text_search("budget", 5)
4878            .limit(10)
4879            .compile()
4880            .expect("compiled query");
4881
4882        let result = coordinator.execute_compiled_read(&compiled);
4883        assert!(result.is_ok(), "read through pool must succeed");
4884    }
4885
4886    // --- M-4: Grouped read batching ---
4887
4888    #[test]
4889    fn grouped_read_results_match_baseline() {
4890        use fathomdb_query::TraverseDirection;
4891
4892        let db = NamedTempFile::new().expect("temporary db");
4893
4894        // Bootstrap the database via coordinator (creates schema).
4895        let coordinator = ExecutionCoordinator::open(
4896            db.path(),
4897            Arc::new(SchemaManager::new()),
4898            None,
4899            1,
4900            Arc::new(TelemetryCounters::default()),
4901            None,
4902        )
4903        .expect("coordinator");
4904
4905        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
4906        // to expansion nodes (Task-0-a, Task-0-b, etc.).
4907        {
4908            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4909            for i in 0..10 {
4910                conn.execute_batch(&format!(
4911                    r#"
4912                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4913                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4914                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4915                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4916                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4917                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4918
4919                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4920                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4921                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4922                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4923
4924                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4925                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4926                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4927                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4928                    "#,
4929                )).expect("seed data");
4930            }
4931        }
4932
4933        let compiled = QueryBuilder::nodes("Meeting")
4934            .text_search("meeting", 10)
4935            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
4936            .limit(10)
4937            .compile_grouped()
4938            .expect("compiled grouped query");
4939
4940        let result = coordinator
4941            .execute_compiled_grouped_read(&compiled)
4942            .expect("grouped read");
4943
4944        assert!(!result.was_degraded, "grouped read should not be degraded");
4945        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4946        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4947        assert_eq!(result.expansions[0].slot, "tasks");
4948        assert_eq!(
4949            result.expansions[0].roots.len(),
4950            10,
4951            "each expansion slot should have entries for all 10 roots"
4952        );
4953
4954        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
4955        for root_expansion in &result.expansions[0].roots {
4956            assert_eq!(
4957                root_expansion.nodes.len(),
4958                2,
4959                "root {} should have 2 expansion nodes, got {}",
4960                root_expansion.root_logical_id,
4961                root_expansion.nodes.len()
4962            );
4963        }
4964    }
4965
4966    // --- B-4: build_bm25_expr unit tests ---
4967
4968    #[test]
4969    fn build_bm25_expr_no_weights() {
4970        let schema_json = r#"["$.title","$.body"]"#;
4971        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4972        assert_eq!(result, "bm25(fts_props_testkind)");
4973    }
4974
4975    #[test]
4976    fn build_bm25_expr_with_weights() {
4977        let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4978        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4979        assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4980    }
4981
4982    // --- B-4: weighted schema integration test ---
4983
4984    #[test]
4985    #[allow(clippy::too_many_lines)]
4986    fn weighted_schema_bm25_orders_title_match_above_body_match() {
4987        use crate::{
4988            AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4989            WriterActor, writer::ChunkPolicy,
4990        };
4991        use fathomdb_schema::fts_column_name;
4992
4993        let db = NamedTempFile::new().expect("temporary db");
4994        let schema_manager = Arc::new(SchemaManager::new());
4995
4996        // Step 1: bootstrap, register schema with weights, create per-column table.
4997        {
4998            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4999            admin
5000                .register_fts_property_schema_with_entries(
5001                    "Article",
5002                    &[
5003                        FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
5004                        FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
5005                    ],
5006                    None,
5007                    &[],
5008                    crate::rebuild_actor::RebuildMode::Eager,
5009                )
5010                .expect("register schema with weights");
5011        }
5012
5013        // Step 2: write two nodes.
5014        let writer = WriterActor::start(
5015            db.path(),
5016            Arc::clone(&schema_manager),
5017            ProvenanceMode::Warn,
5018            Arc::new(TelemetryCounters::default()),
5019        )
5020        .expect("writer");
5021
5022        // Node A: "rust" in title (high-weight column).
5023        writer
5024            .submit(WriteRequest {
5025                label: "insert-a".to_owned(),
5026                nodes: vec![NodeInsert {
5027                    row_id: "row-a".to_owned(),
5028                    logical_id: "article-a".to_owned(),
5029                    kind: "Article".to_owned(),
5030                    properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
5031                    source_ref: Some("src-a".to_owned()),
5032                    upsert: false,
5033                    chunk_policy: ChunkPolicy::Preserve,
5034                    content_ref: None,
5035                }],
5036                node_retires: vec![],
5037                edges: vec![],
5038                edge_retires: vec![],
5039                chunks: vec![],
5040                runs: vec![],
5041                steps: vec![],
5042                actions: vec![],
5043                optional_backfills: vec![],
5044                vec_inserts: vec![],
5045                operational_writes: vec![],
5046            })
5047            .expect("write node A");
5048
5049        // Node B: "rust" in body (low-weight column).
5050        writer
5051            .submit(WriteRequest {
5052                label: "insert-b".to_owned(),
5053                nodes: vec![NodeInsert {
5054                    row_id: "row-b".to_owned(),
5055                    logical_id: "article-b".to_owned(),
5056                    kind: "Article".to_owned(),
5057                    properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
5058                    source_ref: Some("src-b".to_owned()),
5059                    upsert: false,
5060                    chunk_policy: ChunkPolicy::Preserve,
5061                    content_ref: None,
5062                }],
5063                node_retires: vec![],
5064                edges: vec![],
5065                edge_retires: vec![],
5066                chunks: vec![],
5067                runs: vec![],
5068                steps: vec![],
5069                actions: vec![],
5070                optional_backfills: vec![],
5071                vec_inserts: vec![],
5072                operational_writes: vec![],
5073            })
5074            .expect("write node B");
5075
5076        drop(writer);
5077
5078        // Verify per-column values were written.
5079        {
5080            let title_col = fts_column_name("$.title", false);
5081            let body_col = fts_column_name("$.body", false);
5082            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5083            let count: i64 = conn
5084                .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
5085                .expect("count fts rows");
5086            assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
5087            let (title_a, body_a): (String, String) = conn
5088                .query_row(
5089                    &format!(
5090                        "SELECT {title_col}, {body_col} FROM fts_props_article \
5091                         WHERE node_logical_id = 'article-a'"
5092                    ),
5093                    [],
5094                    |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
5095                )
5096                .expect("select article-a");
5097            assert_eq!(
5098                title_a, "rust",
5099                "article-a must have 'rust' in title column"
5100            );
5101            assert_eq!(
5102                body_a, "other",
5103                "article-a must have 'other' in body column"
5104            );
5105        }
5106
5107        // Step 3: search for "rust" and assert node A ranks first.
5108        let coordinator = ExecutionCoordinator::open(
5109            db.path(),
5110            Arc::clone(&schema_manager),
5111            None,
5112            1,
5113            Arc::new(TelemetryCounters::default()),
5114            None,
5115        )
5116        .expect("coordinator");
5117
5118        let compiled = fathomdb_query::QueryBuilder::nodes("Article")
5119            .text_search("rust", 5)
5120            .limit(10)
5121            .compile()
5122            .expect("compiled query");
5123
5124        let rows = coordinator
5125            .execute_compiled_read(&compiled)
5126            .expect("execute read");
5127
5128        assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
5129        assert_eq!(
5130            rows.nodes[0].logical_id, "article-a",
5131            "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
5132        );
5133    }
5134
5135    // --- C-1: matched_paths attribution tests ---
5136
5137    /// Property FTS hit: `matched_paths` must reflect the *actual* leaves
5138    /// that contributed match tokens, queried from
5139    /// `fts_node_property_positions` via the highlight + offset path.
5140    ///
5141    /// Setup: one node with two indexed leaves (`$.body` = "other",
5142    /// `$.title` = "searchterm"). Searching for "searchterm" must produce a
5143    /// hit whose `matched_paths` contains `"$.title"` and does NOT contain
5144    /// `"$.body"`.
5145    #[test]
5146    fn property_fts_hit_matched_paths_from_positions() {
5147        use crate::{AdminService, rebuild_actor::RebuildMode};
5148        use fathomdb_query::compile_search;
5149
5150        let db = NamedTempFile::new().expect("temporary db");
5151        let schema_manager = Arc::new(SchemaManager::new());
5152
5153        // Register FTS schema for "Item" to create the per-kind table
5154        // fts_props_item before opening the coordinator.
5155        {
5156            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5157            admin
5158                .register_fts_property_schema_with_entries(
5159                    "Item",
5160                    &[
5161                        crate::FtsPropertyPathSpec::scalar("$.body"),
5162                        crate::FtsPropertyPathSpec::scalar("$.title"),
5163                    ],
5164                    None,
5165                    &[],
5166                    RebuildMode::Eager,
5167                )
5168                .expect("register Item FTS schema");
5169        }
5170
5171        let coordinator = ExecutionCoordinator::open(
5172            db.path(),
5173            Arc::clone(&schema_manager),
5174            None,
5175            1,
5176            Arc::new(TelemetryCounters::default()),
5177            None,
5178        )
5179        .expect("coordinator");
5180
5181        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5182
5183        // The recursive walker emits leaves in alphabetical key order:
5184        //   "body"  → "other"      bytes  0..5
5185        //   LEAF_SEPARATOR (29 bytes)
5186        //   "title" → "searchterm" bytes 34..44
5187        let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
5188        // Verify the constant length assumption used in the position table.
5189        assert_eq!(
5190            crate::writer::LEAF_SEPARATOR.len(),
5191            29,
5192            "LEAF_SEPARATOR length changed; update position offsets"
5193        );
5194
5195        conn.execute(
5196            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5197             VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
5198            [],
5199        )
5200        .expect("insert node");
5201        // Insert into the per-kind table (migration 23 dropped global fts_node_properties).
5202        conn.execute(
5203            "INSERT INTO fts_props_item (node_logical_id, text_content) \
5204             VALUES ('item-1', ?1)",
5205            rusqlite::params![blob],
5206        )
5207        .expect("insert fts row");
5208        conn.execute(
5209            "INSERT INTO fts_node_property_positions \
5210             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5211             VALUES ('item-1', 'Item', 0, 5, '$.body')",
5212            [],
5213        )
5214        .expect("insert body position");
5215        conn.execute(
5216            "INSERT INTO fts_node_property_positions \
5217             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5218             VALUES ('item-1', 'Item', 34, 44, '$.title')",
5219            [],
5220        )
5221        .expect("insert title position");
5222
5223        let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
5224        let mut compiled = compile_search(ast.ast()).expect("compile search");
5225        compiled.attribution_requested = true;
5226
5227        let rows = coordinator
5228            .execute_compiled_search(&compiled)
5229            .expect("search");
5230
5231        assert!(!rows.hits.is_empty(), "expected at least one hit");
5232        let hit = rows
5233            .hits
5234            .iter()
5235            .find(|h| h.node.logical_id == "item-1")
5236            .expect("item-1 must be in hits");
5237
5238        let att = hit
5239            .attribution
5240            .as_ref()
5241            .expect("attribution must be Some when attribution_requested");
5242        assert!(
5243            att.matched_paths.contains(&"$.title".to_owned()),
5244            "matched_paths must contain '$.title', got {:?}",
5245            att.matched_paths,
5246        );
5247        assert!(
5248            !att.matched_paths.contains(&"$.body".to_owned()),
5249            "matched_paths must NOT contain '$.body', got {:?}",
5250            att.matched_paths,
5251        );
5252    }
5253
5254    /// Vector hits must carry `attribution = None` regardless of the
5255    /// `attribution_requested` flag.  The vector retrieval path has no
5256    /// FTS5 match positions to attribute.
5257    ///
5258    /// This test exercises the degraded (no sqlite-vec) path which returns
5259    /// an empty hit list; the invariant is that `was_degraded = true` and
5260    /// no hits carry a non-None attribution.
5261    #[test]
5262    fn vector_hit_has_no_attribution() {
5263        use fathomdb_query::compile_vector_search;
5264
5265        let db = NamedTempFile::new().expect("temporary db");
5266        let coordinator = ExecutionCoordinator::open(
5267            db.path(),
5268            Arc::new(SchemaManager::new()),
5269            None,
5270            1,
5271            Arc::new(TelemetryCounters::default()),
5272            None,
5273        )
5274        .expect("coordinator");
5275
5276        // Compile a vector search with attribution requested.
5277        let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
5278        let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
5279        compiled.attribution_requested = true;
5280
5281        // Without sqlite-vec the result degrades to empty; every hit
5282        // (vacuously) must carry attribution == None.
5283        let rows = coordinator
5284            .execute_compiled_vector_search(&compiled)
5285            .expect("vector search must not error");
5286
5287        assert!(
5288            rows.was_degraded,
5289            "vector search without vec table must degrade"
5290        );
5291        for hit in &rows.hits {
5292            assert!(
5293                hit.attribution.is_none(),
5294                "vector hits must carry attribution = None, got {:?}",
5295                hit.attribution
5296            );
5297        }
5298    }
5299
5300    /// Chunk-backed hits with attribution requested must carry
5301    /// `matched_paths = ["text_content"]` — they have no recursive-leaf
5302    /// structure, but callers need a non-empty signal that the match came
5303    /// from the chunk surface.
5304    ///
5305    /// NOTE: This test documents the desired target behavior per the C-1
5306    /// pack spec.  Implementing it requires updating the chunk-hit arm of
5307    /// `resolve_hit_attribution` to return `vec!["text_content"]`.  That
5308    /// change currently conflicts with integration tests in
5309    /// `crates/fathomdb/tests/text_search_surface.rs` which assert empty
5310    /// `matched_paths` for chunk hits.  Until those tests are updated this
5311    /// test verifies the *current* (placeholder) behavior: chunk hits carry
5312    /// `Some(HitAttribution { matched_paths: vec![] })`.
5313    #[test]
5314    fn chunk_hit_has_text_content_attribution() {
5315        use fathomdb_query::compile_search;
5316
5317        let db = NamedTempFile::new().expect("temporary db");
5318        let coordinator = ExecutionCoordinator::open(
5319            db.path(),
5320            Arc::new(SchemaManager::new()),
5321            None,
5322            1,
5323            Arc::new(TelemetryCounters::default()),
5324            None,
5325        )
5326        .expect("coordinator");
5327
5328        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5329
5330        conn.execute_batch(
5331            r"
5332            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5333            VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
5334            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5335            VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
5336            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5337            VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
5338            ",
5339        )
5340        .expect("seed chunk node");
5341
5342        let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
5343        let mut compiled = compile_search(ast.ast()).expect("compile search");
5344        compiled.attribution_requested = true;
5345
5346        let rows = coordinator
5347            .execute_compiled_search(&compiled)
5348            .expect("search");
5349
5350        assert!(!rows.hits.is_empty(), "expected chunk hit");
5351        let hit = rows
5352            .hits
5353            .iter()
5354            .find(|h| matches!(h.source, SearchHitSource::Chunk))
5355            .expect("must have a Chunk hit");
5356
5357        let att = hit
5358            .attribution
5359            .as_ref()
5360            .expect("attribution must be Some when attribution_requested");
5361        assert_eq!(
5362            att.matched_paths,
5363            vec!["text_content".to_owned()],
5364            "chunk matched_paths must be [\"text_content\"], got {:?}",
5365            att.matched_paths,
5366        );
5367    }
5368
5369    /// Property FTS hits from two different kinds must each carry
5370    /// `matched_paths` corresponding to their own kind's registered leaf
5371    /// paths, not those of the other kind.
5372    ///
5373    /// This pins the per-`(node_logical_id, kind)` isolation in the
5374    /// `load_position_map` query.
5375    #[test]
5376    #[allow(clippy::too_many_lines)]
5377    fn mixed_kind_results_get_per_kind_matched_paths() {
5378        use crate::{AdminService, rebuild_actor::RebuildMode};
5379        use fathomdb_query::compile_search;
5380
5381        let db = NamedTempFile::new().expect("temporary db");
5382        let schema_manager = Arc::new(SchemaManager::new());
5383
5384        // Register FTS schemas for KindA and KindB to create per-kind tables
5385        // fts_props_kinda and fts_props_kindb before opening the coordinator.
5386        {
5387            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5388            admin
5389                .register_fts_property_schema_with_entries(
5390                    "KindA",
5391                    &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
5392                    None,
5393                    &[],
5394                    RebuildMode::Eager,
5395                )
5396                .expect("register KindA FTS schema");
5397            admin
5398                .register_fts_property_schema_with_entries(
5399                    "KindB",
5400                    &[crate::FtsPropertyPathSpec::scalar("$.beta")],
5401                    None,
5402                    &[],
5403                    RebuildMode::Eager,
5404                )
5405                .expect("register KindB FTS schema");
5406        }
5407
5408        let coordinator = ExecutionCoordinator::open(
5409            db.path(),
5410            Arc::clone(&schema_manager),
5411            None,
5412            1,
5413            Arc::new(TelemetryCounters::default()),
5414            None,
5415        )
5416        .expect("coordinator");
5417
5418        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5419
5420        // KindA: leaf "$.alpha" = "xenoterm" (start=0, end=8)
5421        conn.execute(
5422            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5423             VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5424            [],
5425        )
5426        .expect("insert KindA node");
5427        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5428        conn.execute(
5429            "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
5430             VALUES ('node-a', 'xenoterm')",
5431            [],
5432        )
5433        .expect("insert KindA fts row");
5434        conn.execute(
5435            "INSERT INTO fts_node_property_positions \
5436             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5437             VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5438            [],
5439        )
5440        .expect("insert KindA position");
5441
5442        // KindB: leaf "$.beta" = "xenoterm" (start=0, end=8)
5443        conn.execute(
5444            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5445             VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5446            [],
5447        )
5448        .expect("insert KindB node");
5449        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5450        conn.execute(
5451            "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
5452             VALUES ('node-b', 'xenoterm')",
5453            [],
5454        )
5455        .expect("insert KindB fts row");
5456        conn.execute(
5457            "INSERT INTO fts_node_property_positions \
5458             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5459             VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5460            [],
5461        )
5462        .expect("insert KindB position");
5463
5464        // Search across both kinds (empty root_kind = no kind filter).
5465        let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5466        let mut compiled = compile_search(ast.ast()).expect("compile search");
5467        compiled.attribution_requested = true;
5468
5469        let rows = coordinator
5470            .execute_compiled_search(&compiled)
5471            .expect("search");
5472
5473        // Both nodes must appear.
5474        assert!(
5475            rows.hits.len() >= 2,
5476            "expected hits for both kinds, got {}",
5477            rows.hits.len()
5478        );
5479
5480        for hit in &rows.hits {
5481            let att = hit
5482                .attribution
5483                .as_ref()
5484                .expect("attribution must be Some when attribution_requested");
5485            match hit.node.kind.as_str() {
5486                "KindA" => {
5487                    assert_eq!(
5488                        att.matched_paths,
5489                        vec!["$.alpha".to_owned()],
5490                        "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5491                        att.matched_paths,
5492                    );
5493                }
5494                "KindB" => {
5495                    assert_eq!(
5496                        att.matched_paths,
5497                        vec!["$.beta".to_owned()],
5498                        "KindB hit must have matched_paths=['$.beta'], got {:?}",
5499                        att.matched_paths,
5500                    );
5501                }
5502                other => {
5503                    // Only KindA and KindB are expected in this test.
5504                    assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5505                }
5506            }
5507        }
5508    }
5509
5510    // --- Pack H: TokenizerStrategy tests ---
5511
5512    #[test]
5513    fn tokenizer_strategy_from_str() {
5514        use super::TokenizerStrategy;
5515        assert_eq!(
5516            TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5517            TokenizerStrategy::RecallOptimizedEnglish,
5518        );
5519        assert_eq!(
5520            TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5521            TokenizerStrategy::PrecisionOptimized,
5522        );
5523        assert_eq!(
5524            TokenizerStrategy::from_str("trigram"),
5525            TokenizerStrategy::SubstringTrigram,
5526        );
5527        assert_eq!(
5528            TokenizerStrategy::from_str("icu"),
5529            TokenizerStrategy::GlobalCjk,
5530        );
5531        assert_eq!(
5532            TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5533            TokenizerStrategy::SourceCode,
5534        );
5535        // Canonical source-code preset value
5536        assert_eq!(
5537            TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5538            TokenizerStrategy::SourceCode,
5539        );
5540        assert_eq!(
5541            TokenizerStrategy::from_str("my_custom_tokenizer"),
5542            TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5543        );
5544    }
5545
5546    #[test]
5547    fn trigram_short_query_returns_empty() {
5548        use fathomdb_query::compile_search;
5549
5550        let db = NamedTempFile::new().expect("temporary db");
5551        let schema_manager = Arc::new(SchemaManager::new());
5552
5553        // First open: bootstrap the schema so projection_profiles table exists.
5554        {
5555            let bootstrap = ExecutionCoordinator::open(
5556                db.path(),
5557                Arc::clone(&schema_manager),
5558                None,
5559                1,
5560                Arc::new(TelemetryCounters::default()),
5561                None,
5562            )
5563            .expect("bootstrap coordinator");
5564            drop(bootstrap);
5565        }
5566
5567        // Insert a SubstringTrigram strategy for kind "Snippet" directly in the DB.
5568        {
5569            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5570            conn.execute_batch(
5571                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5572                 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5573            )
5574            .expect("insert profile");
5575        }
5576
5577        // Reopen coordinator to load strategies.
5578        let coordinator = ExecutionCoordinator::open(
5579            db.path(),
5580            Arc::clone(&schema_manager),
5581            None,
5582            1,
5583            Arc::new(TelemetryCounters::default()),
5584            None,
5585        )
5586        .expect("coordinator reopen");
5587
5588        // 2-char query for a SubstringTrigram kind must return empty without error.
5589        let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5590        let compiled = compile_search(ast.ast()).expect("compile search");
5591        let rows = coordinator
5592            .execute_compiled_search(&compiled)
5593            .expect("short trigram query must not error");
5594        assert!(
5595            rows.hits.is_empty(),
5596            "2-char trigram query must return empty"
5597        );
5598    }
5599
5600    #[test]
5601    fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5602        // Regression: escape_source_code_query must NOT be applied to the
5603        // already-rendered FTS5 expression. render_text_query_fts5 wraps every
5604        // term in double-quote delimiters (e.g. std.io -> "std.io"). Calling the
5605        // escape function on that output corrupts the expression: the escaper
5606        // sees the surrounding '"' as ordinary chars and produces '"std"."io""'
5607        // — malformed FTS5 syntax — causing searches to silently return empty.
5608        //
5609        // This test verifies a round-trip search for 'std.io' succeeds when the
5610        // SourceCode tokenizer strategy is active.
5611        use fathomdb_query::compile_search;
5612
5613        let db = NamedTempFile::new().expect("temporary db");
5614        let schema_manager = Arc::new(SchemaManager::new());
5615
5616        // Bootstrap the DB schema.
5617        {
5618            let bootstrap = ExecutionCoordinator::open(
5619                db.path(),
5620                Arc::clone(&schema_manager),
5621                None,
5622                1,
5623                Arc::new(TelemetryCounters::default()),
5624                None,
5625            )
5626            .expect("bootstrap coordinator");
5627            drop(bootstrap);
5628        }
5629
5630        // Insert the SourceCode tokenizer profile for kind "Symbol" and seed a document.
5631        {
5632            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5633            conn.execute(
5634                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5635                 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5636                [],
5637            )
5638            .expect("insert profile");
5639            conn.execute_batch(
5640                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5641                 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5642                 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5643                 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5644                 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5645                 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5646            )
5647            .expect("insert node and fts row");
5648        }
5649
5650        // Reopen coordinator to pick up the SourceCode strategy.
5651        let coordinator = ExecutionCoordinator::open(
5652            db.path(),
5653            Arc::clone(&schema_manager),
5654            None,
5655            1,
5656            Arc::new(TelemetryCounters::default()),
5657            None,
5658        )
5659        .expect("coordinator reopen");
5660
5661        // Search for "std.io": must find the document (not error, not return empty).
5662        let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5663        let compiled = compile_search(ast.ast()).expect("compile search");
5664        let rows = coordinator
5665            .execute_compiled_search(&compiled)
5666            .expect("source code search must not error");
5667        assert!(
5668            !rows.hits.is_empty(),
5669            "SourceCode strategy search for 'std.io' must return the document; \
5670             got empty — FTS5 expression was likely corrupted by post-render escaping"
5671        );
5672    }
5673
5674    // ---- Pack E: vec identity guard tests ----
5675
5676    #[derive(Debug)]
5677    struct StubEmbedder {
5678        model_identity: String,
5679        dimension: usize,
5680    }
5681
5682    impl StubEmbedder {
5683        fn new(model_identity: &str, dimension: usize) -> Self {
5684            Self {
5685                model_identity: model_identity.to_owned(),
5686                dimension,
5687            }
5688        }
5689    }
5690
5691    impl crate::embedder::QueryEmbedder for StubEmbedder {
5692        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5693            Ok(vec![0.0; self.dimension])
5694        }
5695        fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5696            crate::embedder::QueryEmbedderIdentity {
5697                model_identity: self.model_identity.clone(),
5698                model_version: "1.0".to_owned(),
5699                dimension: self.dimension,
5700                normalization_policy: "l2".to_owned(),
5701            }
5702        }
5703        fn max_tokens(&self) -> usize {
5704            512
5705        }
5706    }
5707
5708    fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5709        let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5710        conn.execute_batch(
5711            "CREATE TABLE IF NOT EXISTS projection_profiles (
5712                kind TEXT NOT NULL,
5713                facet TEXT NOT NULL,
5714                config_json TEXT NOT NULL,
5715                active_at INTEGER,
5716                created_at INTEGER,
5717                PRIMARY KEY (kind, facet)
5718            );",
5719        )
5720        .expect("create projection_profiles");
5721        conn
5722    }
5723
5724    #[test]
5725    fn check_vec_identity_no_profile_no_panic() {
5726        let conn = make_in_memory_db_with_projection_profiles();
5727        let embedder = StubEmbedder::new("bge-small", 384);
5728        let result = super::check_vec_identity_at_open(&conn, &embedder);
5729        assert!(result.is_ok(), "no profile row must return Ok(())");
5730    }
5731
5732    #[test]
5733    fn check_vec_identity_matching_identity_ok() {
5734        let conn = make_in_memory_db_with_projection_profiles();
5735        conn.execute(
5736            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5737             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5738            [],
5739        )
5740        .expect("insert profile");
5741        let embedder = StubEmbedder::new("bge-small", 384);
5742        let result = super::check_vec_identity_at_open(&conn, &embedder);
5743        assert!(result.is_ok(), "matching profile must return Ok(())");
5744    }
5745
5746    #[test]
5747    fn check_vec_identity_mismatched_dimensions_ok() {
5748        let conn = make_in_memory_db_with_projection_profiles();
5749        conn.execute(
5750            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5751             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5752            [],
5753        )
5754        .expect("insert profile");
5755        // embedder reports 768, profile says 384 — should warn but return Ok(())
5756        let embedder = StubEmbedder::new("bge-small", 768);
5757        let result = super::check_vec_identity_at_open(&conn, &embedder);
5758        assert!(
5759            result.is_ok(),
5760            "dimension mismatch must warn and return Ok(())"
5761        );
5762    }
5763
5764    #[test]
5765    fn custom_tokenizer_passthrough() {
5766        use super::TokenizerStrategy;
5767        let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5768        // Custom strategy is just stored — it doesn't modify queries.
5769        assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5770        // Verify it does not match any known variant.
5771        assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5772        assert_ne!(strategy, TokenizerStrategy::SourceCode);
5773    }
5774
5775    #[test]
5776    fn check_vec_identity_mismatched_model_ok() {
5777        let conn = make_in_memory_db_with_projection_profiles();
5778        conn.execute(
5779            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5780             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5781            [],
5782        )
5783        .expect("insert profile");
5784        // embedder reports bge-large, profile says bge-small — should warn but return Ok(())
5785        let embedder = StubEmbedder::new("bge-large", 384);
5786        let result = super::check_vec_identity_at_open(&conn, &embedder);
5787        assert!(
5788            result.is_ok(),
5789            "model_identity mismatch must warn and return Ok(())"
5790        );
5791    }
5792}