Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10    FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11    SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20/// Maximum number of cached shape-hash to SQL mappings before the cache is
21/// cleared entirely.  A clear-all strategy is simpler than partial eviction
22/// and the cost of re-compiling on a miss is negligible.
23const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25/// Maximum number of root IDs per batched expansion query.  Kept well below
26/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
27/// the edge-kind parameter.  Larger root sets are chunked into multiple
28/// batches of this size rather than falling back to per-root queries.
29const BATCH_CHUNK_SIZE: usize = 200;
30
31/// Compile an optional expansion-slot target-side filter predicate into a SQL
32/// fragment and bind values for injection into the `numbered` CTE's WHERE clause.
33///
34/// Returns `("", vec![])` when `filter` is `None` — preserving byte-for-byte
35/// identical SQL to pre-Pack-3 behavior. When `Some(predicate)`, returns an
36/// `AND …` fragment and the corresponding bind values starting at `first_param`.
37///
38/// Only `JsonPathEq`, `JsonPathCompare`, `JsonPathFusedEq`, and
39/// `JsonPathFusedTimestampCmp` are supported here; each variant targets the
40/// `n.properties` column already present in the `numbered` CTE join.
41/// Column-direct predicates (`KindEq`, `LogicalIdEq`, etc.) reference `n.kind`
42/// and similar columns that are also available in the `numbered` CTE.
43fn compile_expansion_filter(
44    filter: Option<&Predicate>,
45    first_param: usize,
46) -> (String, Vec<Value>) {
47    let Some(predicate) = filter else {
48        return (String::new(), vec![]);
49    };
50    let p = first_param;
51    match predicate {
52        Predicate::JsonPathEq { path, value } => {
53            let val = match value {
54                ScalarValue::Text(t) => Value::Text(t.clone()),
55                ScalarValue::Integer(i) => Value::Integer(*i),
56                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
57            };
58            (
59                format!(
60                    "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
61                    p + 1
62                ),
63                vec![Value::Text(path.clone()), val],
64            )
65        }
66        Predicate::JsonPathCompare { path, op, value } => {
67            let val = match value {
68                ScalarValue::Text(t) => Value::Text(t.clone()),
69                ScalarValue::Integer(i) => Value::Integer(*i),
70                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
71            };
72            let operator = match op {
73                ComparisonOp::Gt => ">",
74                ComparisonOp::Gte => ">=",
75                ComparisonOp::Lt => "<",
76                ComparisonOp::Lte => "<=",
77            };
78            (
79                format!(
80                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
81                    p + 1
82                ),
83                vec![Value::Text(path.clone()), val],
84            )
85        }
86        Predicate::JsonPathFusedEq { path, value } => (
87            format!(
88                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
89                p + 1
90            ),
91            vec![Value::Text(path.clone()), Value::Text(value.clone())],
92        ),
93        Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
94            let operator = match op {
95                ComparisonOp::Gt => ">",
96                ComparisonOp::Gte => ">=",
97                ComparisonOp::Lt => "<",
98                ComparisonOp::Lte => "<=",
99            };
100            (
101                format!(
102                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
103                    p + 1
104                ),
105                vec![Value::Text(path.clone()), Value::Integer(*value)],
106            )
107        }
108        Predicate::KindEq(kind) => (
109            format!("\n                  AND n.kind = ?{p}"),
110            vec![Value::Text(kind.clone())],
111        ),
112        Predicate::LogicalIdEq(logical_id) => (
113            format!("\n                  AND n.logical_id = ?{p}"),
114            vec![Value::Text(logical_id.clone())],
115        ),
116        Predicate::SourceRefEq(source_ref) => (
117            format!("\n                  AND n.source_ref = ?{p}"),
118            vec![Value::Text(source_ref.clone())],
119        ),
120        Predicate::ContentRefEq(uri) => (
121            format!("\n                  AND n.content_ref = ?{p}"),
122            vec![Value::Text(uri.clone())],
123        ),
124        Predicate::ContentRefNotNull => (
125            "\n                  AND n.content_ref IS NOT NULL".to_owned(),
126            vec![],
127        ),
128    }
129}
130
131/// FTS tokenizer strategy for a given node kind.
132///
133/// Loaded at coordinator open time from `projection_profiles` and used
134/// during query dispatch to apply per-kind query adaptations.
135#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum TokenizerStrategy {
137    /// Porter stemming + unicode61, optimized for English recall.
138    RecallOptimizedEnglish,
139    /// Unicode61 without stemming, optimized for precision.
140    PrecisionOptimized,
141    /// Trigram tokenizer — queries shorter than 3 chars are skipped.
142    SubstringTrigram,
143    /// ICU tokenizer for global / CJK text.
144    GlobalCjk,
145    /// Unicode61 with custom token chars — query special chars are escaped.
146    SourceCode,
147    /// Any other tokenizer string.
148    Custom(String),
149}
150
151impl TokenizerStrategy {
152    /// Map a raw tokenizer config string (as stored in `projection_profiles`)
153    /// to the corresponding strategy variant.
154    pub fn from_str(s: &str) -> Self {
155        match s {
156            "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
157            "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
158            "trigram" => Self::SubstringTrigram,
159            "icu" => Self::GlobalCjk,
160            s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
161            other => Self::Custom(other.to_string()),
162        }
163    }
164}
165
166/// A pool of read-only `SQLite` connections for concurrent read access.
167///
168/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
169/// proceed in parallel when they happen to grab different slots.
170struct ReadPool {
171    connections: Vec<Mutex<Connection>>,
172}
173
174impl fmt::Debug for ReadPool {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        f.debug_struct("ReadPool")
177            .field("size", &self.connections.len())
178            .finish()
179    }
180}
181
182impl ReadPool {
183    /// Open `pool_size` read-only connections to the database at `path`.
184    ///
185    /// Each connection has PRAGMAs initialized via
186    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
187    /// feature is enabled and `vector_enabled` is true, the vec extension
188    /// auto-loaded.
189    ///
190    /// # Errors
191    ///
192    /// Returns [`EngineError`] if any connection fails to open or initialize.
193    fn new(
194        db_path: &Path,
195        pool_size: usize,
196        schema_manager: &SchemaManager,
197        vector_enabled: bool,
198    ) -> Result<Self, EngineError> {
199        let mut connections = Vec::with_capacity(pool_size);
200        for _ in 0..pool_size {
201            let conn = if vector_enabled {
202                #[cfg(feature = "sqlite-vec")]
203                {
204                    sqlite::open_readonly_connection_with_vec(db_path)?
205                }
206                #[cfg(not(feature = "sqlite-vec"))]
207                {
208                    sqlite::open_readonly_connection(db_path)?
209                }
210            } else {
211                sqlite::open_readonly_connection(db_path)?
212            };
213            schema_manager
214                .initialize_reader_connection(&conn)
215                .map_err(EngineError::Schema)?;
216            connections.push(Mutex::new(conn));
217        }
218        Ok(Self { connections })
219    }
220
221    /// Acquire a connection from the pool.
222    ///
223    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
224    /// If every slot is held, falls back to a blocking lock on the first slot.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
229    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
230        // Fast path: try each connection without blocking.
231        for conn in &self.connections {
232            if let Ok(guard) = conn.try_lock() {
233                return Ok(guard);
234            }
235        }
236        // Fallback: block on the first connection.
237        self.connections[0].lock().map_err(|_| {
238            trace_error!("read pool: connection mutex poisoned");
239            EngineError::Bridge("connection mutex poisoned".to_owned())
240        })
241    }
242
243    /// Return the number of connections in the pool.
244    #[cfg(test)]
245    fn size(&self) -> usize {
246        self.connections.len()
247    }
248}
249
250/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
251///
252/// This is a read-only introspection struct. It does not execute SQL.
253#[derive(Clone, Debug, PartialEq, Eq)]
254pub struct QueryPlan {
255    pub sql: String,
256    pub bind_count: usize,
257    pub driving_table: DrivingTable,
258    pub shape_hash: ShapeHash,
259    pub cache_hit: bool,
260}
261
262/// A single node row returned from a query.
263#[derive(Clone, Debug, PartialEq, Eq)]
264pub struct NodeRow {
265    /// Physical row ID.
266    pub row_id: String,
267    /// Logical ID of the node.
268    pub logical_id: String,
269    /// Node kind.
270    pub kind: String,
271    /// JSON-encoded node properties.
272    pub properties: String,
273    /// Optional URI referencing external content.
274    pub content_ref: Option<String>,
275    /// Unix timestamp of last access, if tracked.
276    pub last_accessed_at: Option<i64>,
277}
278
279/// A single run row returned from a query.
280#[derive(Clone, Debug, PartialEq, Eq)]
281pub struct RunRow {
282    /// Unique run ID.
283    pub id: String,
284    /// Run kind.
285    pub kind: String,
286    /// Current status.
287    pub status: String,
288    /// JSON-encoded run properties.
289    pub properties: String,
290}
291
292/// A single step row returned from a query.
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct StepRow {
295    /// Unique step ID.
296    pub id: String,
297    /// ID of the parent run.
298    pub run_id: String,
299    /// Step kind.
300    pub kind: String,
301    /// Current status.
302    pub status: String,
303    /// JSON-encoded step properties.
304    pub properties: String,
305}
306
307/// A single action row returned from a query.
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct ActionRow {
310    /// Unique action ID.
311    pub id: String,
312    /// ID of the parent step.
313    pub step_id: String,
314    /// Action kind.
315    pub kind: String,
316    /// Current status.
317    pub status: String,
318    /// JSON-encoded action properties.
319    pub properties: String,
320}
321
322/// A single row from the `provenance_events` table.
323#[derive(Clone, Debug, PartialEq, Eq)]
324pub struct ProvenanceEvent {
325    pub id: String,
326    pub event_type: String,
327    pub subject: String,
328    pub source_ref: Option<String>,
329    pub metadata_json: String,
330    pub created_at: i64,
331}
332
333/// Result set from executing a flat (non-grouped) compiled query.
334#[derive(Clone, Debug, Default, PartialEq, Eq)]
335pub struct QueryRows {
336    /// Matched node rows.
337    pub nodes: Vec<NodeRow>,
338    /// Runs associated with the matched nodes.
339    pub runs: Vec<RunRow>,
340    /// Steps associated with the matched runs.
341    pub steps: Vec<StepRow>,
342    /// Actions associated with the matched steps.
343    pub actions: Vec<ActionRow>,
344    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
345    /// to degrade to an empty result instead of propagating an error.
346    pub was_degraded: bool,
347}
348
349/// Expansion results for a single root node within a grouped query.
350#[derive(Clone, Debug, PartialEq, Eq)]
351pub struct ExpansionRootRows {
352    /// Logical ID of the root node that seeded this expansion.
353    pub root_logical_id: String,
354    /// Nodes reached by traversing from the root.
355    pub nodes: Vec<NodeRow>,
356}
357
358/// All expansion results for a single named slot across all roots.
359#[derive(Clone, Debug, PartialEq, Eq)]
360pub struct ExpansionSlotRows {
361    /// Name of the expansion slot.
362    pub slot: String,
363    /// Per-root expansion results.
364    pub roots: Vec<ExpansionRootRows>,
365}
366
367/// Result set from executing a grouped compiled query.
368#[derive(Clone, Debug, Default, PartialEq, Eq)]
369pub struct GroupedQueryRows {
370    /// Root node rows matched by the base query.
371    pub roots: Vec<NodeRow>,
372    /// Per-slot expansion results.
373    pub expansions: Vec<ExpansionSlotRows>,
374    /// `true` when a capability miss caused the query to degrade to an empty result.
375    pub was_degraded: bool,
376}
377
378/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
379pub struct ExecutionCoordinator {
380    database_path: PathBuf,
381    schema_manager: Arc<SchemaManager>,
382    pool: ReadPool,
383    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
384    vector_enabled: bool,
385    vec_degradation_warned: AtomicBool,
386    telemetry: Arc<TelemetryCounters>,
387    /// Phase 12.5a: optional read-time query embedder. When present,
388    /// [`Self::execute_retrieval_plan`] invokes it via
389    /// [`Self::fill_vector_branch`] after compile to populate
390    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
391    /// invariant on `search()` is preserved: the vector slot stays empty
392    /// and the coordinator's stage-gating check skips the vector branch.
393    query_embedder: Option<Arc<dyn QueryEmbedder>>,
394    /// Per-kind FTS tokenizer strategies loaded from `projection_profiles`
395    /// at open time. Used during query dispatch for query-side adaptations
396    /// (e.g. short-query skip for trigram, token escaping for source-code).
397    ///
398    /// **Stale-state note**: This map is populated once at `open()` and is
399    /// never refreshed during the lifetime of the coordinator.  If
400    /// `AdminService::set_fts_profile` is called on a running engine, the
401    /// in-memory strategy map will not reflect the change until the engine is
402    /// reopened.  The DB-side profile row is always up-to-date; only the
403    /// query-side adaptation (e.g. the trigram short-query guard) will be stale.
404    fts_strategies: HashMap<String, TokenizerStrategy>,
405}
406
407impl fmt::Debug for ExecutionCoordinator {
408    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409        f.debug_struct("ExecutionCoordinator")
410            .field("database_path", &self.database_path)
411            .finish_non_exhaustive()
412    }
413}
414
415impl ExecutionCoordinator {
416    /// # Errors
417    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
418    pub fn open(
419        path: impl AsRef<Path>,
420        schema_manager: Arc<SchemaManager>,
421        vector_dimension: Option<usize>,
422        pool_size: usize,
423        telemetry: Arc<TelemetryCounters>,
424        query_embedder: Option<Arc<dyn QueryEmbedder>>,
425    ) -> Result<Self, EngineError> {
426        let path = path.as_ref().to_path_buf();
427        #[cfg(feature = "sqlite-vec")]
428        let mut conn = if vector_dimension.is_some() {
429            sqlite::open_connection_with_vec(&path)?
430        } else {
431            sqlite::open_connection(&path)?
432        };
433        #[cfg(not(feature = "sqlite-vec"))]
434        let mut conn = sqlite::open_connection(&path)?;
435
436        let report = schema_manager.bootstrap(&conn)?;
437
438        // ----- Open-time rebuild guards for derived FTS state -----
439        //
440        // Property FTS data is derived state. Per-kind `fts_props_<kind>`
441        // virtual tables are NOT source of truth — they must be rebuildable
442        // from canonical `nodes.properties` + `fts_property_schemas` at any
443        // time. After migration 23 the global `fts_node_properties` table no
444        // longer exists; each registered kind has its own `fts_props_<kind>`
445        // table created at first write or first rebuild.
446        //
447        // Guard 1: if any registered kind's per-kind table is missing or empty
448        // while live nodes of that kind exist, do a synchronous full rebuild of
449        // all per-kind FTS tables and position map rows.
450        //
451        // Guard 2: if any recursive schema has a populated per-kind table but
452        // `fts_node_property_positions` is empty, do a synchronous full rebuild
453        // to regenerate the position map from canonical state.
454        //
455        // Both guards are no-ops on a consistent database.
456        run_open_time_fts_guards(&mut conn)?;
457
458        #[cfg(feature = "sqlite-vec")]
459        let mut vector_enabled = report.vector_profile_enabled;
460        #[cfg(not(feature = "sqlite-vec"))]
461        let vector_enabled = {
462            let _ = &report;
463            false
464        };
465
466        if let Some(dim) = vector_dimension {
467            schema_manager
468                .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
469                .map_err(EngineError::Schema)?;
470            // Profile was just created or updated — mark as enabled.
471            #[cfg(feature = "sqlite-vec")]
472            {
473                vector_enabled = true;
474            }
475        }
476
477        // Vec identity guard: warn (never error) if the stored profile's
478        // model_identity or dimensions differ from the configured embedder.
479        if let Some(ref emb) = query_embedder {
480            check_vec_identity_at_open(&conn, emb.as_ref())?;
481        }
482
483        // Load FTS tokenizer strategies from projection_profiles
484        let fts_strategies: HashMap<String, TokenizerStrategy> = {
485            let mut map = HashMap::new();
486            let mut stmt = conn
487                .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
488            let rows = stmt.query_map([], |row| {
489                let kind: String = row.get(0)?;
490                let config_json: String = row.get(1)?;
491                Ok((kind, config_json))
492            })?;
493            for row in rows.flatten() {
494                let (kind, config_json) = row;
495                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
496                    && let Some(tok) = v["tokenizer"].as_str()
497                {
498                    map.insert(kind, TokenizerStrategy::from_str(tok));
499                }
500            }
501            map
502        };
503
504        // Drop the bootstrap connection — pool connections are used for reads.
505        drop(conn);
506
507        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
508
509        Ok(Self {
510            database_path: path,
511            schema_manager,
512            pool,
513            shape_sql_map: Mutex::new(HashMap::new()),
514            vector_enabled,
515            vec_degradation_warned: AtomicBool::new(false),
516            telemetry,
517            query_embedder,
518            fts_strategies,
519        })
520    }
521
522    /// Returns the filesystem path to the `SQLite` database.
523    pub fn database_path(&self) -> &Path {
524        &self.database_path
525    }
526
527    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
528    #[must_use]
529    pub fn vector_enabled(&self) -> bool {
530        self.vector_enabled
531    }
532
533    /// Returns the configured read-time query embedder, if any.
534    ///
535    /// The 0.4.0 write-time parity work reuses this same embedder for
536    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
537    /// so there is always exactly one source of truth for vector
538    /// identity per [`Engine`] instance.
539    #[must_use]
540    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
541        self.query_embedder.as_ref()
542    }
543
544    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
545        self.pool.acquire()
546    }
547
548    /// Aggregate `SQLite` page-cache counters across all pool connections.
549    ///
550    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
551    /// Connections that are currently locked by a query are skipped — this
552    /// is acceptable for statistical counters.
553    #[must_use]
554    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
555        let mut total = SqliteCacheStatus::default();
556        for conn_mutex in &self.pool.connections {
557            if let Ok(conn) = conn_mutex.try_lock() {
558                total.add(&read_db_cache_status(&conn));
559            }
560        }
561        total
562    }
563
564    /// # Errors
565    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
566    #[allow(clippy::expect_used)]
567    pub fn execute_compiled_read(
568        &self,
569        compiled: &CompiledQuery,
570    ) -> Result<QueryRows, EngineError> {
571        // Scan fallback for first-registration async rebuild: if the query uses the
572        // FtsNodes driving table and the root kind has is_first_registration=1 with
573        // state IN ('PENDING','BUILDING'), the per-kind table has no rows yet.
574        // Route to a full-kind node scan so callers get results instead of empty.
575        if compiled.driving_table == DrivingTable::FtsNodes
576            && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
577            && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
578        {
579            self.telemetry.increment_queries();
580            return Ok(QueryRows {
581                nodes,
582                runs: Vec::new(),
583                steps: Vec::new(),
584                actions: Vec::new(),
585                was_degraded: false,
586            });
587        }
588
589        // For FtsNodes queries the fathomdb-query compile path generates SQL that
590        // references the old global `fts_node_properties` table.  Since migration 23
591        // dropped that table, we rewrite the SQL and binds here to use the per-kind
592        // `fts_props_<kind>` table (or omit the property UNION arm entirely when the
593        // per-kind table does not yet exist).
594        let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
595            let conn_check = match self.lock_connection() {
596                Ok(g) => g,
597                Err(e) => {
598                    self.telemetry.increment_errors();
599                    return Err(e);
600                }
601            };
602            let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
603            drop(conn_check);
604            result?
605        } else {
606            (compiled.sql.clone(), compiled.binds.clone())
607        };
608
609        let row_sql = wrap_node_row_projection_sql(&adapted_sql);
610        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
611        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
612        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
613        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
614        // so we propagate EngineError there instead.
615        {
616            let mut cache = self
617                .shape_sql_map
618                .lock()
619                .unwrap_or_else(PoisonError::into_inner);
620            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
621                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
622                cache.clear();
623            }
624            cache.insert(compiled.shape_hash, row_sql.clone());
625        }
626
627        let bind_values = adapted_binds
628            .iter()
629            .map(bind_value_to_sql)
630            .collect::<Vec<_>>();
631
632        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
633        // shape_sql_map uses into_inner() (pure cache, safe to recover).
634        // conn uses map_err → EngineError (connection state may be corrupt after panic;
635        // into_inner() would risk using a connection with partial transaction state).
636        let conn_guard = match self.lock_connection() {
637            Ok(g) => g,
638            Err(e) => {
639                self.telemetry.increment_errors();
640                return Err(e);
641            }
642        };
643        let mut statement = match conn_guard.prepare_cached(&row_sql) {
644            Ok(stmt) => stmt,
645            Err(e) if is_vec_table_absent(&e) => {
646                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
647                    trace_warn!("vector table absent, degrading to non-vector query");
648                }
649                return Ok(QueryRows {
650                    was_degraded: true,
651                    ..Default::default()
652                });
653            }
654            Err(e) => {
655                self.telemetry.increment_errors();
656                return Err(EngineError::Sqlite(e));
657            }
658        };
659        let nodes = match statement
660            .query_map(params_from_iter(bind_values.iter()), |row| {
661                Ok(NodeRow {
662                    row_id: row.get(0)?,
663                    logical_id: row.get(1)?,
664                    kind: row.get(2)?,
665                    properties: row.get(3)?,
666                    content_ref: row.get(4)?,
667                    last_accessed_at: row.get(5)?,
668                })
669            })
670            .and_then(Iterator::collect)
671        {
672            Ok(rows) => rows,
673            Err(e) => {
674                self.telemetry.increment_errors();
675                return Err(EngineError::Sqlite(e));
676            }
677        };
678
679        self.telemetry.increment_queries();
680        Ok(QueryRows {
681            nodes,
682            runs: Vec::new(),
683            steps: Vec::new(),
684            actions: Vec::new(),
685            was_degraded: false,
686        })
687    }
688
689    /// Execute a compiled adaptive search and return matching hits.
690    ///
691    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
692    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
693    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
694    /// while residual predicates (JSON path filters) stay in the outer
695    /// `WHERE`. The chunk and property FTS
696    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
697    /// better matches), ordered, and limited. All hits return
698    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
699    /// later phases.
700    ///
701    /// # Errors
702    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
703    pub fn execute_compiled_search(
704        &self,
705        compiled: &CompiledSearch,
706    ) -> Result<SearchRows, EngineError> {
707        // Build the two-branch plan from the strict text query and delegate
708        // to the shared plan-execution routine. The relaxed branch is derived
709        // via `derive_relaxed` and only fires when strict returned fewer than
710        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
711        // "relaxed iff strict is empty," but the routine spells the rule out
712        // explicitly so raising K later is a one-line constant bump.
713        let (relaxed_query, was_degraded_at_plan_time) =
714            fathomdb_query::derive_relaxed(&compiled.text_query);
715        let relaxed = relaxed_query.map(|q| CompiledSearch {
716            root_kind: compiled.root_kind.clone(),
717            text_query: q,
718            limit: compiled.limit,
719            fusable_filters: compiled.fusable_filters.clone(),
720            residual_filters: compiled.residual_filters.clone(),
721            attribution_requested: compiled.attribution_requested,
722        });
723        let plan = CompiledSearchPlan {
724            strict: compiled.clone(),
725            relaxed,
726            was_degraded_at_plan_time,
727        };
728        self.execute_compiled_search_plan(&plan)
729    }
730
731    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
732    /// deduped result rows.
733    ///
734    /// This is the shared retrieval/merge routine that both
735    /// [`Self::execute_compiled_search`] (adaptive path) and
736    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
737    /// runs first; the relaxed branch only fires when it is present AND the
738    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
739    /// hits. Merge and dedup semantics are identical to the adaptive path
740    /// regardless of how the plan was constructed.
741    ///
742    /// Error contract: if the relaxed branch errors, the error propagates;
743    /// strict hits are not returned. This matches the rest of the engine's
744    /// fail-hard posture.
745    ///
746    /// # Errors
747    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
748    /// executed.
749    pub fn execute_compiled_search_plan(
750        &self,
751        plan: &CompiledSearchPlan,
752    ) -> Result<SearchRows, EngineError> {
753        let strict = &plan.strict;
754        let limit = strict.limit;
755        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
756
757        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
758        let strict_underfilled = strict_hits.len() < fallback_threshold;
759
760        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
761        let mut fallback_used = false;
762        let mut was_degraded = false;
763        if let Some(relaxed) = plan.relaxed.as_ref()
764            && strict_underfilled
765        {
766            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
767            fallback_used = true;
768            was_degraded = plan.was_degraded_at_plan_time;
769        }
770
771        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
772        // Attribution runs AFTER dedup so that duplicate hits dropped by
773        // `merge_search_branches` do not waste a highlight+position-map
774        // lookup.
775        if strict.attribution_requested {
776            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
777            self.populate_attribution_for_hits(
778                &mut merged,
779                &strict.text_query,
780                relaxed_text_query,
781            )?;
782        }
783        let strict_hit_count = merged
784            .iter()
785            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
786            .count();
787        let relaxed_hit_count = merged
788            .iter()
789            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
790            .count();
791        // Phase 10: no vector execution path yet, so vector_hit_count is
792        // always zero. Future phases that wire a vector branch will
793        // contribute here.
794        let vector_hit_count = 0;
795
796        Ok(SearchRows {
797            hits: merged,
798            strict_hit_count,
799            relaxed_hit_count,
800            vector_hit_count,
801            fallback_used,
802            was_degraded,
803        })
804    }
805
806    /// Execute a compiled vector-only search and return matching hits.
807    ///
808    /// Phase 11 delivers the standalone vector retrieval path. The emitted
809    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
810    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
811    /// into the candidate CTE. The outer `SELECT` applies residual JSON
812    /// predicates and orders by score descending, where `score = -distance`
813    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
814    ///
815    /// ## Capability-miss handling
816    ///
817    /// If the `sqlite-vec` capability is absent (feature disabled or the
818    /// `vec_nodes_active` virtual table has not been created because the
819    /// engine was not opened with a `vector_dimension`), this method returns
820    /// an empty [`SearchRows`] with `was_degraded = true`. This is
821    /// **non-fatal** — the error does not propagate — matching the addendum's
822    /// §Vector-Specific Behavior / Degradation.
823    ///
824    /// ## Attribution
825    ///
826    /// When `compiled.attribution_requested == true`, every returned hit
827    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
828    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
829    /// extended uniformly).
830    ///
831    /// # Errors
832    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
833    /// executed for reasons other than a vec-table capability miss.
834    #[allow(clippy::too_many_lines)]
835    pub fn execute_compiled_vector_search(
836        &self,
837        compiled: &CompiledVectorSearch,
838    ) -> Result<SearchRows, EngineError> {
839        use std::fmt::Write as _;
840
841        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
842        // empty result rather than a SQL error from `LIMIT 0` semantics in
843        // the inner vec0 scan.
844        if compiled.limit == 0 {
845            return Ok(SearchRows::default());
846        }
847
848        let filter_by_kind = !compiled.root_kind.is_empty();
849        let mut binds: Vec<BindValue> = Vec::new();
850        binds.push(BindValue::Text(compiled.query_text.clone()));
851        if filter_by_kind {
852            binds.push(BindValue::Text(compiled.root_kind.clone()));
853        }
854
855        // Build fusable-filter clauses, aliased against `src` inside the
856        // candidate CTE. Same predicate set the text path fuses.
857        let mut fused_clauses = String::new();
858        for predicate in &compiled.fusable_filters {
859            match predicate {
860                Predicate::KindEq(kind) => {
861                    binds.push(BindValue::Text(kind.clone()));
862                    let idx = binds.len();
863                    let _ = write!(
864                        fused_clauses,
865                        "\n                      AND src.kind = ?{idx}"
866                    );
867                }
868                Predicate::LogicalIdEq(logical_id) => {
869                    binds.push(BindValue::Text(logical_id.clone()));
870                    let idx = binds.len();
871                    let _ = write!(
872                        fused_clauses,
873                        "\n                      AND src.logical_id = ?{idx}"
874                    );
875                }
876                Predicate::SourceRefEq(source_ref) => {
877                    binds.push(BindValue::Text(source_ref.clone()));
878                    let idx = binds.len();
879                    let _ = write!(
880                        fused_clauses,
881                        "\n                      AND src.source_ref = ?{idx}"
882                    );
883                }
884                Predicate::ContentRefEq(uri) => {
885                    binds.push(BindValue::Text(uri.clone()));
886                    let idx = binds.len();
887                    let _ = write!(
888                        fused_clauses,
889                        "\n                      AND src.content_ref = ?{idx}"
890                    );
891                }
892                Predicate::ContentRefNotNull => {
893                    fused_clauses
894                        .push_str("\n                      AND src.content_ref IS NOT NULL");
895                }
896                Predicate::JsonPathFusedEq { path, value } => {
897                    binds.push(BindValue::Text(path.clone()));
898                    let path_idx = binds.len();
899                    binds.push(BindValue::Text(value.clone()));
900                    let value_idx = binds.len();
901                    let _ = write!(
902                        fused_clauses,
903                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
904                    );
905                }
906                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
907                    binds.push(BindValue::Text(path.clone()));
908                    let path_idx = binds.len();
909                    binds.push(BindValue::Integer(*value));
910                    let value_idx = binds.len();
911                    let operator = match op {
912                        ComparisonOp::Gt => ">",
913                        ComparisonOp::Gte => ">=",
914                        ComparisonOp::Lt => "<",
915                        ComparisonOp::Lte => "<=",
916                    };
917                    let _ = write!(
918                        fused_clauses,
919                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
920                    );
921                }
922                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
923                    // JSON predicates are residual; compile_vector_search
924                    // guarantees they never appear here, but stay defensive.
925                }
926            }
927        }
928
929        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
930        let mut filter_clauses = String::new();
931        for predicate in &compiled.residual_filters {
932            match predicate {
933                Predicate::JsonPathEq { path, value } => {
934                    binds.push(BindValue::Text(path.clone()));
935                    let path_idx = binds.len();
936                    binds.push(scalar_to_bind(value));
937                    let value_idx = binds.len();
938                    let _ = write!(
939                        filter_clauses,
940                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
941                    );
942                }
943                Predicate::JsonPathCompare { path, op, value } => {
944                    binds.push(BindValue::Text(path.clone()));
945                    let path_idx = binds.len();
946                    binds.push(scalar_to_bind(value));
947                    let value_idx = binds.len();
948                    let operator = match op {
949                        ComparisonOp::Gt => ">",
950                        ComparisonOp::Gte => ">=",
951                        ComparisonOp::Lt => "<",
952                        ComparisonOp::Lte => "<=",
953                    };
954                    let _ = write!(
955                        filter_clauses,
956                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
957                    );
958                }
959                Predicate::KindEq(_)
960                | Predicate::LogicalIdEq(_)
961                | Predicate::SourceRefEq(_)
962                | Predicate::ContentRefEq(_)
963                | Predicate::ContentRefNotNull
964                | Predicate::JsonPathFusedEq { .. }
965                | Predicate::JsonPathFusedTimestampCmp { .. } => {
966                    // Fusable predicates live in fused_clauses above.
967                }
968            }
969        }
970
971        // Bind the outer limit as a named parameter for prepare_cached
972        // stability across calls that vary only by limit value.
973        let limit = compiled.limit;
974        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
975        let limit_idx = binds.len();
976
977        // sqlite-vec requires the LIMIT/k constraint to be visible directly
978        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
979        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
980        // Phase 12's planner may raise this to compensate for fusion
981        // narrowing the candidate pool).
982        let base_limit = limit;
983        let kind_clause = if filter_by_kind {
984            "\n                      AND src.kind = ?2"
985        } else {
986            ""
987        };
988
989        let sql = format!(
990            "WITH vector_hits AS (
991                SELECT
992                    src.row_id AS row_id,
993                    src.logical_id AS logical_id,
994                    src.kind AS kind,
995                    src.properties AS properties,
996                    src.source_ref AS source_ref,
997                    src.content_ref AS content_ref,
998                    src.created_at AS created_at,
999                    vc.distance AS distance,
1000                    vc.chunk_id AS chunk_id
1001                FROM (
1002                    SELECT chunk_id, distance
1003                    FROM vec_nodes_active
1004                    WHERE embedding MATCH ?1
1005                    LIMIT {base_limit}
1006                ) vc
1007                JOIN chunks c ON c.id = vc.chunk_id
1008                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1009                WHERE 1 = 1{kind_clause}{fused_clauses}
1010            )
1011            SELECT
1012                h.row_id,
1013                h.logical_id,
1014                h.kind,
1015                h.properties,
1016                h.content_ref,
1017                am.last_accessed_at,
1018                h.created_at,
1019                h.distance,
1020                h.chunk_id
1021            FROM vector_hits h
1022            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1023            WHERE 1 = 1{filter_clauses}
1024            ORDER BY h.distance ASC
1025            LIMIT ?{limit_idx}"
1026        );
1027
1028        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1029
1030        let conn_guard = match self.lock_connection() {
1031            Ok(g) => g,
1032            Err(e) => {
1033                self.telemetry.increment_errors();
1034                return Err(e);
1035            }
1036        };
1037        let mut statement = match conn_guard.prepare_cached(&sql) {
1038            Ok(stmt) => stmt,
1039            Err(e) if is_vec_table_absent(&e) => {
1040                // Capability miss: non-fatal — surface as was_degraded.
1041                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1042                    trace_warn!("vector table absent, degrading vector_search to empty result");
1043                }
1044                return Ok(SearchRows {
1045                    hits: Vec::new(),
1046                    strict_hit_count: 0,
1047                    relaxed_hit_count: 0,
1048                    vector_hit_count: 0,
1049                    fallback_used: false,
1050                    was_degraded: true,
1051                });
1052            }
1053            Err(e) => {
1054                self.telemetry.increment_errors();
1055                return Err(EngineError::Sqlite(e));
1056            }
1057        };
1058
1059        let attribution_requested = compiled.attribution_requested;
1060        let hits = match statement
1061            .query_map(params_from_iter(bind_values.iter()), |row| {
1062                let distance: f64 = row.get(7)?;
1063                // Score is the negated distance per addendum 1
1064                // §Vector-Specific Behavior / Score and distance. For
1065                // distance metrics (sqlite-vec's default) lower distance =
1066                // better match, so negating yields the higher-is-better
1067                // convention that dedup_branch_hits and the unified result
1068                // surface rely on.
1069                let score = -distance;
1070                Ok(SearchHit {
1071                    node: fathomdb_query::NodeRowLite {
1072                        row_id: row.get(0)?,
1073                        logical_id: row.get(1)?,
1074                        kind: row.get(2)?,
1075                        properties: row.get(3)?,
1076                        content_ref: row.get(4)?,
1077                        last_accessed_at: row.get(5)?,
1078                    },
1079                    written_at: row.get(6)?,
1080                    score,
1081                    modality: RetrievalModality::Vector,
1082                    source: SearchHitSource::Vector,
1083                    // Vector hits have no strict/relaxed notion.
1084                    match_mode: None,
1085                    // Vector hits have no snippet.
1086                    snippet: None,
1087                    projection_row_id: row.get::<_, Option<String>>(8)?,
1088                    vector_distance: Some(distance),
1089                    attribution: if attribution_requested {
1090                        Some(HitAttribution {
1091                            matched_paths: Vec::new(),
1092                        })
1093                    } else {
1094                        None
1095                    },
1096                })
1097            })
1098            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1099        {
1100            Ok(rows) => rows,
1101            Err(e) => {
1102                // Some SQLite errors surface during row iteration (e.g. when
1103                // the vec0 extension is not loaded but the table exists as a
1104                // stub). Classify as capability-miss when the shape matches.
1105                if is_vec_table_absent(&e) {
1106                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1107                        trace_warn!(
1108                            "vector table absent at query time, degrading vector_search to empty result"
1109                        );
1110                    }
1111                    drop(statement);
1112                    drop(conn_guard);
1113                    return Ok(SearchRows {
1114                        hits: Vec::new(),
1115                        strict_hit_count: 0,
1116                        relaxed_hit_count: 0,
1117                        vector_hit_count: 0,
1118                        fallback_used: false,
1119                        was_degraded: true,
1120                    });
1121                }
1122                self.telemetry.increment_errors();
1123                return Err(EngineError::Sqlite(e));
1124            }
1125        };
1126
1127        drop(statement);
1128        drop(conn_guard);
1129
1130        self.telemetry.increment_queries();
1131        let vector_hit_count = hits.len();
1132        Ok(SearchRows {
1133            hits,
1134            strict_hit_count: 0,
1135            relaxed_hit_count: 0,
1136            vector_hit_count,
1137            fallback_used: false,
1138            was_degraded: false,
1139        })
1140    }
1141
1142    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1143    /// entry point) and return deterministically ranked, block-ordered
1144    /// [`SearchRows`].
1145    ///
1146    /// Stages, per addendum 1 §Retrieval Planner Model:
1147    ///
1148    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1149    ///    empty branch result inside `run_search_branch`).
1150    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1151    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1152    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1153    ///    Phase 6 text-only path.
1154    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1155    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1156    ///    planner never wires a vector branch through `search()`, so this
1157    ///    code path is structurally present but dormant.** A future phase
1158    ///    that wires read-time embedding into `compile_retrieval_plan` will
1159    ///    immediately light it up.
1160    /// 4. **Fusion.** All collected hits are merged via
1161    ///    [`merge_search_branches_three`], which produces strict ->
1162    ///    relaxed -> vector block ordering with cross-branch dedup
1163    ///    resolved by branch precedence.
1164    ///
1165    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1166    /// addendum's "vector capability miss => `was_degraded`" semantics
1167    /// applies to `search()` only when the unified planner actually fires
1168    /// the vector branch, which v1 never does.
1169    ///
1170    /// # Errors
1171    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1172    /// executed for a non-capability-miss reason.
1173    pub fn execute_retrieval_plan(
1174        &self,
1175        plan: &CompiledRetrievalPlan,
1176        raw_query: &str,
1177    ) -> Result<SearchRows, EngineError> {
1178        // Phase 12.5a: we work against a local owned copy so
1179        // `fill_vector_branch` can mutate `plan.vector` and
1180        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1181        // a bounded set of predicates + text AST nodes) and avoids
1182        // forcing callers to hand us `&mut` access.
1183        let mut plan = plan.clone();
1184        let limit = plan.text.strict.limit;
1185
1186        // Stage 1: text strict.
1187        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1188
1189        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1190        // path uses.
1191        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1192        let strict_underfilled = strict_hits.len() < fallback_threshold;
1193        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1194        let mut fallback_used = false;
1195        let mut was_degraded = false;
1196        if let Some(relaxed) = plan.text.relaxed.as_ref()
1197            && strict_underfilled
1198        {
1199            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1200            fallback_used = true;
1201            was_degraded = plan.was_degraded_at_plan_time;
1202        }
1203
1204        // Phase 12.5a: fill the vector branch from the configured
1205        // read-time query embedder, if any. Option (b) from the spec:
1206        // only pay the embedding cost when the text branches returned
1207        // nothing, because the three-branch stage gate below only runs
1208        // the vector stage under exactly that condition. This keeps the
1209        // hot path (strict text matched) embedder-free.
1210        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1211        if text_branches_empty && self.query_embedder.is_some() {
1212            self.fill_vector_branch(&mut plan, raw_query);
1213        }
1214
1215        // Stage 3: vector. Runs only when text retrieval is empty AND a
1216        // vector branch is present. When no embedder is configured (Phase
1217        // 12.5a default) `plan.vector` stays `None` and this stage is a
1218        // no-op, preserving the Phase 12 v1 dormancy invariant.
1219        let mut vector_hits: Vec<SearchHit> = Vec::new();
1220        if let Some(vector) = plan.vector.as_ref()
1221            && strict_hits.is_empty()
1222            && relaxed_hits.is_empty()
1223        {
1224            let vector_rows = self.execute_compiled_vector_search(vector)?;
1225            // `execute_compiled_vector_search` returns a fully populated
1226            // `SearchRows`. Promote its hits into the merge stage and lift
1227            // its capability-miss `was_degraded` flag onto the unified
1228            // result, per addendum §Vector-Specific Behavior.
1229            vector_hits = vector_rows.hits;
1230            if vector_rows.was_degraded {
1231                was_degraded = true;
1232            }
1233        }
1234        // Phase 12.5a: an embedder-reported capability miss surfaces as
1235        // `plan.was_degraded_at_plan_time = true` set inside
1236        // `fill_vector_branch`. Lift it onto the response so callers see
1237        // the graceful degradation even when the vector stage-gate never
1238        // had the chance to fire (the embedder call itself failed and
1239        // the vector slot stayed `None`).
1240        if text_branches_empty
1241            && plan.was_degraded_at_plan_time
1242            && plan.vector.is_none()
1243            && self.query_embedder.is_some()
1244        {
1245            was_degraded = true;
1246        }
1247
1248        // Stage 4: fusion.
1249        let strict = &plan.text.strict;
1250        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1251        if strict.attribution_requested {
1252            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1253            self.populate_attribution_for_hits(
1254                &mut merged,
1255                &strict.text_query,
1256                relaxed_text_query,
1257            )?;
1258        }
1259
1260        let strict_hit_count = merged
1261            .iter()
1262            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1263            .count();
1264        let relaxed_hit_count = merged
1265            .iter()
1266            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1267            .count();
1268        let vector_hit_count = merged
1269            .iter()
1270            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1271            .count();
1272
1273        Ok(SearchRows {
1274            hits: merged,
1275            strict_hit_count,
1276            relaxed_hit_count,
1277            vector_hit_count,
1278            fallback_used,
1279            was_degraded,
1280        })
1281    }
1282
1283    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1284    /// query embedder, if any.
1285    ///
1286    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1287    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1288    /// - Both the strict and relaxed text branches already ran and
1289    ///   returned zero hits, so the existing three-branch stage gate
1290    ///   will actually fire the vector stage once the slot is populated.
1291    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1292    ///   cost entirely when text retrieval already won.
1293    ///
1294    /// Contract: never panics, never returns an error. On embedder error
1295    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1296    /// `plan.vector` as `None`; the coordinator's normal error-free
1297    /// degradation path then reports `was_degraded` on the result.
1298    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1299        let Some(embedder) = self.query_embedder.as_ref() else {
1300            return;
1301        };
1302        match embedder.embed_query(raw_query) {
1303            Ok(vec) => {
1304                // `CompiledVectorSearch::query_text` is a JSON float-array
1305                // literal at the time the coordinator binds it (see the
1306                // `CompiledVectorSearch` docs). `serde_json::to_string`
1307                // on a `Vec<f32>` produces exactly that shape — no
1308                // wire-format change required.
1309                let literal = match serde_json::to_string(&vec) {
1310                    Ok(s) => s,
1311                    Err(err) => {
1312                        trace_warn!(
1313                            error = %err,
1314                            "query embedder vector serialization failed; skipping vector branch"
1315                        );
1316                        let _ = err; // Used by trace_warn! when tracing feature is active
1317                        plan.was_degraded_at_plan_time = true;
1318                        return;
1319                    }
1320                };
1321                let strict = &plan.text.strict;
1322                plan.vector = Some(CompiledVectorSearch {
1323                    root_kind: strict.root_kind.clone(),
1324                    query_text: literal,
1325                    limit: strict.limit,
1326                    fusable_filters: strict.fusable_filters.clone(),
1327                    residual_filters: strict.residual_filters.clone(),
1328                    attribution_requested: strict.attribution_requested,
1329                });
1330            }
1331            Err(err) => {
1332                trace_warn!(
1333                    error = %err,
1334                    "query embedder unavailable, skipping vector branch"
1335                );
1336                let _ = err; // Used by trace_warn! when tracing feature is active
1337                plan.was_degraded_at_plan_time = true;
1338            }
1339        }
1340    }
1341
1342    /// Execute a single search branch against the underlying FTS surfaces.
1343    ///
1344    /// This is the shared SQL emission path used by
1345    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1346    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1347    /// The returned hits are tagged with `branch`'s corresponding
1348    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1349    /// caller is responsible for merging multiple branches.
1350    #[allow(clippy::too_many_lines)]
1351    fn run_search_branch(
1352        &self,
1353        compiled: &CompiledSearch,
1354        branch: SearchBranch,
1355    ) -> Result<Vec<SearchHit>, EngineError> {
1356        use std::fmt::Write as _;
1357        // Short-circuit an empty/whitespace-only query: rendering it would
1358        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1359        // (including the adaptive path when strict is Empty and derive_relaxed
1360        // returns None) must see an empty result, not an error. Each branch
1361        // is short-circuited independently so a strict-Empty + relaxed-Some
1362        // plan still exercises the relaxed branch.
1363        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1364        // matches "every row not containing X" — a complement-of-corpus scan
1365        // that no caller would intentionally want. Short-circuit to empty at
1366        // the root only; a `Not` nested inside an `And` is a legitimate
1367        // exclusion and must still run.
1368        if matches!(
1369            compiled.text_query,
1370            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1371        ) {
1372            return Ok(Vec::new());
1373        }
1374        let rendered_base = render_text_query_fts5(&compiled.text_query);
1375        // Apply per-kind query-side tokenizer adaptations.
1376        // SubstringTrigram: queries shorter than 3 chars produce no results
1377        // (trigram index cannot match them). Return empty instead of an FTS5
1378        // error or a full-table scan.
1379        // SourceCode: render_text_query_fts5 already wraps every term in FTS5
1380        // double-quote delimiters (e.g. "std.io"). Applying escape_source_code_query
1381        // on that already-rendered output corrupts the expression — the escape
1382        // function sees the surrounding '"' characters and '.' separator and
1383        // produces malformed syntax like '"std"."io""'. The phrase quoting from
1384        // render_text_query_fts5 is sufficient: FTS5 applies the tokenizer
1385        // (including custom tokenchars) inside double-quoted phrase expressions,
1386        // so "std.io" correctly matches the single token 'std.io'.
1387        let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1388        if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1389            && rendered_base
1390                .chars()
1391                .filter(|c| c.is_alphanumeric())
1392                .count()
1393                < 3
1394        {
1395            return Ok(Vec::new());
1396        }
1397        let rendered = rendered_base;
1398        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1399        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1400        // The adaptive `text_search()` path never produces an empty root_kind
1401        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1402        // the entry point.
1403        let filter_by_kind = !compiled.root_kind.is_empty();
1404
1405        // Acquire the connection early so we can check per-kind table existence
1406        // before building the bind array and SQL. The bind numbering depends on
1407        // whether the property FTS UNION arm is included.
1408        let conn_guard = match self.lock_connection() {
1409            Ok(g) => g,
1410            Err(e) => {
1411                self.telemetry.increment_errors();
1412                return Err(e);
1413            }
1414        };
1415
1416        // Determine which per-kind property FTS tables to include in the UNION arm.
1417        //
1418        // filter_by_kind = true (root_kind set): include the single per-kind table
1419        //   for root_kind if it exists. Bind order: ?1=chunk_text, ?2=kind, ?3=prop_text.
1420        //
1421        // filter_by_kind = false (fallback search, root_kind empty): include per-kind tables
1422        //   based on fusable KindEq predicates or all registered tables from sqlite_master.
1423        //   A single KindEq in fusable_filters lets us use one specific table. With no KindEq
1424        //   we UNION all per-kind tables so kind-less fallback searches still find property
1425        //   hits (matching the behaviour of the former global fts_node_properties table).
1426        //   Bind order: ?1=chunk_text, ?2=prop_text (shared across all prop UNION arms).
1427        //
1428        // In both cases, per-kind tables are already kind-specific so no `fp.kind = ?` filter
1429        // is needed inside the inner arm; the outer `search_hits` CTE WHERE clause handles
1430        // any further kind narrowing via fused KindEq predicates.
1431        // prop_fts_tables is Vec<(kind, table_name)> so we can later look up per-kind
1432        // BM25 weights from fts_property_schemas when building the scoring expression.
1433        let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1434            let kind = compiled.root_kind.clone();
1435            let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1436            let exists: bool = conn_guard
1437                .query_row(
1438                    "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1439                    rusqlite::params![prop_table],
1440                    |_| Ok(true),
1441                )
1442                .optional()
1443                .map_err(EngineError::Sqlite)?
1444                .unwrap_or(false);
1445            if exists {
1446                vec![(kind, prop_table)]
1447            } else {
1448                vec![]
1449            }
1450        } else {
1451            // Fallback / kind-less search: find the right per-kind tables.
1452            // If there is exactly one KindEq in fusable_filters, use that kind's table.
1453            // Otherwise, include all registered per-kind tables from sqlite_master so
1454            // that kind-less fallback searches can still return property FTS hits.
1455            let kind_eq_values: Vec<String> = compiled
1456                .fusable_filters
1457                .iter()
1458                .filter_map(|p| match p {
1459                    Predicate::KindEq(k) => Some(k.clone()),
1460                    _ => None,
1461                })
1462                .collect();
1463            if kind_eq_values.len() == 1 {
1464                let kind = kind_eq_values[0].clone();
1465                let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1466                let exists: bool = conn_guard
1467                    .query_row(
1468                        "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1469                        rusqlite::params![prop_table],
1470                        |_| Ok(true),
1471                    )
1472                    .optional()
1473                    .map_err(EngineError::Sqlite)?
1474                    .unwrap_or(false);
1475                if exists {
1476                    vec![(kind, prop_table)]
1477                } else {
1478                    vec![]
1479                }
1480            } else {
1481                // No single KindEq: UNION all per-kind tables so kind-less fallback
1482                // searches behave like the former global fts_node_properties table.
1483                // Fetch registered kinds and compute/verify their per-kind table names.
1484                let mut stmt = conn_guard
1485                    .prepare("SELECT kind FROM fts_property_schemas")
1486                    .map_err(EngineError::Sqlite)?;
1487                let all_kinds: Vec<String> = stmt
1488                    .query_map([], |r| r.get::<_, String>(0))
1489                    .map_err(EngineError::Sqlite)?
1490                    .collect::<Result<Vec<_>, _>>()
1491                    .map_err(EngineError::Sqlite)?;
1492                drop(stmt);
1493                let mut result = Vec::new();
1494                for kind in all_kinds {
1495                    let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1496                    let exists: bool = conn_guard
1497                        .query_row(
1498                            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1499                            rusqlite::params![prop_table],
1500                            |_| Ok(true),
1501                        )
1502                        .optional()
1503                        .map_err(EngineError::Sqlite)?
1504                        .unwrap_or(false);
1505                    if exists {
1506                        result.push((kind, prop_table));
1507                    }
1508                }
1509                result
1510            }
1511        };
1512        let use_prop_fts = !prop_fts_tables.is_empty();
1513
1514        // Bind layout (before fused/residual predicates and limit):
1515        //   filter_by_kind = true,  use_prop_fts = true:  ?1=chunk_text, ?2=kind, ?3=prop_text
1516        //   filter_by_kind = true,  use_prop_fts = false: ?1=chunk_text, ?2=kind
1517        //   filter_by_kind = false, use_prop_fts = true:  ?1=chunk_text, ?2=prop_text
1518        //   filter_by_kind = false, use_prop_fts = false: ?1=chunk_text
1519        let mut binds: Vec<BindValue> = if filter_by_kind {
1520            if use_prop_fts {
1521                vec![
1522                    BindValue::Text(rendered.clone()),
1523                    BindValue::Text(compiled.root_kind.clone()),
1524                    BindValue::Text(rendered),
1525                ]
1526            } else {
1527                vec![
1528                    BindValue::Text(rendered.clone()),
1529                    BindValue::Text(compiled.root_kind.clone()),
1530                ]
1531            }
1532        } else if use_prop_fts {
1533            // fallback search with property FTS: ?1=chunk, ?2=prop (same query value)
1534            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1535        } else {
1536            vec![BindValue::Text(rendered)]
1537        };
1538
1539        // P2-5: both fusable and residual predicates now match against the
1540        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1541        // `u.content_ref`, `u.properties`) because the inner UNION arms
1542        // project the full active-row column set through the
1543        // `JOIN nodes src` already present in each arm. The previous
1544        // implementation re-joined `nodes hn` at the CTE level and
1545        // `nodes n` again at the outer SELECT, which was triple work on
1546        // the hot search path.
1547        let mut fused_clauses = String::new();
1548        for predicate in &compiled.fusable_filters {
1549            match predicate {
1550                Predicate::KindEq(kind) => {
1551                    binds.push(BindValue::Text(kind.clone()));
1552                    let idx = binds.len();
1553                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1554                }
1555                Predicate::LogicalIdEq(logical_id) => {
1556                    binds.push(BindValue::Text(logical_id.clone()));
1557                    let idx = binds.len();
1558                    let _ = write!(
1559                        fused_clauses,
1560                        "\n                  AND u.logical_id = ?{idx}"
1561                    );
1562                }
1563                Predicate::SourceRefEq(source_ref) => {
1564                    binds.push(BindValue::Text(source_ref.clone()));
1565                    let idx = binds.len();
1566                    let _ = write!(
1567                        fused_clauses,
1568                        "\n                  AND u.source_ref = ?{idx}"
1569                    );
1570                }
1571                Predicate::ContentRefEq(uri) => {
1572                    binds.push(BindValue::Text(uri.clone()));
1573                    let idx = binds.len();
1574                    let _ = write!(
1575                        fused_clauses,
1576                        "\n                  AND u.content_ref = ?{idx}"
1577                    );
1578                }
1579                Predicate::ContentRefNotNull => {
1580                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1581                }
1582                Predicate::JsonPathFusedEq { path, value } => {
1583                    binds.push(BindValue::Text(path.clone()));
1584                    let path_idx = binds.len();
1585                    binds.push(BindValue::Text(value.clone()));
1586                    let value_idx = binds.len();
1587                    let _ = write!(
1588                        fused_clauses,
1589                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1590                    );
1591                }
1592                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1593                    binds.push(BindValue::Text(path.clone()));
1594                    let path_idx = binds.len();
1595                    binds.push(BindValue::Integer(*value));
1596                    let value_idx = binds.len();
1597                    let operator = match op {
1598                        ComparisonOp::Gt => ">",
1599                        ComparisonOp::Gte => ">=",
1600                        ComparisonOp::Lt => "<",
1601                        ComparisonOp::Lte => "<=",
1602                    };
1603                    let _ = write!(
1604                        fused_clauses,
1605                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1606                    );
1607                }
1608                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1609                    // Should be in residual_filters; compile_search guarantees
1610                    // this, but stay defensive.
1611                }
1612            }
1613        }
1614
1615        let mut filter_clauses = String::new();
1616        for predicate in &compiled.residual_filters {
1617            match predicate {
1618                Predicate::JsonPathEq { path, value } => {
1619                    binds.push(BindValue::Text(path.clone()));
1620                    let path_idx = binds.len();
1621                    binds.push(scalar_to_bind(value));
1622                    let value_idx = binds.len();
1623                    let _ = write!(
1624                        filter_clauses,
1625                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1626                    );
1627                }
1628                Predicate::JsonPathCompare { path, op, value } => {
1629                    binds.push(BindValue::Text(path.clone()));
1630                    let path_idx = binds.len();
1631                    binds.push(scalar_to_bind(value));
1632                    let value_idx = binds.len();
1633                    let operator = match op {
1634                        ComparisonOp::Gt => ">",
1635                        ComparisonOp::Gte => ">=",
1636                        ComparisonOp::Lt => "<",
1637                        ComparisonOp::Lte => "<=",
1638                    };
1639                    let _ = write!(
1640                        filter_clauses,
1641                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1642                    );
1643                }
1644                Predicate::KindEq(_)
1645                | Predicate::LogicalIdEq(_)
1646                | Predicate::SourceRefEq(_)
1647                | Predicate::ContentRefEq(_)
1648                | Predicate::ContentRefNotNull
1649                | Predicate::JsonPathFusedEq { .. }
1650                | Predicate::JsonPathFusedTimestampCmp { .. } => {
1651                    // Fusable predicates live in fused_clauses; compile_search
1652                    // partitions them out of residual_filters.
1653                }
1654            }
1655        }
1656
1657        // Bind `limit` as an integer parameter rather than formatting it into
1658        // the SQL string. Interpolating the limit made the prepared-statement
1659        // SQL vary by limit value, so rusqlite's default 16-slot
1660        // `prepare_cached` cache thrashed for paginated callers that varied
1661        // limits per call. With the bind the SQL is structurally stable for
1662        // a given filter shape regardless of `limit` value.
1663        let limit = compiled.limit;
1664        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1665        let limit_idx = binds.len();
1666        // P2-5: the inner UNION arms project the full active-row column
1667        // set through `JOIN nodes src` (kind, row_id, source_ref,
1668        // content_ref, content_hash, created_at, properties). Both the
1669        // CTE's outer WHERE and the final SELECT consume those columns
1670        // directly, which eliminates the previous `JOIN nodes hn` at the
1671        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1672        // redundant joins on the hot search path. `src.superseded_at IS
1673        // NULL` in each arm already filters retired rows, which is what
1674        // the dropped outer joins used to do.
1675        //
1676        // Property FTS uses per-kind tables (fts_props_<kind>). One UNION arm
1677        // is generated per table in prop_fts_tables. The prop text bind index
1678        // is ?3 when filter_by_kind=true (after chunk_text and kind binds) or
1679        // ?2 when filter_by_kind=false (after chunk_text only). The same bind
1680        // position is reused by every prop arm, which is valid in SQLite.
1681        let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1682        let prop_arm_sql: String = if use_prop_fts {
1683            prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1684                // Load schema for this kind to compute BM25 weights.
1685                let bm25_expr = conn_guard
1686                    .query_row(
1687                        "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1688                        rusqlite::params![kind],
1689                        |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1690                    )
1691                    .ok()
1692                    .map_or_else(
1693                        || format!("bm25({prop_table})"),
1694                        |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1695                    );
1696                // For weighted (per-column) schemas text_content does not exist;
1697                // use an empty snippet rather than a column reference that would fail.
1698                let is_weighted = bm25_expr != format!("bm25({prop_table})");
1699                let snippet_expr = if is_weighted {
1700                    "'' AS snippet".to_owned()
1701                } else {
1702                    "substr(fp.text_content, 1, 200) AS snippet".to_owned()
1703                };
1704                let _ = write!(
1705                    acc,
1706                    "
1707                    UNION ALL
1708                    SELECT
1709                        src.row_id AS row_id,
1710                        fp.node_logical_id AS logical_id,
1711                        src.kind AS kind,
1712                        src.properties AS properties,
1713                        src.source_ref AS source_ref,
1714                        src.content_ref AS content_ref,
1715                        src.created_at AS created_at,
1716                        -{bm25_expr} AS score,
1717                        'property' AS source,
1718                        {snippet_expr},
1719                        CAST(fp.rowid AS TEXT) AS projection_row_id
1720                    FROM {prop_table} fp
1721                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1722                    WHERE {prop_table} MATCH ?{prop_bind_idx}"
1723                );
1724                acc
1725            })
1726        } else {
1727            String::new()
1728        };
1729        let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
1730            ("?1", "\n                      AND src.kind = ?2")
1731        } else {
1732            ("?1", "")
1733        };
1734        let sql = format!(
1735            "WITH search_hits AS (
1736                SELECT
1737                    u.row_id AS row_id,
1738                    u.logical_id AS logical_id,
1739                    u.kind AS kind,
1740                    u.properties AS properties,
1741                    u.source_ref AS source_ref,
1742                    u.content_ref AS content_ref,
1743                    u.created_at AS created_at,
1744                    u.score AS score,
1745                    u.source AS source,
1746                    u.snippet AS snippet,
1747                    u.projection_row_id AS projection_row_id
1748                FROM (
1749                    SELECT
1750                        src.row_id AS row_id,
1751                        c.node_logical_id AS logical_id,
1752                        src.kind AS kind,
1753                        src.properties AS properties,
1754                        src.source_ref AS source_ref,
1755                        src.content_ref AS content_ref,
1756                        src.created_at AS created_at,
1757                        -bm25(fts_nodes) AS score,
1758                        'chunk' AS source,
1759                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1760                        f.chunk_id AS projection_row_id
1761                    FROM fts_nodes f
1762                    JOIN chunks c ON c.id = f.chunk_id
1763                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1764                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
1765                ) u
1766                WHERE 1 = 1{fused_clauses}
1767                ORDER BY u.score DESC
1768                LIMIT ?{limit_idx}
1769            )
1770            SELECT
1771                h.row_id,
1772                h.logical_id,
1773                h.kind,
1774                h.properties,
1775                h.content_ref,
1776                am.last_accessed_at,
1777                h.created_at,
1778                h.score,
1779                h.source,
1780                h.snippet,
1781                h.projection_row_id
1782            FROM search_hits h
1783            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1784            WHERE 1 = 1{filter_clauses}
1785            ORDER BY h.score DESC"
1786        );
1787
1788        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1789
1790        let mut statement = match conn_guard.prepare_cached(&sql) {
1791            Ok(stmt) => stmt,
1792            Err(e) => {
1793                self.telemetry.increment_errors();
1794                return Err(EngineError::Sqlite(e));
1795            }
1796        };
1797
1798        let hits = match statement
1799            .query_map(params_from_iter(bind_values.iter()), |row| {
1800                let source_str: String = row.get(8)?;
1801                // The CTE emits only two literal values here: `'chunk'` and
1802                // `'property'`. Default to `Chunk` on anything unexpected so a
1803                // schema drift surfaces as a mislabelled hit rather than a
1804                // row-level error.
1805                let source = if source_str == "property" {
1806                    SearchHitSource::Property
1807                } else {
1808                    SearchHitSource::Chunk
1809                };
1810                let match_mode = match branch {
1811                    SearchBranch::Strict => SearchMatchMode::Strict,
1812                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1813                };
1814                Ok(SearchHit {
1815                    node: fathomdb_query::NodeRowLite {
1816                        row_id: row.get(0)?,
1817                        logical_id: row.get(1)?,
1818                        kind: row.get(2)?,
1819                        properties: row.get(3)?,
1820                        content_ref: row.get(4)?,
1821                        last_accessed_at: row.get(5)?,
1822                    },
1823                    written_at: row.get(6)?,
1824                    score: row.get(7)?,
1825                    // Phase 10: every branch currently emits text hits.
1826                    modality: RetrievalModality::Text,
1827                    source,
1828                    match_mode: Some(match_mode),
1829                    snippet: row.get(9)?,
1830                    projection_row_id: row.get(10)?,
1831                    vector_distance: None,
1832                    attribution: None,
1833                })
1834            })
1835            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1836        {
1837            Ok(rows) => rows,
1838            Err(e) => {
1839                self.telemetry.increment_errors();
1840                return Err(EngineError::Sqlite(e));
1841            }
1842        };
1843
1844        // Drop the statement so `conn_guard` is free (attribution is
1845        // resolved after dedup in `execute_compiled_search_plan` to avoid
1846        // spending highlight lookups on hits that will be discarded).
1847        drop(statement);
1848        drop(conn_guard);
1849
1850        self.telemetry.increment_queries();
1851        Ok(hits)
1852    }
1853
1854    /// Populate per-hit attribution for the given deduped merged hits.
1855    /// Runs after [`merge_search_branches`] so dropped duplicates do not
1856    /// incur the highlight+position-map lookup cost.
1857    fn populate_attribution_for_hits(
1858        &self,
1859        hits: &mut [SearchHit],
1860        strict_text_query: &fathomdb_query::TextQuery,
1861        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1862    ) -> Result<(), EngineError> {
1863        let conn_guard = match self.lock_connection() {
1864            Ok(g) => g,
1865            Err(e) => {
1866                self.telemetry.increment_errors();
1867                return Err(e);
1868            }
1869        };
1870        let strict_expr = render_text_query_fts5(strict_text_query);
1871        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1872        for hit in hits.iter_mut() {
1873            // Phase 10: text hits always carry `Some(match_mode)`. Vector
1874            // hits (when a future phase adds them) have `None` here and
1875            // are skipped by the attribution resolver because attribution
1876            // is meaningless for vector matches.
1877            let match_expr = match hit.match_mode {
1878                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1879                Some(SearchMatchMode::Relaxed) => {
1880                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1881                }
1882                None => continue,
1883            };
1884            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1885                Ok(att) => hit.attribution = Some(att),
1886                Err(e) => {
1887                    self.telemetry.increment_errors();
1888                    return Err(e);
1889                }
1890            }
1891        }
1892        Ok(())
1893    }
1894
1895    /// # Errors
1896    /// Returns [`EngineError`] if the root query or any bounded expansion
1897    /// query cannot be prepared or executed.
1898    pub fn execute_compiled_grouped_read(
1899        &self,
1900        compiled: &CompiledGroupedQuery,
1901    ) -> Result<GroupedQueryRows, EngineError> {
1902        let root_rows = self.execute_compiled_read(&compiled.root)?;
1903        if root_rows.was_degraded {
1904            return Ok(GroupedQueryRows {
1905                roots: Vec::new(),
1906                expansions: Vec::new(),
1907                was_degraded: true,
1908            });
1909        }
1910
1911        let roots = root_rows.nodes;
1912        let mut expansions = Vec::with_capacity(compiled.expansions.len());
1913        for expansion in &compiled.expansions {
1914            let slot_rows = if roots.is_empty() {
1915                Vec::new()
1916            } else {
1917                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1918            };
1919            expansions.push(ExpansionSlotRows {
1920                slot: expansion.slot.clone(),
1921                roots: slot_rows,
1922            });
1923        }
1924
1925        Ok(GroupedQueryRows {
1926            roots,
1927            expansions,
1928            was_degraded: false,
1929        })
1930    }
1931
1932    /// Chunked batched expansion: splits roots into chunks of
1933    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
1934    /// results while preserving root ordering.  This keeps bind-parameter
1935    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
1936    /// for large result sets.
1937    fn read_expansion_nodes_chunked(
1938        &self,
1939        roots: &[NodeRow],
1940        expansion: &ExpansionSlot,
1941        hard_limit: usize,
1942    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1943        if roots.len() <= BATCH_CHUNK_SIZE {
1944            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1945        }
1946
1947        // Merge chunk results keyed by root logical_id, then reassemble in
1948        // root order.
1949        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1950        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1951            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1952                per_root
1953                    .entry(group.root_logical_id)
1954                    .or_default()
1955                    .extend(group.nodes);
1956            }
1957        }
1958
1959        Ok(roots
1960            .iter()
1961            .map(|root| ExpansionRootRows {
1962                root_logical_id: root.logical_id.clone(),
1963                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1964            })
1965            .collect())
1966    }
1967
1968    /// Batched expansion: one recursive CTE query per expansion slot that
1969    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
1970    /// source_logical_id ...)` to enforce the per-root hard limit inside the
1971    /// database rather than in Rust.
1972    fn read_expansion_nodes_batched(
1973        &self,
1974        roots: &[NodeRow],
1975        expansion: &ExpansionSlot,
1976        hard_limit: usize,
1977    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1978        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1979        let (join_condition, next_logical_id) = match expansion.direction {
1980            fathomdb_query::TraverseDirection::Out => {
1981                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1982            }
1983            fathomdb_query::TraverseDirection::In => {
1984                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1985            }
1986        };
1987
1988        // EXECUTE-TIME VALIDATION: fused filter against kinds without an FTS schema.
1989        // This check runs at execute time (not builder time) because the target kind
1990        // set for an expand slot is edge-label-scoped, not kind-scoped, and multiple
1991        // target kinds may be reachable via one edge label. See Pack 12 docs.
1992        if expansion.filter.as_ref().is_some_and(|f| {
1993            matches!(
1994                f,
1995                Predicate::JsonPathFusedEq { .. } | Predicate::JsonPathFusedTimestampCmp { .. }
1996            )
1997        }) {
1998            self.validate_fused_filter_for_edge_label(&expansion.label)?;
1999        }
2000
2001        // Build a UNION ALL of SELECT literals for the root seed rows.
2002        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
2003        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
2004        let root_seed_union: String = (1..=root_ids.len())
2005            .map(|i| format!("SELECT ?{i}"))
2006            .collect::<Vec<_>>()
2007            .join(" UNION ALL ");
2008
2009        // Bind params: root IDs occupy ?1..=?N, edge kind is ?(N+1).
2010        // Filter params (if any) follow starting at ?(N+2).
2011        let edge_kind_param = root_ids.len() + 1;
2012        let filter_param_start = root_ids.len() + 2;
2013
2014        // Compile the optional target-side filter to a SQL fragment + bind values.
2015        // The fragment is injected into the `numbered` CTE's WHERE clause BEFORE
2016        // the ROW_NUMBER() window so the per-originator limit counts only matching rows.
2017        let (filter_sql, filter_binds) =
2018            compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2019
2020        // The `root_id` column tracks which root each traversal path
2021        // originated from. The `ROW_NUMBER()` window in the outer query
2022        // enforces the per-root hard limit.
2023        let sql = format!(
2024            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2025            traversed(root_id, logical_id, depth, visited, emitted) AS (
2026                SELECT rid, rid, 0, printf(',%s,', rid), 0
2027                FROM root_ids
2028                UNION ALL
2029                SELECT
2030                    t.root_id,
2031                    {next_logical_id},
2032                    t.depth + 1,
2033                    t.visited || {next_logical_id} || ',',
2034                    t.emitted + 1
2035                FROM traversed t
2036                JOIN edges e ON {join_condition}
2037                    AND e.kind = ?{edge_kind_param}
2038                    AND e.superseded_at IS NULL
2039                WHERE t.depth < {max_depth}
2040                  AND t.emitted < {hard_limit}
2041                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2042            ),
2043            numbered AS (
2044                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2045                     , n.content_ref, am.last_accessed_at
2046                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2047                FROM traversed t
2048                JOIN nodes n ON n.logical_id = t.logical_id
2049                    AND n.superseded_at IS NULL
2050                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2051                WHERE t.depth > 0{filter_sql}
2052            )
2053            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2054            FROM numbered
2055            WHERE rn <= {hard_limit}
2056            ORDER BY root_id, logical_id",
2057            max_depth = expansion.max_depth,
2058        );
2059
2060        let conn_guard = self.lock_connection()?;
2061        let mut statement = conn_guard
2062            .prepare_cached(&sql)
2063            .map_err(EngineError::Sqlite)?;
2064
2065        // Bind root IDs (1..=N) and edge kind (N+1), then filter params (N+2...).
2066        let mut bind_values: Vec<Value> = root_ids
2067            .iter()
2068            .map(|id| Value::Text((*id).to_owned()))
2069            .collect();
2070        bind_values.push(Value::Text(expansion.label.clone()));
2071        bind_values.extend(filter_binds);
2072
2073        let rows = statement
2074            .query_map(params_from_iter(bind_values.iter()), |row| {
2075                Ok((
2076                    row.get::<_, String>(0)?, // root_id
2077                    NodeRow {
2078                        row_id: row.get(1)?,
2079                        logical_id: row.get(2)?,
2080                        kind: row.get(3)?,
2081                        properties: row.get(4)?,
2082                        content_ref: row.get(5)?,
2083                        last_accessed_at: row.get(6)?,
2084                    },
2085                ))
2086            })
2087            .map_err(EngineError::Sqlite)?
2088            .collect::<Result<Vec<_>, _>>()
2089            .map_err(EngineError::Sqlite)?;
2090
2091        // Partition results back into per-root groups, preserving root order.
2092        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2093        for (root_id, node) in rows {
2094            per_root.entry(root_id).or_default().push(node);
2095        }
2096
2097        let root_groups = roots
2098            .iter()
2099            .map(|root| ExpansionRootRows {
2100                root_logical_id: root.logical_id.clone(),
2101                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2102            })
2103            .collect();
2104
2105        Ok(root_groups)
2106    }
2107
2108    /// Validate that all target node kinds reachable via `edge_label` have a
2109    /// registered property-FTS schema. Called at execute time when an expansion
2110    /// slot carries a fused filter predicate.
2111    ///
2112    /// EXECUTE-TIME VALIDATION: this check runs at execute time (not builder
2113    /// time) for expand slots because the target kind set is edge-label-scoped
2114    /// rather than kind-scoped, and is not statically knowable at builder time
2115    /// when multiple target kinds may be reachable via the same label.
2116    /// See Pack 12 docs.
2117    ///
2118    /// # Errors
2119    /// Returns `EngineError::InvalidConfig` if any reachable target kind lacks
2120    /// a registered property-FTS schema.
2121    fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2122        let conn = self.lock_connection()?;
2123        // Collect the distinct node kinds reachable as targets of this edge label.
2124        let mut stmt = conn
2125            .prepare_cached(
2126                "SELECT DISTINCT n.kind \
2127                 FROM edges e \
2128                 JOIN nodes n ON n.logical_id = e.target_logical_id \
2129                 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2130            )
2131            .map_err(EngineError::Sqlite)?;
2132        let target_kinds: Vec<String> = stmt
2133            .query_map(rusqlite::params![edge_label], |row| row.get(0))
2134            .map_err(EngineError::Sqlite)?
2135            .collect::<Result<Vec<_>, _>>()
2136            .map_err(EngineError::Sqlite)?;
2137
2138        for kind in &target_kinds {
2139            let has_schema: bool = conn
2140                .query_row(
2141                    "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2142                    rusqlite::params![kind],
2143                    |row| row.get(0),
2144                )
2145                .map_err(EngineError::Sqlite)?;
2146            if !has_schema {
2147                return Err(EngineError::InvalidConfig(format!(
2148                    "kind {kind:?} has no registered property-FTS schema; register one with \
2149                     admin.register_fts_property_schema(..) before using fused filters on \
2150                     expansion slots, or use JsonPathEq for non-fused semantics \
2151                     (expand slot uses edge label {edge_label:?})"
2152                )));
2153            }
2154        }
2155        Ok(())
2156    }
2157
2158    /// Read a single run by id.
2159    ///
2160    /// # Errors
2161    /// Returns [`EngineError`] if the query fails or if the connection mutex
2162    /// has been poisoned.
2163    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2164        let conn = self.lock_connection()?;
2165        conn.query_row(
2166            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2167            rusqlite::params![id],
2168            |row| {
2169                Ok(RunRow {
2170                    id: row.get(0)?,
2171                    kind: row.get(1)?,
2172                    status: row.get(2)?,
2173                    properties: row.get(3)?,
2174                })
2175            },
2176        )
2177        .optional()
2178        .map_err(EngineError::Sqlite)
2179    }
2180
2181    /// Read a single step by id.
2182    ///
2183    /// # Errors
2184    /// Returns [`EngineError`] if the query fails or if the connection mutex
2185    /// has been poisoned.
2186    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2187        let conn = self.lock_connection()?;
2188        conn.query_row(
2189            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2190            rusqlite::params![id],
2191            |row| {
2192                Ok(StepRow {
2193                    id: row.get(0)?,
2194                    run_id: row.get(1)?,
2195                    kind: row.get(2)?,
2196                    status: row.get(3)?,
2197                    properties: row.get(4)?,
2198                })
2199            },
2200        )
2201        .optional()
2202        .map_err(EngineError::Sqlite)
2203    }
2204
2205    /// Read a single action by id.
2206    ///
2207    /// # Errors
2208    /// Returns [`EngineError`] if the query fails or if the connection mutex
2209    /// has been poisoned.
2210    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2211        let conn = self.lock_connection()?;
2212        conn.query_row(
2213            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2214            rusqlite::params![id],
2215            |row| {
2216                Ok(ActionRow {
2217                    id: row.get(0)?,
2218                    step_id: row.get(1)?,
2219                    kind: row.get(2)?,
2220                    status: row.get(3)?,
2221                    properties: row.get(4)?,
2222                })
2223            },
2224        )
2225        .optional()
2226        .map_err(EngineError::Sqlite)
2227    }
2228
2229    /// Read all active (non-superseded) runs.
2230    ///
2231    /// # Errors
2232    /// Returns [`EngineError`] if the query fails or if the connection mutex
2233    /// has been poisoned.
2234    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2235        let conn = self.lock_connection()?;
2236        let mut stmt = conn
2237            .prepare_cached(
2238                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2239            )
2240            .map_err(EngineError::Sqlite)?;
2241        let rows = stmt
2242            .query_map([], |row| {
2243                Ok(RunRow {
2244                    id: row.get(0)?,
2245                    kind: row.get(1)?,
2246                    status: row.get(2)?,
2247                    properties: row.get(3)?,
2248                })
2249            })
2250            .map_err(EngineError::Sqlite)?
2251            .collect::<Result<Vec<_>, _>>()
2252            .map_err(EngineError::Sqlite)?;
2253        Ok(rows)
2254    }
2255
2256    /// Returns the number of shape→SQL entries currently indexed.
2257    ///
2258    /// Each distinct query shape (structural hash of kind + steps + limits)
2259    /// maps to exactly one SQL string.  This is a test-oriented introspection
2260    /// helper; it does not reflect rusqlite's internal prepared-statement
2261    /// cache, which is keyed by SQL text.
2262    ///
2263    /// # Panics
2264    /// Panics if the internal shape-SQL-map mutex is poisoned.
2265    #[must_use]
2266    #[allow(clippy::expect_used)]
2267    pub fn shape_sql_count(&self) -> usize {
2268        self.shape_sql_map
2269            .lock()
2270            .unwrap_or_else(PoisonError::into_inner)
2271            .len()
2272    }
2273
2274    /// Returns a cloned `Arc` to the schema manager.
2275    #[must_use]
2276    pub fn schema_manager(&self) -> Arc<SchemaManager> {
2277        Arc::clone(&self.schema_manager)
2278    }
2279
2280    /// Return the execution plan for a compiled query without executing it.
2281    ///
2282    /// Useful for debugging, testing shape-hash caching, and operator
2283    /// diagnostics. Does not open a transaction or touch the database beyond
2284    /// checking the statement cache.
2285    ///
2286    /// # Panics
2287    /// Panics if the internal shape-SQL-map mutex is poisoned.
2288    #[must_use]
2289    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2290        let cache_hit = self
2291            .shape_sql_map
2292            .lock()
2293            .unwrap_or_else(PoisonError::into_inner)
2294            .contains_key(&compiled.shape_hash);
2295        QueryPlan {
2296            sql: wrap_node_row_projection_sql(&compiled.sql),
2297            bind_count: compiled.binds.len(),
2298            driving_table: compiled.driving_table,
2299            shape_hash: compiled.shape_hash,
2300            cache_hit,
2301        }
2302    }
2303
2304    /// Execute a named PRAGMA and return the result as a String.
2305    /// Used by Layer 1 tests to verify startup pragma initialization.
2306    ///
2307    /// # Errors
2308    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
2309    /// mutex has been poisoned.
2310    #[doc(hidden)]
2311    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2312        let conn = self.lock_connection()?;
2313        let result = conn
2314            .query_row(&format!("PRAGMA {name}"), [], |row| {
2315                // PRAGMAs may return TEXT or INTEGER; normalise to String.
2316                row.get::<_, rusqlite::types::Value>(0)
2317            })
2318            .map_err(EngineError::Sqlite)?;
2319        let s = match result {
2320            rusqlite::types::Value::Text(t) => t,
2321            rusqlite::types::Value::Integer(i) => i.to_string(),
2322            rusqlite::types::Value::Real(f) => f.to_string(),
2323            rusqlite::types::Value::Blob(_) => {
2324                return Err(EngineError::InvalidWrite(format!(
2325                    "PRAGMA {name} returned an unexpected BLOB value"
2326                )));
2327            }
2328            rusqlite::types::Value::Null => String::new(),
2329        };
2330        Ok(s)
2331    }
2332
2333    /// Return all provenance events whose `subject` matches the given value.
2334    ///
2335    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
2336    /// values (for excise events).
2337    ///
2338    /// # Errors
2339    /// Returns [`EngineError`] if the query fails or if the connection mutex
2340    /// has been poisoned.
2341    pub fn query_provenance_events(
2342        &self,
2343        subject: &str,
2344    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2345        let conn = self.lock_connection()?;
2346        let mut stmt = conn
2347            .prepare_cached(
2348                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2349                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2350            )
2351            .map_err(EngineError::Sqlite)?;
2352        let events = stmt
2353            .query_map(rusqlite::params![subject], |row| {
2354                Ok(ProvenanceEvent {
2355                    id: row.get(0)?,
2356                    event_type: row.get(1)?,
2357                    subject: row.get(2)?,
2358                    source_ref: row.get(3)?,
2359                    metadata_json: row.get(4)?,
2360                    created_at: row.get(5)?,
2361                })
2362            })
2363            .map_err(EngineError::Sqlite)?
2364            .collect::<Result<Vec<_>, _>>()
2365            .map_err(EngineError::Sqlite)?;
2366        Ok(events)
2367    }
2368
2369    /// Check if `kind` has a first-registration async rebuild in progress
2370    /// (`is_first_registration=1` with state PENDING/BUILDING/SWAPPING and no
2371    /// rows yet in the per-kind `fts_props_<kind>` table). If so, execute a
2372    /// full-kind scan and return the nodes. Returns `None` when the normal
2373    /// FTS5 path should run.
2374    fn scan_fallback_if_first_registration(
2375        &self,
2376        kind: &str,
2377    ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2378        let conn = self.lock_connection()?;
2379
2380        // Quick point-lookup: kind has a first-registration rebuild in a
2381        // pre-complete state AND the per-kind table has no rows yet.
2382        let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2383        // Check whether the per-kind table exists before querying its row count.
2384        let table_exists: bool = conn
2385            .query_row(
2386                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2387                rusqlite::params![prop_table],
2388                |_| Ok(true),
2389            )
2390            .optional()?
2391            .unwrap_or(false);
2392        let prop_empty = if table_exists {
2393            let cnt: i64 =
2394                conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2395                    r.get(0)
2396                })?;
2397            cnt == 0
2398        } else {
2399            true
2400        };
2401        let needs_scan: bool = if prop_empty {
2402            conn.query_row(
2403                "SELECT 1 FROM fts_property_rebuild_state \
2404                 WHERE kind = ?1 AND is_first_registration = 1 \
2405                 AND state IN ('PENDING','BUILDING','SWAPPING') \
2406                 LIMIT 1",
2407                rusqlite::params![kind],
2408                |_| Ok(true),
2409            )
2410            .optional()?
2411            .unwrap_or(false)
2412        } else {
2413            false
2414        };
2415
2416        if !needs_scan {
2417            return Ok(None);
2418        }
2419
2420        // Scan fallback: return all active nodes of this kind.
2421        // Intentionally unindexed — acceptable for first-registration window.
2422        let mut stmt = conn
2423            .prepare_cached(
2424                "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2425                 am.last_accessed_at \
2426                 FROM nodes n \
2427                 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2428                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2429            )
2430            .map_err(EngineError::Sqlite)?;
2431
2432        let nodes = stmt
2433            .query_map(rusqlite::params![kind], |row| {
2434                Ok(NodeRow {
2435                    row_id: row.get(0)?,
2436                    logical_id: row.get(1)?,
2437                    kind: row.get(2)?,
2438                    properties: row.get(3)?,
2439                    content_ref: row.get(4)?,
2440                    last_accessed_at: row.get(5)?,
2441                })
2442            })
2443            .map_err(EngineError::Sqlite)?
2444            .collect::<Result<Vec<_>, _>>()
2445            .map_err(EngineError::Sqlite)?;
2446
2447        Ok(Some(nodes))
2448    }
2449
2450    /// Return the current rebuild progress for a kind, or `None` if no rebuild
2451    /// has been registered for that kind.
2452    ///
2453    /// # Errors
2454    /// Returns [`EngineError`] if the database query fails.
2455    pub fn get_property_fts_rebuild_progress(
2456        &self,
2457        kind: &str,
2458    ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2459        let conn = self.lock_connection()?;
2460        let row = conn
2461            .query_row(
2462                "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2463                 FROM fts_property_rebuild_state WHERE kind = ?1",
2464                rusqlite::params![kind],
2465                |r| {
2466                    Ok(crate::rebuild_actor::RebuildProgress {
2467                        state: r.get(0)?,
2468                        rows_total: r.get(1)?,
2469                        rows_done: r.get(2)?,
2470                        started_at: r.get(3)?,
2471                        last_progress_at: r.get(4)?,
2472                        error_message: r.get(5)?,
2473                    })
2474                },
2475            )
2476            .optional()?;
2477        Ok(row)
2478    }
2479}
2480
2481/// Rewrite a `CompiledQuery` whose SQL references the legacy `fts_node_properties`
2482/// table to use the per-kind `fts_props_<kind>` table, or strip the property FTS
2483/// arm entirely when the per-kind table does not exist in `sqlite_master`.
2484///
2485/// Returns `(adapted_sql, adapted_binds)`.
2486fn adapt_fts_nodes_sql_for_per_kind_tables(
2487    compiled: &CompiledQuery,
2488    conn: &rusqlite::Connection,
2489) -> Result<(String, Vec<BindValue>), EngineError> {
2490    let root_kind = compiled
2491        .binds
2492        .get(1)
2493        .and_then(|b| {
2494            if let BindValue::Text(k) = b {
2495                Some(k.as_str())
2496            } else {
2497                None
2498            }
2499        })
2500        .unwrap_or("");
2501    let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
2502    let prop_table_exists: bool = conn
2503        .query_row(
2504            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2505            rusqlite::params![prop_table],
2506            |_| Ok(true),
2507        )
2508        .optional()
2509        .map_err(EngineError::Sqlite)?
2510        .unwrap_or(false);
2511
2512    // The compile_query path assigns fixed positional parameters:
2513    //   ?1 = text (chunk FTS), ?2 = kind (chunk filter),
2514    //   ?3 = text (prop FTS),  ?4 = kind (prop filter),
2515    //   ?5+ = fusable/residual predicates
2516    let (new_sql, removed_bind_positions) = if prop_table_exists {
2517        let s = compiled
2518            .sql
2519            .replace("fts_node_properties", &prop_table)
2520            .replace("\n                          AND fp.kind = ?4", "");
2521        (renumber_sql_params(&s, &[4]), vec![3usize])
2522    } else {
2523        let s = strip_prop_fts_union_arm(&compiled.sql);
2524        (renumber_sql_params(&s, &[3, 4]), vec![2usize, 3])
2525    };
2526
2527    let new_binds: Vec<BindValue> = compiled
2528        .binds
2529        .iter()
2530        .enumerate()
2531        .filter(|(i, _)| !removed_bind_positions.contains(i))
2532        .map(|(_, b)| b.clone())
2533        .collect();
2534
2535    Ok((new_sql, new_binds))
2536}
2537
2538/// Check that the active vector profile's model identity and dimensions match
2539/// the supplied embedder. If either differs, emit a warning via `trace_warn!`
2540/// but always return `Ok(())`. This function NEVER returns `Err`.
2541///
2542/// Queries `projection_profiles` directly — no `AdminService` indirection.
2543#[allow(clippy::unnecessary_wraps)]
2544fn check_vec_identity_at_open(
2545    conn: &rusqlite::Connection,
2546    embedder: &dyn QueryEmbedder,
2547) -> Result<(), EngineError> {
2548    let row: Option<String> = conn
2549        .query_row(
2550            "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
2551            [],
2552            |row| row.get(0),
2553        )
2554        .optional()
2555        .unwrap_or(None);
2556
2557    let Some(config_json) = row else {
2558        return Ok(());
2559    };
2560
2561    // Parse leniently — any error means we just skip the check.
2562    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
2563        return Ok(());
2564    };
2565
2566    let identity = embedder.identity();
2567
2568    if let Some(stored_model) = parsed
2569        .get("model_identity")
2570        .and_then(serde_json::Value::as_str)
2571        && stored_model != identity.model_identity
2572    {
2573        trace_warn!(
2574            stored_model_identity = stored_model,
2575            embedder_model_identity = %identity.model_identity,
2576            "vec identity mismatch at open: model_identity differs"
2577        );
2578    }
2579
2580    if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
2581        let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
2582        if stored_dim != identity.dimension {
2583            trace_warn!(
2584                stored_dimensions = stored_dim,
2585                embedder_dimensions = identity.dimension,
2586                "vec identity mismatch at open: dimensions differ"
2587            );
2588        }
2589    }
2590
2591    Ok(())
2592}
2593
2594/// Open-time FTS rebuild guards (Guard 1 + Guard 2).
2595///
2596/// Guard 1: if any registered kind's per-kind `fts_props_<kind>` table is
2597/// missing or empty while live nodes of that kind exist, do a synchronous
2598/// full rebuild.
2599///
2600/// Guard 2: if any recursive schema is registered but
2601/// `fts_node_property_positions` is empty, do a synchronous full rebuild to
2602/// regenerate the position map.
2603///
2604/// Both guards are no-ops on a consistent database.
2605fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
2606    let schema_count: i64 = conn
2607        .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
2608            row.get(0)
2609        })
2610        .map_err(EngineError::Sqlite)?;
2611    if schema_count == 0 {
2612        return Ok(());
2613    }
2614
2615    let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
2616    let needs_position_backfill = if needs_fts_rebuild {
2617        false
2618    } else {
2619        open_guard_check_positions_empty(conn)?
2620    };
2621
2622    if needs_fts_rebuild || needs_position_backfill {
2623        let per_kind_tables: Vec<String> = {
2624            let mut stmt = conn
2625                .prepare(
2626                    "SELECT name FROM sqlite_master \
2627                     WHERE type='table' AND name LIKE 'fts_props_%' \
2628                     AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2629                )
2630                .map_err(EngineError::Sqlite)?;
2631            stmt.query_map([], |r| r.get::<_, String>(0))
2632                .map_err(EngineError::Sqlite)?
2633                .collect::<Result<Vec<_>, _>>()
2634                .map_err(EngineError::Sqlite)?
2635        };
2636        let tx = conn
2637            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
2638            .map_err(EngineError::Sqlite)?;
2639        for table in &per_kind_tables {
2640            tx.execute_batch(&format!("DELETE FROM {table}"))
2641                .map_err(EngineError::Sqlite)?;
2642        }
2643        tx.execute("DELETE FROM fts_node_property_positions", [])
2644            .map_err(EngineError::Sqlite)?;
2645        crate::projection::insert_property_fts_rows(
2646            &tx,
2647            "SELECT logical_id, properties FROM nodes \
2648             WHERE kind = ?1 AND superseded_at IS NULL",
2649        )
2650        .map_err(EngineError::Sqlite)?;
2651        tx.commit().map_err(EngineError::Sqlite)?;
2652    }
2653    Ok(())
2654}
2655
2656fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2657    let kinds: Vec<String> = {
2658        let mut stmt = conn
2659            .prepare("SELECT kind FROM fts_property_schemas")
2660            .map_err(EngineError::Sqlite)?;
2661        stmt.query_map([], |row| row.get::<_, String>(0))
2662            .map_err(EngineError::Sqlite)?
2663            .collect::<Result<Vec<_>, _>>()
2664            .map_err(EngineError::Sqlite)?
2665    };
2666    for kind in &kinds {
2667        let table = fathomdb_schema::fts_kind_table_name(kind);
2668        let table_exists: bool = conn
2669            .query_row(
2670                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2671                rusqlite::params![table],
2672                |_| Ok(true),
2673            )
2674            .optional()
2675            .map_err(EngineError::Sqlite)?
2676            .unwrap_or(false);
2677        let fts_count: i64 = if table_exists {
2678            conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
2679                row.get(0)
2680            })
2681            .map_err(EngineError::Sqlite)?
2682        } else {
2683            0
2684        };
2685        if fts_count == 0 {
2686            let node_count: i64 = conn
2687                .query_row(
2688                    "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
2689                    rusqlite::params![kind],
2690                    |row| row.get(0),
2691                )
2692                .map_err(EngineError::Sqlite)?;
2693            if node_count > 0 {
2694                return Ok(true);
2695            }
2696        }
2697    }
2698    Ok(false)
2699}
2700
2701fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2702    let recursive_count: i64 = conn
2703        .query_row(
2704            "SELECT COUNT(*) FROM fts_property_schemas \
2705             WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
2706            [],
2707            |row| row.get(0),
2708        )
2709        .map_err(EngineError::Sqlite)?;
2710    if recursive_count == 0 {
2711        return Ok(false);
2712    }
2713    let pos_count: i64 = conn
2714        .query_row(
2715            "SELECT COUNT(*) FROM fts_node_property_positions",
2716            [],
2717            |row| row.get(0),
2718        )
2719        .map_err(EngineError::Sqlite)?;
2720    Ok(pos_count == 0)
2721}
2722
2723/// Renumber `SQLite` positional parameters in `sql` after removing the given
2724/// 1-based parameter numbers from `removed` (sorted ascending).
2725///
2726/// Each `?N` in the SQL where `N` is in `removed` is left in place (the caller
2727/// must have already deleted those references from the SQL). Every `?N` where
2728/// `N` is greater than any removed parameter is decremented by the count of
2729/// removed parameters that are less than `N`.
2730///
2731/// Example: if `removed = [4]` then `?5` → `?4`, `?6` → `?5`, etc.
2732/// Example: if `removed = [3, 4]` then `?5` → `?3`, `?6` → `?4`, etc.
2733fn renumber_sql_params(sql: &str, removed: &[usize]) -> String {
2734    // We walk the string looking for `?` followed by decimal digits and
2735    // replace the number according to the removal offset.
2736    let mut result = String::with_capacity(sql.len());
2737    let bytes = sql.as_bytes();
2738    let mut i = 0;
2739    while i < bytes.len() {
2740        if bytes[i] == b'?' {
2741            // Check if next chars are digits.
2742            let num_start = i + 1;
2743            let mut j = num_start;
2744            while j < bytes.len() && bytes[j].is_ascii_digit() {
2745                j += 1;
2746            }
2747            if j > num_start {
2748                // Parse the parameter number (1-based).
2749                let num_str = &sql[num_start..j];
2750                if let Ok(n) = num_str.parse::<usize>() {
2751                    // Count how many removed params are < n.
2752                    let offset = removed.iter().filter(|&&r| r < n).count();
2753                    result.push('?');
2754                    result.push_str(&(n - offset).to_string());
2755                    i = j;
2756                    continue;
2757                }
2758            }
2759        }
2760        result.push(bytes[i] as char);
2761        i += 1;
2762    }
2763    result
2764}
2765
2766fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2767    format!(
2768        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2769         FROM ({base_sql}) q \
2770         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2771    )
2772}
2773
2774/// Strip the property FTS UNION arm from a `compile_query`-generated
2775/// `DrivingTable::FtsNodes` SQL string.
2776///
2777/// When the per-kind `fts_props_<kind>` table does not yet exist the
2778/// `UNION SELECT ... FROM fts_node_properties ...` arm must be removed so the
2779/// query degrades to chunk-only results instead of failing with "no such table".
2780///
2781/// The SQL structure from `compile_query` (fathomdb-query) is stable:
2782/// ```text
2783///                     UNION
2784///                     SELECT fp.node_logical_id AS logical_id
2785///                     FROM fts_node_properties fp
2786///                     ...
2787///                     WHERE fts_node_properties MATCH ?3
2788///                       AND fp.kind = ?4
2789///                 ) u
2790/// ```
2791/// We locate the `UNION` that precedes `fts_node_properties` and cut
2792/// everything from it to the closing `) u`.
2793fn strip_prop_fts_union_arm(sql: &str) -> String {
2794    // The UNION arm in compile_query-generated FtsNodes SQL has:
2795    //   - UNION with 24 spaces of indentation
2796    //   - SELECT fp.node_logical_id with 24 spaces of indentation
2797    //   - ending at "\n                    ) u" (20 spaces before ") u")
2798    // Match the UNION that is immediately followed by the property arm.
2799    let union_marker =
2800        "                        UNION\n                        SELECT fp.node_logical_id";
2801    if let Some(start) = sql.find(union_marker) {
2802        // Find the closing ") u" after the property arm.
2803        let end_marker = "\n                    ) u";
2804        if let Some(rel_end) = sql[start..].find(end_marker) {
2805            let end = start + rel_end;
2806            // Remove from UNION start to (but not including) the "\n                    ) u" closing.
2807            return format!("{}{}", &sql[..start], &sql[end..]);
2808        }
2809    }
2810    // Fallback: return unchanged if pattern not found (shouldn't happen).
2811    sql.to_owned()
2812}
2813
2814/// Returns `true` when `err` indicates the vec virtual table is absent
2815/// (sqlite-vec feature enabled but `vec_nodes_active` not yet created).
2816pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2817    match err {
2818        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2819            msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
2820        }
2821        _ => false,
2822    }
2823}
2824
2825fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2826    match value {
2827        ScalarValue::Text(text) => BindValue::Text(text.clone()),
2828        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2829        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2830    }
2831}
2832
2833/// Merge strict and relaxed search branches into a single block-ordered,
2834/// deduplicated, limit-truncated hit list.
2835///
2836/// Phase 3 rules, in order:
2837///
2838/// 1. Each branch is sorted internally by score descending with `logical_id`
2839///    ascending as the deterministic tiebreak.
2840/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
2841///    once from the chunk surface and once from the property surface) the
2842///    higher-score row wins, then chunk > property > vector, then declaration
2843///    order (chunk first).
2844/// 3. Strict hits form one block and relaxed hits form the next. Strict
2845///    always precedes relaxed in the merged output regardless of per-hit
2846///    score.
2847/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
2848///    already appears in the strict block is dropped.
2849/// 5. The merged output is truncated to `limit`.
2850fn merge_search_branches(
2851    strict: Vec<SearchHit>,
2852    relaxed: Vec<SearchHit>,
2853    limit: usize,
2854) -> Vec<SearchHit> {
2855    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2856}
2857
2858/// Three-branch generalization of [`merge_search_branches`]: orders hits as
2859/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
2860/// Semantics, with cross-branch dedup resolved by branch precedence
2861/// (strict > relaxed > vector). Within each block the existing
2862/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
2863/// priority chunk > property > vector).
2864///
2865/// Phase 12 (the unified `search()` entry point) calls this directly. The
2866/// two-branch [`merge_search_branches`] wrapper is preserved as a
2867/// convenience for the text-only `execute_compiled_search_plan` path; both
2868/// reduce to the same code.
2869fn merge_search_branches_three(
2870    strict: Vec<SearchHit>,
2871    relaxed: Vec<SearchHit>,
2872    vector: Vec<SearchHit>,
2873    limit: usize,
2874) -> Vec<SearchHit> {
2875    let strict_block = dedup_branch_hits(strict);
2876    let relaxed_block = dedup_branch_hits(relaxed);
2877    let vector_block = dedup_branch_hits(vector);
2878
2879    let mut seen: std::collections::HashSet<String> = strict_block
2880        .iter()
2881        .map(|h| h.node.logical_id.clone())
2882        .collect();
2883
2884    let mut merged = strict_block;
2885    for hit in relaxed_block {
2886        if seen.insert(hit.node.logical_id.clone()) {
2887            merged.push(hit);
2888        }
2889    }
2890    for hit in vector_block {
2891        if seen.insert(hit.node.logical_id.clone()) {
2892            merged.push(hit);
2893        }
2894    }
2895
2896    if merged.len() > limit {
2897        merged.truncate(limit);
2898    }
2899    merged
2900}
2901
2902/// Sort a branch's hits by score descending + `logical_id` ascending, then
2903/// dedup duplicate `logical_id`s within the branch using source priority
2904/// (chunk > property > vector) and declaration order.
2905fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2906    hits.sort_by(|a, b| {
2907        b.score
2908            .partial_cmp(&a.score)
2909            .unwrap_or(std::cmp::Ordering::Equal)
2910            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2911            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2912    });
2913
2914    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2915    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2916    hits
2917}
2918
2919fn source_priority(source: SearchHitSource) -> u8 {
2920    // Lower is better. Chunk is declared before property in the CTE; vector
2921    // is reserved for future wiring but comes last among the known variants.
2922    match source {
2923        SearchHitSource::Chunk => 0,
2924        SearchHitSource::Property => 1,
2925        SearchHitSource::Vector => 2,
2926    }
2927}
2928
2929/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
2930/// output so the coordinator can recover per-term byte offsets in the
2931/// original `text_content` column.
2932///
2933/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
2934/// ("start of text") byte. These bytes are safe for all valid JSON text
2935/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
2936/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
2937/// extracted blob, making the sentinel ambiguous for that row. Such input
2938/// is treated as out of scope for attribution correctness — hits on those
2939/// rows may have misattributed `matched_paths`, but no panic or query
2940/// failure occurs. A future hardening step could strip bytes < 0x20 at
2941/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
2942///
2943/// Using one-byte markers keeps the original-to-highlighted offset
2944/// accounting trivial: every sentinel adds exactly one byte to the
2945/// highlighted string.
2946const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2947const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2948
2949/// Load the `fts_node_property_positions` sidecar rows for a given
2950/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
2951/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
2952fn load_position_map(
2953    conn: &Connection,
2954    logical_id: &str,
2955    kind: &str,
2956) -> Result<Vec<(usize, usize, String)>, EngineError> {
2957    let mut stmt = conn
2958        .prepare_cached(
2959            "SELECT start_offset, end_offset, leaf_path \
2960             FROM fts_node_property_positions \
2961             WHERE node_logical_id = ?1 AND kind = ?2 \
2962             ORDER BY start_offset ASC",
2963        )
2964        .map_err(EngineError::Sqlite)?;
2965    let rows = stmt
2966        .query_map(rusqlite::params![logical_id, kind], |row| {
2967            let start: i64 = row.get(0)?;
2968            let end: i64 = row.get(1)?;
2969            let path: String = row.get(2)?;
2970            // Offsets are non-negative and within blob byte limits; on the
2971            // off chance a corrupt row is encountered, fall back to 0 so
2972            // lookups silently skip it rather than panicking.
2973            let start = usize::try_from(start).unwrap_or(0);
2974            let end = usize::try_from(end).unwrap_or(0);
2975            Ok((start, end, path))
2976        })
2977        .map_err(EngineError::Sqlite)?;
2978    let mut out = Vec::new();
2979    for row in rows {
2980        out.push(row.map_err(EngineError::Sqlite)?);
2981    }
2982    Ok(out)
2983}
2984
2985/// Parse a `highlight()`-wrapped string, returning the list of original-text
2986/// byte offsets at which matched terms begin. `wrapped` is the
2987/// highlight-decorated form of a text column; `open` / `close` are the
2988/// sentinel markers passed to `highlight()`. The returned offsets refer to
2989/// positions in the *original* text (i.e. the column as it would be stored
2990/// without highlight decoration).
2991fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2992    let mut offsets = Vec::new();
2993    let bytes = wrapped.as_bytes();
2994    let open_bytes = open.as_bytes();
2995    let close_bytes = close.as_bytes();
2996    let mut i = 0usize;
2997    // Number of sentinel bytes consumed so far — every marker encountered
2998    // subtracts from the wrapped-string index to get the original offset.
2999    let mut marker_bytes_seen = 0usize;
3000    while i < bytes.len() {
3001        if bytes[i..].starts_with(open_bytes) {
3002            // Record the original-text offset of the term following the open
3003            // marker.
3004            let original_offset = i - marker_bytes_seen;
3005            offsets.push(original_offset);
3006            i += open_bytes.len();
3007            marker_bytes_seen += open_bytes.len();
3008        } else if bytes[i..].starts_with(close_bytes) {
3009            i += close_bytes.len();
3010            marker_bytes_seen += close_bytes.len();
3011        } else {
3012            i += 1;
3013        }
3014    }
3015    offsets
3016}
3017
3018/// Binary-search the position map for the leaf whose `[start, end)` range
3019/// contains `offset`. Returns `None` if no leaf covers the offset.
3020fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3021    // Binary search for the greatest start_offset <= offset.
3022    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3023        Ok(i) => i,
3024        Err(0) => return None,
3025        Err(i) => i - 1,
3026    };
3027    let (start, end, path) = &positions[idx];
3028    if offset >= *start && offset < *end {
3029        Some(path.as_str())
3030    } else {
3031        None
3032    }
3033}
3034
3035/// Resolve per-hit match attribution by introspecting the FTS5 match state
3036/// for the hit's row via `highlight()` and mapping the resulting original-
3037/// text offsets back to recursive-leaf paths via the Phase 4 position map.
3038///
3039/// Chunk-backed hits carry no leaf structure and always return an empty
3040/// `matched_paths` vector. Property-backed hits without a `projection_row_id`
3041/// (which should not happen — the search CTE always populates it) also
3042/// return empty attribution rather than erroring.
3043fn resolve_hit_attribution(
3044    conn: &Connection,
3045    hit: &SearchHit,
3046    match_expr: &str,
3047) -> Result<HitAttribution, EngineError> {
3048    if !matches!(hit.source, SearchHitSource::Property) {
3049        return Ok(HitAttribution {
3050            matched_paths: Vec::new(),
3051        });
3052    }
3053    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3054        return Ok(HitAttribution {
3055            matched_paths: Vec::new(),
3056        });
3057    };
3058    let rowid: i64 = match rowid_str.parse() {
3059        Ok(v) => v,
3060        Err(_) => {
3061            return Ok(HitAttribution {
3062                matched_paths: Vec::new(),
3063            });
3064        }
3065    };
3066
3067    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
3068    // FTS5 MATCH in the WHERE clause re-establishes the match state that
3069    // `highlight()` needs to decorate the returned text.
3070    // Per-kind tables have schema (node_logical_id UNINDEXED, text_content),
3071    // so text_content is column index 1 (0-based).
3072    let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3073    let highlight_sql = format!(
3074        "SELECT highlight({prop_table}, 1, ?1, ?2) \
3075         FROM {prop_table} \
3076         WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3077    );
3078    let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3079    let wrapped: Option<String> = stmt
3080        .query_row(
3081            rusqlite::params![
3082                ATTRIBUTION_HIGHLIGHT_OPEN,
3083                ATTRIBUTION_HIGHLIGHT_CLOSE,
3084                rowid,
3085                match_expr,
3086            ],
3087            |row| row.get(0),
3088        )
3089        .optional()
3090        .map_err(EngineError::Sqlite)?;
3091    let Some(wrapped) = wrapped else {
3092        return Ok(HitAttribution {
3093            matched_paths: Vec::new(),
3094        });
3095    };
3096
3097    let offsets = parse_highlight_offsets(
3098        &wrapped,
3099        ATTRIBUTION_HIGHLIGHT_OPEN,
3100        ATTRIBUTION_HIGHLIGHT_CLOSE,
3101    );
3102    if offsets.is_empty() {
3103        return Ok(HitAttribution {
3104            matched_paths: Vec::new(),
3105        });
3106    }
3107
3108    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3109    if positions.is_empty() {
3110        // Scalar-only schemas have no position-map entries; attribution
3111        // degrades to an empty vector rather than erroring.
3112        return Ok(HitAttribution {
3113            matched_paths: Vec::new(),
3114        });
3115    }
3116
3117    let mut matched_paths: Vec<String> = Vec::new();
3118    for offset in offsets {
3119        if let Some(path) = find_leaf_for_offset(&positions, offset)
3120            && !matched_paths.iter().any(|p| p == path)
3121        {
3122            matched_paths.push(path.to_owned());
3123        }
3124    }
3125    Ok(HitAttribution { matched_paths })
3126}
3127
3128/// Build a BM25 scoring expression for a per-kind FTS5 table.
3129///
3130/// If the schema has no weighted specs (all weights None), returns `bm25({table})`.
3131/// Otherwise returns `bm25({table}, 0.0, w1, w2, ...)` where the first weight
3132/// (0.0) is for the `node_logical_id UNINDEXED` column (which BM25 should ignore),
3133/// then one weight per spec in schema order.
3134fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3135    let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3136    let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3137    if !any_weighted {
3138        return format!("bm25({table})");
3139    }
3140    // node_logical_id is UNINDEXED — weight 0.0 tells BM25 to ignore it.
3141    let weights: Vec<String> = std::iter::once("0.0".to_owned())
3142        .chain(
3143            schema
3144                .paths
3145                .iter()
3146                .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3147        )
3148        .collect();
3149    format!("bm25({table}, {})", weights.join(", "))
3150}
3151
3152fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3153    match value {
3154        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3155        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3156        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3157    }
3158}
3159
3160#[cfg(test)]
3161#[allow(clippy::expect_used)]
3162mod tests {
3163    use std::panic::{AssertUnwindSafe, catch_unwind};
3164    use std::sync::Arc;
3165
3166    use fathomdb_query::{BindValue, QueryBuilder};
3167    use fathomdb_schema::SchemaManager;
3168    use rusqlite::types::Value;
3169    use tempfile::NamedTempFile;
3170
3171    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3172
3173    use fathomdb_query::{
3174        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3175    };
3176
3177    use super::{
3178        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3179        wrap_node_row_projection_sql,
3180    };
3181
3182    fn mk_hit(
3183        logical_id: &str,
3184        score: f64,
3185        match_mode: SearchMatchMode,
3186        source: SearchHitSource,
3187    ) -> SearchHit {
3188        SearchHit {
3189            node: NodeRowLite {
3190                row_id: format!("{logical_id}-row"),
3191                logical_id: logical_id.to_owned(),
3192                kind: "Goal".to_owned(),
3193                properties: "{}".to_owned(),
3194                content_ref: None,
3195                last_accessed_at: None,
3196            },
3197            score,
3198            modality: RetrievalModality::Text,
3199            source,
3200            match_mode: Some(match_mode),
3201            snippet: None,
3202            written_at: 0,
3203            projection_row_id: None,
3204            vector_distance: None,
3205            attribution: None,
3206        }
3207    }
3208
3209    #[test]
3210    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3211        let strict = vec![mk_hit(
3212            "a",
3213            1.0,
3214            SearchMatchMode::Strict,
3215            SearchHitSource::Chunk,
3216        )];
3217        // Relaxed has a higher score but must still come second.
3218        let relaxed = vec![mk_hit(
3219            "b",
3220            9.9,
3221            SearchMatchMode::Relaxed,
3222            SearchHitSource::Chunk,
3223        )];
3224        let merged = merge_search_branches(strict, relaxed, 10);
3225        assert_eq!(merged.len(), 2);
3226        assert_eq!(merged[0].node.logical_id, "a");
3227        assert!(matches!(
3228            merged[0].match_mode,
3229            Some(SearchMatchMode::Strict)
3230        ));
3231        assert_eq!(merged[1].node.logical_id, "b");
3232        assert!(matches!(
3233            merged[1].match_mode,
3234            Some(SearchMatchMode::Relaxed)
3235        ));
3236    }
3237
3238    #[test]
3239    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3240        let strict = vec![mk_hit(
3241            "shared",
3242            1.0,
3243            SearchMatchMode::Strict,
3244            SearchHitSource::Chunk,
3245        )];
3246        let relaxed = vec![
3247            mk_hit(
3248                "shared",
3249                9.9,
3250                SearchMatchMode::Relaxed,
3251                SearchHitSource::Chunk,
3252            ),
3253            mk_hit(
3254                "other",
3255                2.0,
3256                SearchMatchMode::Relaxed,
3257                SearchHitSource::Chunk,
3258            ),
3259        ];
3260        let merged = merge_search_branches(strict, relaxed, 10);
3261        assert_eq!(merged.len(), 2);
3262        assert_eq!(merged[0].node.logical_id, "shared");
3263        assert!(matches!(
3264            merged[0].match_mode,
3265            Some(SearchMatchMode::Strict)
3266        ));
3267        assert_eq!(merged[1].node.logical_id, "other");
3268        assert!(matches!(
3269            merged[1].match_mode,
3270            Some(SearchMatchMode::Relaxed)
3271        ));
3272    }
3273
3274    #[test]
3275    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3276        let strict = vec![
3277            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3278            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3279            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3280        ];
3281        let merged = merge_search_branches(strict, vec![], 10);
3282        assert_eq!(
3283            merged
3284                .iter()
3285                .map(|h| &h.node.logical_id)
3286                .collect::<Vec<_>>(),
3287            vec!["a", "c", "b"]
3288        );
3289    }
3290
3291    #[test]
3292    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3293        let strict = vec![
3294            mk_hit(
3295                "shared",
3296                1.0,
3297                SearchMatchMode::Strict,
3298                SearchHitSource::Property,
3299            ),
3300            mk_hit(
3301                "shared",
3302                1.0,
3303                SearchMatchMode::Strict,
3304                SearchHitSource::Chunk,
3305            ),
3306        ];
3307        let merged = merge_search_branches(strict, vec![], 10);
3308        assert_eq!(merged.len(), 1);
3309        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3310    }
3311
3312    #[test]
3313    fn merge_truncates_to_limit_after_block_merge() {
3314        let strict = vec![
3315            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3316            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3317        ];
3318        let relaxed = vec![mk_hit(
3319            "c",
3320            9.0,
3321            SearchMatchMode::Relaxed,
3322            SearchHitSource::Chunk,
3323        )];
3324        let merged = merge_search_branches(strict, relaxed, 2);
3325        assert_eq!(merged.len(), 2);
3326        assert_eq!(merged[0].node.logical_id, "a");
3327        assert_eq!(merged[1].node.logical_id, "b");
3328    }
3329
3330    /// P12 architectural pin: the generalized three-branch merger must
3331    /// produce strict -> relaxed -> vector block ordering with cross-branch
3332    /// dedup resolved by branch precedence (strict > relaxed > vector).
3333    /// v1 `search()` policy never fires the vector branch through the
3334    /// unified planner because read-time embedding is deferred, but the
3335    /// merge helper itself must be ready for the day the planner does so —
3336    /// otherwise wiring the future phase requires touching the core merge
3337    /// code as well as the planner.
3338    #[test]
3339    fn search_architecturally_supports_three_branch_fusion() {
3340        let strict = vec![mk_hit(
3341            "alpha",
3342            1.0,
3343            SearchMatchMode::Strict,
3344            SearchHitSource::Chunk,
3345        )];
3346        let relaxed = vec![mk_hit(
3347            "bravo",
3348            5.0,
3349            SearchMatchMode::Relaxed,
3350            SearchHitSource::Chunk,
3351        )];
3352        // Synthetic vector hit with the highest score. Three-block ordering
3353        // must still place it last.
3354        let mut vector_hit = mk_hit(
3355            "charlie",
3356            9.9,
3357            SearchMatchMode::Strict,
3358            SearchHitSource::Vector,
3359        );
3360        // Vector hits actually carry match_mode=None per the addendum, but
3361        // the merge helper's ordering is mode-agnostic; we override here to
3362        // pin the modality field for the test.
3363        vector_hit.match_mode = None;
3364        vector_hit.modality = RetrievalModality::Vector;
3365        let vector = vec![vector_hit];
3366
3367        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3368        assert_eq!(merged.len(), 3);
3369        assert_eq!(merged[0].node.logical_id, "alpha");
3370        assert_eq!(merged[1].node.logical_id, "bravo");
3371        assert_eq!(merged[2].node.logical_id, "charlie");
3372        // Vector block comes last regardless of its higher score.
3373        assert!(matches!(merged[2].source, SearchHitSource::Vector));
3374
3375        // Cross-branch dedup: a logical_id that appears in multiple branches
3376        // is attributed to its highest-priority originating branch only.
3377        let strict2 = vec![mk_hit(
3378            "shared",
3379            0.5,
3380            SearchMatchMode::Strict,
3381            SearchHitSource::Chunk,
3382        )];
3383        let relaxed2 = vec![mk_hit(
3384            "shared",
3385            5.0,
3386            SearchMatchMode::Relaxed,
3387            SearchHitSource::Chunk,
3388        )];
3389        let mut vshared = mk_hit(
3390            "shared",
3391            9.9,
3392            SearchMatchMode::Strict,
3393            SearchHitSource::Vector,
3394        );
3395        vshared.match_mode = None;
3396        vshared.modality = RetrievalModality::Vector;
3397        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3398        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3399        assert!(matches!(
3400            merged2[0].match_mode,
3401            Some(SearchMatchMode::Strict)
3402        ));
3403        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3404
3405        // Relaxed wins over vector when strict is absent.
3406        let mut vshared2 = mk_hit(
3407            "shared",
3408            9.9,
3409            SearchMatchMode::Strict,
3410            SearchHitSource::Vector,
3411        );
3412        vshared2.match_mode = None;
3413        vshared2.modality = RetrievalModality::Vector;
3414        let merged3 = merge_search_branches_three(
3415            vec![],
3416            vec![mk_hit(
3417                "shared",
3418                1.0,
3419                SearchMatchMode::Relaxed,
3420                SearchHitSource::Chunk,
3421            )],
3422            vec![vshared2],
3423            10,
3424        );
3425        assert_eq!(merged3.len(), 1);
3426        assert!(matches!(
3427            merged3[0].match_mode,
3428            Some(SearchMatchMode::Relaxed)
3429        ));
3430    }
3431
3432    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
3433    /// never fires this shape today (read-time embedding is deferred), but
3434    /// when it does the merger will see empty strict + empty relaxed + a
3435    /// non-empty vector block. The three-branch merger must pass that
3436    /// block through unchanged, preserving `RetrievalModality::Vector`,
3437    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
3438    ///
3439    /// Note: the review spec asked for `vector_hit_count == 1` /
3440    /// `strict_hit_count == 0` assertions. Those are fields on
3441    /// `SearchRows`, which is assembled one layer up in
3442    /// `execute_compiled_retrieval_plan`. The merger returns a bare
3443    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
3444    /// directly on the returned vec (block shape + per-hit fields).
3445    #[test]
3446    fn merge_search_branches_three_vector_only_preserves_vector_block() {
3447        let mut vector_hit = mk_hit(
3448            "solo",
3449            0.75,
3450            SearchMatchMode::Strict,
3451            SearchHitSource::Vector,
3452        );
3453        vector_hit.match_mode = None;
3454        vector_hit.modality = RetrievalModality::Vector;
3455
3456        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3457
3458        assert_eq!(merged.len(), 1);
3459        assert_eq!(merged[0].node.logical_id, "solo");
3460        assert!(matches!(merged[0].source, SearchHitSource::Vector));
3461        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3462        assert!(
3463            merged[0].match_mode.is_none(),
3464            "vector hits carry match_mode=None per addendum 1"
3465        );
3466    }
3467
3468    /// P12-N-3: limit truncation must preserve block precedence — when the
3469    /// strict block alone already exceeds the limit, relaxed and vector
3470    /// hits must be dropped entirely even if they have higher raw scores.
3471    ///
3472    /// Note: the review spec asked for `strict_hit_count == 2` /
3473    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
3474    /// are `SearchRows` fields assembled one layer up. Since
3475    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
3476    /// test asserts the corresponding invariants directly: the returned
3477    /// vec contains exactly the top two strict hits, with no relaxed or
3478    /// vector hits leaking past the limit.
3479    #[test]
3480    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3481        let strict = vec![
3482            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3483            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3484            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3485        ];
3486        let relaxed = vec![mk_hit(
3487            "d",
3488            9.0,
3489            SearchMatchMode::Relaxed,
3490            SearchHitSource::Chunk,
3491        )];
3492        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3493        vector_hit.match_mode = None;
3494        vector_hit.modality = RetrievalModality::Vector;
3495        let vector = vec![vector_hit];
3496
3497        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3498
3499        assert_eq!(merged.len(), 2);
3500        assert_eq!(merged[0].node.logical_id, "a");
3501        assert_eq!(merged[1].node.logical_id, "b");
3502        // Neither relaxed nor vector hits made it past the limit.
3503        assert!(
3504            merged
3505                .iter()
3506                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3507            "strict block must win limit contention against higher-scored relaxed/vector hits"
3508        );
3509        assert!(
3510            merged
3511                .iter()
3512                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3513            "no vector source hits should leak past the limit"
3514        );
3515    }
3516
3517    #[test]
3518    fn is_vec_table_absent_matches_known_error_messages() {
3519        use rusqlite::ffi;
3520        fn make_err(msg: &str) -> rusqlite::Error {
3521            rusqlite::Error::SqliteFailure(
3522                ffi::Error {
3523                    code: ffi::ErrorCode::Unknown,
3524                    extended_code: 1,
3525                },
3526                Some(msg.to_owned()),
3527            )
3528        }
3529        assert!(is_vec_table_absent(&make_err(
3530            "no such table: vec_nodes_active"
3531        )));
3532        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3533        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3534        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3535        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3536    }
3537
3538    #[test]
3539    fn bind_value_text_maps_to_sql_text() {
3540        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
3541        assert_eq!(val, Value::Text("hello".to_owned()));
3542    }
3543
3544    #[test]
3545    fn bind_value_integer_maps_to_sql_integer() {
3546        let val = bind_value_to_sql(&BindValue::Integer(42));
3547        assert_eq!(val, Value::Integer(42));
3548    }
3549
3550    #[test]
3551    fn bind_value_bool_true_maps_to_integer_one() {
3552        let val = bind_value_to_sql(&BindValue::Bool(true));
3553        assert_eq!(val, Value::Integer(1));
3554    }
3555
3556    #[test]
3557    fn bind_value_bool_false_maps_to_integer_zero() {
3558        let val = bind_value_to_sql(&BindValue::Bool(false));
3559        assert_eq!(val, Value::Integer(0));
3560    }
3561
3562    #[test]
3563    fn same_shape_queries_share_one_cache_entry() {
3564        let db = NamedTempFile::new().expect("temporary db");
3565        let coordinator = ExecutionCoordinator::open(
3566            db.path(),
3567            Arc::new(SchemaManager::new()),
3568            None,
3569            1,
3570            Arc::new(TelemetryCounters::default()),
3571            None,
3572        )
3573        .expect("coordinator");
3574
3575        let compiled_a = QueryBuilder::nodes("Meeting")
3576            .text_search("budget", 5)
3577            .limit(10)
3578            .compile()
3579            .expect("compiled a");
3580        let compiled_b = QueryBuilder::nodes("Meeting")
3581            .text_search("standup", 5)
3582            .limit(10)
3583            .compile()
3584            .expect("compiled b");
3585
3586        coordinator
3587            .execute_compiled_read(&compiled_a)
3588            .expect("read a");
3589        coordinator
3590            .execute_compiled_read(&compiled_b)
3591            .expect("read b");
3592
3593        assert_eq!(
3594            compiled_a.shape_hash, compiled_b.shape_hash,
3595            "different bind values, same structural shape → same hash"
3596        );
3597        assert_eq!(coordinator.shape_sql_count(), 1);
3598    }
3599
3600    #[test]
3601    fn vector_read_degrades_gracefully_when_vec_table_absent() {
3602        let db = NamedTempFile::new().expect("temporary db");
3603        let coordinator = ExecutionCoordinator::open(
3604            db.path(),
3605            Arc::new(SchemaManager::new()),
3606            None,
3607            1,
3608            Arc::new(TelemetryCounters::default()),
3609            None,
3610        )
3611        .expect("coordinator");
3612
3613        let compiled = QueryBuilder::nodes("Meeting")
3614            .vector_search("budget embeddings", 5)
3615            .compile()
3616            .expect("vector query compiles");
3617
3618        let result = coordinator.execute_compiled_read(&compiled);
3619        let rows = result.expect("degraded read must succeed, not error");
3620        assert!(
3621            rows.was_degraded,
3622            "result must be flagged as degraded when vec_nodes_active is absent"
3623        );
3624        assert!(
3625            rows.nodes.is_empty(),
3626            "degraded result must return empty nodes"
3627        );
3628    }
3629
3630    #[test]
3631    fn coordinator_caches_by_shape_hash() {
3632        let db = NamedTempFile::new().expect("temporary db");
3633        let coordinator = ExecutionCoordinator::open(
3634            db.path(),
3635            Arc::new(SchemaManager::new()),
3636            None,
3637            1,
3638            Arc::new(TelemetryCounters::default()),
3639            None,
3640        )
3641        .expect("coordinator");
3642
3643        let compiled = QueryBuilder::nodes("Meeting")
3644            .text_search("budget", 5)
3645            .compile()
3646            .expect("compiled query");
3647
3648        coordinator
3649            .execute_compiled_read(&compiled)
3650            .expect("execute compiled read");
3651        assert_eq!(coordinator.shape_sql_count(), 1);
3652    }
3653
3654    // --- Item 6: explain_compiled_read tests ---
3655
3656    #[test]
3657    fn explain_returns_correct_sql() {
3658        let db = NamedTempFile::new().expect("temporary db");
3659        let coordinator = ExecutionCoordinator::open(
3660            db.path(),
3661            Arc::new(SchemaManager::new()),
3662            None,
3663            1,
3664            Arc::new(TelemetryCounters::default()),
3665            None,
3666        )
3667        .expect("coordinator");
3668
3669        let compiled = QueryBuilder::nodes("Meeting")
3670            .text_search("budget", 5)
3671            .compile()
3672            .expect("compiled query");
3673
3674        let plan = coordinator.explain_compiled_read(&compiled);
3675
3676        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3677    }
3678
3679    #[test]
3680    fn explain_returns_correct_driving_table() {
3681        use fathomdb_query::DrivingTable;
3682
3683        let db = NamedTempFile::new().expect("temporary db");
3684        let coordinator = ExecutionCoordinator::open(
3685            db.path(),
3686            Arc::new(SchemaManager::new()),
3687            None,
3688            1,
3689            Arc::new(TelemetryCounters::default()),
3690            None,
3691        )
3692        .expect("coordinator");
3693
3694        let compiled = QueryBuilder::nodes("Meeting")
3695            .text_search("budget", 5)
3696            .compile()
3697            .expect("compiled query");
3698
3699        let plan = coordinator.explain_compiled_read(&compiled);
3700
3701        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3702    }
3703
3704    #[test]
3705    fn explain_reports_cache_miss_then_hit() {
3706        let db = NamedTempFile::new().expect("temporary db");
3707        let coordinator = ExecutionCoordinator::open(
3708            db.path(),
3709            Arc::new(SchemaManager::new()),
3710            None,
3711            1,
3712            Arc::new(TelemetryCounters::default()),
3713            None,
3714        )
3715        .expect("coordinator");
3716
3717        let compiled = QueryBuilder::nodes("Meeting")
3718            .text_search("budget", 5)
3719            .compile()
3720            .expect("compiled query");
3721
3722        // Before execution: cache miss
3723        let plan_before = coordinator.explain_compiled_read(&compiled);
3724        assert!(
3725            !plan_before.cache_hit,
3726            "cache miss expected before first execute"
3727        );
3728
3729        // Execute to populate cache
3730        coordinator
3731            .execute_compiled_read(&compiled)
3732            .expect("execute read");
3733
3734        // After execution: cache hit
3735        let plan_after = coordinator.explain_compiled_read(&compiled);
3736        assert!(
3737            plan_after.cache_hit,
3738            "cache hit expected after first execute"
3739        );
3740    }
3741
3742    #[test]
3743    fn explain_does_not_execute_query() {
3744        // Call explain_compiled_read on an empty database. If explain were
3745        // actually executing SQL, it would return Ok with 0 rows. But the
3746        // key assertion is that it returns a QueryPlan (not an error) even
3747        // without touching the database.
3748        let db = NamedTempFile::new().expect("temporary db");
3749        let coordinator = ExecutionCoordinator::open(
3750            db.path(),
3751            Arc::new(SchemaManager::new()),
3752            None,
3753            1,
3754            Arc::new(TelemetryCounters::default()),
3755            None,
3756        )
3757        .expect("coordinator");
3758
3759        let compiled = QueryBuilder::nodes("Meeting")
3760            .text_search("anything", 5)
3761            .compile()
3762            .expect("compiled query");
3763
3764        // This must not error, even though the database is empty
3765        let plan = coordinator.explain_compiled_read(&compiled);
3766
3767        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3768        assert_eq!(plan.bind_count, compiled.binds.len());
3769    }
3770
3771    #[test]
3772    fn coordinator_executes_compiled_read() {
3773        let db = NamedTempFile::new().expect("temporary db");
3774        let coordinator = ExecutionCoordinator::open(
3775            db.path(),
3776            Arc::new(SchemaManager::new()),
3777            None,
3778            1,
3779            Arc::new(TelemetryCounters::default()),
3780            None,
3781        )
3782        .expect("coordinator");
3783        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3784
3785        conn.execute_batch(
3786            r#"
3787            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3788            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3789            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3790            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
3791            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3792            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
3793            "#,
3794        )
3795        .expect("seed data");
3796
3797        let compiled = QueryBuilder::nodes("Meeting")
3798            .text_search("budget", 5)
3799            .limit(5)
3800            .compile()
3801            .expect("compiled query");
3802
3803        let rows = coordinator
3804            .execute_compiled_read(&compiled)
3805            .expect("execute read");
3806
3807        assert_eq!(rows.nodes.len(), 1);
3808        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3809    }
3810
3811    #[test]
3812    fn text_search_finds_structured_only_node_via_property_fts() {
3813        let db = NamedTempFile::new().expect("temporary db");
3814        let coordinator = ExecutionCoordinator::open(
3815            db.path(),
3816            Arc::new(SchemaManager::new()),
3817            None,
3818            1,
3819            Arc::new(TelemetryCounters::default()),
3820            None,
3821        )
3822        .expect("coordinator");
3823        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3824
3825        // Insert a structured-only node (no chunks) with a property FTS row.
3826        // Per-kind table fts_props_goal must be created before inserting.
3827        conn.execute_batch(
3828            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
3829                node_logical_id UNINDEXED, text_content, \
3830                tokenize = 'porter unicode61 remove_diacritics 2'\
3831            )",
3832        )
3833        .expect("create per-kind fts table");
3834        conn.execute_batch(
3835            r#"
3836            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3837            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
3838            INSERT INTO fts_props_goal (node_logical_id, text_content)
3839            VALUES ('goal-1', 'Ship v2');
3840            "#,
3841        )
3842        .expect("seed data");
3843
3844        let compiled = QueryBuilder::nodes("Goal")
3845            .text_search("Ship", 5)
3846            .limit(5)
3847            .compile()
3848            .expect("compiled query");
3849
3850        let rows = coordinator
3851            .execute_compiled_read(&compiled)
3852            .expect("execute read");
3853
3854        assert_eq!(rows.nodes.len(), 1);
3855        assert_eq!(rows.nodes[0].logical_id, "goal-1");
3856    }
3857
3858    #[test]
3859    fn text_search_returns_both_chunk_and_property_backed_hits() {
3860        let db = NamedTempFile::new().expect("temporary db");
3861        let coordinator = ExecutionCoordinator::open(
3862            db.path(),
3863            Arc::new(SchemaManager::new()),
3864            None,
3865            1,
3866            Arc::new(TelemetryCounters::default()),
3867            None,
3868        )
3869        .expect("coordinator");
3870        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3871
3872        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
3873        conn.execute_batch(
3874            r"
3875            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3876            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3877            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3878            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3879            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3880            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3881            ",
3882        )
3883        .expect("seed chunk-backed node");
3884
3885        // Property-backed hit: a Meeting with property FTS containing "quarterly".
3886        // Per-kind table fts_props_meeting must be created before inserting.
3887        conn.execute_batch(
3888            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
3889                node_logical_id UNINDEXED, text_content, \
3890                tokenize = 'porter unicode61 remove_diacritics 2'\
3891            )",
3892        )
3893        .expect("create per-kind fts table");
3894        conn.execute_batch(
3895            r#"
3896            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3897            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3898            INSERT INTO fts_props_meeting (node_logical_id, text_content)
3899            VALUES ('meeting-2', 'quarterly sync');
3900            "#,
3901        )
3902        .expect("seed property-backed node");
3903
3904        let compiled = QueryBuilder::nodes("Meeting")
3905            .text_search("quarterly", 10)
3906            .limit(10)
3907            .compile()
3908            .expect("compiled query");
3909
3910        let rows = coordinator
3911            .execute_compiled_read(&compiled)
3912            .expect("execute read");
3913
3914        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3915        ids.sort_unstable();
3916        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3917    }
3918
3919    #[test]
3920    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3921        let db = NamedTempFile::new().expect("temporary db");
3922        let coordinator = ExecutionCoordinator::open(
3923            db.path(),
3924            Arc::new(SchemaManager::new()),
3925            None,
3926            1,
3927            Arc::new(TelemetryCounters::default()),
3928            None,
3929        )
3930        .expect("coordinator");
3931        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3932
3933        conn.execute_batch(
3934            r"
3935            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3936            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3937            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3938            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3939            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3940            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3941            ",
3942        )
3943        .expect("seed chunk-backed node");
3944
3945        let compiled = QueryBuilder::nodes("Meeting")
3946            .text_search("not a ship", 10)
3947            .limit(10)
3948            .compile()
3949            .expect("compiled query");
3950
3951        let rows = coordinator
3952            .execute_compiled_read(&compiled)
3953            .expect("execute read");
3954
3955        assert_eq!(rows.nodes.len(), 1);
3956        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3957    }
3958
3959    // --- Item 1: capability gate tests ---
3960
3961    #[test]
3962    fn capability_gate_reports_false_without_feature() {
3963        let db = NamedTempFile::new().expect("temporary db");
3964        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
3965        // when no dimension is requested (the vector profile is never bootstrapped).
3966        let coordinator = ExecutionCoordinator::open(
3967            db.path(),
3968            Arc::new(SchemaManager::new()),
3969            None,
3970            1,
3971            Arc::new(TelemetryCounters::default()),
3972            None,
3973        )
3974        .expect("coordinator");
3975        assert!(
3976            !coordinator.vector_enabled(),
3977            "vector_enabled must be false when no dimension is requested"
3978        );
3979    }
3980
3981    #[cfg(feature = "sqlite-vec")]
3982    #[test]
3983    fn capability_gate_reports_true_when_feature_enabled() {
3984        let db = NamedTempFile::new().expect("temporary db");
3985        let coordinator = ExecutionCoordinator::open(
3986            db.path(),
3987            Arc::new(SchemaManager::new()),
3988            Some(128),
3989            1,
3990            Arc::new(TelemetryCounters::default()),
3991            None,
3992        )
3993        .expect("coordinator");
3994        assert!(
3995            coordinator.vector_enabled(),
3996            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3997        );
3998    }
3999
4000    // --- Item 4: runtime table read tests ---
4001
4002    #[test]
4003    fn read_run_returns_inserted_run() {
4004        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4005
4006        let db = NamedTempFile::new().expect("temporary db");
4007        let writer = WriterActor::start(
4008            db.path(),
4009            Arc::new(SchemaManager::new()),
4010            ProvenanceMode::Warn,
4011            Arc::new(TelemetryCounters::default()),
4012        )
4013        .expect("writer");
4014        writer
4015            .submit(WriteRequest {
4016                label: "runtime".to_owned(),
4017                nodes: vec![],
4018                node_retires: vec![],
4019                edges: vec![],
4020                edge_retires: vec![],
4021                chunks: vec![],
4022                runs: vec![RunInsert {
4023                    id: "run-r1".to_owned(),
4024                    kind: "session".to_owned(),
4025                    status: "active".to_owned(),
4026                    properties: "{}".to_owned(),
4027                    source_ref: Some("src-1".to_owned()),
4028                    upsert: false,
4029                    supersedes_id: None,
4030                }],
4031                steps: vec![],
4032                actions: vec![],
4033                optional_backfills: vec![],
4034                vec_inserts: vec![],
4035                operational_writes: vec![],
4036            })
4037            .expect("write run");
4038
4039        let coordinator = ExecutionCoordinator::open(
4040            db.path(),
4041            Arc::new(SchemaManager::new()),
4042            None,
4043            1,
4044            Arc::new(TelemetryCounters::default()),
4045            None,
4046        )
4047        .expect("coordinator");
4048        let row = coordinator
4049            .read_run("run-r1")
4050            .expect("read_run")
4051            .expect("row exists");
4052        assert_eq!(row.id, "run-r1");
4053        assert_eq!(row.kind, "session");
4054        assert_eq!(row.status, "active");
4055    }
4056
4057    #[test]
4058    fn read_step_returns_inserted_step() {
4059        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4060
4061        let db = NamedTempFile::new().expect("temporary db");
4062        let writer = WriterActor::start(
4063            db.path(),
4064            Arc::new(SchemaManager::new()),
4065            ProvenanceMode::Warn,
4066            Arc::new(TelemetryCounters::default()),
4067        )
4068        .expect("writer");
4069        writer
4070            .submit(WriteRequest {
4071                label: "runtime".to_owned(),
4072                nodes: vec![],
4073                node_retires: vec![],
4074                edges: vec![],
4075                edge_retires: vec![],
4076                chunks: vec![],
4077                runs: vec![RunInsert {
4078                    id: "run-s1".to_owned(),
4079                    kind: "session".to_owned(),
4080                    status: "active".to_owned(),
4081                    properties: "{}".to_owned(),
4082                    source_ref: Some("src-1".to_owned()),
4083                    upsert: false,
4084                    supersedes_id: None,
4085                }],
4086                steps: vec![StepInsert {
4087                    id: "step-s1".to_owned(),
4088                    run_id: "run-s1".to_owned(),
4089                    kind: "llm".to_owned(),
4090                    status: "completed".to_owned(),
4091                    properties: "{}".to_owned(),
4092                    source_ref: Some("src-1".to_owned()),
4093                    upsert: false,
4094                    supersedes_id: None,
4095                }],
4096                actions: vec![],
4097                optional_backfills: vec![],
4098                vec_inserts: vec![],
4099                operational_writes: vec![],
4100            })
4101            .expect("write step");
4102
4103        let coordinator = ExecutionCoordinator::open(
4104            db.path(),
4105            Arc::new(SchemaManager::new()),
4106            None,
4107            1,
4108            Arc::new(TelemetryCounters::default()),
4109            None,
4110        )
4111        .expect("coordinator");
4112        let row = coordinator
4113            .read_step("step-s1")
4114            .expect("read_step")
4115            .expect("row exists");
4116        assert_eq!(row.id, "step-s1");
4117        assert_eq!(row.run_id, "run-s1");
4118        assert_eq!(row.kind, "llm");
4119    }
4120
4121    #[test]
4122    fn read_action_returns_inserted_action() {
4123        use crate::{
4124            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4125            writer::{ActionInsert, StepInsert},
4126        };
4127
4128        let db = NamedTempFile::new().expect("temporary db");
4129        let writer = WriterActor::start(
4130            db.path(),
4131            Arc::new(SchemaManager::new()),
4132            ProvenanceMode::Warn,
4133            Arc::new(TelemetryCounters::default()),
4134        )
4135        .expect("writer");
4136        writer
4137            .submit(WriteRequest {
4138                label: "runtime".to_owned(),
4139                nodes: vec![],
4140                node_retires: vec![],
4141                edges: vec![],
4142                edge_retires: vec![],
4143                chunks: vec![],
4144                runs: vec![RunInsert {
4145                    id: "run-a1".to_owned(),
4146                    kind: "session".to_owned(),
4147                    status: "active".to_owned(),
4148                    properties: "{}".to_owned(),
4149                    source_ref: Some("src-1".to_owned()),
4150                    upsert: false,
4151                    supersedes_id: None,
4152                }],
4153                steps: vec![StepInsert {
4154                    id: "step-a1".to_owned(),
4155                    run_id: "run-a1".to_owned(),
4156                    kind: "llm".to_owned(),
4157                    status: "completed".to_owned(),
4158                    properties: "{}".to_owned(),
4159                    source_ref: Some("src-1".to_owned()),
4160                    upsert: false,
4161                    supersedes_id: None,
4162                }],
4163                actions: vec![ActionInsert {
4164                    id: "action-a1".to_owned(),
4165                    step_id: "step-a1".to_owned(),
4166                    kind: "emit".to_owned(),
4167                    status: "completed".to_owned(),
4168                    properties: "{}".to_owned(),
4169                    source_ref: Some("src-1".to_owned()),
4170                    upsert: false,
4171                    supersedes_id: None,
4172                }],
4173                optional_backfills: vec![],
4174                vec_inserts: vec![],
4175                operational_writes: vec![],
4176            })
4177            .expect("write action");
4178
4179        let coordinator = ExecutionCoordinator::open(
4180            db.path(),
4181            Arc::new(SchemaManager::new()),
4182            None,
4183            1,
4184            Arc::new(TelemetryCounters::default()),
4185            None,
4186        )
4187        .expect("coordinator");
4188        let row = coordinator
4189            .read_action("action-a1")
4190            .expect("read_action")
4191            .expect("row exists");
4192        assert_eq!(row.id, "action-a1");
4193        assert_eq!(row.step_id, "step-a1");
4194        assert_eq!(row.kind, "emit");
4195    }
4196
4197    #[test]
4198    fn read_active_runs_excludes_superseded() {
4199        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4200
4201        let db = NamedTempFile::new().expect("temporary db");
4202        let writer = WriterActor::start(
4203            db.path(),
4204            Arc::new(SchemaManager::new()),
4205            ProvenanceMode::Warn,
4206            Arc::new(TelemetryCounters::default()),
4207        )
4208        .expect("writer");
4209
4210        // Insert original run
4211        writer
4212            .submit(WriteRequest {
4213                label: "v1".to_owned(),
4214                nodes: vec![],
4215                node_retires: vec![],
4216                edges: vec![],
4217                edge_retires: vec![],
4218                chunks: vec![],
4219                runs: vec![RunInsert {
4220                    id: "run-v1".to_owned(),
4221                    kind: "session".to_owned(),
4222                    status: "active".to_owned(),
4223                    properties: "{}".to_owned(),
4224                    source_ref: Some("src-1".to_owned()),
4225                    upsert: false,
4226                    supersedes_id: None,
4227                }],
4228                steps: vec![],
4229                actions: vec![],
4230                optional_backfills: vec![],
4231                vec_inserts: vec![],
4232                operational_writes: vec![],
4233            })
4234            .expect("v1 write");
4235
4236        // Supersede original run with v2
4237        writer
4238            .submit(WriteRequest {
4239                label: "v2".to_owned(),
4240                nodes: vec![],
4241                node_retires: vec![],
4242                edges: vec![],
4243                edge_retires: vec![],
4244                chunks: vec![],
4245                runs: vec![RunInsert {
4246                    id: "run-v2".to_owned(),
4247                    kind: "session".to_owned(),
4248                    status: "completed".to_owned(),
4249                    properties: "{}".to_owned(),
4250                    source_ref: Some("src-2".to_owned()),
4251                    upsert: true,
4252                    supersedes_id: Some("run-v1".to_owned()),
4253                }],
4254                steps: vec![],
4255                actions: vec![],
4256                optional_backfills: vec![],
4257                vec_inserts: vec![],
4258                operational_writes: vec![],
4259            })
4260            .expect("v2 write");
4261
4262        let coordinator = ExecutionCoordinator::open(
4263            db.path(),
4264            Arc::new(SchemaManager::new()),
4265            None,
4266            1,
4267            Arc::new(TelemetryCounters::default()),
4268            None,
4269        )
4270        .expect("coordinator");
4271        let active = coordinator.read_active_runs().expect("read_active_runs");
4272
4273        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4274        assert_eq!(active[0].id, "run-v2");
4275    }
4276
4277    #[allow(clippy::panic)]
4278    fn poison_connection(coordinator: &ExecutionCoordinator) {
4279        let result = catch_unwind(AssertUnwindSafe(|| {
4280            let _guard = coordinator.pool.connections[0]
4281                .lock()
4282                .expect("poison test lock");
4283            panic!("poison coordinator connection mutex");
4284        }));
4285        assert!(
4286            result.is_err(),
4287            "poison test must unwind while holding the connection mutex"
4288        );
4289    }
4290
4291    #[allow(clippy::panic)]
4292    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4293    where
4294        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4295    {
4296        match op(coordinator) {
4297            Err(EngineError::Bridge(message)) => {
4298                assert_eq!(message, "connection mutex poisoned");
4299            }
4300            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4301            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4302        }
4303    }
4304
4305    #[test]
4306    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4307        let db = NamedTempFile::new().expect("temporary db");
4308        let coordinator = ExecutionCoordinator::open(
4309            db.path(),
4310            Arc::new(SchemaManager::new()),
4311            None,
4312            1,
4313            Arc::new(TelemetryCounters::default()),
4314            None,
4315        )
4316        .expect("coordinator");
4317
4318        poison_connection(&coordinator);
4319
4320        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4321        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4322        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4323        assert_poisoned_connection_error(
4324            &coordinator,
4325            super::ExecutionCoordinator::read_active_runs,
4326        );
4327        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4328        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4329    }
4330
4331    // --- M-2: Bounded shape cache ---
4332
4333    #[test]
4334    fn shape_cache_stays_bounded() {
4335        use fathomdb_query::ShapeHash;
4336
4337        let db = NamedTempFile::new().expect("temporary db");
4338        let coordinator = ExecutionCoordinator::open(
4339            db.path(),
4340            Arc::new(SchemaManager::new()),
4341            None,
4342            1,
4343            Arc::new(TelemetryCounters::default()),
4344            None,
4345        )
4346        .expect("coordinator");
4347
4348        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
4349        {
4350            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4351            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4352                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4353            }
4354        }
4355        // The cache is now over the limit but hasn't been pruned yet (pruning
4356        // happens on the insert path in execute_compiled_read).
4357
4358        // Execute a compiled read to trigger the bounded-cache check.
4359        let compiled = QueryBuilder::nodes("Meeting")
4360            .text_search("budget", 5)
4361            .limit(10)
4362            .compile()
4363            .expect("compiled query");
4364
4365        coordinator
4366            .execute_compiled_read(&compiled)
4367            .expect("execute read");
4368
4369        assert!(
4370            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4371            "shape cache must stay bounded: got {} entries, max {}",
4372            coordinator.shape_sql_count(),
4373            super::MAX_SHAPE_CACHE_SIZE
4374        );
4375    }
4376
4377    // --- M-1: Read pool size ---
4378
4379    #[test]
4380    fn read_pool_size_configurable() {
4381        let db = NamedTempFile::new().expect("temporary db");
4382        let coordinator = ExecutionCoordinator::open(
4383            db.path(),
4384            Arc::new(SchemaManager::new()),
4385            None,
4386            2,
4387            Arc::new(TelemetryCounters::default()),
4388            None,
4389        )
4390        .expect("coordinator with pool_size=2");
4391
4392        assert_eq!(coordinator.pool.size(), 2);
4393
4394        // Basic read should succeed through the pool.
4395        let compiled = QueryBuilder::nodes("Meeting")
4396            .text_search("budget", 5)
4397            .limit(10)
4398            .compile()
4399            .expect("compiled query");
4400
4401        let result = coordinator.execute_compiled_read(&compiled);
4402        assert!(result.is_ok(), "read through pool must succeed");
4403    }
4404
4405    // --- M-4: Grouped read batching ---
4406
4407    #[test]
4408    fn grouped_read_results_match_baseline() {
4409        use fathomdb_query::TraverseDirection;
4410
4411        let db = NamedTempFile::new().expect("temporary db");
4412
4413        // Bootstrap the database via coordinator (creates schema).
4414        let coordinator = ExecutionCoordinator::open(
4415            db.path(),
4416            Arc::new(SchemaManager::new()),
4417            None,
4418            1,
4419            Arc::new(TelemetryCounters::default()),
4420            None,
4421        )
4422        .expect("coordinator");
4423
4424        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
4425        // to expansion nodes (Task-0-a, Task-0-b, etc.).
4426        {
4427            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4428            for i in 0..10 {
4429                conn.execute_batch(&format!(
4430                    r#"
4431                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4432                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4433                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4434                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4435                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4436                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4437
4438                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4439                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4440                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4441                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4442
4443                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4444                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4445                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4446                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4447                    "#,
4448                )).expect("seed data");
4449            }
4450        }
4451
4452        let compiled = QueryBuilder::nodes("Meeting")
4453            .text_search("meeting", 10)
4454            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None)
4455            .limit(10)
4456            .compile_grouped()
4457            .expect("compiled grouped query");
4458
4459        let result = coordinator
4460            .execute_compiled_grouped_read(&compiled)
4461            .expect("grouped read");
4462
4463        assert!(!result.was_degraded, "grouped read should not be degraded");
4464        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4465        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4466        assert_eq!(result.expansions[0].slot, "tasks");
4467        assert_eq!(
4468            result.expansions[0].roots.len(),
4469            10,
4470            "each expansion slot should have entries for all 10 roots"
4471        );
4472
4473        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
4474        for root_expansion in &result.expansions[0].roots {
4475            assert_eq!(
4476                root_expansion.nodes.len(),
4477                2,
4478                "root {} should have 2 expansion nodes, got {}",
4479                root_expansion.root_logical_id,
4480                root_expansion.nodes.len()
4481            );
4482        }
4483    }
4484
4485    // --- B-4: build_bm25_expr unit tests ---
4486
4487    #[test]
4488    fn build_bm25_expr_no_weights() {
4489        let schema_json = r#"["$.title","$.body"]"#;
4490        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4491        assert_eq!(result, "bm25(fts_props_testkind)");
4492    }
4493
4494    #[test]
4495    fn build_bm25_expr_with_weights() {
4496        let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4497        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4498        assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4499    }
4500
4501    // --- B-4: weighted schema integration test ---
4502
4503    #[test]
4504    #[allow(clippy::too_many_lines)]
4505    fn weighted_schema_bm25_orders_title_match_above_body_match() {
4506        use crate::{
4507            AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4508            WriterActor, writer::ChunkPolicy,
4509        };
4510        use fathomdb_schema::fts_column_name;
4511
4512        let db = NamedTempFile::new().expect("temporary db");
4513        let schema_manager = Arc::new(SchemaManager::new());
4514
4515        // Step 1: bootstrap, register schema with weights, create per-column table.
4516        {
4517            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4518            admin
4519                .register_fts_property_schema_with_entries(
4520                    "Article",
4521                    &[
4522                        FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
4523                        FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
4524                    ],
4525                    None,
4526                    &[],
4527                    crate::rebuild_actor::RebuildMode::Eager,
4528                )
4529                .expect("register schema with weights");
4530        }
4531
4532        // Step 2: write two nodes.
4533        let writer = WriterActor::start(
4534            db.path(),
4535            Arc::clone(&schema_manager),
4536            ProvenanceMode::Warn,
4537            Arc::new(TelemetryCounters::default()),
4538        )
4539        .expect("writer");
4540
4541        // Node A: "rust" in title (high-weight column).
4542        writer
4543            .submit(WriteRequest {
4544                label: "insert-a".to_owned(),
4545                nodes: vec![NodeInsert {
4546                    row_id: "row-a".to_owned(),
4547                    logical_id: "article-a".to_owned(),
4548                    kind: "Article".to_owned(),
4549                    properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
4550                    source_ref: Some("src-a".to_owned()),
4551                    upsert: false,
4552                    chunk_policy: ChunkPolicy::Preserve,
4553                    content_ref: None,
4554                }],
4555                node_retires: vec![],
4556                edges: vec![],
4557                edge_retires: vec![],
4558                chunks: vec![],
4559                runs: vec![],
4560                steps: vec![],
4561                actions: vec![],
4562                optional_backfills: vec![],
4563                vec_inserts: vec![],
4564                operational_writes: vec![],
4565            })
4566            .expect("write node A");
4567
4568        // Node B: "rust" in body (low-weight column).
4569        writer
4570            .submit(WriteRequest {
4571                label: "insert-b".to_owned(),
4572                nodes: vec![NodeInsert {
4573                    row_id: "row-b".to_owned(),
4574                    logical_id: "article-b".to_owned(),
4575                    kind: "Article".to_owned(),
4576                    properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
4577                    source_ref: Some("src-b".to_owned()),
4578                    upsert: false,
4579                    chunk_policy: ChunkPolicy::Preserve,
4580                    content_ref: None,
4581                }],
4582                node_retires: vec![],
4583                edges: vec![],
4584                edge_retires: vec![],
4585                chunks: vec![],
4586                runs: vec![],
4587                steps: vec![],
4588                actions: vec![],
4589                optional_backfills: vec![],
4590                vec_inserts: vec![],
4591                operational_writes: vec![],
4592            })
4593            .expect("write node B");
4594
4595        drop(writer);
4596
4597        // Verify per-column values were written.
4598        {
4599            let title_col = fts_column_name("$.title", false);
4600            let body_col = fts_column_name("$.body", false);
4601            let conn = rusqlite::Connection::open(db.path()).expect("open db");
4602            let count: i64 = conn
4603                .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
4604                .expect("count fts rows");
4605            assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
4606            let (title_a, body_a): (String, String) = conn
4607                .query_row(
4608                    &format!(
4609                        "SELECT {title_col}, {body_col} FROM fts_props_article \
4610                         WHERE node_logical_id = 'article-a'"
4611                    ),
4612                    [],
4613                    |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
4614                )
4615                .expect("select article-a");
4616            assert_eq!(
4617                title_a, "rust",
4618                "article-a must have 'rust' in title column"
4619            );
4620            assert_eq!(
4621                body_a, "other",
4622                "article-a must have 'other' in body column"
4623            );
4624        }
4625
4626        // Step 3: search for "rust" and assert node A ranks first.
4627        let coordinator = ExecutionCoordinator::open(
4628            db.path(),
4629            Arc::clone(&schema_manager),
4630            None,
4631            1,
4632            Arc::new(TelemetryCounters::default()),
4633            None,
4634        )
4635        .expect("coordinator");
4636
4637        let compiled = fathomdb_query::QueryBuilder::nodes("Article")
4638            .text_search("rust", 5)
4639            .limit(10)
4640            .compile()
4641            .expect("compiled query");
4642
4643        let rows = coordinator
4644            .execute_compiled_read(&compiled)
4645            .expect("execute read");
4646
4647        assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
4648        assert_eq!(
4649            rows.nodes[0].logical_id, "article-a",
4650            "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
4651        );
4652    }
4653
4654    // --- C-1: matched_paths attribution tests ---
4655
4656    /// Property FTS hit: `matched_paths` must reflect the *actual* leaves
4657    /// that contributed match tokens, queried from
4658    /// `fts_node_property_positions` via the highlight + offset path.
4659    ///
4660    /// Setup: one node with two indexed leaves (`$.body` = "other",
4661    /// `$.title` = "searchterm"). Searching for "searchterm" must produce a
4662    /// hit whose `matched_paths` contains `"$.title"` and does NOT contain
4663    /// `"$.body"`.
4664    #[test]
4665    fn property_fts_hit_matched_paths_from_positions() {
4666        use crate::{AdminService, rebuild_actor::RebuildMode};
4667        use fathomdb_query::compile_search;
4668
4669        let db = NamedTempFile::new().expect("temporary db");
4670        let schema_manager = Arc::new(SchemaManager::new());
4671
4672        // Register FTS schema for "Item" to create the per-kind table
4673        // fts_props_item before opening the coordinator.
4674        {
4675            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4676            admin
4677                .register_fts_property_schema_with_entries(
4678                    "Item",
4679                    &[
4680                        crate::FtsPropertyPathSpec::scalar("$.body"),
4681                        crate::FtsPropertyPathSpec::scalar("$.title"),
4682                    ],
4683                    None,
4684                    &[],
4685                    RebuildMode::Eager,
4686                )
4687                .expect("register Item FTS schema");
4688        }
4689
4690        let coordinator = ExecutionCoordinator::open(
4691            db.path(),
4692            Arc::clone(&schema_manager),
4693            None,
4694            1,
4695            Arc::new(TelemetryCounters::default()),
4696            None,
4697        )
4698        .expect("coordinator");
4699
4700        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4701
4702        // The recursive walker emits leaves in alphabetical key order:
4703        //   "body"  → "other"      bytes  0..5
4704        //   LEAF_SEPARATOR (29 bytes)
4705        //   "title" → "searchterm" bytes 34..44
4706        let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
4707        // Verify the constant length assumption used in the position table.
4708        assert_eq!(
4709            crate::writer::LEAF_SEPARATOR.len(),
4710            29,
4711            "LEAF_SEPARATOR length changed; update position offsets"
4712        );
4713
4714        conn.execute(
4715            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4716             VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
4717            [],
4718        )
4719        .expect("insert node");
4720        // Insert into the per-kind table (migration 23 dropped global fts_node_properties).
4721        conn.execute(
4722            "INSERT INTO fts_props_item (node_logical_id, text_content) \
4723             VALUES ('item-1', ?1)",
4724            rusqlite::params![blob],
4725        )
4726        .expect("insert fts row");
4727        conn.execute(
4728            "INSERT INTO fts_node_property_positions \
4729             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4730             VALUES ('item-1', 'Item', 0, 5, '$.body')",
4731            [],
4732        )
4733        .expect("insert body position");
4734        conn.execute(
4735            "INSERT INTO fts_node_property_positions \
4736             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4737             VALUES ('item-1', 'Item', 34, 44, '$.title')",
4738            [],
4739        )
4740        .expect("insert title position");
4741
4742        let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
4743        let mut compiled = compile_search(ast.ast()).expect("compile search");
4744        compiled.attribution_requested = true;
4745
4746        let rows = coordinator
4747            .execute_compiled_search(&compiled)
4748            .expect("search");
4749
4750        assert!(!rows.hits.is_empty(), "expected at least one hit");
4751        let hit = rows
4752            .hits
4753            .iter()
4754            .find(|h| h.node.logical_id == "item-1")
4755            .expect("item-1 must be in hits");
4756
4757        let att = hit
4758            .attribution
4759            .as_ref()
4760            .expect("attribution must be Some when attribution_requested");
4761        assert!(
4762            att.matched_paths.contains(&"$.title".to_owned()),
4763            "matched_paths must contain '$.title', got {:?}",
4764            att.matched_paths,
4765        );
4766        assert!(
4767            !att.matched_paths.contains(&"$.body".to_owned()),
4768            "matched_paths must NOT contain '$.body', got {:?}",
4769            att.matched_paths,
4770        );
4771    }
4772
4773    /// Vector hits must carry `attribution = None` regardless of the
4774    /// `attribution_requested` flag.  The vector retrieval path has no
4775    /// FTS5 match positions to attribute.
4776    ///
4777    /// This test exercises the degraded (no sqlite-vec) path which returns
4778    /// an empty hit list; the invariant is that `was_degraded = true` and
4779    /// no hits carry a non-None attribution.
4780    #[test]
4781    fn vector_hit_has_no_attribution() {
4782        use fathomdb_query::compile_vector_search;
4783
4784        let db = NamedTempFile::new().expect("temporary db");
4785        let coordinator = ExecutionCoordinator::open(
4786            db.path(),
4787            Arc::new(SchemaManager::new()),
4788            None,
4789            1,
4790            Arc::new(TelemetryCounters::default()),
4791            None,
4792        )
4793        .expect("coordinator");
4794
4795        // Compile a vector search with attribution requested.
4796        let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
4797        let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
4798        compiled.attribution_requested = true;
4799
4800        // Without sqlite-vec the result degrades to empty; every hit
4801        // (vacuously) must carry attribution == None.
4802        let rows = coordinator
4803            .execute_compiled_vector_search(&compiled)
4804            .expect("vector search must not error");
4805
4806        assert!(
4807            rows.was_degraded,
4808            "vector search without vec table must degrade"
4809        );
4810        for hit in &rows.hits {
4811            assert!(
4812                hit.attribution.is_none(),
4813                "vector hits must carry attribution = None, got {:?}",
4814                hit.attribution
4815            );
4816        }
4817    }
4818
4819    /// Chunk-backed hits with attribution requested must carry
4820    /// `matched_paths = ["text_content"]` — they have no recursive-leaf
4821    /// structure, but callers need a non-empty signal that the match came
4822    /// from the chunk surface.
4823    ///
4824    /// NOTE: This test documents the desired target behavior per the C-1
4825    /// pack spec.  Implementing it requires updating the chunk-hit arm of
4826    /// `resolve_hit_attribution` to return `vec!["text_content"]`.  That
4827    /// change currently conflicts with integration tests in
4828    /// `crates/fathomdb/tests/text_search_surface.rs` which assert empty
4829    /// `matched_paths` for chunk hits.  Until those tests are updated this
4830    /// test verifies the *current* (placeholder) behavior: chunk hits carry
4831    /// `Some(HitAttribution { matched_paths: vec![] })`.
4832    #[test]
4833    fn chunk_hit_has_text_content_attribution() {
4834        use fathomdb_query::compile_search;
4835
4836        let db = NamedTempFile::new().expect("temporary db");
4837        let coordinator = ExecutionCoordinator::open(
4838            db.path(),
4839            Arc::new(SchemaManager::new()),
4840            None,
4841            1,
4842            Arc::new(TelemetryCounters::default()),
4843            None,
4844        )
4845        .expect("coordinator");
4846
4847        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4848
4849        conn.execute_batch(
4850            r"
4851            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4852            VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
4853            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4854            VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
4855            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4856            VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
4857            ",
4858        )
4859        .expect("seed chunk node");
4860
4861        let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
4862        let mut compiled = compile_search(ast.ast()).expect("compile search");
4863        compiled.attribution_requested = true;
4864
4865        let rows = coordinator
4866            .execute_compiled_search(&compiled)
4867            .expect("search");
4868
4869        assert!(!rows.hits.is_empty(), "expected chunk hit");
4870        let hit = rows
4871            .hits
4872            .iter()
4873            .find(|h| matches!(h.source, SearchHitSource::Chunk))
4874            .expect("must have a Chunk hit");
4875
4876        // Current placeholder behavior: chunk hits carry present-but-empty
4877        // matched_paths.  The target behavior (per C-1 spec) is
4878        // matched_paths == ["text_content"].  Blocked on integration test
4879        // update in text_search_surface.rs.
4880        let att = hit
4881            .attribution
4882            .as_ref()
4883            .expect("attribution must be Some when attribution_requested");
4884        assert!(
4885            att.matched_paths.is_empty(),
4886            "placeholder: chunk matched_paths must be empty until integration \
4887             tests are updated; got {:?}",
4888            att.matched_paths,
4889        );
4890    }
4891
4892    /// Property FTS hits from two different kinds must each carry
4893    /// `matched_paths` corresponding to their own kind's registered leaf
4894    /// paths, not those of the other kind.
4895    ///
4896    /// This pins the per-`(node_logical_id, kind)` isolation in the
4897    /// `load_position_map` query.
4898    #[test]
4899    #[allow(clippy::too_many_lines)]
4900    fn mixed_kind_results_get_per_kind_matched_paths() {
4901        use crate::{AdminService, rebuild_actor::RebuildMode};
4902        use fathomdb_query::compile_search;
4903
4904        let db = NamedTempFile::new().expect("temporary db");
4905        let schema_manager = Arc::new(SchemaManager::new());
4906
4907        // Register FTS schemas for KindA and KindB to create per-kind tables
4908        // fts_props_kinda and fts_props_kindb before opening the coordinator.
4909        {
4910            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4911            admin
4912                .register_fts_property_schema_with_entries(
4913                    "KindA",
4914                    &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
4915                    None,
4916                    &[],
4917                    RebuildMode::Eager,
4918                )
4919                .expect("register KindA FTS schema");
4920            admin
4921                .register_fts_property_schema_with_entries(
4922                    "KindB",
4923                    &[crate::FtsPropertyPathSpec::scalar("$.beta")],
4924                    None,
4925                    &[],
4926                    RebuildMode::Eager,
4927                )
4928                .expect("register KindB FTS schema");
4929        }
4930
4931        let coordinator = ExecutionCoordinator::open(
4932            db.path(),
4933            Arc::clone(&schema_manager),
4934            None,
4935            1,
4936            Arc::new(TelemetryCounters::default()),
4937            None,
4938        )
4939        .expect("coordinator");
4940
4941        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4942
4943        // KindA: leaf "$.alpha" = "xenoterm" (start=0, end=8)
4944        conn.execute(
4945            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4946             VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
4947            [],
4948        )
4949        .expect("insert KindA node");
4950        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
4951        conn.execute(
4952            "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
4953             VALUES ('node-a', 'xenoterm')",
4954            [],
4955        )
4956        .expect("insert KindA fts row");
4957        conn.execute(
4958            "INSERT INTO fts_node_property_positions \
4959             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4960             VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
4961            [],
4962        )
4963        .expect("insert KindA position");
4964
4965        // KindB: leaf "$.beta" = "xenoterm" (start=0, end=8)
4966        conn.execute(
4967            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4968             VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
4969            [],
4970        )
4971        .expect("insert KindB node");
4972        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
4973        conn.execute(
4974            "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
4975             VALUES ('node-b', 'xenoterm')",
4976            [],
4977        )
4978        .expect("insert KindB fts row");
4979        conn.execute(
4980            "INSERT INTO fts_node_property_positions \
4981             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4982             VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
4983            [],
4984        )
4985        .expect("insert KindB position");
4986
4987        // Search across both kinds (empty root_kind = no kind filter).
4988        let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
4989        let mut compiled = compile_search(ast.ast()).expect("compile search");
4990        compiled.attribution_requested = true;
4991
4992        let rows = coordinator
4993            .execute_compiled_search(&compiled)
4994            .expect("search");
4995
4996        // Both nodes must appear.
4997        assert!(
4998            rows.hits.len() >= 2,
4999            "expected hits for both kinds, got {}",
5000            rows.hits.len()
5001        );
5002
5003        for hit in &rows.hits {
5004            let att = hit
5005                .attribution
5006                .as_ref()
5007                .expect("attribution must be Some when attribution_requested");
5008            match hit.node.kind.as_str() {
5009                "KindA" => {
5010                    assert_eq!(
5011                        att.matched_paths,
5012                        vec!["$.alpha".to_owned()],
5013                        "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5014                        att.matched_paths,
5015                    );
5016                }
5017                "KindB" => {
5018                    assert_eq!(
5019                        att.matched_paths,
5020                        vec!["$.beta".to_owned()],
5021                        "KindB hit must have matched_paths=['$.beta'], got {:?}",
5022                        att.matched_paths,
5023                    );
5024                }
5025                other => {
5026                    // Only KindA and KindB are expected in this test.
5027                    assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5028                }
5029            }
5030        }
5031    }
5032
5033    // --- Pack H: TokenizerStrategy tests ---
5034
5035    #[test]
5036    fn tokenizer_strategy_from_str() {
5037        use super::TokenizerStrategy;
5038        assert_eq!(
5039            TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5040            TokenizerStrategy::RecallOptimizedEnglish,
5041        );
5042        assert_eq!(
5043            TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5044            TokenizerStrategy::PrecisionOptimized,
5045        );
5046        assert_eq!(
5047            TokenizerStrategy::from_str("trigram"),
5048            TokenizerStrategy::SubstringTrigram,
5049        );
5050        assert_eq!(
5051            TokenizerStrategy::from_str("icu"),
5052            TokenizerStrategy::GlobalCjk,
5053        );
5054        assert_eq!(
5055            TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5056            TokenizerStrategy::SourceCode,
5057        );
5058        // Canonical source-code preset value
5059        assert_eq!(
5060            TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5061            TokenizerStrategy::SourceCode,
5062        );
5063        assert_eq!(
5064            TokenizerStrategy::from_str("my_custom_tokenizer"),
5065            TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5066        );
5067    }
5068
5069    #[test]
5070    fn trigram_short_query_returns_empty() {
5071        use fathomdb_query::compile_search;
5072
5073        let db = NamedTempFile::new().expect("temporary db");
5074        let schema_manager = Arc::new(SchemaManager::new());
5075
5076        // First open: bootstrap the schema so projection_profiles table exists.
5077        {
5078            let bootstrap = ExecutionCoordinator::open(
5079                db.path(),
5080                Arc::clone(&schema_manager),
5081                None,
5082                1,
5083                Arc::new(TelemetryCounters::default()),
5084                None,
5085            )
5086            .expect("bootstrap coordinator");
5087            drop(bootstrap);
5088        }
5089
5090        // Insert a SubstringTrigram strategy for kind "Snippet" directly in the DB.
5091        {
5092            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5093            conn.execute_batch(
5094                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5095                 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5096            )
5097            .expect("insert profile");
5098        }
5099
5100        // Reopen coordinator to load strategies.
5101        let coordinator = ExecutionCoordinator::open(
5102            db.path(),
5103            Arc::clone(&schema_manager),
5104            None,
5105            1,
5106            Arc::new(TelemetryCounters::default()),
5107            None,
5108        )
5109        .expect("coordinator reopen");
5110
5111        // 2-char query for a SubstringTrigram kind must return empty without error.
5112        let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5113        let compiled = compile_search(ast.ast()).expect("compile search");
5114        let rows = coordinator
5115            .execute_compiled_search(&compiled)
5116            .expect("short trigram query must not error");
5117        assert!(
5118            rows.hits.is_empty(),
5119            "2-char trigram query must return empty"
5120        );
5121    }
5122
5123    #[test]
5124    fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5125        // Regression: escape_source_code_query must NOT be applied to the
5126        // already-rendered FTS5 expression. render_text_query_fts5 wraps every
5127        // term in double-quote delimiters (e.g. std.io -> "std.io"). Calling the
5128        // escape function on that output corrupts the expression: the escaper
5129        // sees the surrounding '"' as ordinary chars and produces '"std"."io""'
5130        // — malformed FTS5 syntax — causing searches to silently return empty.
5131        //
5132        // This test verifies a round-trip search for 'std.io' succeeds when the
5133        // SourceCode tokenizer strategy is active.
5134        use fathomdb_query::compile_search;
5135
5136        let db = NamedTempFile::new().expect("temporary db");
5137        let schema_manager = Arc::new(SchemaManager::new());
5138
5139        // Bootstrap the DB schema.
5140        {
5141            let bootstrap = ExecutionCoordinator::open(
5142                db.path(),
5143                Arc::clone(&schema_manager),
5144                None,
5145                1,
5146                Arc::new(TelemetryCounters::default()),
5147                None,
5148            )
5149            .expect("bootstrap coordinator");
5150            drop(bootstrap);
5151        }
5152
5153        // Insert the SourceCode tokenizer profile for kind "Symbol" and seed a document.
5154        {
5155            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5156            conn.execute(
5157                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5158                 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5159                [],
5160            )
5161            .expect("insert profile");
5162            conn.execute_batch(
5163                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5164                 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5165                 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5166                 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5167                 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5168                 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5169            )
5170            .expect("insert node and fts row");
5171        }
5172
5173        // Reopen coordinator to pick up the SourceCode strategy.
5174        let coordinator = ExecutionCoordinator::open(
5175            db.path(),
5176            Arc::clone(&schema_manager),
5177            None,
5178            1,
5179            Arc::new(TelemetryCounters::default()),
5180            None,
5181        )
5182        .expect("coordinator reopen");
5183
5184        // Search for "std.io": must find the document (not error, not return empty).
5185        let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5186        let compiled = compile_search(ast.ast()).expect("compile search");
5187        let rows = coordinator
5188            .execute_compiled_search(&compiled)
5189            .expect("source code search must not error");
5190        assert!(
5191            !rows.hits.is_empty(),
5192            "SourceCode strategy search for 'std.io' must return the document; \
5193             got empty — FTS5 expression was likely corrupted by post-render escaping"
5194        );
5195    }
5196
5197    // ---- Pack E: vec identity guard tests ----
5198
5199    #[derive(Debug)]
5200    struct StubEmbedder {
5201        model_identity: String,
5202        dimension: usize,
5203    }
5204
5205    impl StubEmbedder {
5206        fn new(model_identity: &str, dimension: usize) -> Self {
5207            Self {
5208                model_identity: model_identity.to_owned(),
5209                dimension,
5210            }
5211        }
5212    }
5213
5214    impl crate::embedder::QueryEmbedder for StubEmbedder {
5215        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5216            Ok(vec![0.0; self.dimension])
5217        }
5218        fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5219            crate::embedder::QueryEmbedderIdentity {
5220                model_identity: self.model_identity.clone(),
5221                model_version: "1.0".to_owned(),
5222                dimension: self.dimension,
5223                normalization_policy: "l2".to_owned(),
5224            }
5225        }
5226    }
5227
5228    fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5229        let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5230        conn.execute_batch(
5231            "CREATE TABLE IF NOT EXISTS projection_profiles (
5232                kind TEXT NOT NULL,
5233                facet TEXT NOT NULL,
5234                config_json TEXT NOT NULL,
5235                active_at INTEGER,
5236                created_at INTEGER,
5237                PRIMARY KEY (kind, facet)
5238            );",
5239        )
5240        .expect("create projection_profiles");
5241        conn
5242    }
5243
5244    #[test]
5245    fn check_vec_identity_no_profile_no_panic() {
5246        let conn = make_in_memory_db_with_projection_profiles();
5247        let embedder = StubEmbedder::new("bge-small", 384);
5248        let result = super::check_vec_identity_at_open(&conn, &embedder);
5249        assert!(result.is_ok(), "no profile row must return Ok(())");
5250    }
5251
5252    #[test]
5253    fn check_vec_identity_matching_identity_ok() {
5254        let conn = make_in_memory_db_with_projection_profiles();
5255        conn.execute(
5256            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5257             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5258            [],
5259        )
5260        .expect("insert profile");
5261        let embedder = StubEmbedder::new("bge-small", 384);
5262        let result = super::check_vec_identity_at_open(&conn, &embedder);
5263        assert!(result.is_ok(), "matching profile must return Ok(())");
5264    }
5265
5266    #[test]
5267    fn check_vec_identity_mismatched_dimensions_ok() {
5268        let conn = make_in_memory_db_with_projection_profiles();
5269        conn.execute(
5270            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5271             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5272            [],
5273        )
5274        .expect("insert profile");
5275        // embedder reports 768, profile says 384 — should warn but return Ok(())
5276        let embedder = StubEmbedder::new("bge-small", 768);
5277        let result = super::check_vec_identity_at_open(&conn, &embedder);
5278        assert!(
5279            result.is_ok(),
5280            "dimension mismatch must warn and return Ok(())"
5281        );
5282    }
5283
5284    #[test]
5285    fn custom_tokenizer_passthrough() {
5286        use super::TokenizerStrategy;
5287        let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5288        // Custom strategy is just stored — it doesn't modify queries.
5289        assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5290        // Verify it does not match any known variant.
5291        assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5292        assert_ne!(strategy, TokenizerStrategy::SourceCode);
5293    }
5294
5295    #[test]
5296    fn check_vec_identity_mismatched_model_ok() {
5297        let conn = make_in_memory_db_with_projection_profiles();
5298        conn.execute(
5299            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5300             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5301            [],
5302        )
5303        .expect("insert profile");
5304        // embedder reports bge-large, profile says bge-small — should warn but return Ok(())
5305        let embedder = StubEmbedder::new("bge-large", 384);
5306        let result = super::check_vec_identity_at_open(&conn, &embedder);
5307        assert!(
5308            result.is_ok(),
5309            "model_identity mismatch must warn and return Ok(())"
5310        );
5311    }
5312}