Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10    FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11    SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20/// Maximum number of cached shape-hash to SQL mappings before the cache is
21/// cleared entirely.  A clear-all strategy is simpler than partial eviction
22/// and the cost of re-compiling on a miss is negligible.
23const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25/// Maximum number of root IDs per batched expansion query.  Kept well below
26/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
27/// the edge-kind parameter.  Larger root sets are chunked into multiple
28/// batches of this size rather than falling back to per-root queries.
29const BATCH_CHUNK_SIZE: usize = 200;
30
31/// Build an `IN (...)` SQL fragment and bind list for an expansion-slot filter.
32///
33/// Returns the SQL fragment `AND json_extract(n.properties, ?{p}) IN (?{p+1}, ...)`
34/// and the corresponding bind values `[path, val1, val2, ...]`.
35fn compile_expansion_in_filter(
36    p: usize,
37    path: &str,
38    value_binds: Vec<Value>,
39) -> (String, Vec<Value>) {
40    let first_val = p + 1;
41    let placeholders = (0..value_binds.len())
42        .map(|i| format!("?{}", first_val + i))
43        .collect::<Vec<_>>()
44        .join(", ");
45    let mut binds = vec![Value::Text(path.to_owned())];
46    binds.extend(value_binds);
47    (
48        format!("\n                  AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
49        binds,
50    )
51}
52
53/// Compile an optional expansion-slot target-side filter predicate into a SQL
54/// fragment and bind values for injection into the `numbered` CTE's WHERE clause.
55///
56/// Returns `("", vec![])` when `filter` is `None` — preserving byte-for-byte
57/// identical SQL to pre-Pack-3 behavior. When `Some(predicate)`, returns an
58/// `AND …` fragment and the corresponding bind values starting at `first_param`.
59///
60/// Supported predicates and their targets in the `numbered` CTE:
61/// - `JsonPathEq`, `JsonPathCompare`, `JsonPathFusedEq`, `JsonPathFusedTimestampCmp`,
62///   `JsonPathFusedBoolEq`, `JsonPathFusedIn`, `JsonPathIn`: target `n.properties`.
63/// - `KindEq`, `LogicalIdEq`, `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`:
64///   column-direct predicates on `n.kind`, `n.logical_id`, `n.source_ref`, `n.content_ref`.
65/// - `EdgePropertyEq`, `EdgePropertyCompare`: rejected with `unreachable!` — these
66///   target edge properties and must be compiled via `compile_edge_filter` instead.
67#[allow(clippy::too_many_lines)]
68fn compile_expansion_filter(
69    filter: Option<&Predicate>,
70    first_param: usize,
71) -> (String, Vec<Value>) {
72    let Some(predicate) = filter else {
73        return (String::new(), vec![]);
74    };
75    let p = first_param;
76    match predicate {
77        Predicate::JsonPathEq { path, value } => {
78            let val = match value {
79                ScalarValue::Text(t) => Value::Text(t.clone()),
80                ScalarValue::Integer(i) => Value::Integer(*i),
81                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
82            };
83            (
84                format!(
85                    "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
86                    p + 1
87                ),
88                vec![Value::Text(path.clone()), val],
89            )
90        }
91        Predicate::JsonPathCompare { path, op, value } => {
92            let val = match value {
93                ScalarValue::Text(t) => Value::Text(t.clone()),
94                ScalarValue::Integer(i) => Value::Integer(*i),
95                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
96            };
97            let operator = match op {
98                ComparisonOp::Gt => ">",
99                ComparisonOp::Gte => ">=",
100                ComparisonOp::Lt => "<",
101                ComparisonOp::Lte => "<=",
102            };
103            (
104                format!(
105                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
106                    p + 1
107                ),
108                vec![Value::Text(path.clone()), val],
109            )
110        }
111        Predicate::JsonPathFusedEq { path, value } => (
112            format!(
113                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
114                p + 1
115            ),
116            vec![Value::Text(path.clone()), Value::Text(value.clone())],
117        ),
118        Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
119            let operator = match op {
120                ComparisonOp::Gt => ">",
121                ComparisonOp::Gte => ">=",
122                ComparisonOp::Lt => "<",
123                ComparisonOp::Lte => "<=",
124            };
125            (
126                format!(
127                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
128                    p + 1
129                ),
130                vec![Value::Text(path.clone()), Value::Integer(*value)],
131            )
132        }
133        Predicate::JsonPathFusedBoolEq { path, value } => (
134            format!(
135                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
136                p + 1
137            ),
138            vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
139        ),
140        Predicate::KindEq(kind) => (
141            format!("\n                  AND n.kind = ?{p}"),
142            vec![Value::Text(kind.clone())],
143        ),
144        Predicate::LogicalIdEq(logical_id) => (
145            format!("\n                  AND n.logical_id = ?{p}"),
146            vec![Value::Text(logical_id.clone())],
147        ),
148        Predicate::SourceRefEq(source_ref) => (
149            format!("\n                  AND n.source_ref = ?{p}"),
150            vec![Value::Text(source_ref.clone())],
151        ),
152        Predicate::ContentRefEq(uri) => (
153            format!("\n                  AND n.content_ref = ?{p}"),
154            vec![Value::Text(uri.clone())],
155        ),
156        Predicate::ContentRefNotNull => (
157            "\n                  AND n.content_ref IS NOT NULL".to_owned(),
158            vec![],
159        ),
160        Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
161            unreachable!(
162                "compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
163            );
164        }
165        Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
166            p,
167            path,
168            values.iter().map(|v| Value::Text(v.clone())).collect(),
169        ),
170        Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
171            p,
172            path,
173            values
174                .iter()
175                .map(|v| match v {
176                    ScalarValue::Text(t) => Value::Text(t.clone()),
177                    ScalarValue::Integer(i) => Value::Integer(*i),
178                    ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
179                })
180                .collect(),
181        ),
182    }
183}
184
185/// Compile an optional edge filter predicate into a SQL fragment and bind values
186/// for injection into the `JOIN edges e ON ...` condition.
187///
188/// Returns `("", vec![])` when `filter` is `None`. When `Some(predicate)`,
189/// returns an `AND …` fragment targeting `e.properties` and the corresponding
190/// bind values starting at `first_param`.
191///
192/// Only `EdgePropertyEq` and `EdgePropertyCompare` are valid here.
193/// Non-edge predicates: `panic!("compile_edge_filter: non-edge predicate")`.
194fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
195    let Some(predicate) = filter else {
196        return (String::new(), vec![]);
197    };
198    let p = first_param;
199    match predicate {
200        Predicate::EdgePropertyEq { path, value } => {
201            let val = match value {
202                ScalarValue::Text(t) => Value::Text(t.clone()),
203                ScalarValue::Integer(i) => Value::Integer(*i),
204                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
205            };
206            (
207                format!(
208                    "\n                    AND json_extract(e.properties, ?{p}) = ?{}",
209                    p + 1
210                ),
211                vec![Value::Text(path.clone()), val],
212            )
213        }
214        Predicate::EdgePropertyCompare { path, op, value } => {
215            let val = match value {
216                ScalarValue::Text(t) => Value::Text(t.clone()),
217                ScalarValue::Integer(i) => Value::Integer(*i),
218                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
219            };
220            let operator = match op {
221                ComparisonOp::Gt => ">",
222                ComparisonOp::Gte => ">=",
223                ComparisonOp::Lt => "<",
224                ComparisonOp::Lte => "<=",
225            };
226            (
227                format!(
228                    "\n                    AND json_extract(e.properties, ?{p}) {operator} ?{}",
229                    p + 1
230                ),
231                vec![Value::Text(path.clone()), val],
232            )
233        }
234        _ => {
235            unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
236        }
237    }
238}
239
240/// FTS tokenizer strategy for a given node kind.
241///
242/// Loaded at coordinator open time from `projection_profiles` and used
243/// during query dispatch to apply per-kind query adaptations.
244#[derive(Clone, Debug, PartialEq, Eq)]
245pub enum TokenizerStrategy {
246    /// Porter stemming + unicode61, optimized for English recall.
247    RecallOptimizedEnglish,
248    /// Unicode61 without stemming, optimized for precision.
249    PrecisionOptimized,
250    /// Trigram tokenizer — queries shorter than 3 chars are skipped.
251    SubstringTrigram,
252    /// ICU tokenizer for global / CJK text.
253    GlobalCjk,
254    /// Unicode61 with custom token chars — query special chars are escaped.
255    SourceCode,
256    /// Any other tokenizer string.
257    Custom(String),
258}
259
260impl TokenizerStrategy {
261    /// Map a raw tokenizer config string (as stored in `projection_profiles`)
262    /// to the corresponding strategy variant.
263    pub fn from_str(s: &str) -> Self {
264        match s {
265            "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
266            "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
267            "trigram" => Self::SubstringTrigram,
268            "icu" => Self::GlobalCjk,
269            s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
270            other => Self::Custom(other.to_string()),
271        }
272    }
273}
274
275/// A pool of read-only `SQLite` connections for concurrent read access.
276///
277/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
278/// proceed in parallel when they happen to grab different slots.
279struct ReadPool {
280    connections: Vec<Mutex<Connection>>,
281}
282
283impl fmt::Debug for ReadPool {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        f.debug_struct("ReadPool")
286            .field("size", &self.connections.len())
287            .finish()
288    }
289}
290
291impl ReadPool {
292    /// Open `pool_size` read-only connections to the database at `path`.
293    ///
294    /// Each connection has PRAGMAs initialized via
295    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
296    /// feature is enabled and `vector_enabled` is true, the vec extension
297    /// auto-loaded.
298    ///
299    /// # Errors
300    ///
301    /// Returns [`EngineError`] if any connection fails to open or initialize.
302    fn new(
303        db_path: &Path,
304        pool_size: usize,
305        schema_manager: &SchemaManager,
306        vector_enabled: bool,
307    ) -> Result<Self, EngineError> {
308        let mut connections = Vec::with_capacity(pool_size);
309        for _ in 0..pool_size {
310            let conn = if vector_enabled {
311                #[cfg(feature = "sqlite-vec")]
312                {
313                    sqlite::open_readonly_connection_with_vec(db_path)?
314                }
315                #[cfg(not(feature = "sqlite-vec"))]
316                {
317                    sqlite::open_readonly_connection(db_path)?
318                }
319            } else {
320                sqlite::open_readonly_connection(db_path)?
321            };
322            schema_manager
323                .initialize_reader_connection(&conn)
324                .map_err(EngineError::Schema)?;
325            connections.push(Mutex::new(conn));
326        }
327        Ok(Self { connections })
328    }
329
330    /// Acquire a connection from the pool.
331    ///
332    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
333    /// If every slot is held, falls back to a blocking lock on the first slot.
334    ///
335    /// # Errors
336    ///
337    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
338    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
339        // Fast path: try each connection without blocking.
340        for conn in &self.connections {
341            if let Ok(guard) = conn.try_lock() {
342                return Ok(guard);
343            }
344        }
345        // Fallback: block on the first connection.
346        self.connections[0].lock().map_err(|_| {
347            trace_error!("read pool: connection mutex poisoned");
348            EngineError::Bridge("connection mutex poisoned".to_owned())
349        })
350    }
351
352    /// Return the number of connections in the pool.
353    #[cfg(test)]
354    fn size(&self) -> usize {
355        self.connections.len()
356    }
357}
358
359/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
360///
361/// This is a read-only introspection struct. It does not execute SQL.
362#[derive(Clone, Debug, PartialEq, Eq)]
363pub struct QueryPlan {
364    pub sql: String,
365    pub bind_count: usize,
366    pub driving_table: DrivingTable,
367    pub shape_hash: ShapeHash,
368    pub cache_hit: bool,
369}
370
371/// A single node row returned from a query.
372#[derive(Clone, Debug, PartialEq, Eq)]
373pub struct NodeRow {
374    /// Physical row ID.
375    pub row_id: String,
376    /// Logical ID of the node.
377    pub logical_id: String,
378    /// Node kind.
379    pub kind: String,
380    /// JSON-encoded node properties.
381    pub properties: String,
382    /// Optional URI referencing external content.
383    pub content_ref: Option<String>,
384    /// Unix timestamp of last access, if tracked.
385    pub last_accessed_at: Option<i64>,
386    /// JSON-encoded properties of the edge that connected this node in a
387    /// traversal expansion. `None` for root nodes and non-expansion results.
388    pub edge_properties: Option<String>,
389}
390
391/// A single run row returned from a query.
392#[derive(Clone, Debug, PartialEq, Eq)]
393pub struct RunRow {
394    /// Unique run ID.
395    pub id: String,
396    /// Run kind.
397    pub kind: String,
398    /// Current status.
399    pub status: String,
400    /// JSON-encoded run properties.
401    pub properties: String,
402}
403
404/// A single step row returned from a query.
405#[derive(Clone, Debug, PartialEq, Eq)]
406pub struct StepRow {
407    /// Unique step ID.
408    pub id: String,
409    /// ID of the parent run.
410    pub run_id: String,
411    /// Step kind.
412    pub kind: String,
413    /// Current status.
414    pub status: String,
415    /// JSON-encoded step properties.
416    pub properties: String,
417}
418
419/// A single action row returned from a query.
420#[derive(Clone, Debug, PartialEq, Eq)]
421pub struct ActionRow {
422    /// Unique action ID.
423    pub id: String,
424    /// ID of the parent step.
425    pub step_id: String,
426    /// Action kind.
427    pub kind: String,
428    /// Current status.
429    pub status: String,
430    /// JSON-encoded action properties.
431    pub properties: String,
432}
433
434/// A single row from the `provenance_events` table.
435#[derive(Clone, Debug, PartialEq, Eq)]
436pub struct ProvenanceEvent {
437    pub id: String,
438    pub event_type: String,
439    pub subject: String,
440    pub source_ref: Option<String>,
441    pub metadata_json: String,
442    pub created_at: i64,
443}
444
445/// Result set from executing a flat (non-grouped) compiled query.
446#[derive(Clone, Debug, Default, PartialEq, Eq)]
447pub struct QueryRows {
448    /// Matched node rows.
449    pub nodes: Vec<NodeRow>,
450    /// Runs associated with the matched nodes.
451    pub runs: Vec<RunRow>,
452    /// Steps associated with the matched runs.
453    pub steps: Vec<StepRow>,
454    /// Actions associated with the matched steps.
455    pub actions: Vec<ActionRow>,
456    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
457    /// to degrade to an empty result instead of propagating an error.
458    pub was_degraded: bool,
459}
460
461/// Expansion results for a single root node within a grouped query.
462#[derive(Clone, Debug, PartialEq, Eq)]
463pub struct ExpansionRootRows {
464    /// Logical ID of the root node that seeded this expansion.
465    pub root_logical_id: String,
466    /// Nodes reached by traversing from the root.
467    pub nodes: Vec<NodeRow>,
468}
469
470/// All expansion results for a single named slot across all roots.
471#[derive(Clone, Debug, PartialEq, Eq)]
472pub struct ExpansionSlotRows {
473    /// Name of the expansion slot.
474    pub slot: String,
475    /// Per-root expansion results.
476    pub roots: Vec<ExpansionRootRows>,
477}
478
479/// Result set from executing a grouped compiled query.
480#[derive(Clone, Debug, Default, PartialEq, Eq)]
481pub struct GroupedQueryRows {
482    /// Root node rows matched by the base query.
483    pub roots: Vec<NodeRow>,
484    /// Per-slot expansion results.
485    pub expansions: Vec<ExpansionSlotRows>,
486    /// `true` when a capability miss caused the query to degrade to an empty result.
487    pub was_degraded: bool,
488}
489
490/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
491pub struct ExecutionCoordinator {
492    database_path: PathBuf,
493    schema_manager: Arc<SchemaManager>,
494    pool: ReadPool,
495    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
496    vector_enabled: bool,
497    vec_degradation_warned: AtomicBool,
498    telemetry: Arc<TelemetryCounters>,
499    /// Phase 12.5a: optional read-time query embedder. When present,
500    /// [`Self::execute_retrieval_plan`] invokes it via
501    /// [`Self::fill_vector_branch`] after compile to populate
502    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
503    /// invariant on `search()` is preserved: the vector slot stays empty
504    /// and the coordinator's stage-gating check skips the vector branch.
505    query_embedder: Option<Arc<dyn QueryEmbedder>>,
506    /// Per-kind FTS tokenizer strategies loaded from `projection_profiles`
507    /// at open time. Used during query dispatch for query-side adaptations
508    /// (e.g. short-query skip for trigram, token escaping for source-code).
509    ///
510    /// **Stale-state note**: This map is populated once at `open()` and is
511    /// never refreshed during the lifetime of the coordinator.  If
512    /// `AdminService::set_fts_profile` is called on a running engine, the
513    /// in-memory strategy map will not reflect the change until the engine is
514    /// reopened.  The DB-side profile row is always up-to-date; only the
515    /// query-side adaptation (e.g. the trigram short-query guard) will be stale.
516    fts_strategies: HashMap<String, TokenizerStrategy>,
517}
518
519impl fmt::Debug for ExecutionCoordinator {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        f.debug_struct("ExecutionCoordinator")
522            .field("database_path", &self.database_path)
523            .finish_non_exhaustive()
524    }
525}
526
527impl ExecutionCoordinator {
528    /// # Errors
529    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
530    pub fn open(
531        path: impl AsRef<Path>,
532        schema_manager: Arc<SchemaManager>,
533        vector_dimension: Option<usize>,
534        pool_size: usize,
535        telemetry: Arc<TelemetryCounters>,
536        query_embedder: Option<Arc<dyn QueryEmbedder>>,
537    ) -> Result<Self, EngineError> {
538        let path = path.as_ref().to_path_buf();
539        #[cfg(feature = "sqlite-vec")]
540        let mut conn = if vector_dimension.is_some() {
541            sqlite::open_connection_with_vec(&path)?
542        } else {
543            sqlite::open_connection(&path)?
544        };
545        #[cfg(not(feature = "sqlite-vec"))]
546        let mut conn = sqlite::open_connection(&path)?;
547
548        let report = schema_manager.bootstrap(&conn)?;
549
550        // ----- Open-time rebuild guards for derived FTS state -----
551        //
552        // Property FTS data is derived state. Per-kind `fts_props_<kind>`
553        // virtual tables are NOT source of truth — they must be rebuildable
554        // from canonical `nodes.properties` + `fts_property_schemas` at any
555        // time. After migration 23 the global `fts_node_properties` table no
556        // longer exists; each registered kind has its own `fts_props_<kind>`
557        // table created at first write or first rebuild.
558        //
559        // Guard 1: if any registered kind's per-kind table is missing or empty
560        // while live nodes of that kind exist, do a synchronous full rebuild of
561        // all per-kind FTS tables and position map rows.
562        //
563        // Guard 2: if any recursive schema has a populated per-kind table but
564        // `fts_node_property_positions` is empty, do a synchronous full rebuild
565        // to regenerate the position map from canonical state.
566        //
567        // Both guards are no-ops on a consistent database.
568        run_open_time_fts_guards(&mut conn)?;
569
570        #[cfg(feature = "sqlite-vec")]
571        let mut vector_enabled = report.vector_profile_enabled;
572        #[cfg(not(feature = "sqlite-vec"))]
573        let vector_enabled = {
574            let _ = &report;
575            false
576        };
577
578        // 0.5.0: per-kind vec tables replace the global vec_nodes_active.
579        // The vector_dimension parameter now only sets the vector_enabled flag —
580        // no global table is created. Per-kind vec tables are created by
581        // `regenerate_vector_embeddings` at regen time.
582        if vector_dimension.is_some() {
583            #[cfg(feature = "sqlite-vec")]
584            {
585                vector_enabled = true;
586            }
587        }
588
589        // Vec identity guard: warn (never error) if the stored profile's
590        // model_identity or dimensions differ from the configured embedder.
591        if let Some(ref emb) = query_embedder {
592            check_vec_identity_at_open(&conn, emb.as_ref())?;
593        }
594
595        // Load FTS tokenizer strategies from projection_profiles
596        let fts_strategies: HashMap<String, TokenizerStrategy> = {
597            let mut map = HashMap::new();
598            let mut stmt = conn
599                .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
600            let rows = stmt.query_map([], |row| {
601                let kind: String = row.get(0)?;
602                let config_json: String = row.get(1)?;
603                Ok((kind, config_json))
604            })?;
605            for row in rows.flatten() {
606                let (kind, config_json) = row;
607                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
608                    && let Some(tok) = v["tokenizer"].as_str()
609                {
610                    map.insert(kind, TokenizerStrategy::from_str(tok));
611                }
612            }
613            map
614        };
615
616        // Drop the bootstrap connection — pool connections are used for reads.
617        drop(conn);
618
619        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
620
621        Ok(Self {
622            database_path: path,
623            schema_manager,
624            pool,
625            shape_sql_map: Mutex::new(HashMap::new()),
626            vector_enabled,
627            vec_degradation_warned: AtomicBool::new(false),
628            telemetry,
629            query_embedder,
630            fts_strategies,
631        })
632    }
633
634    /// Returns the filesystem path to the `SQLite` database.
635    pub fn database_path(&self) -> &Path {
636        &self.database_path
637    }
638
639    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
640    #[must_use]
641    pub fn vector_enabled(&self) -> bool {
642        self.vector_enabled
643    }
644
645    /// Returns the configured read-time query embedder, if any.
646    ///
647    /// The 0.4.0 write-time parity work reuses this same embedder for
648    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
649    /// so there is always exactly one source of truth for vector
650    /// identity per [`Engine`] instance.
651    #[must_use]
652    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
653        self.query_embedder.as_ref()
654    }
655
656    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
657        self.pool.acquire()
658    }
659
660    /// Aggregate `SQLite` page-cache counters across all pool connections.
661    ///
662    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
663    /// Connections that are currently locked by a query are skipped — this
664    /// is acceptable for statistical counters.
665    #[must_use]
666    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
667        let mut total = SqliteCacheStatus::default();
668        for conn_mutex in &self.pool.connections {
669            if let Ok(conn) = conn_mutex.try_lock() {
670                total.add(&read_db_cache_status(&conn));
671            }
672        }
673        total
674    }
675
676    /// # Errors
677    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
678    #[allow(clippy::expect_used, clippy::too_many_lines)]
679    pub fn execute_compiled_read(
680        &self,
681        compiled: &CompiledQuery,
682    ) -> Result<QueryRows, EngineError> {
683        // Scan fallback for first-registration async rebuild: if the query uses the
684        // FtsNodes driving table and the root kind has is_first_registration=1 with
685        // state IN ('PENDING','BUILDING'), the per-kind table has no rows yet.
686        // Route to a full-kind node scan so callers get results instead of empty.
687        if compiled.driving_table == DrivingTable::FtsNodes
688            && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
689            && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
690        {
691            self.telemetry.increment_queries();
692            return Ok(QueryRows {
693                nodes,
694                runs: Vec::new(),
695                steps: Vec::new(),
696                actions: Vec::new(),
697                was_degraded: false,
698            });
699        }
700
701        // For FtsNodes queries the fathomdb-query compile path generates SQL that
702        // references the old global `fts_node_properties` table.  Since migration 23
703        // dropped that table, we rewrite the SQL and binds here to use the per-kind
704        // `fts_props_<kind>` table (or omit the property UNION arm entirely when the
705        // per-kind table does not yet exist).
706        //
707        // For VecNodes queries the fathomdb-query compile path generates SQL that
708        // hardcodes `vec_nodes_active`.  Since 0.5.0 uses per-kind tables, we rewrite
709        // `vec_nodes_active` to `vec_<root_kind>` here.
710        let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
711            let conn_check = match self.lock_connection() {
712                Ok(g) => g,
713                Err(e) => {
714                    self.telemetry.increment_errors();
715                    return Err(e);
716                }
717            };
718            let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
719            drop(conn_check);
720            result?
721        } else if compiled.driving_table == DrivingTable::VecNodes {
722            let root_kind = compiled
723                .binds
724                .get(1)
725                .and_then(|b| {
726                    if let BindValue::Text(k) = b {
727                        Some(k.as_str())
728                    } else {
729                        None
730                    }
731                })
732                .unwrap_or("");
733            let vec_table = if root_kind.is_empty() {
734                "vec__unknown".to_owned()
735            } else {
736                fathomdb_schema::vec_kind_table_name(root_kind)
737            };
738            let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
739            (new_sql, compiled.binds.clone())
740        } else {
741            (compiled.sql.clone(), compiled.binds.clone())
742        };
743
744        let row_sql = wrap_node_row_projection_sql(&adapted_sql);
745        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
746        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
747        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
748        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
749        // so we propagate EngineError there instead.
750        {
751            let mut cache = self
752                .shape_sql_map
753                .lock()
754                .unwrap_or_else(PoisonError::into_inner);
755            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
756                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
757                cache.clear();
758            }
759            cache.insert(compiled.shape_hash, row_sql.clone());
760        }
761
762        let bind_values = adapted_binds
763            .iter()
764            .map(bind_value_to_sql)
765            .collect::<Vec<_>>();
766
767        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
768        // shape_sql_map uses into_inner() (pure cache, safe to recover).
769        // conn uses map_err → EngineError (connection state may be corrupt after panic;
770        // into_inner() would risk using a connection with partial transaction state).
771        let conn_guard = match self.lock_connection() {
772            Ok(g) => g,
773            Err(e) => {
774                self.telemetry.increment_errors();
775                return Err(e);
776            }
777        };
778        let mut statement = match conn_guard.prepare_cached(&row_sql) {
779            Ok(stmt) => stmt,
780            Err(e) if is_vec_table_absent(&e) => {
781                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
782                    trace_warn!("vector table absent, degrading to non-vector query");
783                }
784                return Ok(QueryRows {
785                    was_degraded: true,
786                    ..Default::default()
787                });
788            }
789            Err(e) => {
790                self.telemetry.increment_errors();
791                return Err(EngineError::Sqlite(e));
792            }
793        };
794        let nodes = match statement
795            .query_map(params_from_iter(bind_values.iter()), |row| {
796                Ok(NodeRow {
797                    row_id: row.get(0)?,
798                    logical_id: row.get(1)?,
799                    kind: row.get(2)?,
800                    properties: row.get(3)?,
801                    content_ref: row.get(4)?,
802                    last_accessed_at: row.get(5)?,
803                    edge_properties: None,
804                })
805            })
806            .and_then(Iterator::collect)
807        {
808            Ok(rows) => rows,
809            Err(e) => {
810                self.telemetry.increment_errors();
811                return Err(EngineError::Sqlite(e));
812            }
813        };
814
815        self.telemetry.increment_queries();
816        Ok(QueryRows {
817            nodes,
818            runs: Vec::new(),
819            steps: Vec::new(),
820            actions: Vec::new(),
821            was_degraded: false,
822        })
823    }
824
825    /// Execute a compiled adaptive search and return matching hits.
826    ///
827    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
828    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
829    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
830    /// while residual predicates (JSON path filters) stay in the outer
831    /// `WHERE`. The chunk and property FTS
832    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
833    /// better matches), ordered, and limited. All hits return
834    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
835    /// later phases.
836    ///
837    /// # Errors
838    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
839    pub fn execute_compiled_search(
840        &self,
841        compiled: &CompiledSearch,
842    ) -> Result<SearchRows, EngineError> {
843        // Build the two-branch plan from the strict text query and delegate
844        // to the shared plan-execution routine. The relaxed branch is derived
845        // via `derive_relaxed` and only fires when strict returned fewer than
846        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
847        // "relaxed iff strict is empty," but the routine spells the rule out
848        // explicitly so raising K later is a one-line constant bump.
849        let (relaxed_query, was_degraded_at_plan_time) =
850            fathomdb_query::derive_relaxed(&compiled.text_query);
851        let relaxed = relaxed_query.map(|q| CompiledSearch {
852            root_kind: compiled.root_kind.clone(),
853            text_query: q,
854            limit: compiled.limit,
855            fusable_filters: compiled.fusable_filters.clone(),
856            residual_filters: compiled.residual_filters.clone(),
857            attribution_requested: compiled.attribution_requested,
858        });
859        let plan = CompiledSearchPlan {
860            strict: compiled.clone(),
861            relaxed,
862            was_degraded_at_plan_time,
863        };
864        self.execute_compiled_search_plan(&plan)
865    }
866
867    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
868    /// deduped result rows.
869    ///
870    /// This is the shared retrieval/merge routine that both
871    /// [`Self::execute_compiled_search`] (adaptive path) and
872    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
873    /// runs first; the relaxed branch only fires when it is present AND the
874    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
875    /// hits. Merge and dedup semantics are identical to the adaptive path
876    /// regardless of how the plan was constructed.
877    ///
878    /// Error contract: if the relaxed branch errors, the error propagates;
879    /// strict hits are not returned. This matches the rest of the engine's
880    /// fail-hard posture.
881    ///
882    /// # Errors
883    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
884    /// executed.
885    pub fn execute_compiled_search_plan(
886        &self,
887        plan: &CompiledSearchPlan,
888    ) -> Result<SearchRows, EngineError> {
889        let strict = &plan.strict;
890        let limit = strict.limit;
891        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
892
893        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
894        let strict_underfilled = strict_hits.len() < fallback_threshold;
895
896        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
897        let mut fallback_used = false;
898        let mut was_degraded = false;
899        if let Some(relaxed) = plan.relaxed.as_ref()
900            && strict_underfilled
901        {
902            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
903            fallback_used = true;
904            was_degraded = plan.was_degraded_at_plan_time;
905        }
906
907        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
908        // Attribution runs AFTER dedup so that duplicate hits dropped by
909        // `merge_search_branches` do not waste a highlight+position-map
910        // lookup.
911        if strict.attribution_requested {
912            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
913            self.populate_attribution_for_hits(
914                &mut merged,
915                &strict.text_query,
916                relaxed_text_query,
917            )?;
918        }
919        let strict_hit_count = merged
920            .iter()
921            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
922            .count();
923        let relaxed_hit_count = merged
924            .iter()
925            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
926            .count();
927        // Phase 10: no vector execution path yet, so vector_hit_count is
928        // always zero. Future phases that wire a vector branch will
929        // contribute here.
930        let vector_hit_count = 0;
931
932        Ok(SearchRows {
933            hits: merged,
934            strict_hit_count,
935            relaxed_hit_count,
936            vector_hit_count,
937            fallback_used,
938            was_degraded,
939        })
940    }
941
942    /// Execute a compiled vector-only search and return matching hits.
943    ///
944    /// Phase 11 delivers the standalone vector retrieval path. The emitted
945    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
946    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
947    /// into the candidate CTE. The outer `SELECT` applies residual JSON
948    /// predicates and orders by score descending, where `score = -distance`
949    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
950    ///
951    /// ## Capability-miss handling
952    ///
953    /// If the `sqlite-vec` capability is absent (feature disabled or the
954    /// `vec_nodes_active` virtual table has not been created because the
955    /// engine was not opened with a `vector_dimension`), this method returns
956    /// an empty [`SearchRows`] with `was_degraded = true`. This is
957    /// **non-fatal** — the error does not propagate — matching the addendum's
958    /// §Vector-Specific Behavior / Degradation.
959    ///
960    /// ## Attribution
961    ///
962    /// When `compiled.attribution_requested == true`, every returned hit
963    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
964    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
965    /// extended uniformly).
966    ///
967    /// # Errors
968    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
969    /// executed for reasons other than a vec-table capability miss.
970    #[allow(clippy::too_many_lines)]
971    pub fn execute_compiled_vector_search(
972        &self,
973        compiled: &CompiledVectorSearch,
974    ) -> Result<SearchRows, EngineError> {
975        use std::fmt::Write as _;
976
977        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
978        // empty result rather than a SQL error from `LIMIT 0` semantics in
979        // the inner vec0 scan.
980        if compiled.limit == 0 {
981            return Ok(SearchRows::default());
982        }
983
984        let filter_by_kind = !compiled.root_kind.is_empty();
985        let mut binds: Vec<BindValue> = Vec::new();
986        binds.push(BindValue::Text(compiled.query_text.clone()));
987        if filter_by_kind {
988            binds.push(BindValue::Text(compiled.root_kind.clone()));
989        }
990
991        // Build fusable-filter clauses, aliased against `src` inside the
992        // candidate CTE. Same predicate set the text path fuses.
993        let mut fused_clauses = String::new();
994        for predicate in &compiled.fusable_filters {
995            match predicate {
996                Predicate::KindEq(kind) => {
997                    binds.push(BindValue::Text(kind.clone()));
998                    let idx = binds.len();
999                    let _ = write!(
1000                        fused_clauses,
1001                        "\n                      AND src.kind = ?{idx}"
1002                    );
1003                }
1004                Predicate::LogicalIdEq(logical_id) => {
1005                    binds.push(BindValue::Text(logical_id.clone()));
1006                    let idx = binds.len();
1007                    let _ = write!(
1008                        fused_clauses,
1009                        "\n                      AND src.logical_id = ?{idx}"
1010                    );
1011                }
1012                Predicate::SourceRefEq(source_ref) => {
1013                    binds.push(BindValue::Text(source_ref.clone()));
1014                    let idx = binds.len();
1015                    let _ = write!(
1016                        fused_clauses,
1017                        "\n                      AND src.source_ref = ?{idx}"
1018                    );
1019                }
1020                Predicate::ContentRefEq(uri) => {
1021                    binds.push(BindValue::Text(uri.clone()));
1022                    let idx = binds.len();
1023                    let _ = write!(
1024                        fused_clauses,
1025                        "\n                      AND src.content_ref = ?{idx}"
1026                    );
1027                }
1028                Predicate::ContentRefNotNull => {
1029                    fused_clauses
1030                        .push_str("\n                      AND src.content_ref IS NOT NULL");
1031                }
1032                Predicate::JsonPathFusedEq { path, value } => {
1033                    binds.push(BindValue::Text(path.clone()));
1034                    let path_idx = binds.len();
1035                    binds.push(BindValue::Text(value.clone()));
1036                    let value_idx = binds.len();
1037                    let _ = write!(
1038                        fused_clauses,
1039                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1040                    );
1041                }
1042                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1043                    binds.push(BindValue::Text(path.clone()));
1044                    let path_idx = binds.len();
1045                    binds.push(BindValue::Integer(*value));
1046                    let value_idx = binds.len();
1047                    let operator = match op {
1048                        ComparisonOp::Gt => ">",
1049                        ComparisonOp::Gte => ">=",
1050                        ComparisonOp::Lt => "<",
1051                        ComparisonOp::Lte => "<=",
1052                    };
1053                    let _ = write!(
1054                        fused_clauses,
1055                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
1056                    );
1057                }
1058                Predicate::JsonPathFusedBoolEq { path, value } => {
1059                    binds.push(BindValue::Text(path.clone()));
1060                    let path_idx = binds.len();
1061                    binds.push(BindValue::Integer(i64::from(*value)));
1062                    let value_idx = binds.len();
1063                    let _ = write!(
1064                        fused_clauses,
1065                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1066                    );
1067                }
1068                Predicate::JsonPathFusedIn { path, values } => {
1069                    binds.push(BindValue::Text(path.clone()));
1070                    let first_param = binds.len();
1071                    for v in values {
1072                        binds.push(BindValue::Text(v.clone()));
1073                    }
1074                    let placeholders = (1..=values.len())
1075                        .map(|i| format!("?{}", first_param + i))
1076                        .collect::<Vec<_>>()
1077                        .join(", ");
1078                    let _ = write!(
1079                        fused_clauses,
1080                        "\n                      AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
1081                    );
1082                }
1083                Predicate::JsonPathEq { .. }
1084                | Predicate::JsonPathCompare { .. }
1085                | Predicate::JsonPathIn { .. }
1086                | Predicate::EdgePropertyEq { .. }
1087                | Predicate::EdgePropertyCompare { .. } => {
1088                    // JSON predicates are residual; compile_vector_search
1089                    // guarantees they never appear here, but stay defensive.
1090                    // Edge property predicates are not valid in the search path.
1091                }
1092            }
1093        }
1094
1095        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
1096        let mut filter_clauses = String::new();
1097        for predicate in &compiled.residual_filters {
1098            match predicate {
1099                Predicate::JsonPathEq { path, value } => {
1100                    binds.push(BindValue::Text(path.clone()));
1101                    let path_idx = binds.len();
1102                    binds.push(scalar_to_bind(value));
1103                    let value_idx = binds.len();
1104                    let _ = write!(
1105                        filter_clauses,
1106                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1107                    );
1108                }
1109                Predicate::JsonPathCompare { path, op, value } => {
1110                    binds.push(BindValue::Text(path.clone()));
1111                    let path_idx = binds.len();
1112                    binds.push(scalar_to_bind(value));
1113                    let value_idx = binds.len();
1114                    let operator = match op {
1115                        ComparisonOp::Gt => ">",
1116                        ComparisonOp::Gte => ">=",
1117                        ComparisonOp::Lt => "<",
1118                        ComparisonOp::Lte => "<=",
1119                    };
1120                    let _ = write!(
1121                        filter_clauses,
1122                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1123                    );
1124                }
1125                Predicate::JsonPathIn { path, values } => {
1126                    binds.push(BindValue::Text(path.clone()));
1127                    let first_param = binds.len();
1128                    for v in values {
1129                        binds.push(scalar_to_bind(v));
1130                    }
1131                    let placeholders = (1..=values.len())
1132                        .map(|i| format!("?{}", first_param + i))
1133                        .collect::<Vec<_>>()
1134                        .join(", ");
1135                    let _ = write!(
1136                        filter_clauses,
1137                        "\n  AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1138                    );
1139                }
1140                Predicate::KindEq(_)
1141                | Predicate::LogicalIdEq(_)
1142                | Predicate::SourceRefEq(_)
1143                | Predicate::ContentRefEq(_)
1144                | Predicate::ContentRefNotNull
1145                | Predicate::JsonPathFusedEq { .. }
1146                | Predicate::JsonPathFusedTimestampCmp { .. }
1147                | Predicate::JsonPathFusedBoolEq { .. }
1148                | Predicate::JsonPathFusedIn { .. }
1149                | Predicate::EdgePropertyEq { .. }
1150                | Predicate::EdgePropertyCompare { .. } => {
1151                    // Fusable predicates live in fused_clauses above.
1152                    // Edge property predicates are not valid in the search path.
1153                }
1154            }
1155        }
1156
1157        // Bind the outer limit as a named parameter for prepare_cached
1158        // stability across calls that vary only by limit value.
1159        let limit = compiled.limit;
1160        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1161        let limit_idx = binds.len();
1162
1163        // sqlite-vec requires the LIMIT/k constraint to be visible directly
1164        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
1165        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
1166        // Phase 12's planner may raise this to compensate for fusion
1167        // narrowing the candidate pool).
1168        let base_limit = limit;
1169        let kind_clause = if filter_by_kind {
1170            "\n                      AND src.kind = ?2"
1171        } else {
1172            ""
1173        };
1174
1175        // Derive the per-kind vec table name from the query's root kind.
1176        // An empty root_kind defaults to a sentinel that will not exist, causing
1177        // graceful degradation (was_degraded=true).
1178        let vec_table = if compiled.root_kind.is_empty() {
1179            "vec__unknown".to_owned()
1180        } else {
1181            fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1182        };
1183
1184        let sql = format!(
1185            "WITH vector_hits AS (
1186                SELECT
1187                    src.row_id AS row_id,
1188                    src.logical_id AS logical_id,
1189                    src.kind AS kind,
1190                    src.properties AS properties,
1191                    src.source_ref AS source_ref,
1192                    src.content_ref AS content_ref,
1193                    src.created_at AS created_at,
1194                    vc.distance AS distance,
1195                    vc.chunk_id AS chunk_id
1196                FROM (
1197                    SELECT chunk_id, distance
1198                    FROM {vec_table}
1199                    WHERE embedding MATCH ?1
1200                    LIMIT {base_limit}
1201                ) vc
1202                JOIN chunks c ON c.id = vc.chunk_id
1203                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1204                WHERE 1 = 1{kind_clause}{fused_clauses}
1205            )
1206            SELECT
1207                h.row_id,
1208                h.logical_id,
1209                h.kind,
1210                h.properties,
1211                h.content_ref,
1212                am.last_accessed_at,
1213                h.created_at,
1214                h.distance,
1215                h.chunk_id
1216            FROM vector_hits h
1217            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1218            WHERE 1 = 1{filter_clauses}
1219            ORDER BY h.distance ASC
1220            LIMIT ?{limit_idx}"
1221        );
1222
1223        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1224
1225        let conn_guard = match self.lock_connection() {
1226            Ok(g) => g,
1227            Err(e) => {
1228                self.telemetry.increment_errors();
1229                return Err(e);
1230            }
1231        };
1232        let mut statement = match conn_guard.prepare_cached(&sql) {
1233            Ok(stmt) => stmt,
1234            Err(e) if is_vec_table_absent(&e) => {
1235                // Capability miss: non-fatal — surface as was_degraded.
1236                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1237                    trace_warn!("vector table absent, degrading vector_search to empty result");
1238                }
1239                return Ok(SearchRows {
1240                    hits: Vec::new(),
1241                    strict_hit_count: 0,
1242                    relaxed_hit_count: 0,
1243                    vector_hit_count: 0,
1244                    fallback_used: false,
1245                    was_degraded: true,
1246                });
1247            }
1248            Err(e) => {
1249                self.telemetry.increment_errors();
1250                return Err(EngineError::Sqlite(e));
1251            }
1252        };
1253
1254        let attribution_requested = compiled.attribution_requested;
1255        let hits = match statement
1256            .query_map(params_from_iter(bind_values.iter()), |row| {
1257                let distance: f64 = row.get(7)?;
1258                // Score is the negated distance per addendum 1
1259                // §Vector-Specific Behavior / Score and distance. For
1260                // distance metrics (sqlite-vec's default) lower distance =
1261                // better match, so negating yields the higher-is-better
1262                // convention that dedup_branch_hits and the unified result
1263                // surface rely on.
1264                let score = -distance;
1265                Ok(SearchHit {
1266                    node: fathomdb_query::NodeRowLite {
1267                        row_id: row.get(0)?,
1268                        logical_id: row.get(1)?,
1269                        kind: row.get(2)?,
1270                        properties: row.get(3)?,
1271                        content_ref: row.get(4)?,
1272                        last_accessed_at: row.get(5)?,
1273                    },
1274                    written_at: row.get(6)?,
1275                    score,
1276                    modality: RetrievalModality::Vector,
1277                    source: SearchHitSource::Vector,
1278                    // Vector hits have no strict/relaxed notion.
1279                    match_mode: None,
1280                    // Vector hits have no snippet.
1281                    snippet: None,
1282                    projection_row_id: row.get::<_, Option<String>>(8)?,
1283                    vector_distance: Some(distance),
1284                    attribution: if attribution_requested {
1285                        Some(HitAttribution {
1286                            matched_paths: Vec::new(),
1287                        })
1288                    } else {
1289                        None
1290                    },
1291                })
1292            })
1293            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1294        {
1295            Ok(rows) => rows,
1296            Err(e) => {
1297                // Some SQLite errors surface during row iteration (e.g. when
1298                // the vec0 extension is not loaded but the table exists as a
1299                // stub). Classify as capability-miss when the shape matches.
1300                if is_vec_table_absent(&e) {
1301                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1302                        trace_warn!(
1303                            "vector table absent at query time, degrading vector_search to empty result"
1304                        );
1305                    }
1306                    drop(statement);
1307                    drop(conn_guard);
1308                    return Ok(SearchRows {
1309                        hits: Vec::new(),
1310                        strict_hit_count: 0,
1311                        relaxed_hit_count: 0,
1312                        vector_hit_count: 0,
1313                        fallback_used: false,
1314                        was_degraded: true,
1315                    });
1316                }
1317                self.telemetry.increment_errors();
1318                return Err(EngineError::Sqlite(e));
1319            }
1320        };
1321
1322        drop(statement);
1323        drop(conn_guard);
1324
1325        self.telemetry.increment_queries();
1326        let vector_hit_count = hits.len();
1327        Ok(SearchRows {
1328            hits,
1329            strict_hit_count: 0,
1330            relaxed_hit_count: 0,
1331            vector_hit_count,
1332            fallback_used: false,
1333            was_degraded: false,
1334        })
1335    }
1336
1337    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1338    /// entry point) and return deterministically ranked, block-ordered
1339    /// [`SearchRows`].
1340    ///
1341    /// Stages, per addendum 1 §Retrieval Planner Model:
1342    ///
1343    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1344    ///    empty branch result inside `run_search_branch`).
1345    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1346    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1347    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1348    ///    Phase 6 text-only path.
1349    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1350    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1351    ///    planner never wires a vector branch through `search()`, so this
1352    ///    code path is structurally present but dormant.** A future phase
1353    ///    that wires read-time embedding into `compile_retrieval_plan` will
1354    ///    immediately light it up.
1355    /// 4. **Fusion.** All collected hits are merged via
1356    ///    [`merge_search_branches_three`], which produces strict ->
1357    ///    relaxed -> vector block ordering with cross-branch dedup
1358    ///    resolved by branch precedence.
1359    ///
1360    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1361    /// addendum's "vector capability miss => `was_degraded`" semantics
1362    /// applies to `search()` only when the unified planner actually fires
1363    /// the vector branch, which v1 never does.
1364    ///
1365    /// # Errors
1366    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1367    /// executed for a non-capability-miss reason.
1368    pub fn execute_retrieval_plan(
1369        &self,
1370        plan: &CompiledRetrievalPlan,
1371        raw_query: &str,
1372    ) -> Result<SearchRows, EngineError> {
1373        // Phase 12.5a: we work against a local owned copy so
1374        // `fill_vector_branch` can mutate `plan.vector` and
1375        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1376        // a bounded set of predicates + text AST nodes) and avoids
1377        // forcing callers to hand us `&mut` access.
1378        let mut plan = plan.clone();
1379        let limit = plan.text.strict.limit;
1380
1381        // Stage 1: text strict.
1382        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1383
1384        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1385        // path uses.
1386        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1387        let strict_underfilled = strict_hits.len() < fallback_threshold;
1388        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1389        let mut fallback_used = false;
1390        let mut was_degraded = false;
1391        if let Some(relaxed) = plan.text.relaxed.as_ref()
1392            && strict_underfilled
1393        {
1394            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1395            fallback_used = true;
1396            was_degraded = plan.was_degraded_at_plan_time;
1397        }
1398
1399        // Phase 12.5a: fill the vector branch from the configured
1400        // read-time query embedder, if any. Option (b) from the spec:
1401        // only pay the embedding cost when the text branches returned
1402        // nothing, because the three-branch stage gate below only runs
1403        // the vector stage under exactly that condition. This keeps the
1404        // hot path (strict text matched) embedder-free.
1405        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1406        if text_branches_empty && self.query_embedder.is_some() {
1407            self.fill_vector_branch(&mut plan, raw_query);
1408        }
1409
1410        // Stage 3: vector. Runs only when text retrieval is empty AND a
1411        // vector branch is present. When no embedder is configured (Phase
1412        // 12.5a default) `plan.vector` stays `None` and this stage is a
1413        // no-op, preserving the Phase 12 v1 dormancy invariant.
1414        let mut vector_hits: Vec<SearchHit> = Vec::new();
1415        if let Some(vector) = plan.vector.as_ref()
1416            && strict_hits.is_empty()
1417            && relaxed_hits.is_empty()
1418        {
1419            let vector_rows = self.execute_compiled_vector_search(vector)?;
1420            // `execute_compiled_vector_search` returns a fully populated
1421            // `SearchRows`. Promote its hits into the merge stage and lift
1422            // its capability-miss `was_degraded` flag onto the unified
1423            // result, per addendum §Vector-Specific Behavior.
1424            vector_hits = vector_rows.hits;
1425            if vector_rows.was_degraded {
1426                was_degraded = true;
1427            }
1428        }
1429        // Phase 12.5a: an embedder-reported capability miss surfaces as
1430        // `plan.was_degraded_at_plan_time = true` set inside
1431        // `fill_vector_branch`. Lift it onto the response so callers see
1432        // the graceful degradation even when the vector stage-gate never
1433        // had the chance to fire (the embedder call itself failed and
1434        // the vector slot stayed `None`).
1435        if text_branches_empty
1436            && plan.was_degraded_at_plan_time
1437            && plan.vector.is_none()
1438            && self.query_embedder.is_some()
1439        {
1440            was_degraded = true;
1441        }
1442
1443        // Stage 4: fusion.
1444        let strict = &plan.text.strict;
1445        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1446        if strict.attribution_requested {
1447            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1448            self.populate_attribution_for_hits(
1449                &mut merged,
1450                &strict.text_query,
1451                relaxed_text_query,
1452            )?;
1453        }
1454
1455        let strict_hit_count = merged
1456            .iter()
1457            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1458            .count();
1459        let relaxed_hit_count = merged
1460            .iter()
1461            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1462            .count();
1463        let vector_hit_count = merged
1464            .iter()
1465            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1466            .count();
1467
1468        Ok(SearchRows {
1469            hits: merged,
1470            strict_hit_count,
1471            relaxed_hit_count,
1472            vector_hit_count,
1473            fallback_used,
1474            was_degraded,
1475        })
1476    }
1477
1478    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1479    /// query embedder, if any.
1480    ///
1481    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1482    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1483    /// - Both the strict and relaxed text branches already ran and
1484    ///   returned zero hits, so the existing three-branch stage gate
1485    ///   will actually fire the vector stage once the slot is populated.
1486    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1487    ///   cost entirely when text retrieval already won.
1488    ///
1489    /// Contract: never panics, never returns an error. On embedder error
1490    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1491    /// `plan.vector` as `None`; the coordinator's normal error-free
1492    /// degradation path then reports `was_degraded` on the result.
1493    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1494        let Some(embedder) = self.query_embedder.as_ref() else {
1495            return;
1496        };
1497        match embedder.embed_query(raw_query) {
1498            Ok(vec) => {
1499                // `CompiledVectorSearch::query_text` is a JSON float-array
1500                // literal at the time the coordinator binds it (see the
1501                // `CompiledVectorSearch` docs). `serde_json::to_string`
1502                // on a `Vec<f32>` produces exactly that shape — no
1503                // wire-format change required.
1504                let literal = match serde_json::to_string(&vec) {
1505                    Ok(s) => s,
1506                    Err(err) => {
1507                        trace_warn!(
1508                            error = %err,
1509                            "query embedder vector serialization failed; skipping vector branch"
1510                        );
1511                        let _ = err; // Used by trace_warn! when tracing feature is active
1512                        plan.was_degraded_at_plan_time = true;
1513                        return;
1514                    }
1515                };
1516                let strict = &plan.text.strict;
1517                plan.vector = Some(CompiledVectorSearch {
1518                    root_kind: strict.root_kind.clone(),
1519                    query_text: literal,
1520                    limit: strict.limit,
1521                    fusable_filters: strict.fusable_filters.clone(),
1522                    residual_filters: strict.residual_filters.clone(),
1523                    attribution_requested: strict.attribution_requested,
1524                });
1525            }
1526            Err(err) => {
1527                trace_warn!(
1528                    error = %err,
1529                    "query embedder unavailable, skipping vector branch"
1530                );
1531                let _ = err; // Used by trace_warn! when tracing feature is active
1532                plan.was_degraded_at_plan_time = true;
1533            }
1534        }
1535    }
1536
1537    /// Execute a single search branch against the underlying FTS surfaces.
1538    ///
1539    /// This is the shared SQL emission path used by
1540    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1541    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1542    /// The returned hits are tagged with `branch`'s corresponding
1543    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1544    /// caller is responsible for merging multiple branches.
1545    #[allow(clippy::too_many_lines)]
1546    fn run_search_branch(
1547        &self,
1548        compiled: &CompiledSearch,
1549        branch: SearchBranch,
1550    ) -> Result<Vec<SearchHit>, EngineError> {
1551        use std::fmt::Write as _;
1552        // Short-circuit an empty/whitespace-only query: rendering it would
1553        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1554        // (including the adaptive path when strict is Empty and derive_relaxed
1555        // returns None) must see an empty result, not an error. Each branch
1556        // is short-circuited independently so a strict-Empty + relaxed-Some
1557        // plan still exercises the relaxed branch.
1558        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1559        // matches "every row not containing X" — a complement-of-corpus scan
1560        // that no caller would intentionally want. Short-circuit to empty at
1561        // the root only; a `Not` nested inside an `And` is a legitimate
1562        // exclusion and must still run.
1563        if matches!(
1564            compiled.text_query,
1565            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1566        ) {
1567            return Ok(Vec::new());
1568        }
1569        let rendered_base = render_text_query_fts5(&compiled.text_query);
1570        // Apply per-kind query-side tokenizer adaptations.
1571        // SubstringTrigram: queries shorter than 3 chars produce no results
1572        // (trigram index cannot match them). Return empty instead of an FTS5
1573        // error or a full-table scan.
1574        // SourceCode: render_text_query_fts5 already wraps every term in FTS5
1575        // double-quote delimiters (e.g. "std.io"). Applying escape_source_code_query
1576        // on that already-rendered output corrupts the expression — the escape
1577        // function sees the surrounding '"' characters and '.' separator and
1578        // produces malformed syntax like '"std"."io""'. The phrase quoting from
1579        // render_text_query_fts5 is sufficient: FTS5 applies the tokenizer
1580        // (including custom tokenchars) inside double-quoted phrase expressions,
1581        // so "std.io" correctly matches the single token 'std.io'.
1582        let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1583        if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1584            && rendered_base
1585                .chars()
1586                .filter(|c| c.is_alphanumeric())
1587                .count()
1588                < 3
1589        {
1590            return Ok(Vec::new());
1591        }
1592        let rendered = rendered_base;
1593        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1594        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1595        // The adaptive `text_search()` path never produces an empty root_kind
1596        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1597        // the entry point.
1598        let filter_by_kind = !compiled.root_kind.is_empty();
1599
1600        // Acquire the connection early so we can check per-kind table existence
1601        // before building the bind array and SQL. The bind numbering depends on
1602        // whether the property FTS UNION arm is included.
1603        let conn_guard = match self.lock_connection() {
1604            Ok(g) => g,
1605            Err(e) => {
1606                self.telemetry.increment_errors();
1607                return Err(e);
1608            }
1609        };
1610
1611        // Determine which per-kind property FTS tables to include in the UNION arm.
1612        //
1613        // filter_by_kind = true (root_kind set): include the single per-kind table
1614        //   for root_kind if it exists. Bind order: ?1=chunk_text, ?2=kind, ?3=prop_text.
1615        //
1616        // filter_by_kind = false (fallback search, root_kind empty): include per-kind tables
1617        //   based on fusable KindEq predicates or all registered tables from sqlite_master.
1618        //   A single KindEq in fusable_filters lets us use one specific table. With no KindEq
1619        //   we UNION all per-kind tables so kind-less fallback searches still find property
1620        //   hits (matching the behaviour of the former global fts_node_properties table).
1621        //   Bind order: ?1=chunk_text, ?2=prop_text (shared across all prop UNION arms).
1622        //
1623        // In both cases, per-kind tables are already kind-specific so no `fp.kind = ?` filter
1624        // is needed inside the inner arm; the outer `search_hits` CTE WHERE clause handles
1625        // any further kind narrowing via fused KindEq predicates.
1626        // prop_fts_tables is Vec<(kind, table_name)> so we can later look up per-kind
1627        // BM25 weights from fts_property_schemas when building the scoring expression.
1628        let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1629            let kind = compiled.root_kind.clone();
1630            let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1631            let exists: bool = conn_guard
1632                .query_row(
1633                    "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1634                    rusqlite::params![prop_table],
1635                    |_| Ok(true),
1636                )
1637                .optional()
1638                .map_err(EngineError::Sqlite)?
1639                .unwrap_or(false);
1640            if exists {
1641                vec![(kind, prop_table)]
1642            } else {
1643                vec![]
1644            }
1645        } else {
1646            // Fallback / kind-less search: find the right per-kind tables.
1647            // If there is exactly one KindEq in fusable_filters, use that kind's table.
1648            // Otherwise, include all registered per-kind tables from sqlite_master so
1649            // that kind-less fallback searches can still return property FTS hits.
1650            let kind_eq_values: Vec<String> = compiled
1651                .fusable_filters
1652                .iter()
1653                .filter_map(|p| match p {
1654                    Predicate::KindEq(k) => Some(k.clone()),
1655                    _ => None,
1656                })
1657                .collect();
1658            if kind_eq_values.len() == 1 {
1659                let kind = kind_eq_values[0].clone();
1660                let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1661                let exists: bool = conn_guard
1662                    .query_row(
1663                        "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1664                        rusqlite::params![prop_table],
1665                        |_| Ok(true),
1666                    )
1667                    .optional()
1668                    .map_err(EngineError::Sqlite)?
1669                    .unwrap_or(false);
1670                if exists {
1671                    vec![(kind, prop_table)]
1672                } else {
1673                    vec![]
1674                }
1675            } else {
1676                // No single KindEq: UNION all per-kind tables so kind-less fallback
1677                // searches behave like the former global fts_node_properties table.
1678                // Fetch registered kinds and compute/verify their per-kind table names.
1679                let mut stmt = conn_guard
1680                    .prepare("SELECT kind FROM fts_property_schemas")
1681                    .map_err(EngineError::Sqlite)?;
1682                let all_kinds: Vec<String> = stmt
1683                    .query_map([], |r| r.get::<_, String>(0))
1684                    .map_err(EngineError::Sqlite)?
1685                    .collect::<Result<Vec<_>, _>>()
1686                    .map_err(EngineError::Sqlite)?;
1687                drop(stmt);
1688                let mut result = Vec::new();
1689                for kind in all_kinds {
1690                    let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1691                    let exists: bool = conn_guard
1692                        .query_row(
1693                            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1694                            rusqlite::params![prop_table],
1695                            |_| Ok(true),
1696                        )
1697                        .optional()
1698                        .map_err(EngineError::Sqlite)?
1699                        .unwrap_or(false);
1700                    if exists {
1701                        result.push((kind, prop_table));
1702                    }
1703                }
1704                result
1705            }
1706        };
1707        let use_prop_fts = !prop_fts_tables.is_empty();
1708
1709        // Bind layout (before fused/residual predicates and limit):
1710        //   filter_by_kind = true,  use_prop_fts = true:  ?1=chunk_text, ?2=kind, ?3=prop_text
1711        //   filter_by_kind = true,  use_prop_fts = false: ?1=chunk_text, ?2=kind
1712        //   filter_by_kind = false, use_prop_fts = true:  ?1=chunk_text, ?2=prop_text
1713        //   filter_by_kind = false, use_prop_fts = false: ?1=chunk_text
1714        let mut binds: Vec<BindValue> = if filter_by_kind {
1715            if use_prop_fts {
1716                vec![
1717                    BindValue::Text(rendered.clone()),
1718                    BindValue::Text(compiled.root_kind.clone()),
1719                    BindValue::Text(rendered),
1720                ]
1721            } else {
1722                vec![
1723                    BindValue::Text(rendered.clone()),
1724                    BindValue::Text(compiled.root_kind.clone()),
1725                ]
1726            }
1727        } else if use_prop_fts {
1728            // fallback search with property FTS: ?1=chunk, ?2=prop (same query value)
1729            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1730        } else {
1731            vec![BindValue::Text(rendered)]
1732        };
1733
1734        // P2-5: both fusable and residual predicates now match against the
1735        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1736        // `u.content_ref`, `u.properties`) because the inner UNION arms
1737        // project the full active-row column set through the
1738        // `JOIN nodes src` already present in each arm. The previous
1739        // implementation re-joined `nodes hn` at the CTE level and
1740        // `nodes n` again at the outer SELECT, which was triple work on
1741        // the hot search path.
1742        let mut fused_clauses = String::new();
1743        for predicate in &compiled.fusable_filters {
1744            match predicate {
1745                Predicate::KindEq(kind) => {
1746                    binds.push(BindValue::Text(kind.clone()));
1747                    let idx = binds.len();
1748                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1749                }
1750                Predicate::LogicalIdEq(logical_id) => {
1751                    binds.push(BindValue::Text(logical_id.clone()));
1752                    let idx = binds.len();
1753                    let _ = write!(
1754                        fused_clauses,
1755                        "\n                  AND u.logical_id = ?{idx}"
1756                    );
1757                }
1758                Predicate::SourceRefEq(source_ref) => {
1759                    binds.push(BindValue::Text(source_ref.clone()));
1760                    let idx = binds.len();
1761                    let _ = write!(
1762                        fused_clauses,
1763                        "\n                  AND u.source_ref = ?{idx}"
1764                    );
1765                }
1766                Predicate::ContentRefEq(uri) => {
1767                    binds.push(BindValue::Text(uri.clone()));
1768                    let idx = binds.len();
1769                    let _ = write!(
1770                        fused_clauses,
1771                        "\n                  AND u.content_ref = ?{idx}"
1772                    );
1773                }
1774                Predicate::ContentRefNotNull => {
1775                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1776                }
1777                Predicate::JsonPathFusedEq { path, value } => {
1778                    binds.push(BindValue::Text(path.clone()));
1779                    let path_idx = binds.len();
1780                    binds.push(BindValue::Text(value.clone()));
1781                    let value_idx = binds.len();
1782                    let _ = write!(
1783                        fused_clauses,
1784                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1785                    );
1786                }
1787                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1788                    binds.push(BindValue::Text(path.clone()));
1789                    let path_idx = binds.len();
1790                    binds.push(BindValue::Integer(*value));
1791                    let value_idx = binds.len();
1792                    let operator = match op {
1793                        ComparisonOp::Gt => ">",
1794                        ComparisonOp::Gte => ">=",
1795                        ComparisonOp::Lt => "<",
1796                        ComparisonOp::Lte => "<=",
1797                    };
1798                    let _ = write!(
1799                        fused_clauses,
1800                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1801                    );
1802                }
1803                Predicate::JsonPathFusedBoolEq { path, value } => {
1804                    binds.push(BindValue::Text(path.clone()));
1805                    let path_idx = binds.len();
1806                    binds.push(BindValue::Integer(i64::from(*value)));
1807                    let value_idx = binds.len();
1808                    let _ = write!(
1809                        fused_clauses,
1810                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1811                    );
1812                }
1813                Predicate::JsonPathFusedIn { path, values } => {
1814                    binds.push(BindValue::Text(path.clone()));
1815                    let first_param = binds.len();
1816                    for v in values {
1817                        binds.push(BindValue::Text(v.clone()));
1818                    }
1819                    let placeholders = (1..=values.len())
1820                        .map(|i| format!("?{}", first_param + i))
1821                        .collect::<Vec<_>>()
1822                        .join(", ");
1823                    let _ = write!(
1824                        fused_clauses,
1825                        "\n                  AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
1826                    );
1827                }
1828                Predicate::JsonPathEq { .. }
1829                | Predicate::JsonPathCompare { .. }
1830                | Predicate::JsonPathIn { .. }
1831                | Predicate::EdgePropertyEq { .. }
1832                | Predicate::EdgePropertyCompare { .. } => {
1833                    // Should be in residual_filters; compile_search guarantees
1834                    // this, but stay defensive.
1835                    // Edge property predicates are not valid in the search path.
1836                }
1837            }
1838        }
1839
1840        let mut filter_clauses = String::new();
1841        for predicate in &compiled.residual_filters {
1842            match predicate {
1843                Predicate::JsonPathEq { path, value } => {
1844                    binds.push(BindValue::Text(path.clone()));
1845                    let path_idx = binds.len();
1846                    binds.push(scalar_to_bind(value));
1847                    let value_idx = binds.len();
1848                    let _ = write!(
1849                        filter_clauses,
1850                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1851                    );
1852                }
1853                Predicate::JsonPathCompare { path, op, value } => {
1854                    binds.push(BindValue::Text(path.clone()));
1855                    let path_idx = binds.len();
1856                    binds.push(scalar_to_bind(value));
1857                    let value_idx = binds.len();
1858                    let operator = match op {
1859                        ComparisonOp::Gt => ">",
1860                        ComparisonOp::Gte => ">=",
1861                        ComparisonOp::Lt => "<",
1862                        ComparisonOp::Lte => "<=",
1863                    };
1864                    let _ = write!(
1865                        filter_clauses,
1866                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1867                    );
1868                }
1869                Predicate::JsonPathIn { path, values } => {
1870                    binds.push(BindValue::Text(path.clone()));
1871                    let first_param = binds.len();
1872                    for v in values {
1873                        binds.push(scalar_to_bind(v));
1874                    }
1875                    let placeholders = (1..=values.len())
1876                        .map(|i| format!("?{}", first_param + i))
1877                        .collect::<Vec<_>>()
1878                        .join(", ");
1879                    let _ = write!(
1880                        filter_clauses,
1881                        "\n  AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1882                    );
1883                }
1884                Predicate::KindEq(_)
1885                | Predicate::LogicalIdEq(_)
1886                | Predicate::SourceRefEq(_)
1887                | Predicate::ContentRefEq(_)
1888                | Predicate::ContentRefNotNull
1889                | Predicate::JsonPathFusedEq { .. }
1890                | Predicate::JsonPathFusedTimestampCmp { .. }
1891                | Predicate::JsonPathFusedBoolEq { .. }
1892                | Predicate::JsonPathFusedIn { .. }
1893                | Predicate::EdgePropertyEq { .. }
1894                | Predicate::EdgePropertyCompare { .. } => {
1895                    // Fusable predicates live in fused_clauses; compile_search
1896                    // partitions them out of residual_filters.
1897                    // Edge property predicates are not valid in the search path.
1898                }
1899            }
1900        }
1901
1902        // Bind `limit` as an integer parameter rather than formatting it into
1903        // the SQL string. Interpolating the limit made the prepared-statement
1904        // SQL vary by limit value, so rusqlite's default 16-slot
1905        // `prepare_cached` cache thrashed for paginated callers that varied
1906        // limits per call. With the bind the SQL is structurally stable for
1907        // a given filter shape regardless of `limit` value.
1908        let limit = compiled.limit;
1909        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1910        let limit_idx = binds.len();
1911        // P2-5: the inner UNION arms project the full active-row column
1912        // set through `JOIN nodes src` (kind, row_id, source_ref,
1913        // content_ref, content_hash, created_at, properties). Both the
1914        // CTE's outer WHERE and the final SELECT consume those columns
1915        // directly, which eliminates the previous `JOIN nodes hn` at the
1916        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1917        // redundant joins on the hot search path. `src.superseded_at IS
1918        // NULL` in each arm already filters retired rows, which is what
1919        // the dropped outer joins used to do.
1920        //
1921        // Property FTS uses per-kind tables (fts_props_<kind>). One UNION arm
1922        // is generated per table in prop_fts_tables. The prop text bind index
1923        // is ?3 when filter_by_kind=true (after chunk_text and kind binds) or
1924        // ?2 when filter_by_kind=false (after chunk_text only). The same bind
1925        // position is reused by every prop arm, which is valid in SQLite.
1926        let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1927        let prop_arm_sql: String = if use_prop_fts {
1928            prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1929                // Load schema for this kind to compute BM25 weights.
1930                let bm25_expr = conn_guard
1931                    .query_row(
1932                        "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1933                        rusqlite::params![kind],
1934                        |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1935                    )
1936                    .ok()
1937                    .map_or_else(
1938                        || format!("bm25({prop_table})"),
1939                        |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1940                    );
1941                // For weighted (per-column) schemas text_content does not exist;
1942                // use an empty snippet rather than a column reference that would fail.
1943                let is_weighted = bm25_expr != format!("bm25({prop_table})");
1944                let snippet_expr = if is_weighted {
1945                    "'' AS snippet".to_owned()
1946                } else {
1947                    "substr(fp.text_content, 1, 200) AS snippet".to_owned()
1948                };
1949                let _ = write!(
1950                    acc,
1951                    "
1952                    UNION ALL
1953                    SELECT
1954                        src.row_id AS row_id,
1955                        fp.node_logical_id AS logical_id,
1956                        src.kind AS kind,
1957                        src.properties AS properties,
1958                        src.source_ref AS source_ref,
1959                        src.content_ref AS content_ref,
1960                        src.created_at AS created_at,
1961                        -{bm25_expr} AS score,
1962                        'property' AS source,
1963                        {snippet_expr},
1964                        CAST(fp.rowid AS TEXT) AS projection_row_id
1965                    FROM {prop_table} fp
1966                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1967                    WHERE {prop_table} MATCH ?{prop_bind_idx}"
1968                );
1969                acc
1970            })
1971        } else {
1972            String::new()
1973        };
1974        let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
1975            ("?1", "\n                      AND src.kind = ?2")
1976        } else {
1977            ("?1", "")
1978        };
1979        let sql = format!(
1980            "WITH search_hits AS (
1981                SELECT
1982                    u.row_id AS row_id,
1983                    u.logical_id AS logical_id,
1984                    u.kind AS kind,
1985                    u.properties AS properties,
1986                    u.source_ref AS source_ref,
1987                    u.content_ref AS content_ref,
1988                    u.created_at AS created_at,
1989                    u.score AS score,
1990                    u.source AS source,
1991                    u.snippet AS snippet,
1992                    u.projection_row_id AS projection_row_id
1993                FROM (
1994                    SELECT
1995                        src.row_id AS row_id,
1996                        c.node_logical_id AS logical_id,
1997                        src.kind AS kind,
1998                        src.properties AS properties,
1999                        src.source_ref AS source_ref,
2000                        src.content_ref AS content_ref,
2001                        src.created_at AS created_at,
2002                        -bm25(fts_nodes) AS score,
2003                        'chunk' AS source,
2004                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
2005                        f.chunk_id AS projection_row_id
2006                    FROM fts_nodes f
2007                    JOIN chunks c ON c.id = f.chunk_id
2008                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
2009                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
2010                ) u
2011                WHERE 1 = 1{fused_clauses}
2012                ORDER BY u.score DESC
2013                LIMIT ?{limit_idx}
2014            )
2015            SELECT
2016                h.row_id,
2017                h.logical_id,
2018                h.kind,
2019                h.properties,
2020                h.content_ref,
2021                am.last_accessed_at,
2022                h.created_at,
2023                h.score,
2024                h.source,
2025                h.snippet,
2026                h.projection_row_id
2027            FROM search_hits h
2028            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
2029            WHERE 1 = 1{filter_clauses}
2030            ORDER BY h.score DESC"
2031        );
2032
2033        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
2034
2035        let mut statement = match conn_guard.prepare_cached(&sql) {
2036            Ok(stmt) => stmt,
2037            Err(e) => {
2038                self.telemetry.increment_errors();
2039                return Err(EngineError::Sqlite(e));
2040            }
2041        };
2042
2043        let hits = match statement
2044            .query_map(params_from_iter(bind_values.iter()), |row| {
2045                let source_str: String = row.get(8)?;
2046                // The CTE emits only two literal values here: `'chunk'` and
2047                // `'property'`. Default to `Chunk` on anything unexpected so a
2048                // schema drift surfaces as a mislabelled hit rather than a
2049                // row-level error.
2050                let source = if source_str == "property" {
2051                    SearchHitSource::Property
2052                } else {
2053                    SearchHitSource::Chunk
2054                };
2055                let match_mode = match branch {
2056                    SearchBranch::Strict => SearchMatchMode::Strict,
2057                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
2058                };
2059                Ok(SearchHit {
2060                    node: fathomdb_query::NodeRowLite {
2061                        row_id: row.get(0)?,
2062                        logical_id: row.get(1)?,
2063                        kind: row.get(2)?,
2064                        properties: row.get(3)?,
2065                        content_ref: row.get(4)?,
2066                        last_accessed_at: row.get(5)?,
2067                    },
2068                    written_at: row.get(6)?,
2069                    score: row.get(7)?,
2070                    // Phase 10: every branch currently emits text hits.
2071                    modality: RetrievalModality::Text,
2072                    source,
2073                    match_mode: Some(match_mode),
2074                    snippet: row.get(9)?,
2075                    projection_row_id: row.get(10)?,
2076                    vector_distance: None,
2077                    attribution: None,
2078                })
2079            })
2080            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
2081        {
2082            Ok(rows) => rows,
2083            Err(e) => {
2084                self.telemetry.increment_errors();
2085                return Err(EngineError::Sqlite(e));
2086            }
2087        };
2088
2089        // Drop the statement so `conn_guard` is free (attribution is
2090        // resolved after dedup in `execute_compiled_search_plan` to avoid
2091        // spending highlight lookups on hits that will be discarded).
2092        drop(statement);
2093        drop(conn_guard);
2094
2095        self.telemetry.increment_queries();
2096        Ok(hits)
2097    }
2098
2099    /// Populate per-hit attribution for the given deduped merged hits.
2100    /// Runs after [`merge_search_branches`] so dropped duplicates do not
2101    /// incur the highlight+position-map lookup cost.
2102    fn populate_attribution_for_hits(
2103        &self,
2104        hits: &mut [SearchHit],
2105        strict_text_query: &fathomdb_query::TextQuery,
2106        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
2107    ) -> Result<(), EngineError> {
2108        let conn_guard = match self.lock_connection() {
2109            Ok(g) => g,
2110            Err(e) => {
2111                self.telemetry.increment_errors();
2112                return Err(e);
2113            }
2114        };
2115        let strict_expr = render_text_query_fts5(strict_text_query);
2116        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
2117        for hit in hits.iter_mut() {
2118            // Phase 10: text hits always carry `Some(match_mode)`. Vector
2119            // hits (when a future phase adds them) have `None` here and
2120            // are skipped by the attribution resolver because attribution
2121            // is meaningless for vector matches.
2122            let match_expr = match hit.match_mode {
2123                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
2124                Some(SearchMatchMode::Relaxed) => {
2125                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
2126                }
2127                None => continue,
2128            };
2129            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
2130                Ok(att) => hit.attribution = Some(att),
2131                Err(e) => {
2132                    self.telemetry.increment_errors();
2133                    return Err(e);
2134                }
2135            }
2136        }
2137        Ok(())
2138    }
2139
2140    /// # Errors
2141    /// Returns [`EngineError`] if the root query or any bounded expansion
2142    /// query cannot be prepared or executed.
2143    pub fn execute_compiled_grouped_read(
2144        &self,
2145        compiled: &CompiledGroupedQuery,
2146    ) -> Result<GroupedQueryRows, EngineError> {
2147        let root_rows = self.execute_compiled_read(&compiled.root)?;
2148        if root_rows.was_degraded {
2149            return Ok(GroupedQueryRows {
2150                roots: Vec::new(),
2151                expansions: Vec::new(),
2152                was_degraded: true,
2153            });
2154        }
2155
2156        let roots = root_rows.nodes;
2157        let mut expansions = Vec::with_capacity(compiled.expansions.len());
2158        for expansion in &compiled.expansions {
2159            let slot_rows = if roots.is_empty() {
2160                Vec::new()
2161            } else {
2162                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
2163            };
2164            expansions.push(ExpansionSlotRows {
2165                slot: expansion.slot.clone(),
2166                roots: slot_rows,
2167            });
2168        }
2169
2170        Ok(GroupedQueryRows {
2171            roots,
2172            expansions,
2173            was_degraded: false,
2174        })
2175    }
2176
2177    /// Chunked batched expansion: splits roots into chunks of
2178    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
2179    /// results while preserving root ordering.  This keeps bind-parameter
2180    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
2181    /// for large result sets.
2182    fn read_expansion_nodes_chunked(
2183        &self,
2184        roots: &[NodeRow],
2185        expansion: &ExpansionSlot,
2186        hard_limit: usize,
2187    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2188        if roots.len() <= BATCH_CHUNK_SIZE {
2189            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
2190        }
2191
2192        // Merge chunk results keyed by root logical_id, then reassemble in
2193        // root order.
2194        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2195        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2196            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
2197                per_root
2198                    .entry(group.root_logical_id)
2199                    .or_default()
2200                    .extend(group.nodes);
2201            }
2202        }
2203
2204        Ok(roots
2205            .iter()
2206            .map(|root| ExpansionRootRows {
2207                root_logical_id: root.logical_id.clone(),
2208                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2209            })
2210            .collect())
2211    }
2212
2213    /// Batched expansion: one recursive CTE query per expansion slot that
2214    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
2215    /// source_logical_id ...)` to enforce the per-root hard limit inside the
2216    /// database rather than in Rust.
2217    #[allow(clippy::too_many_lines)]
2218    fn read_expansion_nodes_batched(
2219        &self,
2220        roots: &[NodeRow],
2221        expansion: &ExpansionSlot,
2222        hard_limit: usize,
2223    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2224        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2225        let (join_condition, next_logical_id) = match expansion.direction {
2226            fathomdb_query::TraverseDirection::Out => {
2227                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2228            }
2229            fathomdb_query::TraverseDirection::In => {
2230                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2231            }
2232        };
2233
2234        // EXECUTE-TIME VALIDATION: fused filter against kinds without an FTS schema.
2235        // This check runs at execute time (not builder time) because the target kind
2236        // set for an expand slot is edge-label-scoped, not kind-scoped, and multiple
2237        // target kinds may be reachable via one edge label. See Pack 12 docs.
2238        if expansion.filter.as_ref().is_some_and(|f| {
2239            matches!(
2240                f,
2241                Predicate::JsonPathFusedEq { .. }
2242                    | Predicate::JsonPathFusedTimestampCmp { .. }
2243                    | Predicate::JsonPathFusedIn { .. }
2244            )
2245        }) {
2246            self.validate_fused_filter_for_edge_label(&expansion.label)?;
2247        }
2248
2249        // Build a UNION ALL of SELECT literals for the root seed rows.
2250        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
2251        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
2252        let root_seed_union: String = (1..=root_ids.len())
2253            .map(|i| format!("SELECT ?{i}"))
2254            .collect::<Vec<_>>()
2255            .join(" UNION ALL ");
2256
2257        // Bind params: root IDs occupy ?1..=?N, edge kind is ?(N+1).
2258        // Edge filter params (if any) follow at ?(N+2)..=?M.
2259        // Node filter params (if any) follow at ?(M+1)....
2260        let edge_kind_param = root_ids.len() + 1;
2261        let edge_filter_param_start = root_ids.len() + 2;
2262
2263        // Compile the optional edge filter to a SQL fragment + bind values.
2264        // Injected into the JOIN condition so only matching edges are traversed.
2265        let (edge_filter_sql, edge_filter_binds) =
2266            compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2267
2268        let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2269
2270        // Compile the optional target-side filter to a SQL fragment + bind values.
2271        // The fragment is injected into the `numbered` CTE's WHERE clause BEFORE
2272        // the ROW_NUMBER() window so the per-originator limit counts only matching rows.
2273        let (filter_sql, filter_binds) =
2274            compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2275
2276        // The `root_id` column tracks which root each traversal path
2277        // originated from. The `ROW_NUMBER()` window in the outer query
2278        // enforces the per-root hard limit.
2279        // The `edge_properties` column carries the JSON properties of the
2280        // edge that was traversed to reach each node (NULL for the base case).
2281        let sql = format!(
2282            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2283            traversed(root_id, logical_id, depth, visited, emitted, edge_properties) AS (
2284                SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_properties
2285                FROM root_ids
2286                UNION ALL
2287                SELECT
2288                    t.root_id,
2289                    {next_logical_id},
2290                    t.depth + 1,
2291                    t.visited || {next_logical_id} || ',',
2292                    t.emitted + 1,
2293                    e.properties AS edge_properties
2294                FROM traversed t
2295                JOIN edges e ON {join_condition}
2296                    AND e.kind = ?{edge_kind_param}
2297                    AND e.superseded_at IS NULL{edge_filter_sql}
2298                WHERE t.depth < {max_depth}
2299                  AND t.emitted < {hard_limit}
2300                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2301            ),
2302            numbered AS (
2303                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2304                     , n.content_ref, am.last_accessed_at
2305                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2306                     , t.edge_properties
2307                FROM traversed t
2308                JOIN nodes n ON n.logical_id = t.logical_id
2309                    AND n.superseded_at IS NULL
2310                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2311                WHERE t.depth > 0{filter_sql}
2312            )
2313            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2314                 , edge_properties
2315            FROM numbered
2316            WHERE rn <= {hard_limit}
2317            ORDER BY root_id, logical_id",
2318            max_depth = expansion.max_depth,
2319        );
2320
2321        let conn_guard = self.lock_connection()?;
2322        let mut statement = conn_guard
2323            .prepare_cached(&sql)
2324            .map_err(EngineError::Sqlite)?;
2325
2326        // Bind root IDs (1..=N), edge kind (N+1), edge filter params (N+2..M),
2327        // then node filter params (M+1...).
2328        let mut bind_values: Vec<Value> = root_ids
2329            .iter()
2330            .map(|id| Value::Text((*id).to_owned()))
2331            .collect();
2332        bind_values.push(Value::Text(expansion.label.clone()));
2333        bind_values.extend(edge_filter_binds);
2334        bind_values.extend(filter_binds);
2335
2336        let rows = statement
2337            .query_map(params_from_iter(bind_values.iter()), |row| {
2338                Ok((
2339                    row.get::<_, String>(0)?, // root_id
2340                    NodeRow {
2341                        row_id: row.get(1)?,
2342                        logical_id: row.get(2)?,
2343                        kind: row.get(3)?,
2344                        properties: row.get(4)?,
2345                        content_ref: row.get(5)?,
2346                        last_accessed_at: row.get(6)?,
2347                        edge_properties: row.get(7)?,
2348                    },
2349                ))
2350            })
2351            .map_err(EngineError::Sqlite)?
2352            .collect::<Result<Vec<_>, _>>()
2353            .map_err(EngineError::Sqlite)?;
2354
2355        // Partition results back into per-root groups, preserving root order.
2356        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2357        for (root_id, node) in rows {
2358            per_root.entry(root_id).or_default().push(node);
2359        }
2360
2361        let root_groups = roots
2362            .iter()
2363            .map(|root| ExpansionRootRows {
2364                root_logical_id: root.logical_id.clone(),
2365                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2366            })
2367            .collect();
2368
2369        Ok(root_groups)
2370    }
2371
2372    /// Validate that all target node kinds reachable via `edge_label` have a
2373    /// registered property-FTS schema. Called at execute time when an expansion
2374    /// slot carries a fused filter predicate.
2375    ///
2376    /// EXECUTE-TIME VALIDATION: this check runs at execute time (not builder
2377    /// time) for expand slots because the target kind set is edge-label-scoped
2378    /// rather than kind-scoped, and is not statically knowable at builder time
2379    /// when multiple target kinds may be reachable via the same label.
2380    /// See Pack 12 docs.
2381    ///
2382    /// # Errors
2383    /// Returns `EngineError::InvalidConfig` if any reachable target kind lacks
2384    /// a registered property-FTS schema.
2385    fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2386        let conn = self.lock_connection()?;
2387        // Collect the distinct node kinds reachable as targets of this edge label.
2388        let mut stmt = conn
2389            .prepare_cached(
2390                "SELECT DISTINCT n.kind \
2391                 FROM edges e \
2392                 JOIN nodes n ON n.logical_id = e.target_logical_id \
2393                 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2394            )
2395            .map_err(EngineError::Sqlite)?;
2396        let target_kinds: Vec<String> = stmt
2397            .query_map(rusqlite::params![edge_label], |row| row.get(0))
2398            .map_err(EngineError::Sqlite)?
2399            .collect::<Result<Vec<_>, _>>()
2400            .map_err(EngineError::Sqlite)?;
2401
2402        for kind in &target_kinds {
2403            let has_schema: bool = conn
2404                .query_row(
2405                    "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2406                    rusqlite::params![kind],
2407                    |row| row.get(0),
2408                )
2409                .map_err(EngineError::Sqlite)?;
2410            if !has_schema {
2411                return Err(EngineError::InvalidConfig(format!(
2412                    "kind {kind:?} has no registered property-FTS schema; register one with \
2413                     admin.register_fts_property_schema(..) before using fused filters on \
2414                     expansion slots, or use JsonPathEq for non-fused semantics \
2415                     (expand slot uses edge label {edge_label:?})"
2416                )));
2417            }
2418        }
2419        Ok(())
2420    }
2421
2422    /// Read a single run by id.
2423    ///
2424    /// # Errors
2425    /// Returns [`EngineError`] if the query fails or if the connection mutex
2426    /// has been poisoned.
2427    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2428        let conn = self.lock_connection()?;
2429        conn.query_row(
2430            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2431            rusqlite::params![id],
2432            |row| {
2433                Ok(RunRow {
2434                    id: row.get(0)?,
2435                    kind: row.get(1)?,
2436                    status: row.get(2)?,
2437                    properties: row.get(3)?,
2438                })
2439            },
2440        )
2441        .optional()
2442        .map_err(EngineError::Sqlite)
2443    }
2444
2445    /// Read a single step by id.
2446    ///
2447    /// # Errors
2448    /// Returns [`EngineError`] if the query fails or if the connection mutex
2449    /// has been poisoned.
2450    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2451        let conn = self.lock_connection()?;
2452        conn.query_row(
2453            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2454            rusqlite::params![id],
2455            |row| {
2456                Ok(StepRow {
2457                    id: row.get(0)?,
2458                    run_id: row.get(1)?,
2459                    kind: row.get(2)?,
2460                    status: row.get(3)?,
2461                    properties: row.get(4)?,
2462                })
2463            },
2464        )
2465        .optional()
2466        .map_err(EngineError::Sqlite)
2467    }
2468
2469    /// Read a single action by id.
2470    ///
2471    /// # Errors
2472    /// Returns [`EngineError`] if the query fails or if the connection mutex
2473    /// has been poisoned.
2474    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2475        let conn = self.lock_connection()?;
2476        conn.query_row(
2477            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2478            rusqlite::params![id],
2479            |row| {
2480                Ok(ActionRow {
2481                    id: row.get(0)?,
2482                    step_id: row.get(1)?,
2483                    kind: row.get(2)?,
2484                    status: row.get(3)?,
2485                    properties: row.get(4)?,
2486                })
2487            },
2488        )
2489        .optional()
2490        .map_err(EngineError::Sqlite)
2491    }
2492
2493    /// Read all active (non-superseded) runs.
2494    ///
2495    /// # Errors
2496    /// Returns [`EngineError`] if the query fails or if the connection mutex
2497    /// has been poisoned.
2498    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2499        let conn = self.lock_connection()?;
2500        let mut stmt = conn
2501            .prepare_cached(
2502                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2503            )
2504            .map_err(EngineError::Sqlite)?;
2505        let rows = stmt
2506            .query_map([], |row| {
2507                Ok(RunRow {
2508                    id: row.get(0)?,
2509                    kind: row.get(1)?,
2510                    status: row.get(2)?,
2511                    properties: row.get(3)?,
2512                })
2513            })
2514            .map_err(EngineError::Sqlite)?
2515            .collect::<Result<Vec<_>, _>>()
2516            .map_err(EngineError::Sqlite)?;
2517        Ok(rows)
2518    }
2519
2520    /// Returns the number of shape→SQL entries currently indexed.
2521    ///
2522    /// Each distinct query shape (structural hash of kind + steps + limits)
2523    /// maps to exactly one SQL string.  This is a test-oriented introspection
2524    /// helper; it does not reflect rusqlite's internal prepared-statement
2525    /// cache, which is keyed by SQL text.
2526    ///
2527    /// # Panics
2528    /// Panics if the internal shape-SQL-map mutex is poisoned.
2529    #[must_use]
2530    #[allow(clippy::expect_used)]
2531    pub fn shape_sql_count(&self) -> usize {
2532        self.shape_sql_map
2533            .lock()
2534            .unwrap_or_else(PoisonError::into_inner)
2535            .len()
2536    }
2537
2538    /// Returns a cloned `Arc` to the schema manager.
2539    #[must_use]
2540    pub fn schema_manager(&self) -> Arc<SchemaManager> {
2541        Arc::clone(&self.schema_manager)
2542    }
2543
2544    /// Return the execution plan for a compiled query without executing it.
2545    ///
2546    /// Useful for debugging, testing shape-hash caching, and operator
2547    /// diagnostics. Does not open a transaction or touch the database beyond
2548    /// checking the statement cache.
2549    ///
2550    /// # Panics
2551    /// Panics if the internal shape-SQL-map mutex is poisoned.
2552    #[must_use]
2553    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2554        let cache_hit = self
2555            .shape_sql_map
2556            .lock()
2557            .unwrap_or_else(PoisonError::into_inner)
2558            .contains_key(&compiled.shape_hash);
2559        QueryPlan {
2560            sql: wrap_node_row_projection_sql(&compiled.sql),
2561            bind_count: compiled.binds.len(),
2562            driving_table: compiled.driving_table,
2563            shape_hash: compiled.shape_hash,
2564            cache_hit,
2565        }
2566    }
2567
2568    /// Execute a named PRAGMA and return the result as a String.
2569    /// Used by Layer 1 tests to verify startup pragma initialization.
2570    ///
2571    /// # Errors
2572    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
2573    /// mutex has been poisoned.
2574    #[doc(hidden)]
2575    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2576        let conn = self.lock_connection()?;
2577        let result = conn
2578            .query_row(&format!("PRAGMA {name}"), [], |row| {
2579                // PRAGMAs may return TEXT or INTEGER; normalise to String.
2580                row.get::<_, rusqlite::types::Value>(0)
2581            })
2582            .map_err(EngineError::Sqlite)?;
2583        let s = match result {
2584            rusqlite::types::Value::Text(t) => t,
2585            rusqlite::types::Value::Integer(i) => i.to_string(),
2586            rusqlite::types::Value::Real(f) => f.to_string(),
2587            rusqlite::types::Value::Blob(_) => {
2588                return Err(EngineError::InvalidWrite(format!(
2589                    "PRAGMA {name} returned an unexpected BLOB value"
2590                )));
2591            }
2592            rusqlite::types::Value::Null => String::new(),
2593        };
2594        Ok(s)
2595    }
2596
2597    /// Return all provenance events whose `subject` matches the given value.
2598    ///
2599    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
2600    /// values (for excise events).
2601    ///
2602    /// # Errors
2603    /// Returns [`EngineError`] if the query fails or if the connection mutex
2604    /// has been poisoned.
2605    pub fn query_provenance_events(
2606        &self,
2607        subject: &str,
2608    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2609        let conn = self.lock_connection()?;
2610        let mut stmt = conn
2611            .prepare_cached(
2612                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2613                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2614            )
2615            .map_err(EngineError::Sqlite)?;
2616        let events = stmt
2617            .query_map(rusqlite::params![subject], |row| {
2618                Ok(ProvenanceEvent {
2619                    id: row.get(0)?,
2620                    event_type: row.get(1)?,
2621                    subject: row.get(2)?,
2622                    source_ref: row.get(3)?,
2623                    metadata_json: row.get(4)?,
2624                    created_at: row.get(5)?,
2625                })
2626            })
2627            .map_err(EngineError::Sqlite)?
2628            .collect::<Result<Vec<_>, _>>()
2629            .map_err(EngineError::Sqlite)?;
2630        Ok(events)
2631    }
2632
2633    /// Check if `kind` has a first-registration async rebuild in progress
2634    /// (`is_first_registration=1` with state PENDING/BUILDING/SWAPPING and no
2635    /// rows yet in the per-kind `fts_props_<kind>` table). If so, execute a
2636    /// full-kind scan and return the nodes. Returns `None` when the normal
2637    /// FTS5 path should run.
2638    fn scan_fallback_if_first_registration(
2639        &self,
2640        kind: &str,
2641    ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2642        let conn = self.lock_connection()?;
2643
2644        // Quick point-lookup: kind has a first-registration rebuild in a
2645        // pre-complete state AND the per-kind table has no rows yet.
2646        let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2647        // Check whether the per-kind table exists before querying its row count.
2648        let table_exists: bool = conn
2649            .query_row(
2650                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2651                rusqlite::params![prop_table],
2652                |_| Ok(true),
2653            )
2654            .optional()?
2655            .unwrap_or(false);
2656        let prop_empty = if table_exists {
2657            let cnt: i64 =
2658                conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2659                    r.get(0)
2660                })?;
2661            cnt == 0
2662        } else {
2663            true
2664        };
2665        let needs_scan: bool = if prop_empty {
2666            conn.query_row(
2667                "SELECT 1 FROM fts_property_rebuild_state \
2668                 WHERE kind = ?1 AND is_first_registration = 1 \
2669                 AND state IN ('PENDING','BUILDING','SWAPPING') \
2670                 LIMIT 1",
2671                rusqlite::params![kind],
2672                |_| Ok(true),
2673            )
2674            .optional()?
2675            .unwrap_or(false)
2676        } else {
2677            false
2678        };
2679
2680        if !needs_scan {
2681            return Ok(None);
2682        }
2683
2684        // Scan fallback: return all active nodes of this kind.
2685        // Intentionally unindexed — acceptable for first-registration window.
2686        let mut stmt = conn
2687            .prepare_cached(
2688                "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2689                 am.last_accessed_at \
2690                 FROM nodes n \
2691                 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2692                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2693            )
2694            .map_err(EngineError::Sqlite)?;
2695
2696        let nodes = stmt
2697            .query_map(rusqlite::params![kind], |row| {
2698                Ok(NodeRow {
2699                    row_id: row.get(0)?,
2700                    logical_id: row.get(1)?,
2701                    kind: row.get(2)?,
2702                    properties: row.get(3)?,
2703                    content_ref: row.get(4)?,
2704                    last_accessed_at: row.get(5)?,
2705                    edge_properties: None,
2706                })
2707            })
2708            .map_err(EngineError::Sqlite)?
2709            .collect::<Result<Vec<_>, _>>()
2710            .map_err(EngineError::Sqlite)?;
2711
2712        Ok(Some(nodes))
2713    }
2714
2715    /// Return the current rebuild progress for a kind, or `None` if no rebuild
2716    /// has been registered for that kind.
2717    ///
2718    /// # Errors
2719    /// Returns [`EngineError`] if the database query fails.
2720    pub fn get_property_fts_rebuild_progress(
2721        &self,
2722        kind: &str,
2723    ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2724        let conn = self.lock_connection()?;
2725        let row = conn
2726            .query_row(
2727                "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2728                 FROM fts_property_rebuild_state WHERE kind = ?1",
2729                rusqlite::params![kind],
2730                |r| {
2731                    Ok(crate::rebuild_actor::RebuildProgress {
2732                        state: r.get(0)?,
2733                        rows_total: r.get(1)?,
2734                        rows_done: r.get(2)?,
2735                        started_at: r.get(3)?,
2736                        last_progress_at: r.get(4)?,
2737                        error_message: r.get(5)?,
2738                    })
2739                },
2740            )
2741            .optional()?;
2742        Ok(row)
2743    }
2744}
2745
2746/// Rewrite a `CompiledQuery` whose SQL references the legacy `fts_node_properties`
2747/// table to use the per-kind `fts_props_<kind>` table, or strip the property FTS
2748/// arm entirely when the per-kind table does not exist in `sqlite_master`.
2749///
2750/// Returns `(adapted_sql, adapted_binds)`.
2751///
2752/// This wrapper owns the `sqlite_master` existence check (rusqlite stays out of
2753/// `fathomdb-query`) and delegates the SQL/bind rewriting to
2754/// [`CompiledQuery::adapt_fts_for_kind`].
2755fn adapt_fts_nodes_sql_for_per_kind_tables(
2756    compiled: &CompiledQuery,
2757    conn: &rusqlite::Connection,
2758) -> Result<(String, Vec<BindValue>), EngineError> {
2759    let root_kind = compiled
2760        .binds
2761        .get(1)
2762        .and_then(|b| {
2763            if let BindValue::Text(k) = b {
2764                Some(k.as_str())
2765            } else {
2766                None
2767            }
2768        })
2769        .unwrap_or("");
2770    let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
2771    let prop_table_exists: bool = conn
2772        .query_row(
2773            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2774            rusqlite::params![prop_table],
2775            |_| Ok(true),
2776        )
2777        .optional()
2778        .map_err(EngineError::Sqlite)?
2779        .unwrap_or(false);
2780
2781    Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
2782}
2783
2784/// Check that the active vector profile's model identity and dimensions match
2785/// the supplied embedder. If either differs, emit a warning via `trace_warn!`
2786/// but always return `Ok(())`. This function NEVER returns `Err`.
2787///
2788/// Queries `projection_profiles` directly — no `AdminService` indirection.
2789#[allow(clippy::unnecessary_wraps)]
2790fn check_vec_identity_at_open(
2791    conn: &rusqlite::Connection,
2792    embedder: &dyn QueryEmbedder,
2793) -> Result<(), EngineError> {
2794    let row: Option<String> = conn
2795        .query_row(
2796            "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
2797            [],
2798            |row| row.get(0),
2799        )
2800        .optional()
2801        .unwrap_or(None);
2802
2803    let Some(config_json) = row else {
2804        return Ok(());
2805    };
2806
2807    // Parse leniently — any error means we just skip the check.
2808    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
2809        return Ok(());
2810    };
2811
2812    let identity = embedder.identity();
2813
2814    if let Some(stored_model) = parsed
2815        .get("model_identity")
2816        .and_then(serde_json::Value::as_str)
2817        && stored_model != identity.model_identity
2818    {
2819        trace_warn!(
2820            stored_model_identity = stored_model,
2821            embedder_model_identity = %identity.model_identity,
2822            "vec identity mismatch at open: model_identity differs"
2823        );
2824    }
2825
2826    if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
2827        let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
2828        if stored_dim != identity.dimension {
2829            trace_warn!(
2830                stored_dimensions = stored_dim,
2831                embedder_dimensions = identity.dimension,
2832                "vec identity mismatch at open: dimensions differ"
2833            );
2834        }
2835    }
2836
2837    Ok(())
2838}
2839
2840/// Open-time FTS rebuild guards (Guard 1 + Guard 2).
2841///
2842/// Guard 1: if any registered kind's per-kind `fts_props_<kind>` table is
2843/// missing or empty while live nodes of that kind exist, do a synchronous
2844/// full rebuild.
2845///
2846/// Guard 2: if any recursive schema is registered but
2847/// `fts_node_property_positions` is empty, do a synchronous full rebuild to
2848/// regenerate the position map.
2849///
2850/// Both guards are no-ops on a consistent database.
2851fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
2852    let schema_count: i64 = conn
2853        .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
2854            row.get(0)
2855        })
2856        .map_err(EngineError::Sqlite)?;
2857    if schema_count == 0 {
2858        return Ok(());
2859    }
2860
2861    let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
2862    let needs_position_backfill = if needs_fts_rebuild {
2863        false
2864    } else {
2865        open_guard_check_positions_empty(conn)?
2866    };
2867
2868    if needs_fts_rebuild || needs_position_backfill {
2869        let per_kind_tables: Vec<String> = {
2870            let mut stmt = conn
2871                .prepare(
2872                    "SELECT name FROM sqlite_master \
2873                     WHERE type='table' AND name LIKE 'fts_props_%' \
2874                     AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2875                )
2876                .map_err(EngineError::Sqlite)?;
2877            stmt.query_map([], |r| r.get::<_, String>(0))
2878                .map_err(EngineError::Sqlite)?
2879                .collect::<Result<Vec<_>, _>>()
2880                .map_err(EngineError::Sqlite)?
2881        };
2882        let tx = conn
2883            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
2884            .map_err(EngineError::Sqlite)?;
2885        for table in &per_kind_tables {
2886            tx.execute_batch(&format!("DELETE FROM {table}"))
2887                .map_err(EngineError::Sqlite)?;
2888        }
2889        tx.execute("DELETE FROM fts_node_property_positions", [])
2890            .map_err(EngineError::Sqlite)?;
2891        crate::projection::insert_property_fts_rows(
2892            &tx,
2893            "SELECT logical_id, properties FROM nodes \
2894             WHERE kind = ?1 AND superseded_at IS NULL",
2895        )
2896        .map_err(EngineError::Sqlite)?;
2897        tx.commit().map_err(EngineError::Sqlite)?;
2898    }
2899    Ok(())
2900}
2901
2902fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2903    let kinds: Vec<String> = {
2904        let mut stmt = conn
2905            .prepare("SELECT kind FROM fts_property_schemas")
2906            .map_err(EngineError::Sqlite)?;
2907        stmt.query_map([], |row| row.get::<_, String>(0))
2908            .map_err(EngineError::Sqlite)?
2909            .collect::<Result<Vec<_>, _>>()
2910            .map_err(EngineError::Sqlite)?
2911    };
2912    for kind in &kinds {
2913        let table = fathomdb_schema::fts_kind_table_name(kind);
2914        let table_exists: bool = conn
2915            .query_row(
2916                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2917                rusqlite::params![table],
2918                |_| Ok(true),
2919            )
2920            .optional()
2921            .map_err(EngineError::Sqlite)?
2922            .unwrap_or(false);
2923        let fts_count: i64 = if table_exists {
2924            conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
2925                row.get(0)
2926            })
2927            .map_err(EngineError::Sqlite)?
2928        } else {
2929            0
2930        };
2931        if fts_count == 0 {
2932            let node_count: i64 = conn
2933                .query_row(
2934                    "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
2935                    rusqlite::params![kind],
2936                    |row| row.get(0),
2937                )
2938                .map_err(EngineError::Sqlite)?;
2939            if node_count > 0 {
2940                return Ok(true);
2941            }
2942        }
2943    }
2944    Ok(false)
2945}
2946
2947fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2948    let recursive_count: i64 = conn
2949        .query_row(
2950            "SELECT COUNT(*) FROM fts_property_schemas \
2951             WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
2952            [],
2953            |row| row.get(0),
2954        )
2955        .map_err(EngineError::Sqlite)?;
2956    if recursive_count == 0 {
2957        return Ok(false);
2958    }
2959    let pos_count: i64 = conn
2960        .query_row(
2961            "SELECT COUNT(*) FROM fts_node_property_positions",
2962            [],
2963            |row| row.get(0),
2964        )
2965        .map_err(EngineError::Sqlite)?;
2966    Ok(pos_count == 0)
2967}
2968
2969fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2970    format!(
2971        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2972         FROM ({base_sql}) q \
2973         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2974    )
2975}
2976
2977/// Returns `true` when `err` indicates the vec virtual table is absent.
2978///
2979/// Matches:
2980/// - "no such table: vec_<kind>" (per-kind table not yet created by regeneration)
2981/// - "no such module: vec0" (sqlite-vec extension not available)
2982/// - Legacy `"no such table: vec_nodes_active"` (for backward compatibility with
2983///   any remaining tests or DB files referencing the old global table)
2984pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2985    match err {
2986        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2987            // Per-kind tables start with "vec_"
2988            (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
2989                || msg.contains("no such module: vec0")
2990        }
2991        _ => false,
2992    }
2993}
2994
2995fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2996    match value {
2997        ScalarValue::Text(text) => BindValue::Text(text.clone()),
2998        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2999        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
3000    }
3001}
3002
3003/// Merge strict and relaxed search branches into a single block-ordered,
3004/// deduplicated, limit-truncated hit list.
3005///
3006/// Phase 3 rules, in order:
3007///
3008/// 1. Each branch is sorted internally by score descending with `logical_id`
3009///    ascending as the deterministic tiebreak.
3010/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
3011///    once from the chunk surface and once from the property surface) the
3012///    higher-score row wins, then chunk > property > vector, then declaration
3013///    order (chunk first).
3014/// 3. Strict hits form one block and relaxed hits form the next. Strict
3015///    always precedes relaxed in the merged output regardless of per-hit
3016///    score.
3017/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
3018///    already appears in the strict block is dropped.
3019/// 5. The merged output is truncated to `limit`.
3020fn merge_search_branches(
3021    strict: Vec<SearchHit>,
3022    relaxed: Vec<SearchHit>,
3023    limit: usize,
3024) -> Vec<SearchHit> {
3025    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
3026}
3027
3028/// Three-branch generalization of [`merge_search_branches`]: orders hits as
3029/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
3030/// Semantics, with cross-branch dedup resolved by branch precedence
3031/// (strict > relaxed > vector). Within each block the existing
3032/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
3033/// priority chunk > property > vector).
3034///
3035/// Phase 12 (the unified `search()` entry point) calls this directly. The
3036/// two-branch [`merge_search_branches`] wrapper is preserved as a
3037/// convenience for the text-only `execute_compiled_search_plan` path; both
3038/// reduce to the same code.
3039fn merge_search_branches_three(
3040    strict: Vec<SearchHit>,
3041    relaxed: Vec<SearchHit>,
3042    vector: Vec<SearchHit>,
3043    limit: usize,
3044) -> Vec<SearchHit> {
3045    let strict_block = dedup_branch_hits(strict);
3046    let relaxed_block = dedup_branch_hits(relaxed);
3047    let vector_block = dedup_branch_hits(vector);
3048
3049    let mut seen: std::collections::HashSet<String> = strict_block
3050        .iter()
3051        .map(|h| h.node.logical_id.clone())
3052        .collect();
3053
3054    let mut merged = strict_block;
3055    for hit in relaxed_block {
3056        if seen.insert(hit.node.logical_id.clone()) {
3057            merged.push(hit);
3058        }
3059    }
3060    for hit in vector_block {
3061        if seen.insert(hit.node.logical_id.clone()) {
3062            merged.push(hit);
3063        }
3064    }
3065
3066    if merged.len() > limit {
3067        merged.truncate(limit);
3068    }
3069    merged
3070}
3071
3072/// Sort a branch's hits by score descending + `logical_id` ascending, then
3073/// dedup duplicate `logical_id`s within the branch using source priority
3074/// (chunk > property > vector) and declaration order.
3075fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
3076    hits.sort_by(|a, b| {
3077        b.score
3078            .partial_cmp(&a.score)
3079            .unwrap_or(std::cmp::Ordering::Equal)
3080            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
3081            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
3082    });
3083
3084    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
3085    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
3086    hits
3087}
3088
3089fn source_priority(source: SearchHitSource) -> u8 {
3090    // Lower is better. Chunk is declared before property in the CTE; vector
3091    // is reserved for future wiring but comes last among the known variants.
3092    match source {
3093        SearchHitSource::Chunk => 0,
3094        SearchHitSource::Property => 1,
3095        SearchHitSource::Vector => 2,
3096    }
3097}
3098
3099/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
3100/// output so the coordinator can recover per-term byte offsets in the
3101/// original `text_content` column.
3102///
3103/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
3104/// ("start of text") byte. These bytes are safe for all valid JSON text
3105/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
3106/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
3107/// extracted blob, making the sentinel ambiguous for that row. Such input
3108/// is treated as out of scope for attribution correctness — hits on those
3109/// rows may have misattributed `matched_paths`, but no panic or query
3110/// failure occurs. A future hardening step could strip bytes < 0x20 at
3111/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
3112///
3113/// Using one-byte markers keeps the original-to-highlighted offset
3114/// accounting trivial: every sentinel adds exactly one byte to the
3115/// highlighted string.
3116const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
3117const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
3118
3119/// Load the `fts_node_property_positions` sidecar rows for a given
3120/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
3121/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
3122fn load_position_map(
3123    conn: &Connection,
3124    logical_id: &str,
3125    kind: &str,
3126) -> Result<Vec<(usize, usize, String)>, EngineError> {
3127    let mut stmt = conn
3128        .prepare_cached(
3129            "SELECT start_offset, end_offset, leaf_path \
3130             FROM fts_node_property_positions \
3131             WHERE node_logical_id = ?1 AND kind = ?2 \
3132             ORDER BY start_offset ASC",
3133        )
3134        .map_err(EngineError::Sqlite)?;
3135    let rows = stmt
3136        .query_map(rusqlite::params![logical_id, kind], |row| {
3137            let start: i64 = row.get(0)?;
3138            let end: i64 = row.get(1)?;
3139            let path: String = row.get(2)?;
3140            // Offsets are non-negative and within blob byte limits; on the
3141            // off chance a corrupt row is encountered, fall back to 0 so
3142            // lookups silently skip it rather than panicking.
3143            let start = usize::try_from(start).unwrap_or(0);
3144            let end = usize::try_from(end).unwrap_or(0);
3145            Ok((start, end, path))
3146        })
3147        .map_err(EngineError::Sqlite)?;
3148    let mut out = Vec::new();
3149    for row in rows {
3150        out.push(row.map_err(EngineError::Sqlite)?);
3151    }
3152    Ok(out)
3153}
3154
3155/// Parse a `highlight()`-wrapped string, returning the list of original-text
3156/// byte offsets at which matched terms begin. `wrapped` is the
3157/// highlight-decorated form of a text column; `open` / `close` are the
3158/// sentinel markers passed to `highlight()`. The returned offsets refer to
3159/// positions in the *original* text (i.e. the column as it would be stored
3160/// without highlight decoration).
3161fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3162    let mut offsets = Vec::new();
3163    let bytes = wrapped.as_bytes();
3164    let open_bytes = open.as_bytes();
3165    let close_bytes = close.as_bytes();
3166    let mut i = 0usize;
3167    // Number of sentinel bytes consumed so far — every marker encountered
3168    // subtracts from the wrapped-string index to get the original offset.
3169    let mut marker_bytes_seen = 0usize;
3170    while i < bytes.len() {
3171        if bytes[i..].starts_with(open_bytes) {
3172            // Record the original-text offset of the term following the open
3173            // marker.
3174            let original_offset = i - marker_bytes_seen;
3175            offsets.push(original_offset);
3176            i += open_bytes.len();
3177            marker_bytes_seen += open_bytes.len();
3178        } else if bytes[i..].starts_with(close_bytes) {
3179            i += close_bytes.len();
3180            marker_bytes_seen += close_bytes.len();
3181        } else {
3182            i += 1;
3183        }
3184    }
3185    offsets
3186}
3187
3188/// Binary-search the position map for the leaf whose `[start, end)` range
3189/// contains `offset`. Returns `None` if no leaf covers the offset.
3190fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3191    // Binary search for the greatest start_offset <= offset.
3192    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3193        Ok(i) => i,
3194        Err(0) => return None,
3195        Err(i) => i - 1,
3196    };
3197    let (start, end, path) = &positions[idx];
3198    if offset >= *start && offset < *end {
3199        Some(path.as_str())
3200    } else {
3201        None
3202    }
3203}
3204
3205/// Resolve per-hit match attribution by introspecting the FTS5 match state
3206/// for the hit's row via `highlight()` and mapping the resulting original-
3207/// text offsets back to recursive-leaf paths via the Phase 4 position map.
3208///
3209/// Chunk-backed hits carry no leaf structure and always return
3210/// `matched_paths = ["text_content"]`. Property-backed hits without a `projection_row_id`
3211/// (which should not happen — the search CTE always populates it) also
3212/// return empty attribution rather than erroring.
3213fn resolve_hit_attribution(
3214    conn: &Connection,
3215    hit: &SearchHit,
3216    match_expr: &str,
3217) -> Result<HitAttribution, EngineError> {
3218    if matches!(hit.source, SearchHitSource::Chunk) {
3219        return Ok(HitAttribution {
3220            matched_paths: vec!["text_content".to_owned()],
3221        });
3222    }
3223    if !matches!(hit.source, SearchHitSource::Property) {
3224        return Ok(HitAttribution {
3225            matched_paths: Vec::new(),
3226        });
3227    }
3228    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3229        return Ok(HitAttribution {
3230            matched_paths: Vec::new(),
3231        });
3232    };
3233    let rowid: i64 = match rowid_str.parse() {
3234        Ok(v) => v,
3235        Err(_) => {
3236            return Ok(HitAttribution {
3237                matched_paths: Vec::new(),
3238            });
3239        }
3240    };
3241
3242    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
3243    // FTS5 MATCH in the WHERE clause re-establishes the match state that
3244    // `highlight()` needs to decorate the returned text.
3245    // Per-kind tables have schema (node_logical_id UNINDEXED, text_content),
3246    // so text_content is column index 1 (0-based).
3247    let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3248    let highlight_sql = format!(
3249        "SELECT highlight({prop_table}, 1, ?1, ?2) \
3250         FROM {prop_table} \
3251         WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3252    );
3253    let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3254    let wrapped: Option<String> = stmt
3255        .query_row(
3256            rusqlite::params![
3257                ATTRIBUTION_HIGHLIGHT_OPEN,
3258                ATTRIBUTION_HIGHLIGHT_CLOSE,
3259                rowid,
3260                match_expr,
3261            ],
3262            |row| row.get(0),
3263        )
3264        .optional()
3265        .map_err(EngineError::Sqlite)?;
3266    let Some(wrapped) = wrapped else {
3267        return Ok(HitAttribution {
3268            matched_paths: Vec::new(),
3269        });
3270    };
3271
3272    let offsets = parse_highlight_offsets(
3273        &wrapped,
3274        ATTRIBUTION_HIGHLIGHT_OPEN,
3275        ATTRIBUTION_HIGHLIGHT_CLOSE,
3276    );
3277    if offsets.is_empty() {
3278        return Ok(HitAttribution {
3279            matched_paths: Vec::new(),
3280        });
3281    }
3282
3283    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3284    if positions.is_empty() {
3285        // Scalar-only schemas have no position-map entries; attribution
3286        // degrades to an empty vector rather than erroring.
3287        return Ok(HitAttribution {
3288            matched_paths: Vec::new(),
3289        });
3290    }
3291
3292    let mut matched_paths: Vec<String> = Vec::new();
3293    for offset in offsets {
3294        if let Some(path) = find_leaf_for_offset(&positions, offset)
3295            && !matched_paths.iter().any(|p| p == path)
3296        {
3297            matched_paths.push(path.to_owned());
3298        }
3299    }
3300    Ok(HitAttribution { matched_paths })
3301}
3302
3303/// Build a BM25 scoring expression for a per-kind FTS5 table.
3304///
3305/// If the schema has no weighted specs (all weights None), returns `bm25({table})`.
3306/// Otherwise returns `bm25({table}, 0.0, w1, w2, ...)` where the first weight
3307/// (0.0) is for the `node_logical_id UNINDEXED` column (which BM25 should ignore),
3308/// then one weight per spec in schema order.
3309fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3310    let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3311    let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3312    if !any_weighted {
3313        return format!("bm25({table})");
3314    }
3315    // node_logical_id is UNINDEXED — weight 0.0 tells BM25 to ignore it.
3316    let weights: Vec<String> = std::iter::once("0.0".to_owned())
3317        .chain(
3318            schema
3319                .paths
3320                .iter()
3321                .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3322        )
3323        .collect();
3324    format!("bm25({table}, {})", weights.join(", "))
3325}
3326
3327fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3328    match value {
3329        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3330        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3331        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3332    }
3333}
3334
3335#[cfg(test)]
3336#[allow(clippy::expect_used)]
3337mod tests {
3338    use std::panic::{AssertUnwindSafe, catch_unwind};
3339    use std::sync::Arc;
3340
3341    use fathomdb_query::{BindValue, QueryBuilder};
3342    use fathomdb_schema::SchemaManager;
3343    use rusqlite::types::Value;
3344    use tempfile::NamedTempFile;
3345
3346    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3347
3348    use fathomdb_query::{
3349        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3350    };
3351
3352    use super::{
3353        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3354        wrap_node_row_projection_sql,
3355    };
3356
3357    fn mk_hit(
3358        logical_id: &str,
3359        score: f64,
3360        match_mode: SearchMatchMode,
3361        source: SearchHitSource,
3362    ) -> SearchHit {
3363        SearchHit {
3364            node: NodeRowLite {
3365                row_id: format!("{logical_id}-row"),
3366                logical_id: logical_id.to_owned(),
3367                kind: "Goal".to_owned(),
3368                properties: "{}".to_owned(),
3369                content_ref: None,
3370                last_accessed_at: None,
3371            },
3372            score,
3373            modality: RetrievalModality::Text,
3374            source,
3375            match_mode: Some(match_mode),
3376            snippet: None,
3377            written_at: 0,
3378            projection_row_id: None,
3379            vector_distance: None,
3380            attribution: None,
3381        }
3382    }
3383
3384    #[test]
3385    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3386        let strict = vec![mk_hit(
3387            "a",
3388            1.0,
3389            SearchMatchMode::Strict,
3390            SearchHitSource::Chunk,
3391        )];
3392        // Relaxed has a higher score but must still come second.
3393        let relaxed = vec![mk_hit(
3394            "b",
3395            9.9,
3396            SearchMatchMode::Relaxed,
3397            SearchHitSource::Chunk,
3398        )];
3399        let merged = merge_search_branches(strict, relaxed, 10);
3400        assert_eq!(merged.len(), 2);
3401        assert_eq!(merged[0].node.logical_id, "a");
3402        assert!(matches!(
3403            merged[0].match_mode,
3404            Some(SearchMatchMode::Strict)
3405        ));
3406        assert_eq!(merged[1].node.logical_id, "b");
3407        assert!(matches!(
3408            merged[1].match_mode,
3409            Some(SearchMatchMode::Relaxed)
3410        ));
3411    }
3412
3413    #[test]
3414    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3415        let strict = vec![mk_hit(
3416            "shared",
3417            1.0,
3418            SearchMatchMode::Strict,
3419            SearchHitSource::Chunk,
3420        )];
3421        let relaxed = vec![
3422            mk_hit(
3423                "shared",
3424                9.9,
3425                SearchMatchMode::Relaxed,
3426                SearchHitSource::Chunk,
3427            ),
3428            mk_hit(
3429                "other",
3430                2.0,
3431                SearchMatchMode::Relaxed,
3432                SearchHitSource::Chunk,
3433            ),
3434        ];
3435        let merged = merge_search_branches(strict, relaxed, 10);
3436        assert_eq!(merged.len(), 2);
3437        assert_eq!(merged[0].node.logical_id, "shared");
3438        assert!(matches!(
3439            merged[0].match_mode,
3440            Some(SearchMatchMode::Strict)
3441        ));
3442        assert_eq!(merged[1].node.logical_id, "other");
3443        assert!(matches!(
3444            merged[1].match_mode,
3445            Some(SearchMatchMode::Relaxed)
3446        ));
3447    }
3448
3449    #[test]
3450    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3451        let strict = vec![
3452            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3453            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3454            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3455        ];
3456        let merged = merge_search_branches(strict, vec![], 10);
3457        assert_eq!(
3458            merged
3459                .iter()
3460                .map(|h| &h.node.logical_id)
3461                .collect::<Vec<_>>(),
3462            vec!["a", "c", "b"]
3463        );
3464    }
3465
3466    #[test]
3467    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3468        let strict = vec![
3469            mk_hit(
3470                "shared",
3471                1.0,
3472                SearchMatchMode::Strict,
3473                SearchHitSource::Property,
3474            ),
3475            mk_hit(
3476                "shared",
3477                1.0,
3478                SearchMatchMode::Strict,
3479                SearchHitSource::Chunk,
3480            ),
3481        ];
3482        let merged = merge_search_branches(strict, vec![], 10);
3483        assert_eq!(merged.len(), 1);
3484        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3485    }
3486
3487    #[test]
3488    fn merge_truncates_to_limit_after_block_merge() {
3489        let strict = vec![
3490            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3491            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3492        ];
3493        let relaxed = vec![mk_hit(
3494            "c",
3495            9.0,
3496            SearchMatchMode::Relaxed,
3497            SearchHitSource::Chunk,
3498        )];
3499        let merged = merge_search_branches(strict, relaxed, 2);
3500        assert_eq!(merged.len(), 2);
3501        assert_eq!(merged[0].node.logical_id, "a");
3502        assert_eq!(merged[1].node.logical_id, "b");
3503    }
3504
3505    /// P12 architectural pin: the generalized three-branch merger must
3506    /// produce strict -> relaxed -> vector block ordering with cross-branch
3507    /// dedup resolved by branch precedence (strict > relaxed > vector).
3508    /// v1 `search()` policy never fires the vector branch through the
3509    /// unified planner because read-time embedding is deferred, but the
3510    /// merge helper itself must be ready for the day the planner does so —
3511    /// otherwise wiring the future phase requires touching the core merge
3512    /// code as well as the planner.
3513    #[test]
3514    fn search_architecturally_supports_three_branch_fusion() {
3515        let strict = vec![mk_hit(
3516            "alpha",
3517            1.0,
3518            SearchMatchMode::Strict,
3519            SearchHitSource::Chunk,
3520        )];
3521        let relaxed = vec![mk_hit(
3522            "bravo",
3523            5.0,
3524            SearchMatchMode::Relaxed,
3525            SearchHitSource::Chunk,
3526        )];
3527        // Synthetic vector hit with the highest score. Three-block ordering
3528        // must still place it last.
3529        let mut vector_hit = mk_hit(
3530            "charlie",
3531            9.9,
3532            SearchMatchMode::Strict,
3533            SearchHitSource::Vector,
3534        );
3535        // Vector hits actually carry match_mode=None per the addendum, but
3536        // the merge helper's ordering is mode-agnostic; we override here to
3537        // pin the modality field for the test.
3538        vector_hit.match_mode = None;
3539        vector_hit.modality = RetrievalModality::Vector;
3540        let vector = vec![vector_hit];
3541
3542        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3543        assert_eq!(merged.len(), 3);
3544        assert_eq!(merged[0].node.logical_id, "alpha");
3545        assert_eq!(merged[1].node.logical_id, "bravo");
3546        assert_eq!(merged[2].node.logical_id, "charlie");
3547        // Vector block comes last regardless of its higher score.
3548        assert!(matches!(merged[2].source, SearchHitSource::Vector));
3549
3550        // Cross-branch dedup: a logical_id that appears in multiple branches
3551        // is attributed to its highest-priority originating branch only.
3552        let strict2 = vec![mk_hit(
3553            "shared",
3554            0.5,
3555            SearchMatchMode::Strict,
3556            SearchHitSource::Chunk,
3557        )];
3558        let relaxed2 = vec![mk_hit(
3559            "shared",
3560            5.0,
3561            SearchMatchMode::Relaxed,
3562            SearchHitSource::Chunk,
3563        )];
3564        let mut vshared = mk_hit(
3565            "shared",
3566            9.9,
3567            SearchMatchMode::Strict,
3568            SearchHitSource::Vector,
3569        );
3570        vshared.match_mode = None;
3571        vshared.modality = RetrievalModality::Vector;
3572        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3573        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3574        assert!(matches!(
3575            merged2[0].match_mode,
3576            Some(SearchMatchMode::Strict)
3577        ));
3578        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3579
3580        // Relaxed wins over vector when strict is absent.
3581        let mut vshared2 = mk_hit(
3582            "shared",
3583            9.9,
3584            SearchMatchMode::Strict,
3585            SearchHitSource::Vector,
3586        );
3587        vshared2.match_mode = None;
3588        vshared2.modality = RetrievalModality::Vector;
3589        let merged3 = merge_search_branches_three(
3590            vec![],
3591            vec![mk_hit(
3592                "shared",
3593                1.0,
3594                SearchMatchMode::Relaxed,
3595                SearchHitSource::Chunk,
3596            )],
3597            vec![vshared2],
3598            10,
3599        );
3600        assert_eq!(merged3.len(), 1);
3601        assert!(matches!(
3602            merged3[0].match_mode,
3603            Some(SearchMatchMode::Relaxed)
3604        ));
3605    }
3606
3607    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
3608    /// never fires this shape today (read-time embedding is deferred), but
3609    /// when it does the merger will see empty strict + empty relaxed + a
3610    /// non-empty vector block. The three-branch merger must pass that
3611    /// block through unchanged, preserving `RetrievalModality::Vector`,
3612    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
3613    ///
3614    /// Note: the review spec asked for `vector_hit_count == 1` /
3615    /// `strict_hit_count == 0` assertions. Those are fields on
3616    /// `SearchRows`, which is assembled one layer up in
3617    /// `execute_compiled_retrieval_plan`. The merger returns a bare
3618    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
3619    /// directly on the returned vec (block shape + per-hit fields).
3620    #[test]
3621    fn merge_search_branches_three_vector_only_preserves_vector_block() {
3622        let mut vector_hit = mk_hit(
3623            "solo",
3624            0.75,
3625            SearchMatchMode::Strict,
3626            SearchHitSource::Vector,
3627        );
3628        vector_hit.match_mode = None;
3629        vector_hit.modality = RetrievalModality::Vector;
3630
3631        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3632
3633        assert_eq!(merged.len(), 1);
3634        assert_eq!(merged[0].node.logical_id, "solo");
3635        assert!(matches!(merged[0].source, SearchHitSource::Vector));
3636        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3637        assert!(
3638            merged[0].match_mode.is_none(),
3639            "vector hits carry match_mode=None per addendum 1"
3640        );
3641    }
3642
3643    /// P12-N-3: limit truncation must preserve block precedence — when the
3644    /// strict block alone already exceeds the limit, relaxed and vector
3645    /// hits must be dropped entirely even if they have higher raw scores.
3646    ///
3647    /// Note: the review spec asked for `strict_hit_count == 2` /
3648    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
3649    /// are `SearchRows` fields assembled one layer up. Since
3650    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
3651    /// test asserts the corresponding invariants directly: the returned
3652    /// vec contains exactly the top two strict hits, with no relaxed or
3653    /// vector hits leaking past the limit.
3654    #[test]
3655    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3656        let strict = vec![
3657            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3658            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3659            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3660        ];
3661        let relaxed = vec![mk_hit(
3662            "d",
3663            9.0,
3664            SearchMatchMode::Relaxed,
3665            SearchHitSource::Chunk,
3666        )];
3667        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3668        vector_hit.match_mode = None;
3669        vector_hit.modality = RetrievalModality::Vector;
3670        let vector = vec![vector_hit];
3671
3672        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3673
3674        assert_eq!(merged.len(), 2);
3675        assert_eq!(merged[0].node.logical_id, "a");
3676        assert_eq!(merged[1].node.logical_id, "b");
3677        // Neither relaxed nor vector hits made it past the limit.
3678        assert!(
3679            merged
3680                .iter()
3681                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3682            "strict block must win limit contention against higher-scored relaxed/vector hits"
3683        );
3684        assert!(
3685            merged
3686                .iter()
3687                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3688            "no vector source hits should leak past the limit"
3689        );
3690    }
3691
3692    #[test]
3693    fn is_vec_table_absent_matches_known_error_messages() {
3694        use rusqlite::ffi;
3695        fn make_err(msg: &str) -> rusqlite::Error {
3696            rusqlite::Error::SqliteFailure(
3697                ffi::Error {
3698                    code: ffi::ErrorCode::Unknown,
3699                    extended_code: 1,
3700                },
3701                Some(msg.to_owned()),
3702            )
3703        }
3704        assert!(is_vec_table_absent(&make_err(
3705            "no such table: vec_nodes_active"
3706        )));
3707        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3708        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3709        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3710        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3711    }
3712
3713    // --- 0.5.0 item 6: per-kind vec table in coordinator ---
3714
3715    #[test]
3716    fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
3717        // Without sqlite-vec feature or without creating vec_mykind, querying
3718        // for kind "MyKind" should degrade gracefully (was_degraded=true).
3719        let db = NamedTempFile::new().expect("temporary db");
3720        let coordinator = ExecutionCoordinator::open(
3721            db.path(),
3722            Arc::new(SchemaManager::new()),
3723            None,
3724            1,
3725            Arc::new(TelemetryCounters::default()),
3726            None,
3727        )
3728        .expect("coordinator");
3729
3730        let compiled = QueryBuilder::nodes("MyKind")
3731            .vector_search("some query", 5)
3732            .compile()
3733            .expect("vector query compiles");
3734
3735        let rows = coordinator
3736            .execute_compiled_read(&compiled)
3737            .expect("degraded read must succeed");
3738        assert!(
3739            rows.was_degraded,
3740            "must degrade when vec_mykind table does not exist"
3741        );
3742        assert!(
3743            rows.nodes.is_empty(),
3744            "degraded result must return empty nodes"
3745        );
3746    }
3747
3748    #[test]
3749    fn bind_value_text_maps_to_sql_text() {
3750        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
3751        assert_eq!(val, Value::Text("hello".to_owned()));
3752    }
3753
3754    #[test]
3755    fn bind_value_integer_maps_to_sql_integer() {
3756        let val = bind_value_to_sql(&BindValue::Integer(42));
3757        assert_eq!(val, Value::Integer(42));
3758    }
3759
3760    #[test]
3761    fn bind_value_bool_true_maps_to_integer_one() {
3762        let val = bind_value_to_sql(&BindValue::Bool(true));
3763        assert_eq!(val, Value::Integer(1));
3764    }
3765
3766    #[test]
3767    fn bind_value_bool_false_maps_to_integer_zero() {
3768        let val = bind_value_to_sql(&BindValue::Bool(false));
3769        assert_eq!(val, Value::Integer(0));
3770    }
3771
3772    #[test]
3773    fn same_shape_queries_share_one_cache_entry() {
3774        let db = NamedTempFile::new().expect("temporary db");
3775        let coordinator = ExecutionCoordinator::open(
3776            db.path(),
3777            Arc::new(SchemaManager::new()),
3778            None,
3779            1,
3780            Arc::new(TelemetryCounters::default()),
3781            None,
3782        )
3783        .expect("coordinator");
3784
3785        let compiled_a = QueryBuilder::nodes("Meeting")
3786            .text_search("budget", 5)
3787            .limit(10)
3788            .compile()
3789            .expect("compiled a");
3790        let compiled_b = QueryBuilder::nodes("Meeting")
3791            .text_search("standup", 5)
3792            .limit(10)
3793            .compile()
3794            .expect("compiled b");
3795
3796        coordinator
3797            .execute_compiled_read(&compiled_a)
3798            .expect("read a");
3799        coordinator
3800            .execute_compiled_read(&compiled_b)
3801            .expect("read b");
3802
3803        assert_eq!(
3804            compiled_a.shape_hash, compiled_b.shape_hash,
3805            "different bind values, same structural shape → same hash"
3806        );
3807        assert_eq!(coordinator.shape_sql_count(), 1);
3808    }
3809
3810    #[test]
3811    fn vector_read_degrades_gracefully_when_vec_table_absent() {
3812        let db = NamedTempFile::new().expect("temporary db");
3813        let coordinator = ExecutionCoordinator::open(
3814            db.path(),
3815            Arc::new(SchemaManager::new()),
3816            None,
3817            1,
3818            Arc::new(TelemetryCounters::default()),
3819            None,
3820        )
3821        .expect("coordinator");
3822
3823        let compiled = QueryBuilder::nodes("Meeting")
3824            .vector_search("budget embeddings", 5)
3825            .compile()
3826            .expect("vector query compiles");
3827
3828        let result = coordinator.execute_compiled_read(&compiled);
3829        let rows = result.expect("degraded read must succeed, not error");
3830        assert!(
3831            rows.was_degraded,
3832            "result must be flagged as degraded when vec_nodes_active is absent"
3833        );
3834        assert!(
3835            rows.nodes.is_empty(),
3836            "degraded result must return empty nodes"
3837        );
3838    }
3839
3840    #[test]
3841    fn coordinator_caches_by_shape_hash() {
3842        let db = NamedTempFile::new().expect("temporary db");
3843        let coordinator = ExecutionCoordinator::open(
3844            db.path(),
3845            Arc::new(SchemaManager::new()),
3846            None,
3847            1,
3848            Arc::new(TelemetryCounters::default()),
3849            None,
3850        )
3851        .expect("coordinator");
3852
3853        let compiled = QueryBuilder::nodes("Meeting")
3854            .text_search("budget", 5)
3855            .compile()
3856            .expect("compiled query");
3857
3858        coordinator
3859            .execute_compiled_read(&compiled)
3860            .expect("execute compiled read");
3861        assert_eq!(coordinator.shape_sql_count(), 1);
3862    }
3863
3864    // --- Item 6: explain_compiled_read tests ---
3865
3866    #[test]
3867    fn explain_returns_correct_sql() {
3868        let db = NamedTempFile::new().expect("temporary db");
3869        let coordinator = ExecutionCoordinator::open(
3870            db.path(),
3871            Arc::new(SchemaManager::new()),
3872            None,
3873            1,
3874            Arc::new(TelemetryCounters::default()),
3875            None,
3876        )
3877        .expect("coordinator");
3878
3879        let compiled = QueryBuilder::nodes("Meeting")
3880            .text_search("budget", 5)
3881            .compile()
3882            .expect("compiled query");
3883
3884        let plan = coordinator.explain_compiled_read(&compiled);
3885
3886        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3887    }
3888
3889    #[test]
3890    fn explain_returns_correct_driving_table() {
3891        use fathomdb_query::DrivingTable;
3892
3893        let db = NamedTempFile::new().expect("temporary db");
3894        let coordinator = ExecutionCoordinator::open(
3895            db.path(),
3896            Arc::new(SchemaManager::new()),
3897            None,
3898            1,
3899            Arc::new(TelemetryCounters::default()),
3900            None,
3901        )
3902        .expect("coordinator");
3903
3904        let compiled = QueryBuilder::nodes("Meeting")
3905            .text_search("budget", 5)
3906            .compile()
3907            .expect("compiled query");
3908
3909        let plan = coordinator.explain_compiled_read(&compiled);
3910
3911        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3912    }
3913
3914    #[test]
3915    fn explain_reports_cache_miss_then_hit() {
3916        let db = NamedTempFile::new().expect("temporary db");
3917        let coordinator = ExecutionCoordinator::open(
3918            db.path(),
3919            Arc::new(SchemaManager::new()),
3920            None,
3921            1,
3922            Arc::new(TelemetryCounters::default()),
3923            None,
3924        )
3925        .expect("coordinator");
3926
3927        let compiled = QueryBuilder::nodes("Meeting")
3928            .text_search("budget", 5)
3929            .compile()
3930            .expect("compiled query");
3931
3932        // Before execution: cache miss
3933        let plan_before = coordinator.explain_compiled_read(&compiled);
3934        assert!(
3935            !plan_before.cache_hit,
3936            "cache miss expected before first execute"
3937        );
3938
3939        // Execute to populate cache
3940        coordinator
3941            .execute_compiled_read(&compiled)
3942            .expect("execute read");
3943
3944        // After execution: cache hit
3945        let plan_after = coordinator.explain_compiled_read(&compiled);
3946        assert!(
3947            plan_after.cache_hit,
3948            "cache hit expected after first execute"
3949        );
3950    }
3951
3952    #[test]
3953    fn explain_does_not_execute_query() {
3954        // Call explain_compiled_read on an empty database. If explain were
3955        // actually executing SQL, it would return Ok with 0 rows. But the
3956        // key assertion is that it returns a QueryPlan (not an error) even
3957        // without touching the database.
3958        let db = NamedTempFile::new().expect("temporary db");
3959        let coordinator = ExecutionCoordinator::open(
3960            db.path(),
3961            Arc::new(SchemaManager::new()),
3962            None,
3963            1,
3964            Arc::new(TelemetryCounters::default()),
3965            None,
3966        )
3967        .expect("coordinator");
3968
3969        let compiled = QueryBuilder::nodes("Meeting")
3970            .text_search("anything", 5)
3971            .compile()
3972            .expect("compiled query");
3973
3974        // This must not error, even though the database is empty
3975        let plan = coordinator.explain_compiled_read(&compiled);
3976
3977        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3978        assert_eq!(plan.bind_count, compiled.binds.len());
3979    }
3980
3981    #[test]
3982    fn coordinator_executes_compiled_read() {
3983        let db = NamedTempFile::new().expect("temporary db");
3984        let coordinator = ExecutionCoordinator::open(
3985            db.path(),
3986            Arc::new(SchemaManager::new()),
3987            None,
3988            1,
3989            Arc::new(TelemetryCounters::default()),
3990            None,
3991        )
3992        .expect("coordinator");
3993        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3994
3995        conn.execute_batch(
3996            r#"
3997            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3998            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3999            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4000            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
4001            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4002            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
4003            "#,
4004        )
4005        .expect("seed data");
4006
4007        let compiled = QueryBuilder::nodes("Meeting")
4008            .text_search("budget", 5)
4009            .limit(5)
4010            .compile()
4011            .expect("compiled query");
4012
4013        let rows = coordinator
4014            .execute_compiled_read(&compiled)
4015            .expect("execute read");
4016
4017        assert_eq!(rows.nodes.len(), 1);
4018        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4019    }
4020
4021    #[test]
4022    fn text_search_finds_structured_only_node_via_property_fts() {
4023        let db = NamedTempFile::new().expect("temporary db");
4024        let coordinator = ExecutionCoordinator::open(
4025            db.path(),
4026            Arc::new(SchemaManager::new()),
4027            None,
4028            1,
4029            Arc::new(TelemetryCounters::default()),
4030            None,
4031        )
4032        .expect("coordinator");
4033        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4034
4035        // Insert a structured-only node (no chunks) with a property FTS row.
4036        // Per-kind table fts_props_goal must be created before inserting.
4037        conn.execute_batch(
4038            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
4039                node_logical_id UNINDEXED, text_content, \
4040                tokenize = 'porter unicode61 remove_diacritics 2'\
4041            )",
4042        )
4043        .expect("create per-kind fts table");
4044        conn.execute_batch(
4045            r#"
4046            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4047            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
4048            INSERT INTO fts_props_goal (node_logical_id, text_content)
4049            VALUES ('goal-1', 'Ship v2');
4050            "#,
4051        )
4052        .expect("seed data");
4053
4054        let compiled = QueryBuilder::nodes("Goal")
4055            .text_search("Ship", 5)
4056            .limit(5)
4057            .compile()
4058            .expect("compiled query");
4059
4060        let rows = coordinator
4061            .execute_compiled_read(&compiled)
4062            .expect("execute read");
4063
4064        assert_eq!(rows.nodes.len(), 1);
4065        assert_eq!(rows.nodes[0].logical_id, "goal-1");
4066    }
4067
4068    #[test]
4069    fn text_search_returns_both_chunk_and_property_backed_hits() {
4070        let db = NamedTempFile::new().expect("temporary db");
4071        let coordinator = ExecutionCoordinator::open(
4072            db.path(),
4073            Arc::new(SchemaManager::new()),
4074            None,
4075            1,
4076            Arc::new(TelemetryCounters::default()),
4077            None,
4078        )
4079        .expect("coordinator");
4080        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4081
4082        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
4083        conn.execute_batch(
4084            r"
4085            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4086            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4087            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4088            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
4089            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4090            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
4091            ",
4092        )
4093        .expect("seed chunk-backed node");
4094
4095        // Property-backed hit: a Meeting with property FTS containing "quarterly".
4096        // Per-kind table fts_props_meeting must be created before inserting.
4097        conn.execute_batch(
4098            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
4099                node_logical_id UNINDEXED, text_content, \
4100                tokenize = 'porter unicode61 remove_diacritics 2'\
4101            )",
4102        )
4103        .expect("create per-kind fts table");
4104        conn.execute_batch(
4105            r#"
4106            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4107            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
4108            INSERT INTO fts_props_meeting (node_logical_id, text_content)
4109            VALUES ('meeting-2', 'quarterly sync');
4110            "#,
4111        )
4112        .expect("seed property-backed node");
4113
4114        let compiled = QueryBuilder::nodes("Meeting")
4115            .text_search("quarterly", 10)
4116            .limit(10)
4117            .compile()
4118            .expect("compiled query");
4119
4120        let rows = coordinator
4121            .execute_compiled_read(&compiled)
4122            .expect("execute read");
4123
4124        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
4125        ids.sort_unstable();
4126        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
4127    }
4128
4129    #[test]
4130    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
4131        let db = NamedTempFile::new().expect("temporary db");
4132        let coordinator = ExecutionCoordinator::open(
4133            db.path(),
4134            Arc::new(SchemaManager::new()),
4135            None,
4136            1,
4137            Arc::new(TelemetryCounters::default()),
4138            None,
4139        )
4140        .expect("coordinator");
4141        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4142
4143        conn.execute_batch(
4144            r"
4145            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4146            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4147            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4148            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4149            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4150            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4151            ",
4152        )
4153        .expect("seed chunk-backed node");
4154
4155        let compiled = QueryBuilder::nodes("Meeting")
4156            .text_search("not a ship", 10)
4157            .limit(10)
4158            .compile()
4159            .expect("compiled query");
4160
4161        let rows = coordinator
4162            .execute_compiled_read(&compiled)
4163            .expect("execute read");
4164
4165        assert_eq!(rows.nodes.len(), 1);
4166        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4167    }
4168
4169    // --- Item 1: capability gate tests ---
4170
4171    #[test]
4172    fn capability_gate_reports_false_without_feature() {
4173        let db = NamedTempFile::new().expect("temporary db");
4174        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
4175        // when no dimension is requested (the vector profile is never bootstrapped).
4176        let coordinator = ExecutionCoordinator::open(
4177            db.path(),
4178            Arc::new(SchemaManager::new()),
4179            None,
4180            1,
4181            Arc::new(TelemetryCounters::default()),
4182            None,
4183        )
4184        .expect("coordinator");
4185        assert!(
4186            !coordinator.vector_enabled(),
4187            "vector_enabled must be false when no dimension is requested"
4188        );
4189    }
4190
4191    #[cfg(feature = "sqlite-vec")]
4192    #[test]
4193    fn capability_gate_reports_true_when_feature_enabled() {
4194        let db = NamedTempFile::new().expect("temporary db");
4195        let coordinator = ExecutionCoordinator::open(
4196            db.path(),
4197            Arc::new(SchemaManager::new()),
4198            Some(128),
4199            1,
4200            Arc::new(TelemetryCounters::default()),
4201            None,
4202        )
4203        .expect("coordinator");
4204        assert!(
4205            coordinator.vector_enabled(),
4206            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4207        );
4208    }
4209
4210    // --- Item 4: runtime table read tests ---
4211
4212    #[test]
4213    fn read_run_returns_inserted_run() {
4214        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4215
4216        let db = NamedTempFile::new().expect("temporary db");
4217        let writer = WriterActor::start(
4218            db.path(),
4219            Arc::new(SchemaManager::new()),
4220            ProvenanceMode::Warn,
4221            Arc::new(TelemetryCounters::default()),
4222        )
4223        .expect("writer");
4224        writer
4225            .submit(WriteRequest {
4226                label: "runtime".to_owned(),
4227                nodes: vec![],
4228                node_retires: vec![],
4229                edges: vec![],
4230                edge_retires: vec![],
4231                chunks: vec![],
4232                runs: vec![RunInsert {
4233                    id: "run-r1".to_owned(),
4234                    kind: "session".to_owned(),
4235                    status: "active".to_owned(),
4236                    properties: "{}".to_owned(),
4237                    source_ref: Some("src-1".to_owned()),
4238                    upsert: false,
4239                    supersedes_id: None,
4240                }],
4241                steps: vec![],
4242                actions: vec![],
4243                optional_backfills: vec![],
4244                vec_inserts: vec![],
4245                operational_writes: vec![],
4246            })
4247            .expect("write run");
4248
4249        let coordinator = ExecutionCoordinator::open(
4250            db.path(),
4251            Arc::new(SchemaManager::new()),
4252            None,
4253            1,
4254            Arc::new(TelemetryCounters::default()),
4255            None,
4256        )
4257        .expect("coordinator");
4258        let row = coordinator
4259            .read_run("run-r1")
4260            .expect("read_run")
4261            .expect("row exists");
4262        assert_eq!(row.id, "run-r1");
4263        assert_eq!(row.kind, "session");
4264        assert_eq!(row.status, "active");
4265    }
4266
4267    #[test]
4268    fn read_step_returns_inserted_step() {
4269        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4270
4271        let db = NamedTempFile::new().expect("temporary db");
4272        let writer = WriterActor::start(
4273            db.path(),
4274            Arc::new(SchemaManager::new()),
4275            ProvenanceMode::Warn,
4276            Arc::new(TelemetryCounters::default()),
4277        )
4278        .expect("writer");
4279        writer
4280            .submit(WriteRequest {
4281                label: "runtime".to_owned(),
4282                nodes: vec![],
4283                node_retires: vec![],
4284                edges: vec![],
4285                edge_retires: vec![],
4286                chunks: vec![],
4287                runs: vec![RunInsert {
4288                    id: "run-s1".to_owned(),
4289                    kind: "session".to_owned(),
4290                    status: "active".to_owned(),
4291                    properties: "{}".to_owned(),
4292                    source_ref: Some("src-1".to_owned()),
4293                    upsert: false,
4294                    supersedes_id: None,
4295                }],
4296                steps: vec![StepInsert {
4297                    id: "step-s1".to_owned(),
4298                    run_id: "run-s1".to_owned(),
4299                    kind: "llm".to_owned(),
4300                    status: "completed".to_owned(),
4301                    properties: "{}".to_owned(),
4302                    source_ref: Some("src-1".to_owned()),
4303                    upsert: false,
4304                    supersedes_id: None,
4305                }],
4306                actions: vec![],
4307                optional_backfills: vec![],
4308                vec_inserts: vec![],
4309                operational_writes: vec![],
4310            })
4311            .expect("write step");
4312
4313        let coordinator = ExecutionCoordinator::open(
4314            db.path(),
4315            Arc::new(SchemaManager::new()),
4316            None,
4317            1,
4318            Arc::new(TelemetryCounters::default()),
4319            None,
4320        )
4321        .expect("coordinator");
4322        let row = coordinator
4323            .read_step("step-s1")
4324            .expect("read_step")
4325            .expect("row exists");
4326        assert_eq!(row.id, "step-s1");
4327        assert_eq!(row.run_id, "run-s1");
4328        assert_eq!(row.kind, "llm");
4329    }
4330
4331    #[test]
4332    fn read_action_returns_inserted_action() {
4333        use crate::{
4334            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4335            writer::{ActionInsert, StepInsert},
4336        };
4337
4338        let db = NamedTempFile::new().expect("temporary db");
4339        let writer = WriterActor::start(
4340            db.path(),
4341            Arc::new(SchemaManager::new()),
4342            ProvenanceMode::Warn,
4343            Arc::new(TelemetryCounters::default()),
4344        )
4345        .expect("writer");
4346        writer
4347            .submit(WriteRequest {
4348                label: "runtime".to_owned(),
4349                nodes: vec![],
4350                node_retires: vec![],
4351                edges: vec![],
4352                edge_retires: vec![],
4353                chunks: vec![],
4354                runs: vec![RunInsert {
4355                    id: "run-a1".to_owned(),
4356                    kind: "session".to_owned(),
4357                    status: "active".to_owned(),
4358                    properties: "{}".to_owned(),
4359                    source_ref: Some("src-1".to_owned()),
4360                    upsert: false,
4361                    supersedes_id: None,
4362                }],
4363                steps: vec![StepInsert {
4364                    id: "step-a1".to_owned(),
4365                    run_id: "run-a1".to_owned(),
4366                    kind: "llm".to_owned(),
4367                    status: "completed".to_owned(),
4368                    properties: "{}".to_owned(),
4369                    source_ref: Some("src-1".to_owned()),
4370                    upsert: false,
4371                    supersedes_id: None,
4372                }],
4373                actions: vec![ActionInsert {
4374                    id: "action-a1".to_owned(),
4375                    step_id: "step-a1".to_owned(),
4376                    kind: "emit".to_owned(),
4377                    status: "completed".to_owned(),
4378                    properties: "{}".to_owned(),
4379                    source_ref: Some("src-1".to_owned()),
4380                    upsert: false,
4381                    supersedes_id: None,
4382                }],
4383                optional_backfills: vec![],
4384                vec_inserts: vec![],
4385                operational_writes: vec![],
4386            })
4387            .expect("write action");
4388
4389        let coordinator = ExecutionCoordinator::open(
4390            db.path(),
4391            Arc::new(SchemaManager::new()),
4392            None,
4393            1,
4394            Arc::new(TelemetryCounters::default()),
4395            None,
4396        )
4397        .expect("coordinator");
4398        let row = coordinator
4399            .read_action("action-a1")
4400            .expect("read_action")
4401            .expect("row exists");
4402        assert_eq!(row.id, "action-a1");
4403        assert_eq!(row.step_id, "step-a1");
4404        assert_eq!(row.kind, "emit");
4405    }
4406
4407    #[test]
4408    fn read_active_runs_excludes_superseded() {
4409        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4410
4411        let db = NamedTempFile::new().expect("temporary db");
4412        let writer = WriterActor::start(
4413            db.path(),
4414            Arc::new(SchemaManager::new()),
4415            ProvenanceMode::Warn,
4416            Arc::new(TelemetryCounters::default()),
4417        )
4418        .expect("writer");
4419
4420        // Insert original run
4421        writer
4422            .submit(WriteRequest {
4423                label: "v1".to_owned(),
4424                nodes: vec![],
4425                node_retires: vec![],
4426                edges: vec![],
4427                edge_retires: vec![],
4428                chunks: vec![],
4429                runs: vec![RunInsert {
4430                    id: "run-v1".to_owned(),
4431                    kind: "session".to_owned(),
4432                    status: "active".to_owned(),
4433                    properties: "{}".to_owned(),
4434                    source_ref: Some("src-1".to_owned()),
4435                    upsert: false,
4436                    supersedes_id: None,
4437                }],
4438                steps: vec![],
4439                actions: vec![],
4440                optional_backfills: vec![],
4441                vec_inserts: vec![],
4442                operational_writes: vec![],
4443            })
4444            .expect("v1 write");
4445
4446        // Supersede original run with v2
4447        writer
4448            .submit(WriteRequest {
4449                label: "v2".to_owned(),
4450                nodes: vec![],
4451                node_retires: vec![],
4452                edges: vec![],
4453                edge_retires: vec![],
4454                chunks: vec![],
4455                runs: vec![RunInsert {
4456                    id: "run-v2".to_owned(),
4457                    kind: "session".to_owned(),
4458                    status: "completed".to_owned(),
4459                    properties: "{}".to_owned(),
4460                    source_ref: Some("src-2".to_owned()),
4461                    upsert: true,
4462                    supersedes_id: Some("run-v1".to_owned()),
4463                }],
4464                steps: vec![],
4465                actions: vec![],
4466                optional_backfills: vec![],
4467                vec_inserts: vec![],
4468                operational_writes: vec![],
4469            })
4470            .expect("v2 write");
4471
4472        let coordinator = ExecutionCoordinator::open(
4473            db.path(),
4474            Arc::new(SchemaManager::new()),
4475            None,
4476            1,
4477            Arc::new(TelemetryCounters::default()),
4478            None,
4479        )
4480        .expect("coordinator");
4481        let active = coordinator.read_active_runs().expect("read_active_runs");
4482
4483        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4484        assert_eq!(active[0].id, "run-v2");
4485    }
4486
4487    #[allow(clippy::panic)]
4488    fn poison_connection(coordinator: &ExecutionCoordinator) {
4489        let result = catch_unwind(AssertUnwindSafe(|| {
4490            let _guard = coordinator.pool.connections[0]
4491                .lock()
4492                .expect("poison test lock");
4493            panic!("poison coordinator connection mutex");
4494        }));
4495        assert!(
4496            result.is_err(),
4497            "poison test must unwind while holding the connection mutex"
4498        );
4499    }
4500
4501    #[allow(clippy::panic)]
4502    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4503    where
4504        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4505    {
4506        match op(coordinator) {
4507            Err(EngineError::Bridge(message)) => {
4508                assert_eq!(message, "connection mutex poisoned");
4509            }
4510            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4511            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4512        }
4513    }
4514
4515    #[test]
4516    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4517        let db = NamedTempFile::new().expect("temporary db");
4518        let coordinator = ExecutionCoordinator::open(
4519            db.path(),
4520            Arc::new(SchemaManager::new()),
4521            None,
4522            1,
4523            Arc::new(TelemetryCounters::default()),
4524            None,
4525        )
4526        .expect("coordinator");
4527
4528        poison_connection(&coordinator);
4529
4530        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4531        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4532        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4533        assert_poisoned_connection_error(
4534            &coordinator,
4535            super::ExecutionCoordinator::read_active_runs,
4536        );
4537        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4538        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4539    }
4540
4541    // --- M-2: Bounded shape cache ---
4542
4543    #[test]
4544    fn shape_cache_stays_bounded() {
4545        use fathomdb_query::ShapeHash;
4546
4547        let db = NamedTempFile::new().expect("temporary db");
4548        let coordinator = ExecutionCoordinator::open(
4549            db.path(),
4550            Arc::new(SchemaManager::new()),
4551            None,
4552            1,
4553            Arc::new(TelemetryCounters::default()),
4554            None,
4555        )
4556        .expect("coordinator");
4557
4558        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
4559        {
4560            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4561            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4562                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4563            }
4564        }
4565        // The cache is now over the limit but hasn't been pruned yet (pruning
4566        // happens on the insert path in execute_compiled_read).
4567
4568        // Execute a compiled read to trigger the bounded-cache check.
4569        let compiled = QueryBuilder::nodes("Meeting")
4570            .text_search("budget", 5)
4571            .limit(10)
4572            .compile()
4573            .expect("compiled query");
4574
4575        coordinator
4576            .execute_compiled_read(&compiled)
4577            .expect("execute read");
4578
4579        assert!(
4580            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4581            "shape cache must stay bounded: got {} entries, max {}",
4582            coordinator.shape_sql_count(),
4583            super::MAX_SHAPE_CACHE_SIZE
4584        );
4585    }
4586
4587    // --- M-1: Read pool size ---
4588
4589    #[test]
4590    fn read_pool_size_configurable() {
4591        let db = NamedTempFile::new().expect("temporary db");
4592        let coordinator = ExecutionCoordinator::open(
4593            db.path(),
4594            Arc::new(SchemaManager::new()),
4595            None,
4596            2,
4597            Arc::new(TelemetryCounters::default()),
4598            None,
4599        )
4600        .expect("coordinator with pool_size=2");
4601
4602        assert_eq!(coordinator.pool.size(), 2);
4603
4604        // Basic read should succeed through the pool.
4605        let compiled = QueryBuilder::nodes("Meeting")
4606            .text_search("budget", 5)
4607            .limit(10)
4608            .compile()
4609            .expect("compiled query");
4610
4611        let result = coordinator.execute_compiled_read(&compiled);
4612        assert!(result.is_ok(), "read through pool must succeed");
4613    }
4614
4615    // --- M-4: Grouped read batching ---
4616
4617    #[test]
4618    fn grouped_read_results_match_baseline() {
4619        use fathomdb_query::TraverseDirection;
4620
4621        let db = NamedTempFile::new().expect("temporary db");
4622
4623        // Bootstrap the database via coordinator (creates schema).
4624        let coordinator = ExecutionCoordinator::open(
4625            db.path(),
4626            Arc::new(SchemaManager::new()),
4627            None,
4628            1,
4629            Arc::new(TelemetryCounters::default()),
4630            None,
4631        )
4632        .expect("coordinator");
4633
4634        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
4635        // to expansion nodes (Task-0-a, Task-0-b, etc.).
4636        {
4637            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4638            for i in 0..10 {
4639                conn.execute_batch(&format!(
4640                    r#"
4641                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4642                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4643                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4644                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4645                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4646                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4647
4648                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4649                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4650                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4651                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4652
4653                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4654                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4655                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4656                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4657                    "#,
4658                )).expect("seed data");
4659            }
4660        }
4661
4662        let compiled = QueryBuilder::nodes("Meeting")
4663            .text_search("meeting", 10)
4664            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
4665            .limit(10)
4666            .compile_grouped()
4667            .expect("compiled grouped query");
4668
4669        let result = coordinator
4670            .execute_compiled_grouped_read(&compiled)
4671            .expect("grouped read");
4672
4673        assert!(!result.was_degraded, "grouped read should not be degraded");
4674        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4675        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4676        assert_eq!(result.expansions[0].slot, "tasks");
4677        assert_eq!(
4678            result.expansions[0].roots.len(),
4679            10,
4680            "each expansion slot should have entries for all 10 roots"
4681        );
4682
4683        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
4684        for root_expansion in &result.expansions[0].roots {
4685            assert_eq!(
4686                root_expansion.nodes.len(),
4687                2,
4688                "root {} should have 2 expansion nodes, got {}",
4689                root_expansion.root_logical_id,
4690                root_expansion.nodes.len()
4691            );
4692        }
4693    }
4694
4695    // --- B-4: build_bm25_expr unit tests ---
4696
4697    #[test]
4698    fn build_bm25_expr_no_weights() {
4699        let schema_json = r#"["$.title","$.body"]"#;
4700        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4701        assert_eq!(result, "bm25(fts_props_testkind)");
4702    }
4703
4704    #[test]
4705    fn build_bm25_expr_with_weights() {
4706        let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4707        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4708        assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4709    }
4710
4711    // --- B-4: weighted schema integration test ---
4712
4713    #[test]
4714    #[allow(clippy::too_many_lines)]
4715    fn weighted_schema_bm25_orders_title_match_above_body_match() {
4716        use crate::{
4717            AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4718            WriterActor, writer::ChunkPolicy,
4719        };
4720        use fathomdb_schema::fts_column_name;
4721
4722        let db = NamedTempFile::new().expect("temporary db");
4723        let schema_manager = Arc::new(SchemaManager::new());
4724
4725        // Step 1: bootstrap, register schema with weights, create per-column table.
4726        {
4727            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4728            admin
4729                .register_fts_property_schema_with_entries(
4730                    "Article",
4731                    &[
4732                        FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
4733                        FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
4734                    ],
4735                    None,
4736                    &[],
4737                    crate::rebuild_actor::RebuildMode::Eager,
4738                )
4739                .expect("register schema with weights");
4740        }
4741
4742        // Step 2: write two nodes.
4743        let writer = WriterActor::start(
4744            db.path(),
4745            Arc::clone(&schema_manager),
4746            ProvenanceMode::Warn,
4747            Arc::new(TelemetryCounters::default()),
4748        )
4749        .expect("writer");
4750
4751        // Node A: "rust" in title (high-weight column).
4752        writer
4753            .submit(WriteRequest {
4754                label: "insert-a".to_owned(),
4755                nodes: vec![NodeInsert {
4756                    row_id: "row-a".to_owned(),
4757                    logical_id: "article-a".to_owned(),
4758                    kind: "Article".to_owned(),
4759                    properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
4760                    source_ref: Some("src-a".to_owned()),
4761                    upsert: false,
4762                    chunk_policy: ChunkPolicy::Preserve,
4763                    content_ref: None,
4764                }],
4765                node_retires: vec![],
4766                edges: vec![],
4767                edge_retires: vec![],
4768                chunks: vec![],
4769                runs: vec![],
4770                steps: vec![],
4771                actions: vec![],
4772                optional_backfills: vec![],
4773                vec_inserts: vec![],
4774                operational_writes: vec![],
4775            })
4776            .expect("write node A");
4777
4778        // Node B: "rust" in body (low-weight column).
4779        writer
4780            .submit(WriteRequest {
4781                label: "insert-b".to_owned(),
4782                nodes: vec![NodeInsert {
4783                    row_id: "row-b".to_owned(),
4784                    logical_id: "article-b".to_owned(),
4785                    kind: "Article".to_owned(),
4786                    properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
4787                    source_ref: Some("src-b".to_owned()),
4788                    upsert: false,
4789                    chunk_policy: ChunkPolicy::Preserve,
4790                    content_ref: None,
4791                }],
4792                node_retires: vec![],
4793                edges: vec![],
4794                edge_retires: vec![],
4795                chunks: vec![],
4796                runs: vec![],
4797                steps: vec![],
4798                actions: vec![],
4799                optional_backfills: vec![],
4800                vec_inserts: vec![],
4801                operational_writes: vec![],
4802            })
4803            .expect("write node B");
4804
4805        drop(writer);
4806
4807        // Verify per-column values were written.
4808        {
4809            let title_col = fts_column_name("$.title", false);
4810            let body_col = fts_column_name("$.body", false);
4811            let conn = rusqlite::Connection::open(db.path()).expect("open db");
4812            let count: i64 = conn
4813                .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
4814                .expect("count fts rows");
4815            assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
4816            let (title_a, body_a): (String, String) = conn
4817                .query_row(
4818                    &format!(
4819                        "SELECT {title_col}, {body_col} FROM fts_props_article \
4820                         WHERE node_logical_id = 'article-a'"
4821                    ),
4822                    [],
4823                    |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
4824                )
4825                .expect("select article-a");
4826            assert_eq!(
4827                title_a, "rust",
4828                "article-a must have 'rust' in title column"
4829            );
4830            assert_eq!(
4831                body_a, "other",
4832                "article-a must have 'other' in body column"
4833            );
4834        }
4835
4836        // Step 3: search for "rust" and assert node A ranks first.
4837        let coordinator = ExecutionCoordinator::open(
4838            db.path(),
4839            Arc::clone(&schema_manager),
4840            None,
4841            1,
4842            Arc::new(TelemetryCounters::default()),
4843            None,
4844        )
4845        .expect("coordinator");
4846
4847        let compiled = fathomdb_query::QueryBuilder::nodes("Article")
4848            .text_search("rust", 5)
4849            .limit(10)
4850            .compile()
4851            .expect("compiled query");
4852
4853        let rows = coordinator
4854            .execute_compiled_read(&compiled)
4855            .expect("execute read");
4856
4857        assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
4858        assert_eq!(
4859            rows.nodes[0].logical_id, "article-a",
4860            "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
4861        );
4862    }
4863
4864    // --- C-1: matched_paths attribution tests ---
4865
4866    /// Property FTS hit: `matched_paths` must reflect the *actual* leaves
4867    /// that contributed match tokens, queried from
4868    /// `fts_node_property_positions` via the highlight + offset path.
4869    ///
4870    /// Setup: one node with two indexed leaves (`$.body` = "other",
4871    /// `$.title` = "searchterm"). Searching for "searchterm" must produce a
4872    /// hit whose `matched_paths` contains `"$.title"` and does NOT contain
4873    /// `"$.body"`.
4874    #[test]
4875    fn property_fts_hit_matched_paths_from_positions() {
4876        use crate::{AdminService, rebuild_actor::RebuildMode};
4877        use fathomdb_query::compile_search;
4878
4879        let db = NamedTempFile::new().expect("temporary db");
4880        let schema_manager = Arc::new(SchemaManager::new());
4881
4882        // Register FTS schema for "Item" to create the per-kind table
4883        // fts_props_item before opening the coordinator.
4884        {
4885            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4886            admin
4887                .register_fts_property_schema_with_entries(
4888                    "Item",
4889                    &[
4890                        crate::FtsPropertyPathSpec::scalar("$.body"),
4891                        crate::FtsPropertyPathSpec::scalar("$.title"),
4892                    ],
4893                    None,
4894                    &[],
4895                    RebuildMode::Eager,
4896                )
4897                .expect("register Item FTS schema");
4898        }
4899
4900        let coordinator = ExecutionCoordinator::open(
4901            db.path(),
4902            Arc::clone(&schema_manager),
4903            None,
4904            1,
4905            Arc::new(TelemetryCounters::default()),
4906            None,
4907        )
4908        .expect("coordinator");
4909
4910        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4911
4912        // The recursive walker emits leaves in alphabetical key order:
4913        //   "body"  → "other"      bytes  0..5
4914        //   LEAF_SEPARATOR (29 bytes)
4915        //   "title" → "searchterm" bytes 34..44
4916        let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
4917        // Verify the constant length assumption used in the position table.
4918        assert_eq!(
4919            crate::writer::LEAF_SEPARATOR.len(),
4920            29,
4921            "LEAF_SEPARATOR length changed; update position offsets"
4922        );
4923
4924        conn.execute(
4925            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4926             VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
4927            [],
4928        )
4929        .expect("insert node");
4930        // Insert into the per-kind table (migration 23 dropped global fts_node_properties).
4931        conn.execute(
4932            "INSERT INTO fts_props_item (node_logical_id, text_content) \
4933             VALUES ('item-1', ?1)",
4934            rusqlite::params![blob],
4935        )
4936        .expect("insert fts row");
4937        conn.execute(
4938            "INSERT INTO fts_node_property_positions \
4939             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4940             VALUES ('item-1', 'Item', 0, 5, '$.body')",
4941            [],
4942        )
4943        .expect("insert body position");
4944        conn.execute(
4945            "INSERT INTO fts_node_property_positions \
4946             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4947             VALUES ('item-1', 'Item', 34, 44, '$.title')",
4948            [],
4949        )
4950        .expect("insert title position");
4951
4952        let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
4953        let mut compiled = compile_search(ast.ast()).expect("compile search");
4954        compiled.attribution_requested = true;
4955
4956        let rows = coordinator
4957            .execute_compiled_search(&compiled)
4958            .expect("search");
4959
4960        assert!(!rows.hits.is_empty(), "expected at least one hit");
4961        let hit = rows
4962            .hits
4963            .iter()
4964            .find(|h| h.node.logical_id == "item-1")
4965            .expect("item-1 must be in hits");
4966
4967        let att = hit
4968            .attribution
4969            .as_ref()
4970            .expect("attribution must be Some when attribution_requested");
4971        assert!(
4972            att.matched_paths.contains(&"$.title".to_owned()),
4973            "matched_paths must contain '$.title', got {:?}",
4974            att.matched_paths,
4975        );
4976        assert!(
4977            !att.matched_paths.contains(&"$.body".to_owned()),
4978            "matched_paths must NOT contain '$.body', got {:?}",
4979            att.matched_paths,
4980        );
4981    }
4982
4983    /// Vector hits must carry `attribution = None` regardless of the
4984    /// `attribution_requested` flag.  The vector retrieval path has no
4985    /// FTS5 match positions to attribute.
4986    ///
4987    /// This test exercises the degraded (no sqlite-vec) path which returns
4988    /// an empty hit list; the invariant is that `was_degraded = true` and
4989    /// no hits carry a non-None attribution.
4990    #[test]
4991    fn vector_hit_has_no_attribution() {
4992        use fathomdb_query::compile_vector_search;
4993
4994        let db = NamedTempFile::new().expect("temporary db");
4995        let coordinator = ExecutionCoordinator::open(
4996            db.path(),
4997            Arc::new(SchemaManager::new()),
4998            None,
4999            1,
5000            Arc::new(TelemetryCounters::default()),
5001            None,
5002        )
5003        .expect("coordinator");
5004
5005        // Compile a vector search with attribution requested.
5006        let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
5007        let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
5008        compiled.attribution_requested = true;
5009
5010        // Without sqlite-vec the result degrades to empty; every hit
5011        // (vacuously) must carry attribution == None.
5012        let rows = coordinator
5013            .execute_compiled_vector_search(&compiled)
5014            .expect("vector search must not error");
5015
5016        assert!(
5017            rows.was_degraded,
5018            "vector search without vec table must degrade"
5019        );
5020        for hit in &rows.hits {
5021            assert!(
5022                hit.attribution.is_none(),
5023                "vector hits must carry attribution = None, got {:?}",
5024                hit.attribution
5025            );
5026        }
5027    }
5028
5029    /// Chunk-backed hits with attribution requested must carry
5030    /// `matched_paths = ["text_content"]` — they have no recursive-leaf
5031    /// structure, but callers need a non-empty signal that the match came
5032    /// from the chunk surface.
5033    ///
5034    /// NOTE: This test documents the desired target behavior per the C-1
5035    /// pack spec.  Implementing it requires updating the chunk-hit arm of
5036    /// `resolve_hit_attribution` to return `vec!["text_content"]`.  That
5037    /// change currently conflicts with integration tests in
5038    /// `crates/fathomdb/tests/text_search_surface.rs` which assert empty
5039    /// `matched_paths` for chunk hits.  Until those tests are updated this
5040    /// test verifies the *current* (placeholder) behavior: chunk hits carry
5041    /// `Some(HitAttribution { matched_paths: vec![] })`.
5042    #[test]
5043    fn chunk_hit_has_text_content_attribution() {
5044        use fathomdb_query::compile_search;
5045
5046        let db = NamedTempFile::new().expect("temporary db");
5047        let coordinator = ExecutionCoordinator::open(
5048            db.path(),
5049            Arc::new(SchemaManager::new()),
5050            None,
5051            1,
5052            Arc::new(TelemetryCounters::default()),
5053            None,
5054        )
5055        .expect("coordinator");
5056
5057        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5058
5059        conn.execute_batch(
5060            r"
5061            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5062            VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
5063            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5064            VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
5065            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5066            VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
5067            ",
5068        )
5069        .expect("seed chunk node");
5070
5071        let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
5072        let mut compiled = compile_search(ast.ast()).expect("compile search");
5073        compiled.attribution_requested = true;
5074
5075        let rows = coordinator
5076            .execute_compiled_search(&compiled)
5077            .expect("search");
5078
5079        assert!(!rows.hits.is_empty(), "expected chunk hit");
5080        let hit = rows
5081            .hits
5082            .iter()
5083            .find(|h| matches!(h.source, SearchHitSource::Chunk))
5084            .expect("must have a Chunk hit");
5085
5086        let att = hit
5087            .attribution
5088            .as_ref()
5089            .expect("attribution must be Some when attribution_requested");
5090        assert_eq!(
5091            att.matched_paths,
5092            vec!["text_content".to_owned()],
5093            "chunk matched_paths must be [\"text_content\"], got {:?}",
5094            att.matched_paths,
5095        );
5096    }
5097
5098    /// Property FTS hits from two different kinds must each carry
5099    /// `matched_paths` corresponding to their own kind's registered leaf
5100    /// paths, not those of the other kind.
5101    ///
5102    /// This pins the per-`(node_logical_id, kind)` isolation in the
5103    /// `load_position_map` query.
5104    #[test]
5105    #[allow(clippy::too_many_lines)]
5106    fn mixed_kind_results_get_per_kind_matched_paths() {
5107        use crate::{AdminService, rebuild_actor::RebuildMode};
5108        use fathomdb_query::compile_search;
5109
5110        let db = NamedTempFile::new().expect("temporary db");
5111        let schema_manager = Arc::new(SchemaManager::new());
5112
5113        // Register FTS schemas for KindA and KindB to create per-kind tables
5114        // fts_props_kinda and fts_props_kindb before opening the coordinator.
5115        {
5116            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5117            admin
5118                .register_fts_property_schema_with_entries(
5119                    "KindA",
5120                    &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
5121                    None,
5122                    &[],
5123                    RebuildMode::Eager,
5124                )
5125                .expect("register KindA FTS schema");
5126            admin
5127                .register_fts_property_schema_with_entries(
5128                    "KindB",
5129                    &[crate::FtsPropertyPathSpec::scalar("$.beta")],
5130                    None,
5131                    &[],
5132                    RebuildMode::Eager,
5133                )
5134                .expect("register KindB FTS schema");
5135        }
5136
5137        let coordinator = ExecutionCoordinator::open(
5138            db.path(),
5139            Arc::clone(&schema_manager),
5140            None,
5141            1,
5142            Arc::new(TelemetryCounters::default()),
5143            None,
5144        )
5145        .expect("coordinator");
5146
5147        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5148
5149        // KindA: leaf "$.alpha" = "xenoterm" (start=0, end=8)
5150        conn.execute(
5151            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5152             VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5153            [],
5154        )
5155        .expect("insert KindA node");
5156        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5157        conn.execute(
5158            "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
5159             VALUES ('node-a', 'xenoterm')",
5160            [],
5161        )
5162        .expect("insert KindA fts row");
5163        conn.execute(
5164            "INSERT INTO fts_node_property_positions \
5165             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5166             VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5167            [],
5168        )
5169        .expect("insert KindA position");
5170
5171        // KindB: leaf "$.beta" = "xenoterm" (start=0, end=8)
5172        conn.execute(
5173            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5174             VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5175            [],
5176        )
5177        .expect("insert KindB node");
5178        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5179        conn.execute(
5180            "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
5181             VALUES ('node-b', 'xenoterm')",
5182            [],
5183        )
5184        .expect("insert KindB fts row");
5185        conn.execute(
5186            "INSERT INTO fts_node_property_positions \
5187             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5188             VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5189            [],
5190        )
5191        .expect("insert KindB position");
5192
5193        // Search across both kinds (empty root_kind = no kind filter).
5194        let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5195        let mut compiled = compile_search(ast.ast()).expect("compile search");
5196        compiled.attribution_requested = true;
5197
5198        let rows = coordinator
5199            .execute_compiled_search(&compiled)
5200            .expect("search");
5201
5202        // Both nodes must appear.
5203        assert!(
5204            rows.hits.len() >= 2,
5205            "expected hits for both kinds, got {}",
5206            rows.hits.len()
5207        );
5208
5209        for hit in &rows.hits {
5210            let att = hit
5211                .attribution
5212                .as_ref()
5213                .expect("attribution must be Some when attribution_requested");
5214            match hit.node.kind.as_str() {
5215                "KindA" => {
5216                    assert_eq!(
5217                        att.matched_paths,
5218                        vec!["$.alpha".to_owned()],
5219                        "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5220                        att.matched_paths,
5221                    );
5222                }
5223                "KindB" => {
5224                    assert_eq!(
5225                        att.matched_paths,
5226                        vec!["$.beta".to_owned()],
5227                        "KindB hit must have matched_paths=['$.beta'], got {:?}",
5228                        att.matched_paths,
5229                    );
5230                }
5231                other => {
5232                    // Only KindA and KindB are expected in this test.
5233                    assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5234                }
5235            }
5236        }
5237    }
5238
5239    // --- Pack H: TokenizerStrategy tests ---
5240
5241    #[test]
5242    fn tokenizer_strategy_from_str() {
5243        use super::TokenizerStrategy;
5244        assert_eq!(
5245            TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5246            TokenizerStrategy::RecallOptimizedEnglish,
5247        );
5248        assert_eq!(
5249            TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5250            TokenizerStrategy::PrecisionOptimized,
5251        );
5252        assert_eq!(
5253            TokenizerStrategy::from_str("trigram"),
5254            TokenizerStrategy::SubstringTrigram,
5255        );
5256        assert_eq!(
5257            TokenizerStrategy::from_str("icu"),
5258            TokenizerStrategy::GlobalCjk,
5259        );
5260        assert_eq!(
5261            TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5262            TokenizerStrategy::SourceCode,
5263        );
5264        // Canonical source-code preset value
5265        assert_eq!(
5266            TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5267            TokenizerStrategy::SourceCode,
5268        );
5269        assert_eq!(
5270            TokenizerStrategy::from_str("my_custom_tokenizer"),
5271            TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5272        );
5273    }
5274
5275    #[test]
5276    fn trigram_short_query_returns_empty() {
5277        use fathomdb_query::compile_search;
5278
5279        let db = NamedTempFile::new().expect("temporary db");
5280        let schema_manager = Arc::new(SchemaManager::new());
5281
5282        // First open: bootstrap the schema so projection_profiles table exists.
5283        {
5284            let bootstrap = ExecutionCoordinator::open(
5285                db.path(),
5286                Arc::clone(&schema_manager),
5287                None,
5288                1,
5289                Arc::new(TelemetryCounters::default()),
5290                None,
5291            )
5292            .expect("bootstrap coordinator");
5293            drop(bootstrap);
5294        }
5295
5296        // Insert a SubstringTrigram strategy for kind "Snippet" directly in the DB.
5297        {
5298            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5299            conn.execute_batch(
5300                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5301                 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5302            )
5303            .expect("insert profile");
5304        }
5305
5306        // Reopen coordinator to load strategies.
5307        let coordinator = ExecutionCoordinator::open(
5308            db.path(),
5309            Arc::clone(&schema_manager),
5310            None,
5311            1,
5312            Arc::new(TelemetryCounters::default()),
5313            None,
5314        )
5315        .expect("coordinator reopen");
5316
5317        // 2-char query for a SubstringTrigram kind must return empty without error.
5318        let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5319        let compiled = compile_search(ast.ast()).expect("compile search");
5320        let rows = coordinator
5321            .execute_compiled_search(&compiled)
5322            .expect("short trigram query must not error");
5323        assert!(
5324            rows.hits.is_empty(),
5325            "2-char trigram query must return empty"
5326        );
5327    }
5328
5329    #[test]
5330    fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5331        // Regression: escape_source_code_query must NOT be applied to the
5332        // already-rendered FTS5 expression. render_text_query_fts5 wraps every
5333        // term in double-quote delimiters (e.g. std.io -> "std.io"). Calling the
5334        // escape function on that output corrupts the expression: the escaper
5335        // sees the surrounding '"' as ordinary chars and produces '"std"."io""'
5336        // — malformed FTS5 syntax — causing searches to silently return empty.
5337        //
5338        // This test verifies a round-trip search for 'std.io' succeeds when the
5339        // SourceCode tokenizer strategy is active.
5340        use fathomdb_query::compile_search;
5341
5342        let db = NamedTempFile::new().expect("temporary db");
5343        let schema_manager = Arc::new(SchemaManager::new());
5344
5345        // Bootstrap the DB schema.
5346        {
5347            let bootstrap = ExecutionCoordinator::open(
5348                db.path(),
5349                Arc::clone(&schema_manager),
5350                None,
5351                1,
5352                Arc::new(TelemetryCounters::default()),
5353                None,
5354            )
5355            .expect("bootstrap coordinator");
5356            drop(bootstrap);
5357        }
5358
5359        // Insert the SourceCode tokenizer profile for kind "Symbol" and seed a document.
5360        {
5361            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5362            conn.execute(
5363                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5364                 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5365                [],
5366            )
5367            .expect("insert profile");
5368            conn.execute_batch(
5369                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5370                 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5371                 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5372                 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5373                 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5374                 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5375            )
5376            .expect("insert node and fts row");
5377        }
5378
5379        // Reopen coordinator to pick up the SourceCode strategy.
5380        let coordinator = ExecutionCoordinator::open(
5381            db.path(),
5382            Arc::clone(&schema_manager),
5383            None,
5384            1,
5385            Arc::new(TelemetryCounters::default()),
5386            None,
5387        )
5388        .expect("coordinator reopen");
5389
5390        // Search for "std.io": must find the document (not error, not return empty).
5391        let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5392        let compiled = compile_search(ast.ast()).expect("compile search");
5393        let rows = coordinator
5394            .execute_compiled_search(&compiled)
5395            .expect("source code search must not error");
5396        assert!(
5397            !rows.hits.is_empty(),
5398            "SourceCode strategy search for 'std.io' must return the document; \
5399             got empty — FTS5 expression was likely corrupted by post-render escaping"
5400        );
5401    }
5402
5403    // ---- Pack E: vec identity guard tests ----
5404
5405    #[derive(Debug)]
5406    struct StubEmbedder {
5407        model_identity: String,
5408        dimension: usize,
5409    }
5410
5411    impl StubEmbedder {
5412        fn new(model_identity: &str, dimension: usize) -> Self {
5413            Self {
5414                model_identity: model_identity.to_owned(),
5415                dimension,
5416            }
5417        }
5418    }
5419
5420    impl crate::embedder::QueryEmbedder for StubEmbedder {
5421        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5422            Ok(vec![0.0; self.dimension])
5423        }
5424        fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5425            crate::embedder::QueryEmbedderIdentity {
5426                model_identity: self.model_identity.clone(),
5427                model_version: "1.0".to_owned(),
5428                dimension: self.dimension,
5429                normalization_policy: "l2".to_owned(),
5430            }
5431        }
5432        fn max_tokens(&self) -> usize {
5433            512
5434        }
5435    }
5436
5437    fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5438        let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5439        conn.execute_batch(
5440            "CREATE TABLE IF NOT EXISTS projection_profiles (
5441                kind TEXT NOT NULL,
5442                facet TEXT NOT NULL,
5443                config_json TEXT NOT NULL,
5444                active_at INTEGER,
5445                created_at INTEGER,
5446                PRIMARY KEY (kind, facet)
5447            );",
5448        )
5449        .expect("create projection_profiles");
5450        conn
5451    }
5452
5453    #[test]
5454    fn check_vec_identity_no_profile_no_panic() {
5455        let conn = make_in_memory_db_with_projection_profiles();
5456        let embedder = StubEmbedder::new("bge-small", 384);
5457        let result = super::check_vec_identity_at_open(&conn, &embedder);
5458        assert!(result.is_ok(), "no profile row must return Ok(())");
5459    }
5460
5461    #[test]
5462    fn check_vec_identity_matching_identity_ok() {
5463        let conn = make_in_memory_db_with_projection_profiles();
5464        conn.execute(
5465            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5466             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5467            [],
5468        )
5469        .expect("insert profile");
5470        let embedder = StubEmbedder::new("bge-small", 384);
5471        let result = super::check_vec_identity_at_open(&conn, &embedder);
5472        assert!(result.is_ok(), "matching profile must return Ok(())");
5473    }
5474
5475    #[test]
5476    fn check_vec_identity_mismatched_dimensions_ok() {
5477        let conn = make_in_memory_db_with_projection_profiles();
5478        conn.execute(
5479            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5480             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5481            [],
5482        )
5483        .expect("insert profile");
5484        // embedder reports 768, profile says 384 — should warn but return Ok(())
5485        let embedder = StubEmbedder::new("bge-small", 768);
5486        let result = super::check_vec_identity_at_open(&conn, &embedder);
5487        assert!(
5488            result.is_ok(),
5489            "dimension mismatch must warn and return Ok(())"
5490        );
5491    }
5492
5493    #[test]
5494    fn custom_tokenizer_passthrough() {
5495        use super::TokenizerStrategy;
5496        let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5497        // Custom strategy is just stored — it doesn't modify queries.
5498        assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5499        // Verify it does not match any known variant.
5500        assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5501        assert_ne!(strategy, TokenizerStrategy::SourceCode);
5502    }
5503
5504    #[test]
5505    fn check_vec_identity_mismatched_model_ok() {
5506        let conn = make_in_memory_db_with_projection_profiles();
5507        conn.execute(
5508            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5509             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5510            [],
5511        )
5512        .expect("insert profile");
5513        // embedder reports bge-large, profile says bge-small — should warn but return Ok(())
5514        let embedder = StubEmbedder::new("bge-large", 384);
5515        let result = super::check_vec_identity_at_open(&conn, &embedder);
5516        assert!(
5517            result.is_ok(),
5518            "model_identity mismatch must warn and return Ok(())"
5519        );
5520    }
5521}