Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10    FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11    SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20/// Maximum number of cached shape-hash to SQL mappings before the cache is
21/// cleared entirely.  A clear-all strategy is simpler than partial eviction
22/// and the cost of re-compiling on a miss is negligible.
23const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25/// Maximum number of root IDs per batched expansion query.  Kept well below
26/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
27/// the edge-kind parameter.  Larger root sets are chunked into multiple
28/// batches of this size rather than falling back to per-root queries.
29const BATCH_CHUNK_SIZE: usize = 200;
30
31/// Compile an optional expansion-slot target-side filter predicate into a SQL
32/// fragment and bind values for injection into the `numbered` CTE's WHERE clause.
33///
34/// Returns `("", vec![])` when `filter` is `None` — preserving byte-for-byte
35/// identical SQL to pre-Pack-3 behavior. When `Some(predicate)`, returns an
36/// `AND …` fragment and the corresponding bind values starting at `first_param`.
37///
38/// Only `JsonPathEq`, `JsonPathCompare`, `JsonPathFusedEq`, and
39/// `JsonPathFusedTimestampCmp` are supported here; each variant targets the
40/// `n.properties` column already present in the `numbered` CTE join.
41/// Column-direct predicates (`KindEq`, `LogicalIdEq`, etc.) reference `n.kind`
42/// and similar columns that are also available in the `numbered` CTE.
43fn compile_expansion_filter(
44    filter: Option<&Predicate>,
45    first_param: usize,
46) -> (String, Vec<Value>) {
47    let Some(predicate) = filter else {
48        return (String::new(), vec![]);
49    };
50    let p = first_param;
51    match predicate {
52        Predicate::JsonPathEq { path, value } => {
53            let val = match value {
54                ScalarValue::Text(t) => Value::Text(t.clone()),
55                ScalarValue::Integer(i) => Value::Integer(*i),
56                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
57            };
58            (
59                format!(
60                    "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
61                    p + 1
62                ),
63                vec![Value::Text(path.clone()), val],
64            )
65        }
66        Predicate::JsonPathCompare { path, op, value } => {
67            let val = match value {
68                ScalarValue::Text(t) => Value::Text(t.clone()),
69                ScalarValue::Integer(i) => Value::Integer(*i),
70                ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
71            };
72            let operator = match op {
73                ComparisonOp::Gt => ">",
74                ComparisonOp::Gte => ">=",
75                ComparisonOp::Lt => "<",
76                ComparisonOp::Lte => "<=",
77            };
78            (
79                format!(
80                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
81                    p + 1
82                ),
83                vec![Value::Text(path.clone()), val],
84            )
85        }
86        Predicate::JsonPathFusedEq { path, value } => (
87            format!(
88                "\n                  AND json_extract(n.properties, ?{p}) = ?{}",
89                p + 1
90            ),
91            vec![Value::Text(path.clone()), Value::Text(value.clone())],
92        ),
93        Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
94            let operator = match op {
95                ComparisonOp::Gt => ">",
96                ComparisonOp::Gte => ">=",
97                ComparisonOp::Lt => "<",
98                ComparisonOp::Lte => "<=",
99            };
100            (
101                format!(
102                    "\n                  AND json_extract(n.properties, ?{p}) {operator} ?{}",
103                    p + 1
104                ),
105                vec![Value::Text(path.clone()), Value::Integer(*value)],
106            )
107        }
108        Predicate::KindEq(kind) => (
109            format!("\n                  AND n.kind = ?{p}"),
110            vec![Value::Text(kind.clone())],
111        ),
112        Predicate::LogicalIdEq(logical_id) => (
113            format!("\n                  AND n.logical_id = ?{p}"),
114            vec![Value::Text(logical_id.clone())],
115        ),
116        Predicate::SourceRefEq(source_ref) => (
117            format!("\n                  AND n.source_ref = ?{p}"),
118            vec![Value::Text(source_ref.clone())],
119        ),
120        Predicate::ContentRefEq(uri) => (
121            format!("\n                  AND n.content_ref = ?{p}"),
122            vec![Value::Text(uri.clone())],
123        ),
124        Predicate::ContentRefNotNull => (
125            "\n                  AND n.content_ref IS NOT NULL".to_owned(),
126            vec![],
127        ),
128    }
129}
130
131/// FTS tokenizer strategy for a given node kind.
132///
133/// Loaded at coordinator open time from `projection_profiles` and used
134/// during query dispatch to apply per-kind query adaptations.
135#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum TokenizerStrategy {
137    /// Porter stemming + unicode61, optimized for English recall.
138    RecallOptimizedEnglish,
139    /// Unicode61 without stemming, optimized for precision.
140    PrecisionOptimized,
141    /// Trigram tokenizer — queries shorter than 3 chars are skipped.
142    SubstringTrigram,
143    /// ICU tokenizer for global / CJK text.
144    GlobalCjk,
145    /// Unicode61 with custom token chars — query special chars are escaped.
146    SourceCode,
147    /// Any other tokenizer string.
148    Custom(String),
149}
150
151impl TokenizerStrategy {
152    /// Map a raw tokenizer config string (as stored in `projection_profiles`)
153    /// to the corresponding strategy variant.
154    pub fn from_str(s: &str) -> Self {
155        match s {
156            "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
157            "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
158            "trigram" => Self::SubstringTrigram,
159            "icu" => Self::GlobalCjk,
160            s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
161            other => Self::Custom(other.to_string()),
162        }
163    }
164}
165
166/// A pool of read-only `SQLite` connections for concurrent read access.
167///
168/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
169/// proceed in parallel when they happen to grab different slots.
170struct ReadPool {
171    connections: Vec<Mutex<Connection>>,
172}
173
174impl fmt::Debug for ReadPool {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        f.debug_struct("ReadPool")
177            .field("size", &self.connections.len())
178            .finish()
179    }
180}
181
182impl ReadPool {
183    /// Open `pool_size` read-only connections to the database at `path`.
184    ///
185    /// Each connection has PRAGMAs initialized via
186    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
187    /// feature is enabled and `vector_enabled` is true, the vec extension
188    /// auto-loaded.
189    ///
190    /// # Errors
191    ///
192    /// Returns [`EngineError`] if any connection fails to open or initialize.
193    fn new(
194        db_path: &Path,
195        pool_size: usize,
196        schema_manager: &SchemaManager,
197        vector_enabled: bool,
198    ) -> Result<Self, EngineError> {
199        let mut connections = Vec::with_capacity(pool_size);
200        for _ in 0..pool_size {
201            let conn = if vector_enabled {
202                #[cfg(feature = "sqlite-vec")]
203                {
204                    sqlite::open_readonly_connection_with_vec(db_path)?
205                }
206                #[cfg(not(feature = "sqlite-vec"))]
207                {
208                    sqlite::open_readonly_connection(db_path)?
209                }
210            } else {
211                sqlite::open_readonly_connection(db_path)?
212            };
213            schema_manager
214                .initialize_reader_connection(&conn)
215                .map_err(EngineError::Schema)?;
216            connections.push(Mutex::new(conn));
217        }
218        Ok(Self { connections })
219    }
220
221    /// Acquire a connection from the pool.
222    ///
223    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
224    /// If every slot is held, falls back to a blocking lock on the first slot.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
229    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
230        // Fast path: try each connection without blocking.
231        for conn in &self.connections {
232            if let Ok(guard) = conn.try_lock() {
233                return Ok(guard);
234            }
235        }
236        // Fallback: block on the first connection.
237        self.connections[0].lock().map_err(|_| {
238            trace_error!("read pool: connection mutex poisoned");
239            EngineError::Bridge("connection mutex poisoned".to_owned())
240        })
241    }
242
243    /// Return the number of connections in the pool.
244    #[cfg(test)]
245    fn size(&self) -> usize {
246        self.connections.len()
247    }
248}
249
250/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
251///
252/// This is a read-only introspection struct. It does not execute SQL.
253#[derive(Clone, Debug, PartialEq, Eq)]
254pub struct QueryPlan {
255    pub sql: String,
256    pub bind_count: usize,
257    pub driving_table: DrivingTable,
258    pub shape_hash: ShapeHash,
259    pub cache_hit: bool,
260}
261
262/// A single node row returned from a query.
263#[derive(Clone, Debug, PartialEq, Eq)]
264pub struct NodeRow {
265    /// Physical row ID.
266    pub row_id: String,
267    /// Logical ID of the node.
268    pub logical_id: String,
269    /// Node kind.
270    pub kind: String,
271    /// JSON-encoded node properties.
272    pub properties: String,
273    /// Optional URI referencing external content.
274    pub content_ref: Option<String>,
275    /// Unix timestamp of last access, if tracked.
276    pub last_accessed_at: Option<i64>,
277}
278
279/// A single run row returned from a query.
280#[derive(Clone, Debug, PartialEq, Eq)]
281pub struct RunRow {
282    /// Unique run ID.
283    pub id: String,
284    /// Run kind.
285    pub kind: String,
286    /// Current status.
287    pub status: String,
288    /// JSON-encoded run properties.
289    pub properties: String,
290}
291
292/// A single step row returned from a query.
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct StepRow {
295    /// Unique step ID.
296    pub id: String,
297    /// ID of the parent run.
298    pub run_id: String,
299    /// Step kind.
300    pub kind: String,
301    /// Current status.
302    pub status: String,
303    /// JSON-encoded step properties.
304    pub properties: String,
305}
306
307/// A single action row returned from a query.
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct ActionRow {
310    /// Unique action ID.
311    pub id: String,
312    /// ID of the parent step.
313    pub step_id: String,
314    /// Action kind.
315    pub kind: String,
316    /// Current status.
317    pub status: String,
318    /// JSON-encoded action properties.
319    pub properties: String,
320}
321
322/// A single row from the `provenance_events` table.
323#[derive(Clone, Debug, PartialEq, Eq)]
324pub struct ProvenanceEvent {
325    pub id: String,
326    pub event_type: String,
327    pub subject: String,
328    pub source_ref: Option<String>,
329    pub metadata_json: String,
330    pub created_at: i64,
331}
332
333/// Result set from executing a flat (non-grouped) compiled query.
334#[derive(Clone, Debug, Default, PartialEq, Eq)]
335pub struct QueryRows {
336    /// Matched node rows.
337    pub nodes: Vec<NodeRow>,
338    /// Runs associated with the matched nodes.
339    pub runs: Vec<RunRow>,
340    /// Steps associated with the matched runs.
341    pub steps: Vec<StepRow>,
342    /// Actions associated with the matched steps.
343    pub actions: Vec<ActionRow>,
344    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
345    /// to degrade to an empty result instead of propagating an error.
346    pub was_degraded: bool,
347}
348
349/// Expansion results for a single root node within a grouped query.
350#[derive(Clone, Debug, PartialEq, Eq)]
351pub struct ExpansionRootRows {
352    /// Logical ID of the root node that seeded this expansion.
353    pub root_logical_id: String,
354    /// Nodes reached by traversing from the root.
355    pub nodes: Vec<NodeRow>,
356}
357
358/// All expansion results for a single named slot across all roots.
359#[derive(Clone, Debug, PartialEq, Eq)]
360pub struct ExpansionSlotRows {
361    /// Name of the expansion slot.
362    pub slot: String,
363    /// Per-root expansion results.
364    pub roots: Vec<ExpansionRootRows>,
365}
366
367/// Result set from executing a grouped compiled query.
368#[derive(Clone, Debug, Default, PartialEq, Eq)]
369pub struct GroupedQueryRows {
370    /// Root node rows matched by the base query.
371    pub roots: Vec<NodeRow>,
372    /// Per-slot expansion results.
373    pub expansions: Vec<ExpansionSlotRows>,
374    /// `true` when a capability miss caused the query to degrade to an empty result.
375    pub was_degraded: bool,
376}
377
378/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
379pub struct ExecutionCoordinator {
380    database_path: PathBuf,
381    schema_manager: Arc<SchemaManager>,
382    pool: ReadPool,
383    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
384    vector_enabled: bool,
385    vec_degradation_warned: AtomicBool,
386    telemetry: Arc<TelemetryCounters>,
387    /// Phase 12.5a: optional read-time query embedder. When present,
388    /// [`Self::execute_retrieval_plan`] invokes it via
389    /// [`Self::fill_vector_branch`] after compile to populate
390    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
391    /// invariant on `search()` is preserved: the vector slot stays empty
392    /// and the coordinator's stage-gating check skips the vector branch.
393    query_embedder: Option<Arc<dyn QueryEmbedder>>,
394    /// Per-kind FTS tokenizer strategies loaded from `projection_profiles`
395    /// at open time. Used during query dispatch for query-side adaptations
396    /// (e.g. short-query skip for trigram, token escaping for source-code).
397    ///
398    /// **Stale-state note**: This map is populated once at `open()` and is
399    /// never refreshed during the lifetime of the coordinator.  If
400    /// `AdminService::set_fts_profile` is called on a running engine, the
401    /// in-memory strategy map will not reflect the change until the engine is
402    /// reopened.  The DB-side profile row is always up-to-date; only the
403    /// query-side adaptation (e.g. the trigram short-query guard) will be stale.
404    fts_strategies: HashMap<String, TokenizerStrategy>,
405}
406
407impl fmt::Debug for ExecutionCoordinator {
408    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409        f.debug_struct("ExecutionCoordinator")
410            .field("database_path", &self.database_path)
411            .finish_non_exhaustive()
412    }
413}
414
415impl ExecutionCoordinator {
416    /// # Errors
417    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
418    pub fn open(
419        path: impl AsRef<Path>,
420        schema_manager: Arc<SchemaManager>,
421        vector_dimension: Option<usize>,
422        pool_size: usize,
423        telemetry: Arc<TelemetryCounters>,
424        query_embedder: Option<Arc<dyn QueryEmbedder>>,
425    ) -> Result<Self, EngineError> {
426        let path = path.as_ref().to_path_buf();
427        #[cfg(feature = "sqlite-vec")]
428        let mut conn = if vector_dimension.is_some() {
429            sqlite::open_connection_with_vec(&path)?
430        } else {
431            sqlite::open_connection(&path)?
432        };
433        #[cfg(not(feature = "sqlite-vec"))]
434        let mut conn = sqlite::open_connection(&path)?;
435
436        let report = schema_manager.bootstrap(&conn)?;
437
438        // ----- Open-time rebuild guards for derived FTS state -----
439        //
440        // Property FTS data is derived state. Per-kind `fts_props_<kind>`
441        // virtual tables are NOT source of truth — they must be rebuildable
442        // from canonical `nodes.properties` + `fts_property_schemas` at any
443        // time. After migration 23 the global `fts_node_properties` table no
444        // longer exists; each registered kind has its own `fts_props_<kind>`
445        // table created at first write or first rebuild.
446        //
447        // Guard 1: if any registered kind's per-kind table is missing or empty
448        // while live nodes of that kind exist, do a synchronous full rebuild of
449        // all per-kind FTS tables and position map rows.
450        //
451        // Guard 2: if any recursive schema has a populated per-kind table but
452        // `fts_node_property_positions` is empty, do a synchronous full rebuild
453        // to regenerate the position map from canonical state.
454        //
455        // Both guards are no-ops on a consistent database.
456        run_open_time_fts_guards(&mut conn)?;
457
458        #[cfg(feature = "sqlite-vec")]
459        let mut vector_enabled = report.vector_profile_enabled;
460        #[cfg(not(feature = "sqlite-vec"))]
461        let vector_enabled = {
462            let _ = &report;
463            false
464        };
465
466        // 0.5.0: per-kind vec tables replace the global vec_nodes_active.
467        // The vector_dimension parameter now only sets the vector_enabled flag —
468        // no global table is created. Per-kind vec tables are created by
469        // `regenerate_vector_embeddings` at regen time.
470        if vector_dimension.is_some() {
471            #[cfg(feature = "sqlite-vec")]
472            {
473                vector_enabled = true;
474            }
475        }
476
477        // Vec identity guard: warn (never error) if the stored profile's
478        // model_identity or dimensions differ from the configured embedder.
479        if let Some(ref emb) = query_embedder {
480            check_vec_identity_at_open(&conn, emb.as_ref())?;
481        }
482
483        // Load FTS tokenizer strategies from projection_profiles
484        let fts_strategies: HashMap<String, TokenizerStrategy> = {
485            let mut map = HashMap::new();
486            let mut stmt = conn
487                .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
488            let rows = stmt.query_map([], |row| {
489                let kind: String = row.get(0)?;
490                let config_json: String = row.get(1)?;
491                Ok((kind, config_json))
492            })?;
493            for row in rows.flatten() {
494                let (kind, config_json) = row;
495                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
496                    && let Some(tok) = v["tokenizer"].as_str()
497                {
498                    map.insert(kind, TokenizerStrategy::from_str(tok));
499                }
500            }
501            map
502        };
503
504        // Drop the bootstrap connection — pool connections are used for reads.
505        drop(conn);
506
507        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
508
509        Ok(Self {
510            database_path: path,
511            schema_manager,
512            pool,
513            shape_sql_map: Mutex::new(HashMap::new()),
514            vector_enabled,
515            vec_degradation_warned: AtomicBool::new(false),
516            telemetry,
517            query_embedder,
518            fts_strategies,
519        })
520    }
521
522    /// Returns the filesystem path to the `SQLite` database.
523    pub fn database_path(&self) -> &Path {
524        &self.database_path
525    }
526
527    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
528    #[must_use]
529    pub fn vector_enabled(&self) -> bool {
530        self.vector_enabled
531    }
532
533    /// Returns the configured read-time query embedder, if any.
534    ///
535    /// The 0.4.0 write-time parity work reuses this same embedder for
536    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
537    /// so there is always exactly one source of truth for vector
538    /// identity per [`Engine`] instance.
539    #[must_use]
540    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
541        self.query_embedder.as_ref()
542    }
543
544    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
545        self.pool.acquire()
546    }
547
548    /// Aggregate `SQLite` page-cache counters across all pool connections.
549    ///
550    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
551    /// Connections that are currently locked by a query are skipped — this
552    /// is acceptable for statistical counters.
553    #[must_use]
554    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
555        let mut total = SqliteCacheStatus::default();
556        for conn_mutex in &self.pool.connections {
557            if let Ok(conn) = conn_mutex.try_lock() {
558                total.add(&read_db_cache_status(&conn));
559            }
560        }
561        total
562    }
563
564    /// # Errors
565    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
566    #[allow(clippy::expect_used, clippy::too_many_lines)]
567    pub fn execute_compiled_read(
568        &self,
569        compiled: &CompiledQuery,
570    ) -> Result<QueryRows, EngineError> {
571        // Scan fallback for first-registration async rebuild: if the query uses the
572        // FtsNodes driving table and the root kind has is_first_registration=1 with
573        // state IN ('PENDING','BUILDING'), the per-kind table has no rows yet.
574        // Route to a full-kind node scan so callers get results instead of empty.
575        if compiled.driving_table == DrivingTable::FtsNodes
576            && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
577            && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
578        {
579            self.telemetry.increment_queries();
580            return Ok(QueryRows {
581                nodes,
582                runs: Vec::new(),
583                steps: Vec::new(),
584                actions: Vec::new(),
585                was_degraded: false,
586            });
587        }
588
589        // For FtsNodes queries the fathomdb-query compile path generates SQL that
590        // references the old global `fts_node_properties` table.  Since migration 23
591        // dropped that table, we rewrite the SQL and binds here to use the per-kind
592        // `fts_props_<kind>` table (or omit the property UNION arm entirely when the
593        // per-kind table does not yet exist).
594        //
595        // For VecNodes queries the fathomdb-query compile path generates SQL that
596        // hardcodes `vec_nodes_active`.  Since 0.5.0 uses per-kind tables, we rewrite
597        // `vec_nodes_active` to `vec_<root_kind>` here.
598        let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
599            let conn_check = match self.lock_connection() {
600                Ok(g) => g,
601                Err(e) => {
602                    self.telemetry.increment_errors();
603                    return Err(e);
604                }
605            };
606            let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
607            drop(conn_check);
608            result?
609        } else if compiled.driving_table == DrivingTable::VecNodes {
610            let root_kind = compiled
611                .binds
612                .get(1)
613                .and_then(|b| {
614                    if let BindValue::Text(k) = b {
615                        Some(k.as_str())
616                    } else {
617                        None
618                    }
619                })
620                .unwrap_or("");
621            let vec_table = if root_kind.is_empty() {
622                "vec__unknown".to_owned()
623            } else {
624                fathomdb_schema::vec_kind_table_name(root_kind)
625            };
626            let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
627            (new_sql, compiled.binds.clone())
628        } else {
629            (compiled.sql.clone(), compiled.binds.clone())
630        };
631
632        let row_sql = wrap_node_row_projection_sql(&adapted_sql);
633        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
634        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
635        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
636        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
637        // so we propagate EngineError there instead.
638        {
639            let mut cache = self
640                .shape_sql_map
641                .lock()
642                .unwrap_or_else(PoisonError::into_inner);
643            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
644                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
645                cache.clear();
646            }
647            cache.insert(compiled.shape_hash, row_sql.clone());
648        }
649
650        let bind_values = adapted_binds
651            .iter()
652            .map(bind_value_to_sql)
653            .collect::<Vec<_>>();
654
655        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
656        // shape_sql_map uses into_inner() (pure cache, safe to recover).
657        // conn uses map_err → EngineError (connection state may be corrupt after panic;
658        // into_inner() would risk using a connection with partial transaction state).
659        let conn_guard = match self.lock_connection() {
660            Ok(g) => g,
661            Err(e) => {
662                self.telemetry.increment_errors();
663                return Err(e);
664            }
665        };
666        let mut statement = match conn_guard.prepare_cached(&row_sql) {
667            Ok(stmt) => stmt,
668            Err(e) if is_vec_table_absent(&e) => {
669                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
670                    trace_warn!("vector table absent, degrading to non-vector query");
671                }
672                return Ok(QueryRows {
673                    was_degraded: true,
674                    ..Default::default()
675                });
676            }
677            Err(e) => {
678                self.telemetry.increment_errors();
679                return Err(EngineError::Sqlite(e));
680            }
681        };
682        let nodes = match statement
683            .query_map(params_from_iter(bind_values.iter()), |row| {
684                Ok(NodeRow {
685                    row_id: row.get(0)?,
686                    logical_id: row.get(1)?,
687                    kind: row.get(2)?,
688                    properties: row.get(3)?,
689                    content_ref: row.get(4)?,
690                    last_accessed_at: row.get(5)?,
691                })
692            })
693            .and_then(Iterator::collect)
694        {
695            Ok(rows) => rows,
696            Err(e) => {
697                self.telemetry.increment_errors();
698                return Err(EngineError::Sqlite(e));
699            }
700        };
701
702        self.telemetry.increment_queries();
703        Ok(QueryRows {
704            nodes,
705            runs: Vec::new(),
706            steps: Vec::new(),
707            actions: Vec::new(),
708            was_degraded: false,
709        })
710    }
711
712    /// Execute a compiled adaptive search and return matching hits.
713    ///
714    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
715    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
716    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
717    /// while residual predicates (JSON path filters) stay in the outer
718    /// `WHERE`. The chunk and property FTS
719    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
720    /// better matches), ordered, and limited. All hits return
721    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
722    /// later phases.
723    ///
724    /// # Errors
725    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
726    pub fn execute_compiled_search(
727        &self,
728        compiled: &CompiledSearch,
729    ) -> Result<SearchRows, EngineError> {
730        // Build the two-branch plan from the strict text query and delegate
731        // to the shared plan-execution routine. The relaxed branch is derived
732        // via `derive_relaxed` and only fires when strict returned fewer than
733        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
734        // "relaxed iff strict is empty," but the routine spells the rule out
735        // explicitly so raising K later is a one-line constant bump.
736        let (relaxed_query, was_degraded_at_plan_time) =
737            fathomdb_query::derive_relaxed(&compiled.text_query);
738        let relaxed = relaxed_query.map(|q| CompiledSearch {
739            root_kind: compiled.root_kind.clone(),
740            text_query: q,
741            limit: compiled.limit,
742            fusable_filters: compiled.fusable_filters.clone(),
743            residual_filters: compiled.residual_filters.clone(),
744            attribution_requested: compiled.attribution_requested,
745        });
746        let plan = CompiledSearchPlan {
747            strict: compiled.clone(),
748            relaxed,
749            was_degraded_at_plan_time,
750        };
751        self.execute_compiled_search_plan(&plan)
752    }
753
754    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
755    /// deduped result rows.
756    ///
757    /// This is the shared retrieval/merge routine that both
758    /// [`Self::execute_compiled_search`] (adaptive path) and
759    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
760    /// runs first; the relaxed branch only fires when it is present AND the
761    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
762    /// hits. Merge and dedup semantics are identical to the adaptive path
763    /// regardless of how the plan was constructed.
764    ///
765    /// Error contract: if the relaxed branch errors, the error propagates;
766    /// strict hits are not returned. This matches the rest of the engine's
767    /// fail-hard posture.
768    ///
769    /// # Errors
770    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
771    /// executed.
772    pub fn execute_compiled_search_plan(
773        &self,
774        plan: &CompiledSearchPlan,
775    ) -> Result<SearchRows, EngineError> {
776        let strict = &plan.strict;
777        let limit = strict.limit;
778        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
779
780        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
781        let strict_underfilled = strict_hits.len() < fallback_threshold;
782
783        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
784        let mut fallback_used = false;
785        let mut was_degraded = false;
786        if let Some(relaxed) = plan.relaxed.as_ref()
787            && strict_underfilled
788        {
789            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
790            fallback_used = true;
791            was_degraded = plan.was_degraded_at_plan_time;
792        }
793
794        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
795        // Attribution runs AFTER dedup so that duplicate hits dropped by
796        // `merge_search_branches` do not waste a highlight+position-map
797        // lookup.
798        if strict.attribution_requested {
799            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
800            self.populate_attribution_for_hits(
801                &mut merged,
802                &strict.text_query,
803                relaxed_text_query,
804            )?;
805        }
806        let strict_hit_count = merged
807            .iter()
808            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
809            .count();
810        let relaxed_hit_count = merged
811            .iter()
812            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
813            .count();
814        // Phase 10: no vector execution path yet, so vector_hit_count is
815        // always zero. Future phases that wire a vector branch will
816        // contribute here.
817        let vector_hit_count = 0;
818
819        Ok(SearchRows {
820            hits: merged,
821            strict_hit_count,
822            relaxed_hit_count,
823            vector_hit_count,
824            fallback_used,
825            was_degraded,
826        })
827    }
828
829    /// Execute a compiled vector-only search and return matching hits.
830    ///
831    /// Phase 11 delivers the standalone vector retrieval path. The emitted
832    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
833    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
834    /// into the candidate CTE. The outer `SELECT` applies residual JSON
835    /// predicates and orders by score descending, where `score = -distance`
836    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
837    ///
838    /// ## Capability-miss handling
839    ///
840    /// If the `sqlite-vec` capability is absent (feature disabled or the
841    /// `vec_nodes_active` virtual table has not been created because the
842    /// engine was not opened with a `vector_dimension`), this method returns
843    /// an empty [`SearchRows`] with `was_degraded = true`. This is
844    /// **non-fatal** — the error does not propagate — matching the addendum's
845    /// §Vector-Specific Behavior / Degradation.
846    ///
847    /// ## Attribution
848    ///
849    /// When `compiled.attribution_requested == true`, every returned hit
850    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
851    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
852    /// extended uniformly).
853    ///
854    /// # Errors
855    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
856    /// executed for reasons other than a vec-table capability miss.
857    #[allow(clippy::too_many_lines)]
858    pub fn execute_compiled_vector_search(
859        &self,
860        compiled: &CompiledVectorSearch,
861    ) -> Result<SearchRows, EngineError> {
862        use std::fmt::Write as _;
863
864        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
865        // empty result rather than a SQL error from `LIMIT 0` semantics in
866        // the inner vec0 scan.
867        if compiled.limit == 0 {
868            return Ok(SearchRows::default());
869        }
870
871        let filter_by_kind = !compiled.root_kind.is_empty();
872        let mut binds: Vec<BindValue> = Vec::new();
873        binds.push(BindValue::Text(compiled.query_text.clone()));
874        if filter_by_kind {
875            binds.push(BindValue::Text(compiled.root_kind.clone()));
876        }
877
878        // Build fusable-filter clauses, aliased against `src` inside the
879        // candidate CTE. Same predicate set the text path fuses.
880        let mut fused_clauses = String::new();
881        for predicate in &compiled.fusable_filters {
882            match predicate {
883                Predicate::KindEq(kind) => {
884                    binds.push(BindValue::Text(kind.clone()));
885                    let idx = binds.len();
886                    let _ = write!(
887                        fused_clauses,
888                        "\n                      AND src.kind = ?{idx}"
889                    );
890                }
891                Predicate::LogicalIdEq(logical_id) => {
892                    binds.push(BindValue::Text(logical_id.clone()));
893                    let idx = binds.len();
894                    let _ = write!(
895                        fused_clauses,
896                        "\n                      AND src.logical_id = ?{idx}"
897                    );
898                }
899                Predicate::SourceRefEq(source_ref) => {
900                    binds.push(BindValue::Text(source_ref.clone()));
901                    let idx = binds.len();
902                    let _ = write!(
903                        fused_clauses,
904                        "\n                      AND src.source_ref = ?{idx}"
905                    );
906                }
907                Predicate::ContentRefEq(uri) => {
908                    binds.push(BindValue::Text(uri.clone()));
909                    let idx = binds.len();
910                    let _ = write!(
911                        fused_clauses,
912                        "\n                      AND src.content_ref = ?{idx}"
913                    );
914                }
915                Predicate::ContentRefNotNull => {
916                    fused_clauses
917                        .push_str("\n                      AND src.content_ref IS NOT NULL");
918                }
919                Predicate::JsonPathFusedEq { path, value } => {
920                    binds.push(BindValue::Text(path.clone()));
921                    let path_idx = binds.len();
922                    binds.push(BindValue::Text(value.clone()));
923                    let value_idx = binds.len();
924                    let _ = write!(
925                        fused_clauses,
926                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
927                    );
928                }
929                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
930                    binds.push(BindValue::Text(path.clone()));
931                    let path_idx = binds.len();
932                    binds.push(BindValue::Integer(*value));
933                    let value_idx = binds.len();
934                    let operator = match op {
935                        ComparisonOp::Gt => ">",
936                        ComparisonOp::Gte => ">=",
937                        ComparisonOp::Lt => "<",
938                        ComparisonOp::Lte => "<=",
939                    };
940                    let _ = write!(
941                        fused_clauses,
942                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
943                    );
944                }
945                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
946                    // JSON predicates are residual; compile_vector_search
947                    // guarantees they never appear here, but stay defensive.
948                }
949            }
950        }
951
952        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
953        let mut filter_clauses = String::new();
954        for predicate in &compiled.residual_filters {
955            match predicate {
956                Predicate::JsonPathEq { path, value } => {
957                    binds.push(BindValue::Text(path.clone()));
958                    let path_idx = binds.len();
959                    binds.push(scalar_to_bind(value));
960                    let value_idx = binds.len();
961                    let _ = write!(
962                        filter_clauses,
963                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
964                    );
965                }
966                Predicate::JsonPathCompare { path, op, value } => {
967                    binds.push(BindValue::Text(path.clone()));
968                    let path_idx = binds.len();
969                    binds.push(scalar_to_bind(value));
970                    let value_idx = binds.len();
971                    let operator = match op {
972                        ComparisonOp::Gt => ">",
973                        ComparisonOp::Gte => ">=",
974                        ComparisonOp::Lt => "<",
975                        ComparisonOp::Lte => "<=",
976                    };
977                    let _ = write!(
978                        filter_clauses,
979                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
980                    );
981                }
982                Predicate::KindEq(_)
983                | Predicate::LogicalIdEq(_)
984                | Predicate::SourceRefEq(_)
985                | Predicate::ContentRefEq(_)
986                | Predicate::ContentRefNotNull
987                | Predicate::JsonPathFusedEq { .. }
988                | Predicate::JsonPathFusedTimestampCmp { .. } => {
989                    // Fusable predicates live in fused_clauses above.
990                }
991            }
992        }
993
994        // Bind the outer limit as a named parameter for prepare_cached
995        // stability across calls that vary only by limit value.
996        let limit = compiled.limit;
997        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
998        let limit_idx = binds.len();
999
1000        // sqlite-vec requires the LIMIT/k constraint to be visible directly
1001        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
1002        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
1003        // Phase 12's planner may raise this to compensate for fusion
1004        // narrowing the candidate pool).
1005        let base_limit = limit;
1006        let kind_clause = if filter_by_kind {
1007            "\n                      AND src.kind = ?2"
1008        } else {
1009            ""
1010        };
1011
1012        // Derive the per-kind vec table name from the query's root kind.
1013        // An empty root_kind defaults to a sentinel that will not exist, causing
1014        // graceful degradation (was_degraded=true).
1015        let vec_table = if compiled.root_kind.is_empty() {
1016            "vec__unknown".to_owned()
1017        } else {
1018            fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1019        };
1020
1021        let sql = format!(
1022            "WITH vector_hits AS (
1023                SELECT
1024                    src.row_id AS row_id,
1025                    src.logical_id AS logical_id,
1026                    src.kind AS kind,
1027                    src.properties AS properties,
1028                    src.source_ref AS source_ref,
1029                    src.content_ref AS content_ref,
1030                    src.created_at AS created_at,
1031                    vc.distance AS distance,
1032                    vc.chunk_id AS chunk_id
1033                FROM (
1034                    SELECT chunk_id, distance
1035                    FROM {vec_table}
1036                    WHERE embedding MATCH ?1
1037                    LIMIT {base_limit}
1038                ) vc
1039                JOIN chunks c ON c.id = vc.chunk_id
1040                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1041                WHERE 1 = 1{kind_clause}{fused_clauses}
1042            )
1043            SELECT
1044                h.row_id,
1045                h.logical_id,
1046                h.kind,
1047                h.properties,
1048                h.content_ref,
1049                am.last_accessed_at,
1050                h.created_at,
1051                h.distance,
1052                h.chunk_id
1053            FROM vector_hits h
1054            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1055            WHERE 1 = 1{filter_clauses}
1056            ORDER BY h.distance ASC
1057            LIMIT ?{limit_idx}"
1058        );
1059
1060        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1061
1062        let conn_guard = match self.lock_connection() {
1063            Ok(g) => g,
1064            Err(e) => {
1065                self.telemetry.increment_errors();
1066                return Err(e);
1067            }
1068        };
1069        let mut statement = match conn_guard.prepare_cached(&sql) {
1070            Ok(stmt) => stmt,
1071            Err(e) if is_vec_table_absent(&e) => {
1072                // Capability miss: non-fatal — surface as was_degraded.
1073                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1074                    trace_warn!("vector table absent, degrading vector_search to empty result");
1075                }
1076                return Ok(SearchRows {
1077                    hits: Vec::new(),
1078                    strict_hit_count: 0,
1079                    relaxed_hit_count: 0,
1080                    vector_hit_count: 0,
1081                    fallback_used: false,
1082                    was_degraded: true,
1083                });
1084            }
1085            Err(e) => {
1086                self.telemetry.increment_errors();
1087                return Err(EngineError::Sqlite(e));
1088            }
1089        };
1090
1091        let attribution_requested = compiled.attribution_requested;
1092        let hits = match statement
1093            .query_map(params_from_iter(bind_values.iter()), |row| {
1094                let distance: f64 = row.get(7)?;
1095                // Score is the negated distance per addendum 1
1096                // §Vector-Specific Behavior / Score and distance. For
1097                // distance metrics (sqlite-vec's default) lower distance =
1098                // better match, so negating yields the higher-is-better
1099                // convention that dedup_branch_hits and the unified result
1100                // surface rely on.
1101                let score = -distance;
1102                Ok(SearchHit {
1103                    node: fathomdb_query::NodeRowLite {
1104                        row_id: row.get(0)?,
1105                        logical_id: row.get(1)?,
1106                        kind: row.get(2)?,
1107                        properties: row.get(3)?,
1108                        content_ref: row.get(4)?,
1109                        last_accessed_at: row.get(5)?,
1110                    },
1111                    written_at: row.get(6)?,
1112                    score,
1113                    modality: RetrievalModality::Vector,
1114                    source: SearchHitSource::Vector,
1115                    // Vector hits have no strict/relaxed notion.
1116                    match_mode: None,
1117                    // Vector hits have no snippet.
1118                    snippet: None,
1119                    projection_row_id: row.get::<_, Option<String>>(8)?,
1120                    vector_distance: Some(distance),
1121                    attribution: if attribution_requested {
1122                        Some(HitAttribution {
1123                            matched_paths: Vec::new(),
1124                        })
1125                    } else {
1126                        None
1127                    },
1128                })
1129            })
1130            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1131        {
1132            Ok(rows) => rows,
1133            Err(e) => {
1134                // Some SQLite errors surface during row iteration (e.g. when
1135                // the vec0 extension is not loaded but the table exists as a
1136                // stub). Classify as capability-miss when the shape matches.
1137                if is_vec_table_absent(&e) {
1138                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1139                        trace_warn!(
1140                            "vector table absent at query time, degrading vector_search to empty result"
1141                        );
1142                    }
1143                    drop(statement);
1144                    drop(conn_guard);
1145                    return Ok(SearchRows {
1146                        hits: Vec::new(),
1147                        strict_hit_count: 0,
1148                        relaxed_hit_count: 0,
1149                        vector_hit_count: 0,
1150                        fallback_used: false,
1151                        was_degraded: true,
1152                    });
1153                }
1154                self.telemetry.increment_errors();
1155                return Err(EngineError::Sqlite(e));
1156            }
1157        };
1158
1159        drop(statement);
1160        drop(conn_guard);
1161
1162        self.telemetry.increment_queries();
1163        let vector_hit_count = hits.len();
1164        Ok(SearchRows {
1165            hits,
1166            strict_hit_count: 0,
1167            relaxed_hit_count: 0,
1168            vector_hit_count,
1169            fallback_used: false,
1170            was_degraded: false,
1171        })
1172    }
1173
1174    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1175    /// entry point) and return deterministically ranked, block-ordered
1176    /// [`SearchRows`].
1177    ///
1178    /// Stages, per addendum 1 §Retrieval Planner Model:
1179    ///
1180    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1181    ///    empty branch result inside `run_search_branch`).
1182    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1183    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1184    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1185    ///    Phase 6 text-only path.
1186    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1187    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1188    ///    planner never wires a vector branch through `search()`, so this
1189    ///    code path is structurally present but dormant.** A future phase
1190    ///    that wires read-time embedding into `compile_retrieval_plan` will
1191    ///    immediately light it up.
1192    /// 4. **Fusion.** All collected hits are merged via
1193    ///    [`merge_search_branches_three`], which produces strict ->
1194    ///    relaxed -> vector block ordering with cross-branch dedup
1195    ///    resolved by branch precedence.
1196    ///
1197    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1198    /// addendum's "vector capability miss => `was_degraded`" semantics
1199    /// applies to `search()` only when the unified planner actually fires
1200    /// the vector branch, which v1 never does.
1201    ///
1202    /// # Errors
1203    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1204    /// executed for a non-capability-miss reason.
1205    pub fn execute_retrieval_plan(
1206        &self,
1207        plan: &CompiledRetrievalPlan,
1208        raw_query: &str,
1209    ) -> Result<SearchRows, EngineError> {
1210        // Phase 12.5a: we work against a local owned copy so
1211        // `fill_vector_branch` can mutate `plan.vector` and
1212        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1213        // a bounded set of predicates + text AST nodes) and avoids
1214        // forcing callers to hand us `&mut` access.
1215        let mut plan = plan.clone();
1216        let limit = plan.text.strict.limit;
1217
1218        // Stage 1: text strict.
1219        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1220
1221        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1222        // path uses.
1223        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1224        let strict_underfilled = strict_hits.len() < fallback_threshold;
1225        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1226        let mut fallback_used = false;
1227        let mut was_degraded = false;
1228        if let Some(relaxed) = plan.text.relaxed.as_ref()
1229            && strict_underfilled
1230        {
1231            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1232            fallback_used = true;
1233            was_degraded = plan.was_degraded_at_plan_time;
1234        }
1235
1236        // Phase 12.5a: fill the vector branch from the configured
1237        // read-time query embedder, if any. Option (b) from the spec:
1238        // only pay the embedding cost when the text branches returned
1239        // nothing, because the three-branch stage gate below only runs
1240        // the vector stage under exactly that condition. This keeps the
1241        // hot path (strict text matched) embedder-free.
1242        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1243        if text_branches_empty && self.query_embedder.is_some() {
1244            self.fill_vector_branch(&mut plan, raw_query);
1245        }
1246
1247        // Stage 3: vector. Runs only when text retrieval is empty AND a
1248        // vector branch is present. When no embedder is configured (Phase
1249        // 12.5a default) `plan.vector` stays `None` and this stage is a
1250        // no-op, preserving the Phase 12 v1 dormancy invariant.
1251        let mut vector_hits: Vec<SearchHit> = Vec::new();
1252        if let Some(vector) = plan.vector.as_ref()
1253            && strict_hits.is_empty()
1254            && relaxed_hits.is_empty()
1255        {
1256            let vector_rows = self.execute_compiled_vector_search(vector)?;
1257            // `execute_compiled_vector_search` returns a fully populated
1258            // `SearchRows`. Promote its hits into the merge stage and lift
1259            // its capability-miss `was_degraded` flag onto the unified
1260            // result, per addendum §Vector-Specific Behavior.
1261            vector_hits = vector_rows.hits;
1262            if vector_rows.was_degraded {
1263                was_degraded = true;
1264            }
1265        }
1266        // Phase 12.5a: an embedder-reported capability miss surfaces as
1267        // `plan.was_degraded_at_plan_time = true` set inside
1268        // `fill_vector_branch`. Lift it onto the response so callers see
1269        // the graceful degradation even when the vector stage-gate never
1270        // had the chance to fire (the embedder call itself failed and
1271        // the vector slot stayed `None`).
1272        if text_branches_empty
1273            && plan.was_degraded_at_plan_time
1274            && plan.vector.is_none()
1275            && self.query_embedder.is_some()
1276        {
1277            was_degraded = true;
1278        }
1279
1280        // Stage 4: fusion.
1281        let strict = &plan.text.strict;
1282        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1283        if strict.attribution_requested {
1284            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1285            self.populate_attribution_for_hits(
1286                &mut merged,
1287                &strict.text_query,
1288                relaxed_text_query,
1289            )?;
1290        }
1291
1292        let strict_hit_count = merged
1293            .iter()
1294            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1295            .count();
1296        let relaxed_hit_count = merged
1297            .iter()
1298            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1299            .count();
1300        let vector_hit_count = merged
1301            .iter()
1302            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1303            .count();
1304
1305        Ok(SearchRows {
1306            hits: merged,
1307            strict_hit_count,
1308            relaxed_hit_count,
1309            vector_hit_count,
1310            fallback_used,
1311            was_degraded,
1312        })
1313    }
1314
1315    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1316    /// query embedder, if any.
1317    ///
1318    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1319    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1320    /// - Both the strict and relaxed text branches already ran and
1321    ///   returned zero hits, so the existing three-branch stage gate
1322    ///   will actually fire the vector stage once the slot is populated.
1323    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1324    ///   cost entirely when text retrieval already won.
1325    ///
1326    /// Contract: never panics, never returns an error. On embedder error
1327    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1328    /// `plan.vector` as `None`; the coordinator's normal error-free
1329    /// degradation path then reports `was_degraded` on the result.
1330    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1331        let Some(embedder) = self.query_embedder.as_ref() else {
1332            return;
1333        };
1334        match embedder.embed_query(raw_query) {
1335            Ok(vec) => {
1336                // `CompiledVectorSearch::query_text` is a JSON float-array
1337                // literal at the time the coordinator binds it (see the
1338                // `CompiledVectorSearch` docs). `serde_json::to_string`
1339                // on a `Vec<f32>` produces exactly that shape — no
1340                // wire-format change required.
1341                let literal = match serde_json::to_string(&vec) {
1342                    Ok(s) => s,
1343                    Err(err) => {
1344                        trace_warn!(
1345                            error = %err,
1346                            "query embedder vector serialization failed; skipping vector branch"
1347                        );
1348                        let _ = err; // Used by trace_warn! when tracing feature is active
1349                        plan.was_degraded_at_plan_time = true;
1350                        return;
1351                    }
1352                };
1353                let strict = &plan.text.strict;
1354                plan.vector = Some(CompiledVectorSearch {
1355                    root_kind: strict.root_kind.clone(),
1356                    query_text: literal,
1357                    limit: strict.limit,
1358                    fusable_filters: strict.fusable_filters.clone(),
1359                    residual_filters: strict.residual_filters.clone(),
1360                    attribution_requested: strict.attribution_requested,
1361                });
1362            }
1363            Err(err) => {
1364                trace_warn!(
1365                    error = %err,
1366                    "query embedder unavailable, skipping vector branch"
1367                );
1368                let _ = err; // Used by trace_warn! when tracing feature is active
1369                plan.was_degraded_at_plan_time = true;
1370            }
1371        }
1372    }
1373
1374    /// Execute a single search branch against the underlying FTS surfaces.
1375    ///
1376    /// This is the shared SQL emission path used by
1377    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1378    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1379    /// The returned hits are tagged with `branch`'s corresponding
1380    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1381    /// caller is responsible for merging multiple branches.
1382    #[allow(clippy::too_many_lines)]
1383    fn run_search_branch(
1384        &self,
1385        compiled: &CompiledSearch,
1386        branch: SearchBranch,
1387    ) -> Result<Vec<SearchHit>, EngineError> {
1388        use std::fmt::Write as _;
1389        // Short-circuit an empty/whitespace-only query: rendering it would
1390        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1391        // (including the adaptive path when strict is Empty and derive_relaxed
1392        // returns None) must see an empty result, not an error. Each branch
1393        // is short-circuited independently so a strict-Empty + relaxed-Some
1394        // plan still exercises the relaxed branch.
1395        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1396        // matches "every row not containing X" — a complement-of-corpus scan
1397        // that no caller would intentionally want. Short-circuit to empty at
1398        // the root only; a `Not` nested inside an `And` is a legitimate
1399        // exclusion and must still run.
1400        if matches!(
1401            compiled.text_query,
1402            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1403        ) {
1404            return Ok(Vec::new());
1405        }
1406        let rendered_base = render_text_query_fts5(&compiled.text_query);
1407        // Apply per-kind query-side tokenizer adaptations.
1408        // SubstringTrigram: queries shorter than 3 chars produce no results
1409        // (trigram index cannot match them). Return empty instead of an FTS5
1410        // error or a full-table scan.
1411        // SourceCode: render_text_query_fts5 already wraps every term in FTS5
1412        // double-quote delimiters (e.g. "std.io"). Applying escape_source_code_query
1413        // on that already-rendered output corrupts the expression — the escape
1414        // function sees the surrounding '"' characters and '.' separator and
1415        // produces malformed syntax like '"std"."io""'. The phrase quoting from
1416        // render_text_query_fts5 is sufficient: FTS5 applies the tokenizer
1417        // (including custom tokenchars) inside double-quoted phrase expressions,
1418        // so "std.io" correctly matches the single token 'std.io'.
1419        let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1420        if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1421            && rendered_base
1422                .chars()
1423                .filter(|c| c.is_alphanumeric())
1424                .count()
1425                < 3
1426        {
1427            return Ok(Vec::new());
1428        }
1429        let rendered = rendered_base;
1430        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1431        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1432        // The adaptive `text_search()` path never produces an empty root_kind
1433        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1434        // the entry point.
1435        let filter_by_kind = !compiled.root_kind.is_empty();
1436
1437        // Acquire the connection early so we can check per-kind table existence
1438        // before building the bind array and SQL. The bind numbering depends on
1439        // whether the property FTS UNION arm is included.
1440        let conn_guard = match self.lock_connection() {
1441            Ok(g) => g,
1442            Err(e) => {
1443                self.telemetry.increment_errors();
1444                return Err(e);
1445            }
1446        };
1447
1448        // Determine which per-kind property FTS tables to include in the UNION arm.
1449        //
1450        // filter_by_kind = true (root_kind set): include the single per-kind table
1451        //   for root_kind if it exists. Bind order: ?1=chunk_text, ?2=kind, ?3=prop_text.
1452        //
1453        // filter_by_kind = false (fallback search, root_kind empty): include per-kind tables
1454        //   based on fusable KindEq predicates or all registered tables from sqlite_master.
1455        //   A single KindEq in fusable_filters lets us use one specific table. With no KindEq
1456        //   we UNION all per-kind tables so kind-less fallback searches still find property
1457        //   hits (matching the behaviour of the former global fts_node_properties table).
1458        //   Bind order: ?1=chunk_text, ?2=prop_text (shared across all prop UNION arms).
1459        //
1460        // In both cases, per-kind tables are already kind-specific so no `fp.kind = ?` filter
1461        // is needed inside the inner arm; the outer `search_hits` CTE WHERE clause handles
1462        // any further kind narrowing via fused KindEq predicates.
1463        // prop_fts_tables is Vec<(kind, table_name)> so we can later look up per-kind
1464        // BM25 weights from fts_property_schemas when building the scoring expression.
1465        let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1466            let kind = compiled.root_kind.clone();
1467            let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1468            let exists: bool = conn_guard
1469                .query_row(
1470                    "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1471                    rusqlite::params![prop_table],
1472                    |_| Ok(true),
1473                )
1474                .optional()
1475                .map_err(EngineError::Sqlite)?
1476                .unwrap_or(false);
1477            if exists {
1478                vec![(kind, prop_table)]
1479            } else {
1480                vec![]
1481            }
1482        } else {
1483            // Fallback / kind-less search: find the right per-kind tables.
1484            // If there is exactly one KindEq in fusable_filters, use that kind's table.
1485            // Otherwise, include all registered per-kind tables from sqlite_master so
1486            // that kind-less fallback searches can still return property FTS hits.
1487            let kind_eq_values: Vec<String> = compiled
1488                .fusable_filters
1489                .iter()
1490                .filter_map(|p| match p {
1491                    Predicate::KindEq(k) => Some(k.clone()),
1492                    _ => None,
1493                })
1494                .collect();
1495            if kind_eq_values.len() == 1 {
1496                let kind = kind_eq_values[0].clone();
1497                let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1498                let exists: bool = conn_guard
1499                    .query_row(
1500                        "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1501                        rusqlite::params![prop_table],
1502                        |_| Ok(true),
1503                    )
1504                    .optional()
1505                    .map_err(EngineError::Sqlite)?
1506                    .unwrap_or(false);
1507                if exists {
1508                    vec![(kind, prop_table)]
1509                } else {
1510                    vec![]
1511                }
1512            } else {
1513                // No single KindEq: UNION all per-kind tables so kind-less fallback
1514                // searches behave like the former global fts_node_properties table.
1515                // Fetch registered kinds and compute/verify their per-kind table names.
1516                let mut stmt = conn_guard
1517                    .prepare("SELECT kind FROM fts_property_schemas")
1518                    .map_err(EngineError::Sqlite)?;
1519                let all_kinds: Vec<String> = stmt
1520                    .query_map([], |r| r.get::<_, String>(0))
1521                    .map_err(EngineError::Sqlite)?
1522                    .collect::<Result<Vec<_>, _>>()
1523                    .map_err(EngineError::Sqlite)?;
1524                drop(stmt);
1525                let mut result = Vec::new();
1526                for kind in all_kinds {
1527                    let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1528                    let exists: bool = conn_guard
1529                        .query_row(
1530                            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1531                            rusqlite::params![prop_table],
1532                            |_| Ok(true),
1533                        )
1534                        .optional()
1535                        .map_err(EngineError::Sqlite)?
1536                        .unwrap_or(false);
1537                    if exists {
1538                        result.push((kind, prop_table));
1539                    }
1540                }
1541                result
1542            }
1543        };
1544        let use_prop_fts = !prop_fts_tables.is_empty();
1545
1546        // Bind layout (before fused/residual predicates and limit):
1547        //   filter_by_kind = true,  use_prop_fts = true:  ?1=chunk_text, ?2=kind, ?3=prop_text
1548        //   filter_by_kind = true,  use_prop_fts = false: ?1=chunk_text, ?2=kind
1549        //   filter_by_kind = false, use_prop_fts = true:  ?1=chunk_text, ?2=prop_text
1550        //   filter_by_kind = false, use_prop_fts = false: ?1=chunk_text
1551        let mut binds: Vec<BindValue> = if filter_by_kind {
1552            if use_prop_fts {
1553                vec![
1554                    BindValue::Text(rendered.clone()),
1555                    BindValue::Text(compiled.root_kind.clone()),
1556                    BindValue::Text(rendered),
1557                ]
1558            } else {
1559                vec![
1560                    BindValue::Text(rendered.clone()),
1561                    BindValue::Text(compiled.root_kind.clone()),
1562                ]
1563            }
1564        } else if use_prop_fts {
1565            // fallback search with property FTS: ?1=chunk, ?2=prop (same query value)
1566            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1567        } else {
1568            vec![BindValue::Text(rendered)]
1569        };
1570
1571        // P2-5: both fusable and residual predicates now match against the
1572        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1573        // `u.content_ref`, `u.properties`) because the inner UNION arms
1574        // project the full active-row column set through the
1575        // `JOIN nodes src` already present in each arm. The previous
1576        // implementation re-joined `nodes hn` at the CTE level and
1577        // `nodes n` again at the outer SELECT, which was triple work on
1578        // the hot search path.
1579        let mut fused_clauses = String::new();
1580        for predicate in &compiled.fusable_filters {
1581            match predicate {
1582                Predicate::KindEq(kind) => {
1583                    binds.push(BindValue::Text(kind.clone()));
1584                    let idx = binds.len();
1585                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1586                }
1587                Predicate::LogicalIdEq(logical_id) => {
1588                    binds.push(BindValue::Text(logical_id.clone()));
1589                    let idx = binds.len();
1590                    let _ = write!(
1591                        fused_clauses,
1592                        "\n                  AND u.logical_id = ?{idx}"
1593                    );
1594                }
1595                Predicate::SourceRefEq(source_ref) => {
1596                    binds.push(BindValue::Text(source_ref.clone()));
1597                    let idx = binds.len();
1598                    let _ = write!(
1599                        fused_clauses,
1600                        "\n                  AND u.source_ref = ?{idx}"
1601                    );
1602                }
1603                Predicate::ContentRefEq(uri) => {
1604                    binds.push(BindValue::Text(uri.clone()));
1605                    let idx = binds.len();
1606                    let _ = write!(
1607                        fused_clauses,
1608                        "\n                  AND u.content_ref = ?{idx}"
1609                    );
1610                }
1611                Predicate::ContentRefNotNull => {
1612                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1613                }
1614                Predicate::JsonPathFusedEq { path, value } => {
1615                    binds.push(BindValue::Text(path.clone()));
1616                    let path_idx = binds.len();
1617                    binds.push(BindValue::Text(value.clone()));
1618                    let value_idx = binds.len();
1619                    let _ = write!(
1620                        fused_clauses,
1621                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1622                    );
1623                }
1624                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1625                    binds.push(BindValue::Text(path.clone()));
1626                    let path_idx = binds.len();
1627                    binds.push(BindValue::Integer(*value));
1628                    let value_idx = binds.len();
1629                    let operator = match op {
1630                        ComparisonOp::Gt => ">",
1631                        ComparisonOp::Gte => ">=",
1632                        ComparisonOp::Lt => "<",
1633                        ComparisonOp::Lte => "<=",
1634                    };
1635                    let _ = write!(
1636                        fused_clauses,
1637                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1638                    );
1639                }
1640                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1641                    // Should be in residual_filters; compile_search guarantees
1642                    // this, but stay defensive.
1643                }
1644            }
1645        }
1646
1647        let mut filter_clauses = String::new();
1648        for predicate in &compiled.residual_filters {
1649            match predicate {
1650                Predicate::JsonPathEq { path, value } => {
1651                    binds.push(BindValue::Text(path.clone()));
1652                    let path_idx = binds.len();
1653                    binds.push(scalar_to_bind(value));
1654                    let value_idx = binds.len();
1655                    let _ = write!(
1656                        filter_clauses,
1657                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1658                    );
1659                }
1660                Predicate::JsonPathCompare { path, op, value } => {
1661                    binds.push(BindValue::Text(path.clone()));
1662                    let path_idx = binds.len();
1663                    binds.push(scalar_to_bind(value));
1664                    let value_idx = binds.len();
1665                    let operator = match op {
1666                        ComparisonOp::Gt => ">",
1667                        ComparisonOp::Gte => ">=",
1668                        ComparisonOp::Lt => "<",
1669                        ComparisonOp::Lte => "<=",
1670                    };
1671                    let _ = write!(
1672                        filter_clauses,
1673                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1674                    );
1675                }
1676                Predicate::KindEq(_)
1677                | Predicate::LogicalIdEq(_)
1678                | Predicate::SourceRefEq(_)
1679                | Predicate::ContentRefEq(_)
1680                | Predicate::ContentRefNotNull
1681                | Predicate::JsonPathFusedEq { .. }
1682                | Predicate::JsonPathFusedTimestampCmp { .. } => {
1683                    // Fusable predicates live in fused_clauses; compile_search
1684                    // partitions them out of residual_filters.
1685                }
1686            }
1687        }
1688
1689        // Bind `limit` as an integer parameter rather than formatting it into
1690        // the SQL string. Interpolating the limit made the prepared-statement
1691        // SQL vary by limit value, so rusqlite's default 16-slot
1692        // `prepare_cached` cache thrashed for paginated callers that varied
1693        // limits per call. With the bind the SQL is structurally stable for
1694        // a given filter shape regardless of `limit` value.
1695        let limit = compiled.limit;
1696        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1697        let limit_idx = binds.len();
1698        // P2-5: the inner UNION arms project the full active-row column
1699        // set through `JOIN nodes src` (kind, row_id, source_ref,
1700        // content_ref, content_hash, created_at, properties). Both the
1701        // CTE's outer WHERE and the final SELECT consume those columns
1702        // directly, which eliminates the previous `JOIN nodes hn` at the
1703        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1704        // redundant joins on the hot search path. `src.superseded_at IS
1705        // NULL` in each arm already filters retired rows, which is what
1706        // the dropped outer joins used to do.
1707        //
1708        // Property FTS uses per-kind tables (fts_props_<kind>). One UNION arm
1709        // is generated per table in prop_fts_tables. The prop text bind index
1710        // is ?3 when filter_by_kind=true (after chunk_text and kind binds) or
1711        // ?2 when filter_by_kind=false (after chunk_text only). The same bind
1712        // position is reused by every prop arm, which is valid in SQLite.
1713        let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1714        let prop_arm_sql: String = if use_prop_fts {
1715            prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1716                // Load schema for this kind to compute BM25 weights.
1717                let bm25_expr = conn_guard
1718                    .query_row(
1719                        "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1720                        rusqlite::params![kind],
1721                        |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1722                    )
1723                    .ok()
1724                    .map_or_else(
1725                        || format!("bm25({prop_table})"),
1726                        |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1727                    );
1728                // For weighted (per-column) schemas text_content does not exist;
1729                // use an empty snippet rather than a column reference that would fail.
1730                let is_weighted = bm25_expr != format!("bm25({prop_table})");
1731                let snippet_expr = if is_weighted {
1732                    "'' AS snippet".to_owned()
1733                } else {
1734                    "substr(fp.text_content, 1, 200) AS snippet".to_owned()
1735                };
1736                let _ = write!(
1737                    acc,
1738                    "
1739                    UNION ALL
1740                    SELECT
1741                        src.row_id AS row_id,
1742                        fp.node_logical_id AS logical_id,
1743                        src.kind AS kind,
1744                        src.properties AS properties,
1745                        src.source_ref AS source_ref,
1746                        src.content_ref AS content_ref,
1747                        src.created_at AS created_at,
1748                        -{bm25_expr} AS score,
1749                        'property' AS source,
1750                        {snippet_expr},
1751                        CAST(fp.rowid AS TEXT) AS projection_row_id
1752                    FROM {prop_table} fp
1753                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1754                    WHERE {prop_table} MATCH ?{prop_bind_idx}"
1755                );
1756                acc
1757            })
1758        } else {
1759            String::new()
1760        };
1761        let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
1762            ("?1", "\n                      AND src.kind = ?2")
1763        } else {
1764            ("?1", "")
1765        };
1766        let sql = format!(
1767            "WITH search_hits AS (
1768                SELECT
1769                    u.row_id AS row_id,
1770                    u.logical_id AS logical_id,
1771                    u.kind AS kind,
1772                    u.properties AS properties,
1773                    u.source_ref AS source_ref,
1774                    u.content_ref AS content_ref,
1775                    u.created_at AS created_at,
1776                    u.score AS score,
1777                    u.source AS source,
1778                    u.snippet AS snippet,
1779                    u.projection_row_id AS projection_row_id
1780                FROM (
1781                    SELECT
1782                        src.row_id AS row_id,
1783                        c.node_logical_id AS logical_id,
1784                        src.kind AS kind,
1785                        src.properties AS properties,
1786                        src.source_ref AS source_ref,
1787                        src.content_ref AS content_ref,
1788                        src.created_at AS created_at,
1789                        -bm25(fts_nodes) AS score,
1790                        'chunk' AS source,
1791                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1792                        f.chunk_id AS projection_row_id
1793                    FROM fts_nodes f
1794                    JOIN chunks c ON c.id = f.chunk_id
1795                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1796                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
1797                ) u
1798                WHERE 1 = 1{fused_clauses}
1799                ORDER BY u.score DESC
1800                LIMIT ?{limit_idx}
1801            )
1802            SELECT
1803                h.row_id,
1804                h.logical_id,
1805                h.kind,
1806                h.properties,
1807                h.content_ref,
1808                am.last_accessed_at,
1809                h.created_at,
1810                h.score,
1811                h.source,
1812                h.snippet,
1813                h.projection_row_id
1814            FROM search_hits h
1815            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1816            WHERE 1 = 1{filter_clauses}
1817            ORDER BY h.score DESC"
1818        );
1819
1820        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1821
1822        let mut statement = match conn_guard.prepare_cached(&sql) {
1823            Ok(stmt) => stmt,
1824            Err(e) => {
1825                self.telemetry.increment_errors();
1826                return Err(EngineError::Sqlite(e));
1827            }
1828        };
1829
1830        let hits = match statement
1831            .query_map(params_from_iter(bind_values.iter()), |row| {
1832                let source_str: String = row.get(8)?;
1833                // The CTE emits only two literal values here: `'chunk'` and
1834                // `'property'`. Default to `Chunk` on anything unexpected so a
1835                // schema drift surfaces as a mislabelled hit rather than a
1836                // row-level error.
1837                let source = if source_str == "property" {
1838                    SearchHitSource::Property
1839                } else {
1840                    SearchHitSource::Chunk
1841                };
1842                let match_mode = match branch {
1843                    SearchBranch::Strict => SearchMatchMode::Strict,
1844                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1845                };
1846                Ok(SearchHit {
1847                    node: fathomdb_query::NodeRowLite {
1848                        row_id: row.get(0)?,
1849                        logical_id: row.get(1)?,
1850                        kind: row.get(2)?,
1851                        properties: row.get(3)?,
1852                        content_ref: row.get(4)?,
1853                        last_accessed_at: row.get(5)?,
1854                    },
1855                    written_at: row.get(6)?,
1856                    score: row.get(7)?,
1857                    // Phase 10: every branch currently emits text hits.
1858                    modality: RetrievalModality::Text,
1859                    source,
1860                    match_mode: Some(match_mode),
1861                    snippet: row.get(9)?,
1862                    projection_row_id: row.get(10)?,
1863                    vector_distance: None,
1864                    attribution: None,
1865                })
1866            })
1867            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1868        {
1869            Ok(rows) => rows,
1870            Err(e) => {
1871                self.telemetry.increment_errors();
1872                return Err(EngineError::Sqlite(e));
1873            }
1874        };
1875
1876        // Drop the statement so `conn_guard` is free (attribution is
1877        // resolved after dedup in `execute_compiled_search_plan` to avoid
1878        // spending highlight lookups on hits that will be discarded).
1879        drop(statement);
1880        drop(conn_guard);
1881
1882        self.telemetry.increment_queries();
1883        Ok(hits)
1884    }
1885
1886    /// Populate per-hit attribution for the given deduped merged hits.
1887    /// Runs after [`merge_search_branches`] so dropped duplicates do not
1888    /// incur the highlight+position-map lookup cost.
1889    fn populate_attribution_for_hits(
1890        &self,
1891        hits: &mut [SearchHit],
1892        strict_text_query: &fathomdb_query::TextQuery,
1893        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1894    ) -> Result<(), EngineError> {
1895        let conn_guard = match self.lock_connection() {
1896            Ok(g) => g,
1897            Err(e) => {
1898                self.telemetry.increment_errors();
1899                return Err(e);
1900            }
1901        };
1902        let strict_expr = render_text_query_fts5(strict_text_query);
1903        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1904        for hit in hits.iter_mut() {
1905            // Phase 10: text hits always carry `Some(match_mode)`. Vector
1906            // hits (when a future phase adds them) have `None` here and
1907            // are skipped by the attribution resolver because attribution
1908            // is meaningless for vector matches.
1909            let match_expr = match hit.match_mode {
1910                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1911                Some(SearchMatchMode::Relaxed) => {
1912                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1913                }
1914                None => continue,
1915            };
1916            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1917                Ok(att) => hit.attribution = Some(att),
1918                Err(e) => {
1919                    self.telemetry.increment_errors();
1920                    return Err(e);
1921                }
1922            }
1923        }
1924        Ok(())
1925    }
1926
1927    /// # Errors
1928    /// Returns [`EngineError`] if the root query or any bounded expansion
1929    /// query cannot be prepared or executed.
1930    pub fn execute_compiled_grouped_read(
1931        &self,
1932        compiled: &CompiledGroupedQuery,
1933    ) -> Result<GroupedQueryRows, EngineError> {
1934        let root_rows = self.execute_compiled_read(&compiled.root)?;
1935        if root_rows.was_degraded {
1936            return Ok(GroupedQueryRows {
1937                roots: Vec::new(),
1938                expansions: Vec::new(),
1939                was_degraded: true,
1940            });
1941        }
1942
1943        let roots = root_rows.nodes;
1944        let mut expansions = Vec::with_capacity(compiled.expansions.len());
1945        for expansion in &compiled.expansions {
1946            let slot_rows = if roots.is_empty() {
1947                Vec::new()
1948            } else {
1949                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1950            };
1951            expansions.push(ExpansionSlotRows {
1952                slot: expansion.slot.clone(),
1953                roots: slot_rows,
1954            });
1955        }
1956
1957        Ok(GroupedQueryRows {
1958            roots,
1959            expansions,
1960            was_degraded: false,
1961        })
1962    }
1963
1964    /// Chunked batched expansion: splits roots into chunks of
1965    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
1966    /// results while preserving root ordering.  This keeps bind-parameter
1967    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
1968    /// for large result sets.
1969    fn read_expansion_nodes_chunked(
1970        &self,
1971        roots: &[NodeRow],
1972        expansion: &ExpansionSlot,
1973        hard_limit: usize,
1974    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1975        if roots.len() <= BATCH_CHUNK_SIZE {
1976            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1977        }
1978
1979        // Merge chunk results keyed by root logical_id, then reassemble in
1980        // root order.
1981        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1982        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1983            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1984                per_root
1985                    .entry(group.root_logical_id)
1986                    .or_default()
1987                    .extend(group.nodes);
1988            }
1989        }
1990
1991        Ok(roots
1992            .iter()
1993            .map(|root| ExpansionRootRows {
1994                root_logical_id: root.logical_id.clone(),
1995                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1996            })
1997            .collect())
1998    }
1999
2000    /// Batched expansion: one recursive CTE query per expansion slot that
2001    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
2002    /// source_logical_id ...)` to enforce the per-root hard limit inside the
2003    /// database rather than in Rust.
2004    fn read_expansion_nodes_batched(
2005        &self,
2006        roots: &[NodeRow],
2007        expansion: &ExpansionSlot,
2008        hard_limit: usize,
2009    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2010        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2011        let (join_condition, next_logical_id) = match expansion.direction {
2012            fathomdb_query::TraverseDirection::Out => {
2013                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2014            }
2015            fathomdb_query::TraverseDirection::In => {
2016                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2017            }
2018        };
2019
2020        // EXECUTE-TIME VALIDATION: fused filter against kinds without an FTS schema.
2021        // This check runs at execute time (not builder time) because the target kind
2022        // set for an expand slot is edge-label-scoped, not kind-scoped, and multiple
2023        // target kinds may be reachable via one edge label. See Pack 12 docs.
2024        if expansion.filter.as_ref().is_some_and(|f| {
2025            matches!(
2026                f,
2027                Predicate::JsonPathFusedEq { .. } | Predicate::JsonPathFusedTimestampCmp { .. }
2028            )
2029        }) {
2030            self.validate_fused_filter_for_edge_label(&expansion.label)?;
2031        }
2032
2033        // Build a UNION ALL of SELECT literals for the root seed rows.
2034        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
2035        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
2036        let root_seed_union: String = (1..=root_ids.len())
2037            .map(|i| format!("SELECT ?{i}"))
2038            .collect::<Vec<_>>()
2039            .join(" UNION ALL ");
2040
2041        // Bind params: root IDs occupy ?1..=?N, edge kind is ?(N+1).
2042        // Filter params (if any) follow starting at ?(N+2).
2043        let edge_kind_param = root_ids.len() + 1;
2044        let filter_param_start = root_ids.len() + 2;
2045
2046        // Compile the optional target-side filter to a SQL fragment + bind values.
2047        // The fragment is injected into the `numbered` CTE's WHERE clause BEFORE
2048        // the ROW_NUMBER() window so the per-originator limit counts only matching rows.
2049        let (filter_sql, filter_binds) =
2050            compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2051
2052        // The `root_id` column tracks which root each traversal path
2053        // originated from. The `ROW_NUMBER()` window in the outer query
2054        // enforces the per-root hard limit.
2055        let sql = format!(
2056            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2057            traversed(root_id, logical_id, depth, visited, emitted) AS (
2058                SELECT rid, rid, 0, printf(',%s,', rid), 0
2059                FROM root_ids
2060                UNION ALL
2061                SELECT
2062                    t.root_id,
2063                    {next_logical_id},
2064                    t.depth + 1,
2065                    t.visited || {next_logical_id} || ',',
2066                    t.emitted + 1
2067                FROM traversed t
2068                JOIN edges e ON {join_condition}
2069                    AND e.kind = ?{edge_kind_param}
2070                    AND e.superseded_at IS NULL
2071                WHERE t.depth < {max_depth}
2072                  AND t.emitted < {hard_limit}
2073                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2074            ),
2075            numbered AS (
2076                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2077                     , n.content_ref, am.last_accessed_at
2078                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2079                FROM traversed t
2080                JOIN nodes n ON n.logical_id = t.logical_id
2081                    AND n.superseded_at IS NULL
2082                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2083                WHERE t.depth > 0{filter_sql}
2084            )
2085            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2086            FROM numbered
2087            WHERE rn <= {hard_limit}
2088            ORDER BY root_id, logical_id",
2089            max_depth = expansion.max_depth,
2090        );
2091
2092        let conn_guard = self.lock_connection()?;
2093        let mut statement = conn_guard
2094            .prepare_cached(&sql)
2095            .map_err(EngineError::Sqlite)?;
2096
2097        // Bind root IDs (1..=N) and edge kind (N+1), then filter params (N+2...).
2098        let mut bind_values: Vec<Value> = root_ids
2099            .iter()
2100            .map(|id| Value::Text((*id).to_owned()))
2101            .collect();
2102        bind_values.push(Value::Text(expansion.label.clone()));
2103        bind_values.extend(filter_binds);
2104
2105        let rows = statement
2106            .query_map(params_from_iter(bind_values.iter()), |row| {
2107                Ok((
2108                    row.get::<_, String>(0)?, // root_id
2109                    NodeRow {
2110                        row_id: row.get(1)?,
2111                        logical_id: row.get(2)?,
2112                        kind: row.get(3)?,
2113                        properties: row.get(4)?,
2114                        content_ref: row.get(5)?,
2115                        last_accessed_at: row.get(6)?,
2116                    },
2117                ))
2118            })
2119            .map_err(EngineError::Sqlite)?
2120            .collect::<Result<Vec<_>, _>>()
2121            .map_err(EngineError::Sqlite)?;
2122
2123        // Partition results back into per-root groups, preserving root order.
2124        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2125        for (root_id, node) in rows {
2126            per_root.entry(root_id).or_default().push(node);
2127        }
2128
2129        let root_groups = roots
2130            .iter()
2131            .map(|root| ExpansionRootRows {
2132                root_logical_id: root.logical_id.clone(),
2133                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2134            })
2135            .collect();
2136
2137        Ok(root_groups)
2138    }
2139
2140    /// Validate that all target node kinds reachable via `edge_label` have a
2141    /// registered property-FTS schema. Called at execute time when an expansion
2142    /// slot carries a fused filter predicate.
2143    ///
2144    /// EXECUTE-TIME VALIDATION: this check runs at execute time (not builder
2145    /// time) for expand slots because the target kind set is edge-label-scoped
2146    /// rather than kind-scoped, and is not statically knowable at builder time
2147    /// when multiple target kinds may be reachable via the same label.
2148    /// See Pack 12 docs.
2149    ///
2150    /// # Errors
2151    /// Returns `EngineError::InvalidConfig` if any reachable target kind lacks
2152    /// a registered property-FTS schema.
2153    fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2154        let conn = self.lock_connection()?;
2155        // Collect the distinct node kinds reachable as targets of this edge label.
2156        let mut stmt = conn
2157            .prepare_cached(
2158                "SELECT DISTINCT n.kind \
2159                 FROM edges e \
2160                 JOIN nodes n ON n.logical_id = e.target_logical_id \
2161                 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2162            )
2163            .map_err(EngineError::Sqlite)?;
2164        let target_kinds: Vec<String> = stmt
2165            .query_map(rusqlite::params![edge_label], |row| row.get(0))
2166            .map_err(EngineError::Sqlite)?
2167            .collect::<Result<Vec<_>, _>>()
2168            .map_err(EngineError::Sqlite)?;
2169
2170        for kind in &target_kinds {
2171            let has_schema: bool = conn
2172                .query_row(
2173                    "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2174                    rusqlite::params![kind],
2175                    |row| row.get(0),
2176                )
2177                .map_err(EngineError::Sqlite)?;
2178            if !has_schema {
2179                return Err(EngineError::InvalidConfig(format!(
2180                    "kind {kind:?} has no registered property-FTS schema; register one with \
2181                     admin.register_fts_property_schema(..) before using fused filters on \
2182                     expansion slots, or use JsonPathEq for non-fused semantics \
2183                     (expand slot uses edge label {edge_label:?})"
2184                )));
2185            }
2186        }
2187        Ok(())
2188    }
2189
2190    /// Read a single run by id.
2191    ///
2192    /// # Errors
2193    /// Returns [`EngineError`] if the query fails or if the connection mutex
2194    /// has been poisoned.
2195    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2196        let conn = self.lock_connection()?;
2197        conn.query_row(
2198            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2199            rusqlite::params![id],
2200            |row| {
2201                Ok(RunRow {
2202                    id: row.get(0)?,
2203                    kind: row.get(1)?,
2204                    status: row.get(2)?,
2205                    properties: row.get(3)?,
2206                })
2207            },
2208        )
2209        .optional()
2210        .map_err(EngineError::Sqlite)
2211    }
2212
2213    /// Read a single step by id.
2214    ///
2215    /// # Errors
2216    /// Returns [`EngineError`] if the query fails or if the connection mutex
2217    /// has been poisoned.
2218    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2219        let conn = self.lock_connection()?;
2220        conn.query_row(
2221            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2222            rusqlite::params![id],
2223            |row| {
2224                Ok(StepRow {
2225                    id: row.get(0)?,
2226                    run_id: row.get(1)?,
2227                    kind: row.get(2)?,
2228                    status: row.get(3)?,
2229                    properties: row.get(4)?,
2230                })
2231            },
2232        )
2233        .optional()
2234        .map_err(EngineError::Sqlite)
2235    }
2236
2237    /// Read a single action by id.
2238    ///
2239    /// # Errors
2240    /// Returns [`EngineError`] if the query fails or if the connection mutex
2241    /// has been poisoned.
2242    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2243        let conn = self.lock_connection()?;
2244        conn.query_row(
2245            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2246            rusqlite::params![id],
2247            |row| {
2248                Ok(ActionRow {
2249                    id: row.get(0)?,
2250                    step_id: row.get(1)?,
2251                    kind: row.get(2)?,
2252                    status: row.get(3)?,
2253                    properties: row.get(4)?,
2254                })
2255            },
2256        )
2257        .optional()
2258        .map_err(EngineError::Sqlite)
2259    }
2260
2261    /// Read all active (non-superseded) runs.
2262    ///
2263    /// # Errors
2264    /// Returns [`EngineError`] if the query fails or if the connection mutex
2265    /// has been poisoned.
2266    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2267        let conn = self.lock_connection()?;
2268        let mut stmt = conn
2269            .prepare_cached(
2270                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2271            )
2272            .map_err(EngineError::Sqlite)?;
2273        let rows = stmt
2274            .query_map([], |row| {
2275                Ok(RunRow {
2276                    id: row.get(0)?,
2277                    kind: row.get(1)?,
2278                    status: row.get(2)?,
2279                    properties: row.get(3)?,
2280                })
2281            })
2282            .map_err(EngineError::Sqlite)?
2283            .collect::<Result<Vec<_>, _>>()
2284            .map_err(EngineError::Sqlite)?;
2285        Ok(rows)
2286    }
2287
2288    /// Returns the number of shape→SQL entries currently indexed.
2289    ///
2290    /// Each distinct query shape (structural hash of kind + steps + limits)
2291    /// maps to exactly one SQL string.  This is a test-oriented introspection
2292    /// helper; it does not reflect rusqlite's internal prepared-statement
2293    /// cache, which is keyed by SQL text.
2294    ///
2295    /// # Panics
2296    /// Panics if the internal shape-SQL-map mutex is poisoned.
2297    #[must_use]
2298    #[allow(clippy::expect_used)]
2299    pub fn shape_sql_count(&self) -> usize {
2300        self.shape_sql_map
2301            .lock()
2302            .unwrap_or_else(PoisonError::into_inner)
2303            .len()
2304    }
2305
2306    /// Returns a cloned `Arc` to the schema manager.
2307    #[must_use]
2308    pub fn schema_manager(&self) -> Arc<SchemaManager> {
2309        Arc::clone(&self.schema_manager)
2310    }
2311
2312    /// Return the execution plan for a compiled query without executing it.
2313    ///
2314    /// Useful for debugging, testing shape-hash caching, and operator
2315    /// diagnostics. Does not open a transaction or touch the database beyond
2316    /// checking the statement cache.
2317    ///
2318    /// # Panics
2319    /// Panics if the internal shape-SQL-map mutex is poisoned.
2320    #[must_use]
2321    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2322        let cache_hit = self
2323            .shape_sql_map
2324            .lock()
2325            .unwrap_or_else(PoisonError::into_inner)
2326            .contains_key(&compiled.shape_hash);
2327        QueryPlan {
2328            sql: wrap_node_row_projection_sql(&compiled.sql),
2329            bind_count: compiled.binds.len(),
2330            driving_table: compiled.driving_table,
2331            shape_hash: compiled.shape_hash,
2332            cache_hit,
2333        }
2334    }
2335
2336    /// Execute a named PRAGMA and return the result as a String.
2337    /// Used by Layer 1 tests to verify startup pragma initialization.
2338    ///
2339    /// # Errors
2340    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
2341    /// mutex has been poisoned.
2342    #[doc(hidden)]
2343    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2344        let conn = self.lock_connection()?;
2345        let result = conn
2346            .query_row(&format!("PRAGMA {name}"), [], |row| {
2347                // PRAGMAs may return TEXT or INTEGER; normalise to String.
2348                row.get::<_, rusqlite::types::Value>(0)
2349            })
2350            .map_err(EngineError::Sqlite)?;
2351        let s = match result {
2352            rusqlite::types::Value::Text(t) => t,
2353            rusqlite::types::Value::Integer(i) => i.to_string(),
2354            rusqlite::types::Value::Real(f) => f.to_string(),
2355            rusqlite::types::Value::Blob(_) => {
2356                return Err(EngineError::InvalidWrite(format!(
2357                    "PRAGMA {name} returned an unexpected BLOB value"
2358                )));
2359            }
2360            rusqlite::types::Value::Null => String::new(),
2361        };
2362        Ok(s)
2363    }
2364
2365    /// Return all provenance events whose `subject` matches the given value.
2366    ///
2367    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
2368    /// values (for excise events).
2369    ///
2370    /// # Errors
2371    /// Returns [`EngineError`] if the query fails or if the connection mutex
2372    /// has been poisoned.
2373    pub fn query_provenance_events(
2374        &self,
2375        subject: &str,
2376    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2377        let conn = self.lock_connection()?;
2378        let mut stmt = conn
2379            .prepare_cached(
2380                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2381                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2382            )
2383            .map_err(EngineError::Sqlite)?;
2384        let events = stmt
2385            .query_map(rusqlite::params![subject], |row| {
2386                Ok(ProvenanceEvent {
2387                    id: row.get(0)?,
2388                    event_type: row.get(1)?,
2389                    subject: row.get(2)?,
2390                    source_ref: row.get(3)?,
2391                    metadata_json: row.get(4)?,
2392                    created_at: row.get(5)?,
2393                })
2394            })
2395            .map_err(EngineError::Sqlite)?
2396            .collect::<Result<Vec<_>, _>>()
2397            .map_err(EngineError::Sqlite)?;
2398        Ok(events)
2399    }
2400
2401    /// Check if `kind` has a first-registration async rebuild in progress
2402    /// (`is_first_registration=1` with state PENDING/BUILDING/SWAPPING and no
2403    /// rows yet in the per-kind `fts_props_<kind>` table). If so, execute a
2404    /// full-kind scan and return the nodes. Returns `None` when the normal
2405    /// FTS5 path should run.
2406    fn scan_fallback_if_first_registration(
2407        &self,
2408        kind: &str,
2409    ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2410        let conn = self.lock_connection()?;
2411
2412        // Quick point-lookup: kind has a first-registration rebuild in a
2413        // pre-complete state AND the per-kind table has no rows yet.
2414        let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2415        // Check whether the per-kind table exists before querying its row count.
2416        let table_exists: bool = conn
2417            .query_row(
2418                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2419                rusqlite::params![prop_table],
2420                |_| Ok(true),
2421            )
2422            .optional()?
2423            .unwrap_or(false);
2424        let prop_empty = if table_exists {
2425            let cnt: i64 =
2426                conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2427                    r.get(0)
2428                })?;
2429            cnt == 0
2430        } else {
2431            true
2432        };
2433        let needs_scan: bool = if prop_empty {
2434            conn.query_row(
2435                "SELECT 1 FROM fts_property_rebuild_state \
2436                 WHERE kind = ?1 AND is_first_registration = 1 \
2437                 AND state IN ('PENDING','BUILDING','SWAPPING') \
2438                 LIMIT 1",
2439                rusqlite::params![kind],
2440                |_| Ok(true),
2441            )
2442            .optional()?
2443            .unwrap_or(false)
2444        } else {
2445            false
2446        };
2447
2448        if !needs_scan {
2449            return Ok(None);
2450        }
2451
2452        // Scan fallback: return all active nodes of this kind.
2453        // Intentionally unindexed — acceptable for first-registration window.
2454        let mut stmt = conn
2455            .prepare_cached(
2456                "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2457                 am.last_accessed_at \
2458                 FROM nodes n \
2459                 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2460                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2461            )
2462            .map_err(EngineError::Sqlite)?;
2463
2464        let nodes = stmt
2465            .query_map(rusqlite::params![kind], |row| {
2466                Ok(NodeRow {
2467                    row_id: row.get(0)?,
2468                    logical_id: row.get(1)?,
2469                    kind: row.get(2)?,
2470                    properties: row.get(3)?,
2471                    content_ref: row.get(4)?,
2472                    last_accessed_at: row.get(5)?,
2473                })
2474            })
2475            .map_err(EngineError::Sqlite)?
2476            .collect::<Result<Vec<_>, _>>()
2477            .map_err(EngineError::Sqlite)?;
2478
2479        Ok(Some(nodes))
2480    }
2481
2482    /// Return the current rebuild progress for a kind, or `None` if no rebuild
2483    /// has been registered for that kind.
2484    ///
2485    /// # Errors
2486    /// Returns [`EngineError`] if the database query fails.
2487    pub fn get_property_fts_rebuild_progress(
2488        &self,
2489        kind: &str,
2490    ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2491        let conn = self.lock_connection()?;
2492        let row = conn
2493            .query_row(
2494                "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2495                 FROM fts_property_rebuild_state WHERE kind = ?1",
2496                rusqlite::params![kind],
2497                |r| {
2498                    Ok(crate::rebuild_actor::RebuildProgress {
2499                        state: r.get(0)?,
2500                        rows_total: r.get(1)?,
2501                        rows_done: r.get(2)?,
2502                        started_at: r.get(3)?,
2503                        last_progress_at: r.get(4)?,
2504                        error_message: r.get(5)?,
2505                    })
2506                },
2507            )
2508            .optional()?;
2509        Ok(row)
2510    }
2511}
2512
2513/// Rewrite a `CompiledQuery` whose SQL references the legacy `fts_node_properties`
2514/// table to use the per-kind `fts_props_<kind>` table, or strip the property FTS
2515/// arm entirely when the per-kind table does not exist in `sqlite_master`.
2516///
2517/// Returns `(adapted_sql, adapted_binds)`.
2518fn adapt_fts_nodes_sql_for_per_kind_tables(
2519    compiled: &CompiledQuery,
2520    conn: &rusqlite::Connection,
2521) -> Result<(String, Vec<BindValue>), EngineError> {
2522    let root_kind = compiled
2523        .binds
2524        .get(1)
2525        .and_then(|b| {
2526            if let BindValue::Text(k) = b {
2527                Some(k.as_str())
2528            } else {
2529                None
2530            }
2531        })
2532        .unwrap_or("");
2533    let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
2534    let prop_table_exists: bool = conn
2535        .query_row(
2536            "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2537            rusqlite::params![prop_table],
2538            |_| Ok(true),
2539        )
2540        .optional()
2541        .map_err(EngineError::Sqlite)?
2542        .unwrap_or(false);
2543
2544    // The compile_query path assigns fixed positional parameters:
2545    //   ?1 = text (chunk FTS), ?2 = kind (chunk filter),
2546    //   ?3 = text (prop FTS),  ?4 = kind (prop filter),
2547    //   ?5+ = fusable/residual predicates
2548    let (new_sql, removed_bind_positions) = if prop_table_exists {
2549        let s = compiled
2550            .sql
2551            .replace("fts_node_properties", &prop_table)
2552            .replace("\n                          AND fp.kind = ?4", "");
2553        (renumber_sql_params(&s, &[4]), vec![3usize])
2554    } else {
2555        let s = strip_prop_fts_union_arm(&compiled.sql);
2556        (renumber_sql_params(&s, &[3, 4]), vec![2usize, 3])
2557    };
2558
2559    let new_binds: Vec<BindValue> = compiled
2560        .binds
2561        .iter()
2562        .enumerate()
2563        .filter(|(i, _)| !removed_bind_positions.contains(i))
2564        .map(|(_, b)| b.clone())
2565        .collect();
2566
2567    Ok((new_sql, new_binds))
2568}
2569
2570/// Check that the active vector profile's model identity and dimensions match
2571/// the supplied embedder. If either differs, emit a warning via `trace_warn!`
2572/// but always return `Ok(())`. This function NEVER returns `Err`.
2573///
2574/// Queries `projection_profiles` directly — no `AdminService` indirection.
2575#[allow(clippy::unnecessary_wraps)]
2576fn check_vec_identity_at_open(
2577    conn: &rusqlite::Connection,
2578    embedder: &dyn QueryEmbedder,
2579) -> Result<(), EngineError> {
2580    let row: Option<String> = conn
2581        .query_row(
2582            "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
2583            [],
2584            |row| row.get(0),
2585        )
2586        .optional()
2587        .unwrap_or(None);
2588
2589    let Some(config_json) = row else {
2590        return Ok(());
2591    };
2592
2593    // Parse leniently — any error means we just skip the check.
2594    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
2595        return Ok(());
2596    };
2597
2598    let identity = embedder.identity();
2599
2600    if let Some(stored_model) = parsed
2601        .get("model_identity")
2602        .and_then(serde_json::Value::as_str)
2603        && stored_model != identity.model_identity
2604    {
2605        trace_warn!(
2606            stored_model_identity = stored_model,
2607            embedder_model_identity = %identity.model_identity,
2608            "vec identity mismatch at open: model_identity differs"
2609        );
2610    }
2611
2612    if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
2613        let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
2614        if stored_dim != identity.dimension {
2615            trace_warn!(
2616                stored_dimensions = stored_dim,
2617                embedder_dimensions = identity.dimension,
2618                "vec identity mismatch at open: dimensions differ"
2619            );
2620        }
2621    }
2622
2623    Ok(())
2624}
2625
2626/// Open-time FTS rebuild guards (Guard 1 + Guard 2).
2627///
2628/// Guard 1: if any registered kind's per-kind `fts_props_<kind>` table is
2629/// missing or empty while live nodes of that kind exist, do a synchronous
2630/// full rebuild.
2631///
2632/// Guard 2: if any recursive schema is registered but
2633/// `fts_node_property_positions` is empty, do a synchronous full rebuild to
2634/// regenerate the position map.
2635///
2636/// Both guards are no-ops on a consistent database.
2637fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
2638    let schema_count: i64 = conn
2639        .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
2640            row.get(0)
2641        })
2642        .map_err(EngineError::Sqlite)?;
2643    if schema_count == 0 {
2644        return Ok(());
2645    }
2646
2647    let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
2648    let needs_position_backfill = if needs_fts_rebuild {
2649        false
2650    } else {
2651        open_guard_check_positions_empty(conn)?
2652    };
2653
2654    if needs_fts_rebuild || needs_position_backfill {
2655        let per_kind_tables: Vec<String> = {
2656            let mut stmt = conn
2657                .prepare(
2658                    "SELECT name FROM sqlite_master \
2659                     WHERE type='table' AND name LIKE 'fts_props_%' \
2660                     AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2661                )
2662                .map_err(EngineError::Sqlite)?;
2663            stmt.query_map([], |r| r.get::<_, String>(0))
2664                .map_err(EngineError::Sqlite)?
2665                .collect::<Result<Vec<_>, _>>()
2666                .map_err(EngineError::Sqlite)?
2667        };
2668        let tx = conn
2669            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
2670            .map_err(EngineError::Sqlite)?;
2671        for table in &per_kind_tables {
2672            tx.execute_batch(&format!("DELETE FROM {table}"))
2673                .map_err(EngineError::Sqlite)?;
2674        }
2675        tx.execute("DELETE FROM fts_node_property_positions", [])
2676            .map_err(EngineError::Sqlite)?;
2677        crate::projection::insert_property_fts_rows(
2678            &tx,
2679            "SELECT logical_id, properties FROM nodes \
2680             WHERE kind = ?1 AND superseded_at IS NULL",
2681        )
2682        .map_err(EngineError::Sqlite)?;
2683        tx.commit().map_err(EngineError::Sqlite)?;
2684    }
2685    Ok(())
2686}
2687
2688fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2689    let kinds: Vec<String> = {
2690        let mut stmt = conn
2691            .prepare("SELECT kind FROM fts_property_schemas")
2692            .map_err(EngineError::Sqlite)?;
2693        stmt.query_map([], |row| row.get::<_, String>(0))
2694            .map_err(EngineError::Sqlite)?
2695            .collect::<Result<Vec<_>, _>>()
2696            .map_err(EngineError::Sqlite)?
2697    };
2698    for kind in &kinds {
2699        let table = fathomdb_schema::fts_kind_table_name(kind);
2700        let table_exists: bool = conn
2701            .query_row(
2702                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2703                rusqlite::params![table],
2704                |_| Ok(true),
2705            )
2706            .optional()
2707            .map_err(EngineError::Sqlite)?
2708            .unwrap_or(false);
2709        let fts_count: i64 = if table_exists {
2710            conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
2711                row.get(0)
2712            })
2713            .map_err(EngineError::Sqlite)?
2714        } else {
2715            0
2716        };
2717        if fts_count == 0 {
2718            let node_count: i64 = conn
2719                .query_row(
2720                    "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
2721                    rusqlite::params![kind],
2722                    |row| row.get(0),
2723                )
2724                .map_err(EngineError::Sqlite)?;
2725            if node_count > 0 {
2726                return Ok(true);
2727            }
2728        }
2729    }
2730    Ok(false)
2731}
2732
2733fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2734    let recursive_count: i64 = conn
2735        .query_row(
2736            "SELECT COUNT(*) FROM fts_property_schemas \
2737             WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
2738            [],
2739            |row| row.get(0),
2740        )
2741        .map_err(EngineError::Sqlite)?;
2742    if recursive_count == 0 {
2743        return Ok(false);
2744    }
2745    let pos_count: i64 = conn
2746        .query_row(
2747            "SELECT COUNT(*) FROM fts_node_property_positions",
2748            [],
2749            |row| row.get(0),
2750        )
2751        .map_err(EngineError::Sqlite)?;
2752    Ok(pos_count == 0)
2753}
2754
2755/// Renumber `SQLite` positional parameters in `sql` after removing the given
2756/// 1-based parameter numbers from `removed` (sorted ascending).
2757///
2758/// Each `?N` in the SQL where `N` is in `removed` is left in place (the caller
2759/// must have already deleted those references from the SQL). Every `?N` where
2760/// `N` is greater than any removed parameter is decremented by the count of
2761/// removed parameters that are less than `N`.
2762///
2763/// Example: if `removed = [4]` then `?5` → `?4`, `?6` → `?5`, etc.
2764/// Example: if `removed = [3, 4]` then `?5` → `?3`, `?6` → `?4`, etc.
2765fn renumber_sql_params(sql: &str, removed: &[usize]) -> String {
2766    // We walk the string looking for `?` followed by decimal digits and
2767    // replace the number according to the removal offset.
2768    let mut result = String::with_capacity(sql.len());
2769    let bytes = sql.as_bytes();
2770    let mut i = 0;
2771    while i < bytes.len() {
2772        if bytes[i] == b'?' {
2773            // Check if next chars are digits.
2774            let num_start = i + 1;
2775            let mut j = num_start;
2776            while j < bytes.len() && bytes[j].is_ascii_digit() {
2777                j += 1;
2778            }
2779            if j > num_start {
2780                // Parse the parameter number (1-based).
2781                let num_str = &sql[num_start..j];
2782                if let Ok(n) = num_str.parse::<usize>() {
2783                    // Count how many removed params are < n.
2784                    let offset = removed.iter().filter(|&&r| r < n).count();
2785                    result.push('?');
2786                    result.push_str(&(n - offset).to_string());
2787                    i = j;
2788                    continue;
2789                }
2790            }
2791        }
2792        result.push(bytes[i] as char);
2793        i += 1;
2794    }
2795    result
2796}
2797
2798fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2799    format!(
2800        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2801         FROM ({base_sql}) q \
2802         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2803    )
2804}
2805
2806/// Strip the property FTS UNION arm from a `compile_query`-generated
2807/// `DrivingTable::FtsNodes` SQL string.
2808///
2809/// When the per-kind `fts_props_<kind>` table does not yet exist the
2810/// `UNION SELECT ... FROM fts_node_properties ...` arm must be removed so the
2811/// query degrades to chunk-only results instead of failing with "no such table".
2812///
2813/// The SQL structure from `compile_query` (fathomdb-query) is stable:
2814/// ```text
2815///                     UNION
2816///                     SELECT fp.node_logical_id AS logical_id
2817///                     FROM fts_node_properties fp
2818///                     ...
2819///                     WHERE fts_node_properties MATCH ?3
2820///                       AND fp.kind = ?4
2821///                 ) u
2822/// ```
2823/// We locate the `UNION` that precedes `fts_node_properties` and cut
2824/// everything from it to the closing `) u`.
2825fn strip_prop_fts_union_arm(sql: &str) -> String {
2826    // The UNION arm in compile_query-generated FtsNodes SQL has:
2827    //   - UNION with 24 spaces of indentation
2828    //   - SELECT fp.node_logical_id with 24 spaces of indentation
2829    //   - ending at "\n                    ) u" (20 spaces before ") u")
2830    // Match the UNION that is immediately followed by the property arm.
2831    let union_marker =
2832        "                        UNION\n                        SELECT fp.node_logical_id";
2833    if let Some(start) = sql.find(union_marker) {
2834        // Find the closing ") u" after the property arm.
2835        let end_marker = "\n                    ) u";
2836        if let Some(rel_end) = sql[start..].find(end_marker) {
2837            let end = start + rel_end;
2838            // Remove from UNION start to (but not including) the "\n                    ) u" closing.
2839            return format!("{}{}", &sql[..start], &sql[end..]);
2840        }
2841    }
2842    // Fallback: return unchanged if pattern not found (shouldn't happen).
2843    sql.to_owned()
2844}
2845
2846/// Returns `true` when `err` indicates the vec virtual table is absent.
2847///
2848/// Matches:
2849/// - "no such table: vec_<kind>" (per-kind table not yet created by regeneration)
2850/// - "no such module: vec0" (sqlite-vec extension not available)
2851/// - Legacy `"no such table: vec_nodes_active"` (for backward compatibility with
2852///   any remaining tests or DB files referencing the old global table)
2853pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2854    match err {
2855        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2856            // Per-kind tables start with "vec_"
2857            (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
2858                || msg.contains("no such module: vec0")
2859        }
2860        _ => false,
2861    }
2862}
2863
2864fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2865    match value {
2866        ScalarValue::Text(text) => BindValue::Text(text.clone()),
2867        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2868        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2869    }
2870}
2871
2872/// Merge strict and relaxed search branches into a single block-ordered,
2873/// deduplicated, limit-truncated hit list.
2874///
2875/// Phase 3 rules, in order:
2876///
2877/// 1. Each branch is sorted internally by score descending with `logical_id`
2878///    ascending as the deterministic tiebreak.
2879/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
2880///    once from the chunk surface and once from the property surface) the
2881///    higher-score row wins, then chunk > property > vector, then declaration
2882///    order (chunk first).
2883/// 3. Strict hits form one block and relaxed hits form the next. Strict
2884///    always precedes relaxed in the merged output regardless of per-hit
2885///    score.
2886/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
2887///    already appears in the strict block is dropped.
2888/// 5. The merged output is truncated to `limit`.
2889fn merge_search_branches(
2890    strict: Vec<SearchHit>,
2891    relaxed: Vec<SearchHit>,
2892    limit: usize,
2893) -> Vec<SearchHit> {
2894    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2895}
2896
2897/// Three-branch generalization of [`merge_search_branches`]: orders hits as
2898/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
2899/// Semantics, with cross-branch dedup resolved by branch precedence
2900/// (strict > relaxed > vector). Within each block the existing
2901/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
2902/// priority chunk > property > vector).
2903///
2904/// Phase 12 (the unified `search()` entry point) calls this directly. The
2905/// two-branch [`merge_search_branches`] wrapper is preserved as a
2906/// convenience for the text-only `execute_compiled_search_plan` path; both
2907/// reduce to the same code.
2908fn merge_search_branches_three(
2909    strict: Vec<SearchHit>,
2910    relaxed: Vec<SearchHit>,
2911    vector: Vec<SearchHit>,
2912    limit: usize,
2913) -> Vec<SearchHit> {
2914    let strict_block = dedup_branch_hits(strict);
2915    let relaxed_block = dedup_branch_hits(relaxed);
2916    let vector_block = dedup_branch_hits(vector);
2917
2918    let mut seen: std::collections::HashSet<String> = strict_block
2919        .iter()
2920        .map(|h| h.node.logical_id.clone())
2921        .collect();
2922
2923    let mut merged = strict_block;
2924    for hit in relaxed_block {
2925        if seen.insert(hit.node.logical_id.clone()) {
2926            merged.push(hit);
2927        }
2928    }
2929    for hit in vector_block {
2930        if seen.insert(hit.node.logical_id.clone()) {
2931            merged.push(hit);
2932        }
2933    }
2934
2935    if merged.len() > limit {
2936        merged.truncate(limit);
2937    }
2938    merged
2939}
2940
2941/// Sort a branch's hits by score descending + `logical_id` ascending, then
2942/// dedup duplicate `logical_id`s within the branch using source priority
2943/// (chunk > property > vector) and declaration order.
2944fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2945    hits.sort_by(|a, b| {
2946        b.score
2947            .partial_cmp(&a.score)
2948            .unwrap_or(std::cmp::Ordering::Equal)
2949            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2950            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2951    });
2952
2953    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2954    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2955    hits
2956}
2957
2958fn source_priority(source: SearchHitSource) -> u8 {
2959    // Lower is better. Chunk is declared before property in the CTE; vector
2960    // is reserved for future wiring but comes last among the known variants.
2961    match source {
2962        SearchHitSource::Chunk => 0,
2963        SearchHitSource::Property => 1,
2964        SearchHitSource::Vector => 2,
2965    }
2966}
2967
2968/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
2969/// output so the coordinator can recover per-term byte offsets in the
2970/// original `text_content` column.
2971///
2972/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
2973/// ("start of text") byte. These bytes are safe for all valid JSON text
2974/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
2975/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
2976/// extracted blob, making the sentinel ambiguous for that row. Such input
2977/// is treated as out of scope for attribution correctness — hits on those
2978/// rows may have misattributed `matched_paths`, but no panic or query
2979/// failure occurs. A future hardening step could strip bytes < 0x20 at
2980/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
2981///
2982/// Using one-byte markers keeps the original-to-highlighted offset
2983/// accounting trivial: every sentinel adds exactly one byte to the
2984/// highlighted string.
2985const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2986const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2987
2988/// Load the `fts_node_property_positions` sidecar rows for a given
2989/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
2990/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
2991fn load_position_map(
2992    conn: &Connection,
2993    logical_id: &str,
2994    kind: &str,
2995) -> Result<Vec<(usize, usize, String)>, EngineError> {
2996    let mut stmt = conn
2997        .prepare_cached(
2998            "SELECT start_offset, end_offset, leaf_path \
2999             FROM fts_node_property_positions \
3000             WHERE node_logical_id = ?1 AND kind = ?2 \
3001             ORDER BY start_offset ASC",
3002        )
3003        .map_err(EngineError::Sqlite)?;
3004    let rows = stmt
3005        .query_map(rusqlite::params![logical_id, kind], |row| {
3006            let start: i64 = row.get(0)?;
3007            let end: i64 = row.get(1)?;
3008            let path: String = row.get(2)?;
3009            // Offsets are non-negative and within blob byte limits; on the
3010            // off chance a corrupt row is encountered, fall back to 0 so
3011            // lookups silently skip it rather than panicking.
3012            let start = usize::try_from(start).unwrap_or(0);
3013            let end = usize::try_from(end).unwrap_or(0);
3014            Ok((start, end, path))
3015        })
3016        .map_err(EngineError::Sqlite)?;
3017    let mut out = Vec::new();
3018    for row in rows {
3019        out.push(row.map_err(EngineError::Sqlite)?);
3020    }
3021    Ok(out)
3022}
3023
3024/// Parse a `highlight()`-wrapped string, returning the list of original-text
3025/// byte offsets at which matched terms begin. `wrapped` is the
3026/// highlight-decorated form of a text column; `open` / `close` are the
3027/// sentinel markers passed to `highlight()`. The returned offsets refer to
3028/// positions in the *original* text (i.e. the column as it would be stored
3029/// without highlight decoration).
3030fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3031    let mut offsets = Vec::new();
3032    let bytes = wrapped.as_bytes();
3033    let open_bytes = open.as_bytes();
3034    let close_bytes = close.as_bytes();
3035    let mut i = 0usize;
3036    // Number of sentinel bytes consumed so far — every marker encountered
3037    // subtracts from the wrapped-string index to get the original offset.
3038    let mut marker_bytes_seen = 0usize;
3039    while i < bytes.len() {
3040        if bytes[i..].starts_with(open_bytes) {
3041            // Record the original-text offset of the term following the open
3042            // marker.
3043            let original_offset = i - marker_bytes_seen;
3044            offsets.push(original_offset);
3045            i += open_bytes.len();
3046            marker_bytes_seen += open_bytes.len();
3047        } else if bytes[i..].starts_with(close_bytes) {
3048            i += close_bytes.len();
3049            marker_bytes_seen += close_bytes.len();
3050        } else {
3051            i += 1;
3052        }
3053    }
3054    offsets
3055}
3056
3057/// Binary-search the position map for the leaf whose `[start, end)` range
3058/// contains `offset`. Returns `None` if no leaf covers the offset.
3059fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3060    // Binary search for the greatest start_offset <= offset.
3061    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3062        Ok(i) => i,
3063        Err(0) => return None,
3064        Err(i) => i - 1,
3065    };
3066    let (start, end, path) = &positions[idx];
3067    if offset >= *start && offset < *end {
3068        Some(path.as_str())
3069    } else {
3070        None
3071    }
3072}
3073
3074/// Resolve per-hit match attribution by introspecting the FTS5 match state
3075/// for the hit's row via `highlight()` and mapping the resulting original-
3076/// text offsets back to recursive-leaf paths via the Phase 4 position map.
3077///
3078/// Chunk-backed hits carry no leaf structure and always return an empty
3079/// `matched_paths` vector. Property-backed hits without a `projection_row_id`
3080/// (which should not happen — the search CTE always populates it) also
3081/// return empty attribution rather than erroring.
3082fn resolve_hit_attribution(
3083    conn: &Connection,
3084    hit: &SearchHit,
3085    match_expr: &str,
3086) -> Result<HitAttribution, EngineError> {
3087    if !matches!(hit.source, SearchHitSource::Property) {
3088        return Ok(HitAttribution {
3089            matched_paths: Vec::new(),
3090        });
3091    }
3092    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3093        return Ok(HitAttribution {
3094            matched_paths: Vec::new(),
3095        });
3096    };
3097    let rowid: i64 = match rowid_str.parse() {
3098        Ok(v) => v,
3099        Err(_) => {
3100            return Ok(HitAttribution {
3101                matched_paths: Vec::new(),
3102            });
3103        }
3104    };
3105
3106    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
3107    // FTS5 MATCH in the WHERE clause re-establishes the match state that
3108    // `highlight()` needs to decorate the returned text.
3109    // Per-kind tables have schema (node_logical_id UNINDEXED, text_content),
3110    // so text_content is column index 1 (0-based).
3111    let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3112    let highlight_sql = format!(
3113        "SELECT highlight({prop_table}, 1, ?1, ?2) \
3114         FROM {prop_table} \
3115         WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3116    );
3117    let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3118    let wrapped: Option<String> = stmt
3119        .query_row(
3120            rusqlite::params![
3121                ATTRIBUTION_HIGHLIGHT_OPEN,
3122                ATTRIBUTION_HIGHLIGHT_CLOSE,
3123                rowid,
3124                match_expr,
3125            ],
3126            |row| row.get(0),
3127        )
3128        .optional()
3129        .map_err(EngineError::Sqlite)?;
3130    let Some(wrapped) = wrapped else {
3131        return Ok(HitAttribution {
3132            matched_paths: Vec::new(),
3133        });
3134    };
3135
3136    let offsets = parse_highlight_offsets(
3137        &wrapped,
3138        ATTRIBUTION_HIGHLIGHT_OPEN,
3139        ATTRIBUTION_HIGHLIGHT_CLOSE,
3140    );
3141    if offsets.is_empty() {
3142        return Ok(HitAttribution {
3143            matched_paths: Vec::new(),
3144        });
3145    }
3146
3147    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3148    if positions.is_empty() {
3149        // Scalar-only schemas have no position-map entries; attribution
3150        // degrades to an empty vector rather than erroring.
3151        return Ok(HitAttribution {
3152            matched_paths: Vec::new(),
3153        });
3154    }
3155
3156    let mut matched_paths: Vec<String> = Vec::new();
3157    for offset in offsets {
3158        if let Some(path) = find_leaf_for_offset(&positions, offset)
3159            && !matched_paths.iter().any(|p| p == path)
3160        {
3161            matched_paths.push(path.to_owned());
3162        }
3163    }
3164    Ok(HitAttribution { matched_paths })
3165}
3166
3167/// Build a BM25 scoring expression for a per-kind FTS5 table.
3168///
3169/// If the schema has no weighted specs (all weights None), returns `bm25({table})`.
3170/// Otherwise returns `bm25({table}, 0.0, w1, w2, ...)` where the first weight
3171/// (0.0) is for the `node_logical_id UNINDEXED` column (which BM25 should ignore),
3172/// then one weight per spec in schema order.
3173fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3174    let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3175    let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3176    if !any_weighted {
3177        return format!("bm25({table})");
3178    }
3179    // node_logical_id is UNINDEXED — weight 0.0 tells BM25 to ignore it.
3180    let weights: Vec<String> = std::iter::once("0.0".to_owned())
3181        .chain(
3182            schema
3183                .paths
3184                .iter()
3185                .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3186        )
3187        .collect();
3188    format!("bm25({table}, {})", weights.join(", "))
3189}
3190
3191fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3192    match value {
3193        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3194        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3195        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3196    }
3197}
3198
3199#[cfg(test)]
3200#[allow(clippy::expect_used)]
3201mod tests {
3202    use std::panic::{AssertUnwindSafe, catch_unwind};
3203    use std::sync::Arc;
3204
3205    use fathomdb_query::{BindValue, QueryBuilder};
3206    use fathomdb_schema::SchemaManager;
3207    use rusqlite::types::Value;
3208    use tempfile::NamedTempFile;
3209
3210    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3211
3212    use fathomdb_query::{
3213        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3214    };
3215
3216    use super::{
3217        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3218        wrap_node_row_projection_sql,
3219    };
3220
3221    fn mk_hit(
3222        logical_id: &str,
3223        score: f64,
3224        match_mode: SearchMatchMode,
3225        source: SearchHitSource,
3226    ) -> SearchHit {
3227        SearchHit {
3228            node: NodeRowLite {
3229                row_id: format!("{logical_id}-row"),
3230                logical_id: logical_id.to_owned(),
3231                kind: "Goal".to_owned(),
3232                properties: "{}".to_owned(),
3233                content_ref: None,
3234                last_accessed_at: None,
3235            },
3236            score,
3237            modality: RetrievalModality::Text,
3238            source,
3239            match_mode: Some(match_mode),
3240            snippet: None,
3241            written_at: 0,
3242            projection_row_id: None,
3243            vector_distance: None,
3244            attribution: None,
3245        }
3246    }
3247
3248    #[test]
3249    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3250        let strict = vec![mk_hit(
3251            "a",
3252            1.0,
3253            SearchMatchMode::Strict,
3254            SearchHitSource::Chunk,
3255        )];
3256        // Relaxed has a higher score but must still come second.
3257        let relaxed = vec![mk_hit(
3258            "b",
3259            9.9,
3260            SearchMatchMode::Relaxed,
3261            SearchHitSource::Chunk,
3262        )];
3263        let merged = merge_search_branches(strict, relaxed, 10);
3264        assert_eq!(merged.len(), 2);
3265        assert_eq!(merged[0].node.logical_id, "a");
3266        assert!(matches!(
3267            merged[0].match_mode,
3268            Some(SearchMatchMode::Strict)
3269        ));
3270        assert_eq!(merged[1].node.logical_id, "b");
3271        assert!(matches!(
3272            merged[1].match_mode,
3273            Some(SearchMatchMode::Relaxed)
3274        ));
3275    }
3276
3277    #[test]
3278    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3279        let strict = vec![mk_hit(
3280            "shared",
3281            1.0,
3282            SearchMatchMode::Strict,
3283            SearchHitSource::Chunk,
3284        )];
3285        let relaxed = vec![
3286            mk_hit(
3287                "shared",
3288                9.9,
3289                SearchMatchMode::Relaxed,
3290                SearchHitSource::Chunk,
3291            ),
3292            mk_hit(
3293                "other",
3294                2.0,
3295                SearchMatchMode::Relaxed,
3296                SearchHitSource::Chunk,
3297            ),
3298        ];
3299        let merged = merge_search_branches(strict, relaxed, 10);
3300        assert_eq!(merged.len(), 2);
3301        assert_eq!(merged[0].node.logical_id, "shared");
3302        assert!(matches!(
3303            merged[0].match_mode,
3304            Some(SearchMatchMode::Strict)
3305        ));
3306        assert_eq!(merged[1].node.logical_id, "other");
3307        assert!(matches!(
3308            merged[1].match_mode,
3309            Some(SearchMatchMode::Relaxed)
3310        ));
3311    }
3312
3313    #[test]
3314    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3315        let strict = vec![
3316            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3317            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3318            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3319        ];
3320        let merged = merge_search_branches(strict, vec![], 10);
3321        assert_eq!(
3322            merged
3323                .iter()
3324                .map(|h| &h.node.logical_id)
3325                .collect::<Vec<_>>(),
3326            vec!["a", "c", "b"]
3327        );
3328    }
3329
3330    #[test]
3331    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3332        let strict = vec![
3333            mk_hit(
3334                "shared",
3335                1.0,
3336                SearchMatchMode::Strict,
3337                SearchHitSource::Property,
3338            ),
3339            mk_hit(
3340                "shared",
3341                1.0,
3342                SearchMatchMode::Strict,
3343                SearchHitSource::Chunk,
3344            ),
3345        ];
3346        let merged = merge_search_branches(strict, vec![], 10);
3347        assert_eq!(merged.len(), 1);
3348        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3349    }
3350
3351    #[test]
3352    fn merge_truncates_to_limit_after_block_merge() {
3353        let strict = vec![
3354            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3355            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3356        ];
3357        let relaxed = vec![mk_hit(
3358            "c",
3359            9.0,
3360            SearchMatchMode::Relaxed,
3361            SearchHitSource::Chunk,
3362        )];
3363        let merged = merge_search_branches(strict, relaxed, 2);
3364        assert_eq!(merged.len(), 2);
3365        assert_eq!(merged[0].node.logical_id, "a");
3366        assert_eq!(merged[1].node.logical_id, "b");
3367    }
3368
3369    /// P12 architectural pin: the generalized three-branch merger must
3370    /// produce strict -> relaxed -> vector block ordering with cross-branch
3371    /// dedup resolved by branch precedence (strict > relaxed > vector).
3372    /// v1 `search()` policy never fires the vector branch through the
3373    /// unified planner because read-time embedding is deferred, but the
3374    /// merge helper itself must be ready for the day the planner does so —
3375    /// otherwise wiring the future phase requires touching the core merge
3376    /// code as well as the planner.
3377    #[test]
3378    fn search_architecturally_supports_three_branch_fusion() {
3379        let strict = vec![mk_hit(
3380            "alpha",
3381            1.0,
3382            SearchMatchMode::Strict,
3383            SearchHitSource::Chunk,
3384        )];
3385        let relaxed = vec![mk_hit(
3386            "bravo",
3387            5.0,
3388            SearchMatchMode::Relaxed,
3389            SearchHitSource::Chunk,
3390        )];
3391        // Synthetic vector hit with the highest score. Three-block ordering
3392        // must still place it last.
3393        let mut vector_hit = mk_hit(
3394            "charlie",
3395            9.9,
3396            SearchMatchMode::Strict,
3397            SearchHitSource::Vector,
3398        );
3399        // Vector hits actually carry match_mode=None per the addendum, but
3400        // the merge helper's ordering is mode-agnostic; we override here to
3401        // pin the modality field for the test.
3402        vector_hit.match_mode = None;
3403        vector_hit.modality = RetrievalModality::Vector;
3404        let vector = vec![vector_hit];
3405
3406        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3407        assert_eq!(merged.len(), 3);
3408        assert_eq!(merged[0].node.logical_id, "alpha");
3409        assert_eq!(merged[1].node.logical_id, "bravo");
3410        assert_eq!(merged[2].node.logical_id, "charlie");
3411        // Vector block comes last regardless of its higher score.
3412        assert!(matches!(merged[2].source, SearchHitSource::Vector));
3413
3414        // Cross-branch dedup: a logical_id that appears in multiple branches
3415        // is attributed to its highest-priority originating branch only.
3416        let strict2 = vec![mk_hit(
3417            "shared",
3418            0.5,
3419            SearchMatchMode::Strict,
3420            SearchHitSource::Chunk,
3421        )];
3422        let relaxed2 = vec![mk_hit(
3423            "shared",
3424            5.0,
3425            SearchMatchMode::Relaxed,
3426            SearchHitSource::Chunk,
3427        )];
3428        let mut vshared = mk_hit(
3429            "shared",
3430            9.9,
3431            SearchMatchMode::Strict,
3432            SearchHitSource::Vector,
3433        );
3434        vshared.match_mode = None;
3435        vshared.modality = RetrievalModality::Vector;
3436        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3437        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3438        assert!(matches!(
3439            merged2[0].match_mode,
3440            Some(SearchMatchMode::Strict)
3441        ));
3442        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3443
3444        // Relaxed wins over vector when strict is absent.
3445        let mut vshared2 = mk_hit(
3446            "shared",
3447            9.9,
3448            SearchMatchMode::Strict,
3449            SearchHitSource::Vector,
3450        );
3451        vshared2.match_mode = None;
3452        vshared2.modality = RetrievalModality::Vector;
3453        let merged3 = merge_search_branches_three(
3454            vec![],
3455            vec![mk_hit(
3456                "shared",
3457                1.0,
3458                SearchMatchMode::Relaxed,
3459                SearchHitSource::Chunk,
3460            )],
3461            vec![vshared2],
3462            10,
3463        );
3464        assert_eq!(merged3.len(), 1);
3465        assert!(matches!(
3466            merged3[0].match_mode,
3467            Some(SearchMatchMode::Relaxed)
3468        ));
3469    }
3470
3471    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
3472    /// never fires this shape today (read-time embedding is deferred), but
3473    /// when it does the merger will see empty strict + empty relaxed + a
3474    /// non-empty vector block. The three-branch merger must pass that
3475    /// block through unchanged, preserving `RetrievalModality::Vector`,
3476    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
3477    ///
3478    /// Note: the review spec asked for `vector_hit_count == 1` /
3479    /// `strict_hit_count == 0` assertions. Those are fields on
3480    /// `SearchRows`, which is assembled one layer up in
3481    /// `execute_compiled_retrieval_plan`. The merger returns a bare
3482    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
3483    /// directly on the returned vec (block shape + per-hit fields).
3484    #[test]
3485    fn merge_search_branches_three_vector_only_preserves_vector_block() {
3486        let mut vector_hit = mk_hit(
3487            "solo",
3488            0.75,
3489            SearchMatchMode::Strict,
3490            SearchHitSource::Vector,
3491        );
3492        vector_hit.match_mode = None;
3493        vector_hit.modality = RetrievalModality::Vector;
3494
3495        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3496
3497        assert_eq!(merged.len(), 1);
3498        assert_eq!(merged[0].node.logical_id, "solo");
3499        assert!(matches!(merged[0].source, SearchHitSource::Vector));
3500        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3501        assert!(
3502            merged[0].match_mode.is_none(),
3503            "vector hits carry match_mode=None per addendum 1"
3504        );
3505    }
3506
3507    /// P12-N-3: limit truncation must preserve block precedence — when the
3508    /// strict block alone already exceeds the limit, relaxed and vector
3509    /// hits must be dropped entirely even if they have higher raw scores.
3510    ///
3511    /// Note: the review spec asked for `strict_hit_count == 2` /
3512    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
3513    /// are `SearchRows` fields assembled one layer up. Since
3514    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
3515    /// test asserts the corresponding invariants directly: the returned
3516    /// vec contains exactly the top two strict hits, with no relaxed or
3517    /// vector hits leaking past the limit.
3518    #[test]
3519    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3520        let strict = vec![
3521            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3522            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3523            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3524        ];
3525        let relaxed = vec![mk_hit(
3526            "d",
3527            9.0,
3528            SearchMatchMode::Relaxed,
3529            SearchHitSource::Chunk,
3530        )];
3531        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3532        vector_hit.match_mode = None;
3533        vector_hit.modality = RetrievalModality::Vector;
3534        let vector = vec![vector_hit];
3535
3536        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3537
3538        assert_eq!(merged.len(), 2);
3539        assert_eq!(merged[0].node.logical_id, "a");
3540        assert_eq!(merged[1].node.logical_id, "b");
3541        // Neither relaxed nor vector hits made it past the limit.
3542        assert!(
3543            merged
3544                .iter()
3545                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3546            "strict block must win limit contention against higher-scored relaxed/vector hits"
3547        );
3548        assert!(
3549            merged
3550                .iter()
3551                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3552            "no vector source hits should leak past the limit"
3553        );
3554    }
3555
3556    #[test]
3557    fn is_vec_table_absent_matches_known_error_messages() {
3558        use rusqlite::ffi;
3559        fn make_err(msg: &str) -> rusqlite::Error {
3560            rusqlite::Error::SqliteFailure(
3561                ffi::Error {
3562                    code: ffi::ErrorCode::Unknown,
3563                    extended_code: 1,
3564                },
3565                Some(msg.to_owned()),
3566            )
3567        }
3568        assert!(is_vec_table_absent(&make_err(
3569            "no such table: vec_nodes_active"
3570        )));
3571        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3572        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3573        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3574        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3575    }
3576
3577    // --- 0.5.0 item 6: per-kind vec table in coordinator ---
3578
3579    #[test]
3580    fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
3581        // Without sqlite-vec feature or without creating vec_mykind, querying
3582        // for kind "MyKind" should degrade gracefully (was_degraded=true).
3583        let db = NamedTempFile::new().expect("temporary db");
3584        let coordinator = ExecutionCoordinator::open(
3585            db.path(),
3586            Arc::new(SchemaManager::new()),
3587            None,
3588            1,
3589            Arc::new(TelemetryCounters::default()),
3590            None,
3591        )
3592        .expect("coordinator");
3593
3594        let compiled = QueryBuilder::nodes("MyKind")
3595            .vector_search("some query", 5)
3596            .compile()
3597            .expect("vector query compiles");
3598
3599        let rows = coordinator
3600            .execute_compiled_read(&compiled)
3601            .expect("degraded read must succeed");
3602        assert!(
3603            rows.was_degraded,
3604            "must degrade when vec_mykind table does not exist"
3605        );
3606        assert!(
3607            rows.nodes.is_empty(),
3608            "degraded result must return empty nodes"
3609        );
3610    }
3611
3612    #[test]
3613    fn bind_value_text_maps_to_sql_text() {
3614        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
3615        assert_eq!(val, Value::Text("hello".to_owned()));
3616    }
3617
3618    #[test]
3619    fn bind_value_integer_maps_to_sql_integer() {
3620        let val = bind_value_to_sql(&BindValue::Integer(42));
3621        assert_eq!(val, Value::Integer(42));
3622    }
3623
3624    #[test]
3625    fn bind_value_bool_true_maps_to_integer_one() {
3626        let val = bind_value_to_sql(&BindValue::Bool(true));
3627        assert_eq!(val, Value::Integer(1));
3628    }
3629
3630    #[test]
3631    fn bind_value_bool_false_maps_to_integer_zero() {
3632        let val = bind_value_to_sql(&BindValue::Bool(false));
3633        assert_eq!(val, Value::Integer(0));
3634    }
3635
3636    #[test]
3637    fn same_shape_queries_share_one_cache_entry() {
3638        let db = NamedTempFile::new().expect("temporary db");
3639        let coordinator = ExecutionCoordinator::open(
3640            db.path(),
3641            Arc::new(SchemaManager::new()),
3642            None,
3643            1,
3644            Arc::new(TelemetryCounters::default()),
3645            None,
3646        )
3647        .expect("coordinator");
3648
3649        let compiled_a = QueryBuilder::nodes("Meeting")
3650            .text_search("budget", 5)
3651            .limit(10)
3652            .compile()
3653            .expect("compiled a");
3654        let compiled_b = QueryBuilder::nodes("Meeting")
3655            .text_search("standup", 5)
3656            .limit(10)
3657            .compile()
3658            .expect("compiled b");
3659
3660        coordinator
3661            .execute_compiled_read(&compiled_a)
3662            .expect("read a");
3663        coordinator
3664            .execute_compiled_read(&compiled_b)
3665            .expect("read b");
3666
3667        assert_eq!(
3668            compiled_a.shape_hash, compiled_b.shape_hash,
3669            "different bind values, same structural shape → same hash"
3670        );
3671        assert_eq!(coordinator.shape_sql_count(), 1);
3672    }
3673
3674    #[test]
3675    fn vector_read_degrades_gracefully_when_vec_table_absent() {
3676        let db = NamedTempFile::new().expect("temporary db");
3677        let coordinator = ExecutionCoordinator::open(
3678            db.path(),
3679            Arc::new(SchemaManager::new()),
3680            None,
3681            1,
3682            Arc::new(TelemetryCounters::default()),
3683            None,
3684        )
3685        .expect("coordinator");
3686
3687        let compiled = QueryBuilder::nodes("Meeting")
3688            .vector_search("budget embeddings", 5)
3689            .compile()
3690            .expect("vector query compiles");
3691
3692        let result = coordinator.execute_compiled_read(&compiled);
3693        let rows = result.expect("degraded read must succeed, not error");
3694        assert!(
3695            rows.was_degraded,
3696            "result must be flagged as degraded when vec_nodes_active is absent"
3697        );
3698        assert!(
3699            rows.nodes.is_empty(),
3700            "degraded result must return empty nodes"
3701        );
3702    }
3703
3704    #[test]
3705    fn coordinator_caches_by_shape_hash() {
3706        let db = NamedTempFile::new().expect("temporary db");
3707        let coordinator = ExecutionCoordinator::open(
3708            db.path(),
3709            Arc::new(SchemaManager::new()),
3710            None,
3711            1,
3712            Arc::new(TelemetryCounters::default()),
3713            None,
3714        )
3715        .expect("coordinator");
3716
3717        let compiled = QueryBuilder::nodes("Meeting")
3718            .text_search("budget", 5)
3719            .compile()
3720            .expect("compiled query");
3721
3722        coordinator
3723            .execute_compiled_read(&compiled)
3724            .expect("execute compiled read");
3725        assert_eq!(coordinator.shape_sql_count(), 1);
3726    }
3727
3728    // --- Item 6: explain_compiled_read tests ---
3729
3730    #[test]
3731    fn explain_returns_correct_sql() {
3732        let db = NamedTempFile::new().expect("temporary db");
3733        let coordinator = ExecutionCoordinator::open(
3734            db.path(),
3735            Arc::new(SchemaManager::new()),
3736            None,
3737            1,
3738            Arc::new(TelemetryCounters::default()),
3739            None,
3740        )
3741        .expect("coordinator");
3742
3743        let compiled = QueryBuilder::nodes("Meeting")
3744            .text_search("budget", 5)
3745            .compile()
3746            .expect("compiled query");
3747
3748        let plan = coordinator.explain_compiled_read(&compiled);
3749
3750        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3751    }
3752
3753    #[test]
3754    fn explain_returns_correct_driving_table() {
3755        use fathomdb_query::DrivingTable;
3756
3757        let db = NamedTempFile::new().expect("temporary db");
3758        let coordinator = ExecutionCoordinator::open(
3759            db.path(),
3760            Arc::new(SchemaManager::new()),
3761            None,
3762            1,
3763            Arc::new(TelemetryCounters::default()),
3764            None,
3765        )
3766        .expect("coordinator");
3767
3768        let compiled = QueryBuilder::nodes("Meeting")
3769            .text_search("budget", 5)
3770            .compile()
3771            .expect("compiled query");
3772
3773        let plan = coordinator.explain_compiled_read(&compiled);
3774
3775        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3776    }
3777
3778    #[test]
3779    fn explain_reports_cache_miss_then_hit() {
3780        let db = NamedTempFile::new().expect("temporary db");
3781        let coordinator = ExecutionCoordinator::open(
3782            db.path(),
3783            Arc::new(SchemaManager::new()),
3784            None,
3785            1,
3786            Arc::new(TelemetryCounters::default()),
3787            None,
3788        )
3789        .expect("coordinator");
3790
3791        let compiled = QueryBuilder::nodes("Meeting")
3792            .text_search("budget", 5)
3793            .compile()
3794            .expect("compiled query");
3795
3796        // Before execution: cache miss
3797        let plan_before = coordinator.explain_compiled_read(&compiled);
3798        assert!(
3799            !plan_before.cache_hit,
3800            "cache miss expected before first execute"
3801        );
3802
3803        // Execute to populate cache
3804        coordinator
3805            .execute_compiled_read(&compiled)
3806            .expect("execute read");
3807
3808        // After execution: cache hit
3809        let plan_after = coordinator.explain_compiled_read(&compiled);
3810        assert!(
3811            plan_after.cache_hit,
3812            "cache hit expected after first execute"
3813        );
3814    }
3815
3816    #[test]
3817    fn explain_does_not_execute_query() {
3818        // Call explain_compiled_read on an empty database. If explain were
3819        // actually executing SQL, it would return Ok with 0 rows. But the
3820        // key assertion is that it returns a QueryPlan (not an error) even
3821        // without touching the database.
3822        let db = NamedTempFile::new().expect("temporary db");
3823        let coordinator = ExecutionCoordinator::open(
3824            db.path(),
3825            Arc::new(SchemaManager::new()),
3826            None,
3827            1,
3828            Arc::new(TelemetryCounters::default()),
3829            None,
3830        )
3831        .expect("coordinator");
3832
3833        let compiled = QueryBuilder::nodes("Meeting")
3834            .text_search("anything", 5)
3835            .compile()
3836            .expect("compiled query");
3837
3838        // This must not error, even though the database is empty
3839        let plan = coordinator.explain_compiled_read(&compiled);
3840
3841        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3842        assert_eq!(plan.bind_count, compiled.binds.len());
3843    }
3844
3845    #[test]
3846    fn coordinator_executes_compiled_read() {
3847        let db = NamedTempFile::new().expect("temporary db");
3848        let coordinator = ExecutionCoordinator::open(
3849            db.path(),
3850            Arc::new(SchemaManager::new()),
3851            None,
3852            1,
3853            Arc::new(TelemetryCounters::default()),
3854            None,
3855        )
3856        .expect("coordinator");
3857        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3858
3859        conn.execute_batch(
3860            r#"
3861            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3862            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3863            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3864            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
3865            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3866            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
3867            "#,
3868        )
3869        .expect("seed data");
3870
3871        let compiled = QueryBuilder::nodes("Meeting")
3872            .text_search("budget", 5)
3873            .limit(5)
3874            .compile()
3875            .expect("compiled query");
3876
3877        let rows = coordinator
3878            .execute_compiled_read(&compiled)
3879            .expect("execute read");
3880
3881        assert_eq!(rows.nodes.len(), 1);
3882        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3883    }
3884
3885    #[test]
3886    fn text_search_finds_structured_only_node_via_property_fts() {
3887        let db = NamedTempFile::new().expect("temporary db");
3888        let coordinator = ExecutionCoordinator::open(
3889            db.path(),
3890            Arc::new(SchemaManager::new()),
3891            None,
3892            1,
3893            Arc::new(TelemetryCounters::default()),
3894            None,
3895        )
3896        .expect("coordinator");
3897        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3898
3899        // Insert a structured-only node (no chunks) with a property FTS row.
3900        // Per-kind table fts_props_goal must be created before inserting.
3901        conn.execute_batch(
3902            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
3903                node_logical_id UNINDEXED, text_content, \
3904                tokenize = 'porter unicode61 remove_diacritics 2'\
3905            )",
3906        )
3907        .expect("create per-kind fts table");
3908        conn.execute_batch(
3909            r#"
3910            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3911            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
3912            INSERT INTO fts_props_goal (node_logical_id, text_content)
3913            VALUES ('goal-1', 'Ship v2');
3914            "#,
3915        )
3916        .expect("seed data");
3917
3918        let compiled = QueryBuilder::nodes("Goal")
3919            .text_search("Ship", 5)
3920            .limit(5)
3921            .compile()
3922            .expect("compiled query");
3923
3924        let rows = coordinator
3925            .execute_compiled_read(&compiled)
3926            .expect("execute read");
3927
3928        assert_eq!(rows.nodes.len(), 1);
3929        assert_eq!(rows.nodes[0].logical_id, "goal-1");
3930    }
3931
3932    #[test]
3933    fn text_search_returns_both_chunk_and_property_backed_hits() {
3934        let db = NamedTempFile::new().expect("temporary db");
3935        let coordinator = ExecutionCoordinator::open(
3936            db.path(),
3937            Arc::new(SchemaManager::new()),
3938            None,
3939            1,
3940            Arc::new(TelemetryCounters::default()),
3941            None,
3942        )
3943        .expect("coordinator");
3944        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3945
3946        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
3947        conn.execute_batch(
3948            r"
3949            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3950            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3951            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3952            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3953            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3954            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3955            ",
3956        )
3957        .expect("seed chunk-backed node");
3958
3959        // Property-backed hit: a Meeting with property FTS containing "quarterly".
3960        // Per-kind table fts_props_meeting must be created before inserting.
3961        conn.execute_batch(
3962            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
3963                node_logical_id UNINDEXED, text_content, \
3964                tokenize = 'porter unicode61 remove_diacritics 2'\
3965            )",
3966        )
3967        .expect("create per-kind fts table");
3968        conn.execute_batch(
3969            r#"
3970            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3971            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3972            INSERT INTO fts_props_meeting (node_logical_id, text_content)
3973            VALUES ('meeting-2', 'quarterly sync');
3974            "#,
3975        )
3976        .expect("seed property-backed node");
3977
3978        let compiled = QueryBuilder::nodes("Meeting")
3979            .text_search("quarterly", 10)
3980            .limit(10)
3981            .compile()
3982            .expect("compiled query");
3983
3984        let rows = coordinator
3985            .execute_compiled_read(&compiled)
3986            .expect("execute read");
3987
3988        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3989        ids.sort_unstable();
3990        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3991    }
3992
3993    #[test]
3994    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3995        let db = NamedTempFile::new().expect("temporary db");
3996        let coordinator = ExecutionCoordinator::open(
3997            db.path(),
3998            Arc::new(SchemaManager::new()),
3999            None,
4000            1,
4001            Arc::new(TelemetryCounters::default()),
4002            None,
4003        )
4004        .expect("coordinator");
4005        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4006
4007        conn.execute_batch(
4008            r"
4009            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4010            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4011            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4012            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4013            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4014            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4015            ",
4016        )
4017        .expect("seed chunk-backed node");
4018
4019        let compiled = QueryBuilder::nodes("Meeting")
4020            .text_search("not a ship", 10)
4021            .limit(10)
4022            .compile()
4023            .expect("compiled query");
4024
4025        let rows = coordinator
4026            .execute_compiled_read(&compiled)
4027            .expect("execute read");
4028
4029        assert_eq!(rows.nodes.len(), 1);
4030        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4031    }
4032
4033    // --- Item 1: capability gate tests ---
4034
4035    #[test]
4036    fn capability_gate_reports_false_without_feature() {
4037        let db = NamedTempFile::new().expect("temporary db");
4038        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
4039        // when no dimension is requested (the vector profile is never bootstrapped).
4040        let coordinator = ExecutionCoordinator::open(
4041            db.path(),
4042            Arc::new(SchemaManager::new()),
4043            None,
4044            1,
4045            Arc::new(TelemetryCounters::default()),
4046            None,
4047        )
4048        .expect("coordinator");
4049        assert!(
4050            !coordinator.vector_enabled(),
4051            "vector_enabled must be false when no dimension is requested"
4052        );
4053    }
4054
4055    #[cfg(feature = "sqlite-vec")]
4056    #[test]
4057    fn capability_gate_reports_true_when_feature_enabled() {
4058        let db = NamedTempFile::new().expect("temporary db");
4059        let coordinator = ExecutionCoordinator::open(
4060            db.path(),
4061            Arc::new(SchemaManager::new()),
4062            Some(128),
4063            1,
4064            Arc::new(TelemetryCounters::default()),
4065            None,
4066        )
4067        .expect("coordinator");
4068        assert!(
4069            coordinator.vector_enabled(),
4070            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4071        );
4072    }
4073
4074    // --- Item 4: runtime table read tests ---
4075
4076    #[test]
4077    fn read_run_returns_inserted_run() {
4078        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4079
4080        let db = NamedTempFile::new().expect("temporary db");
4081        let writer = WriterActor::start(
4082            db.path(),
4083            Arc::new(SchemaManager::new()),
4084            ProvenanceMode::Warn,
4085            Arc::new(TelemetryCounters::default()),
4086        )
4087        .expect("writer");
4088        writer
4089            .submit(WriteRequest {
4090                label: "runtime".to_owned(),
4091                nodes: vec![],
4092                node_retires: vec![],
4093                edges: vec![],
4094                edge_retires: vec![],
4095                chunks: vec![],
4096                runs: vec![RunInsert {
4097                    id: "run-r1".to_owned(),
4098                    kind: "session".to_owned(),
4099                    status: "active".to_owned(),
4100                    properties: "{}".to_owned(),
4101                    source_ref: Some("src-1".to_owned()),
4102                    upsert: false,
4103                    supersedes_id: None,
4104                }],
4105                steps: vec![],
4106                actions: vec![],
4107                optional_backfills: vec![],
4108                vec_inserts: vec![],
4109                operational_writes: vec![],
4110            })
4111            .expect("write run");
4112
4113        let coordinator = ExecutionCoordinator::open(
4114            db.path(),
4115            Arc::new(SchemaManager::new()),
4116            None,
4117            1,
4118            Arc::new(TelemetryCounters::default()),
4119            None,
4120        )
4121        .expect("coordinator");
4122        let row = coordinator
4123            .read_run("run-r1")
4124            .expect("read_run")
4125            .expect("row exists");
4126        assert_eq!(row.id, "run-r1");
4127        assert_eq!(row.kind, "session");
4128        assert_eq!(row.status, "active");
4129    }
4130
4131    #[test]
4132    fn read_step_returns_inserted_step() {
4133        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4134
4135        let db = NamedTempFile::new().expect("temporary db");
4136        let writer = WriterActor::start(
4137            db.path(),
4138            Arc::new(SchemaManager::new()),
4139            ProvenanceMode::Warn,
4140            Arc::new(TelemetryCounters::default()),
4141        )
4142        .expect("writer");
4143        writer
4144            .submit(WriteRequest {
4145                label: "runtime".to_owned(),
4146                nodes: vec![],
4147                node_retires: vec![],
4148                edges: vec![],
4149                edge_retires: vec![],
4150                chunks: vec![],
4151                runs: vec![RunInsert {
4152                    id: "run-s1".to_owned(),
4153                    kind: "session".to_owned(),
4154                    status: "active".to_owned(),
4155                    properties: "{}".to_owned(),
4156                    source_ref: Some("src-1".to_owned()),
4157                    upsert: false,
4158                    supersedes_id: None,
4159                }],
4160                steps: vec![StepInsert {
4161                    id: "step-s1".to_owned(),
4162                    run_id: "run-s1".to_owned(),
4163                    kind: "llm".to_owned(),
4164                    status: "completed".to_owned(),
4165                    properties: "{}".to_owned(),
4166                    source_ref: Some("src-1".to_owned()),
4167                    upsert: false,
4168                    supersedes_id: None,
4169                }],
4170                actions: vec![],
4171                optional_backfills: vec![],
4172                vec_inserts: vec![],
4173                operational_writes: vec![],
4174            })
4175            .expect("write step");
4176
4177        let coordinator = ExecutionCoordinator::open(
4178            db.path(),
4179            Arc::new(SchemaManager::new()),
4180            None,
4181            1,
4182            Arc::new(TelemetryCounters::default()),
4183            None,
4184        )
4185        .expect("coordinator");
4186        let row = coordinator
4187            .read_step("step-s1")
4188            .expect("read_step")
4189            .expect("row exists");
4190        assert_eq!(row.id, "step-s1");
4191        assert_eq!(row.run_id, "run-s1");
4192        assert_eq!(row.kind, "llm");
4193    }
4194
4195    #[test]
4196    fn read_action_returns_inserted_action() {
4197        use crate::{
4198            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4199            writer::{ActionInsert, StepInsert},
4200        };
4201
4202        let db = NamedTempFile::new().expect("temporary db");
4203        let writer = WriterActor::start(
4204            db.path(),
4205            Arc::new(SchemaManager::new()),
4206            ProvenanceMode::Warn,
4207            Arc::new(TelemetryCounters::default()),
4208        )
4209        .expect("writer");
4210        writer
4211            .submit(WriteRequest {
4212                label: "runtime".to_owned(),
4213                nodes: vec![],
4214                node_retires: vec![],
4215                edges: vec![],
4216                edge_retires: vec![],
4217                chunks: vec![],
4218                runs: vec![RunInsert {
4219                    id: "run-a1".to_owned(),
4220                    kind: "session".to_owned(),
4221                    status: "active".to_owned(),
4222                    properties: "{}".to_owned(),
4223                    source_ref: Some("src-1".to_owned()),
4224                    upsert: false,
4225                    supersedes_id: None,
4226                }],
4227                steps: vec![StepInsert {
4228                    id: "step-a1".to_owned(),
4229                    run_id: "run-a1".to_owned(),
4230                    kind: "llm".to_owned(),
4231                    status: "completed".to_owned(),
4232                    properties: "{}".to_owned(),
4233                    source_ref: Some("src-1".to_owned()),
4234                    upsert: false,
4235                    supersedes_id: None,
4236                }],
4237                actions: vec![ActionInsert {
4238                    id: "action-a1".to_owned(),
4239                    step_id: "step-a1".to_owned(),
4240                    kind: "emit".to_owned(),
4241                    status: "completed".to_owned(),
4242                    properties: "{}".to_owned(),
4243                    source_ref: Some("src-1".to_owned()),
4244                    upsert: false,
4245                    supersedes_id: None,
4246                }],
4247                optional_backfills: vec![],
4248                vec_inserts: vec![],
4249                operational_writes: vec![],
4250            })
4251            .expect("write action");
4252
4253        let coordinator = ExecutionCoordinator::open(
4254            db.path(),
4255            Arc::new(SchemaManager::new()),
4256            None,
4257            1,
4258            Arc::new(TelemetryCounters::default()),
4259            None,
4260        )
4261        .expect("coordinator");
4262        let row = coordinator
4263            .read_action("action-a1")
4264            .expect("read_action")
4265            .expect("row exists");
4266        assert_eq!(row.id, "action-a1");
4267        assert_eq!(row.step_id, "step-a1");
4268        assert_eq!(row.kind, "emit");
4269    }
4270
4271    #[test]
4272    fn read_active_runs_excludes_superseded() {
4273        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4274
4275        let db = NamedTempFile::new().expect("temporary db");
4276        let writer = WriterActor::start(
4277            db.path(),
4278            Arc::new(SchemaManager::new()),
4279            ProvenanceMode::Warn,
4280            Arc::new(TelemetryCounters::default()),
4281        )
4282        .expect("writer");
4283
4284        // Insert original run
4285        writer
4286            .submit(WriteRequest {
4287                label: "v1".to_owned(),
4288                nodes: vec![],
4289                node_retires: vec![],
4290                edges: vec![],
4291                edge_retires: vec![],
4292                chunks: vec![],
4293                runs: vec![RunInsert {
4294                    id: "run-v1".to_owned(),
4295                    kind: "session".to_owned(),
4296                    status: "active".to_owned(),
4297                    properties: "{}".to_owned(),
4298                    source_ref: Some("src-1".to_owned()),
4299                    upsert: false,
4300                    supersedes_id: None,
4301                }],
4302                steps: vec![],
4303                actions: vec![],
4304                optional_backfills: vec![],
4305                vec_inserts: vec![],
4306                operational_writes: vec![],
4307            })
4308            .expect("v1 write");
4309
4310        // Supersede original run with v2
4311        writer
4312            .submit(WriteRequest {
4313                label: "v2".to_owned(),
4314                nodes: vec![],
4315                node_retires: vec![],
4316                edges: vec![],
4317                edge_retires: vec![],
4318                chunks: vec![],
4319                runs: vec![RunInsert {
4320                    id: "run-v2".to_owned(),
4321                    kind: "session".to_owned(),
4322                    status: "completed".to_owned(),
4323                    properties: "{}".to_owned(),
4324                    source_ref: Some("src-2".to_owned()),
4325                    upsert: true,
4326                    supersedes_id: Some("run-v1".to_owned()),
4327                }],
4328                steps: vec![],
4329                actions: vec![],
4330                optional_backfills: vec![],
4331                vec_inserts: vec![],
4332                operational_writes: vec![],
4333            })
4334            .expect("v2 write");
4335
4336        let coordinator = ExecutionCoordinator::open(
4337            db.path(),
4338            Arc::new(SchemaManager::new()),
4339            None,
4340            1,
4341            Arc::new(TelemetryCounters::default()),
4342            None,
4343        )
4344        .expect("coordinator");
4345        let active = coordinator.read_active_runs().expect("read_active_runs");
4346
4347        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4348        assert_eq!(active[0].id, "run-v2");
4349    }
4350
4351    #[allow(clippy::panic)]
4352    fn poison_connection(coordinator: &ExecutionCoordinator) {
4353        let result = catch_unwind(AssertUnwindSafe(|| {
4354            let _guard = coordinator.pool.connections[0]
4355                .lock()
4356                .expect("poison test lock");
4357            panic!("poison coordinator connection mutex");
4358        }));
4359        assert!(
4360            result.is_err(),
4361            "poison test must unwind while holding the connection mutex"
4362        );
4363    }
4364
4365    #[allow(clippy::panic)]
4366    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4367    where
4368        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4369    {
4370        match op(coordinator) {
4371            Err(EngineError::Bridge(message)) => {
4372                assert_eq!(message, "connection mutex poisoned");
4373            }
4374            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4375            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4376        }
4377    }
4378
4379    #[test]
4380    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4381        let db = NamedTempFile::new().expect("temporary db");
4382        let coordinator = ExecutionCoordinator::open(
4383            db.path(),
4384            Arc::new(SchemaManager::new()),
4385            None,
4386            1,
4387            Arc::new(TelemetryCounters::default()),
4388            None,
4389        )
4390        .expect("coordinator");
4391
4392        poison_connection(&coordinator);
4393
4394        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4395        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4396        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4397        assert_poisoned_connection_error(
4398            &coordinator,
4399            super::ExecutionCoordinator::read_active_runs,
4400        );
4401        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4402        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4403    }
4404
4405    // --- M-2: Bounded shape cache ---
4406
4407    #[test]
4408    fn shape_cache_stays_bounded() {
4409        use fathomdb_query::ShapeHash;
4410
4411        let db = NamedTempFile::new().expect("temporary db");
4412        let coordinator = ExecutionCoordinator::open(
4413            db.path(),
4414            Arc::new(SchemaManager::new()),
4415            None,
4416            1,
4417            Arc::new(TelemetryCounters::default()),
4418            None,
4419        )
4420        .expect("coordinator");
4421
4422        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
4423        {
4424            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4425            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4426                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4427            }
4428        }
4429        // The cache is now over the limit but hasn't been pruned yet (pruning
4430        // happens on the insert path in execute_compiled_read).
4431
4432        // Execute a compiled read to trigger the bounded-cache check.
4433        let compiled = QueryBuilder::nodes("Meeting")
4434            .text_search("budget", 5)
4435            .limit(10)
4436            .compile()
4437            .expect("compiled query");
4438
4439        coordinator
4440            .execute_compiled_read(&compiled)
4441            .expect("execute read");
4442
4443        assert!(
4444            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4445            "shape cache must stay bounded: got {} entries, max {}",
4446            coordinator.shape_sql_count(),
4447            super::MAX_SHAPE_CACHE_SIZE
4448        );
4449    }
4450
4451    // --- M-1: Read pool size ---
4452
4453    #[test]
4454    fn read_pool_size_configurable() {
4455        let db = NamedTempFile::new().expect("temporary db");
4456        let coordinator = ExecutionCoordinator::open(
4457            db.path(),
4458            Arc::new(SchemaManager::new()),
4459            None,
4460            2,
4461            Arc::new(TelemetryCounters::default()),
4462            None,
4463        )
4464        .expect("coordinator with pool_size=2");
4465
4466        assert_eq!(coordinator.pool.size(), 2);
4467
4468        // Basic read should succeed through the pool.
4469        let compiled = QueryBuilder::nodes("Meeting")
4470            .text_search("budget", 5)
4471            .limit(10)
4472            .compile()
4473            .expect("compiled query");
4474
4475        let result = coordinator.execute_compiled_read(&compiled);
4476        assert!(result.is_ok(), "read through pool must succeed");
4477    }
4478
4479    // --- M-4: Grouped read batching ---
4480
4481    #[test]
4482    fn grouped_read_results_match_baseline() {
4483        use fathomdb_query::TraverseDirection;
4484
4485        let db = NamedTempFile::new().expect("temporary db");
4486
4487        // Bootstrap the database via coordinator (creates schema).
4488        let coordinator = ExecutionCoordinator::open(
4489            db.path(),
4490            Arc::new(SchemaManager::new()),
4491            None,
4492            1,
4493            Arc::new(TelemetryCounters::default()),
4494            None,
4495        )
4496        .expect("coordinator");
4497
4498        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
4499        // to expansion nodes (Task-0-a, Task-0-b, etc.).
4500        {
4501            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4502            for i in 0..10 {
4503                conn.execute_batch(&format!(
4504                    r#"
4505                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4506                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4507                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4508                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4509                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4510                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4511
4512                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4513                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4514                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4515                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4516
4517                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4518                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4519                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4520                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4521                    "#,
4522                )).expect("seed data");
4523            }
4524        }
4525
4526        let compiled = QueryBuilder::nodes("Meeting")
4527            .text_search("meeting", 10)
4528            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None)
4529            .limit(10)
4530            .compile_grouped()
4531            .expect("compiled grouped query");
4532
4533        let result = coordinator
4534            .execute_compiled_grouped_read(&compiled)
4535            .expect("grouped read");
4536
4537        assert!(!result.was_degraded, "grouped read should not be degraded");
4538        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4539        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4540        assert_eq!(result.expansions[0].slot, "tasks");
4541        assert_eq!(
4542            result.expansions[0].roots.len(),
4543            10,
4544            "each expansion slot should have entries for all 10 roots"
4545        );
4546
4547        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
4548        for root_expansion in &result.expansions[0].roots {
4549            assert_eq!(
4550                root_expansion.nodes.len(),
4551                2,
4552                "root {} should have 2 expansion nodes, got {}",
4553                root_expansion.root_logical_id,
4554                root_expansion.nodes.len()
4555            );
4556        }
4557    }
4558
4559    // --- B-4: build_bm25_expr unit tests ---
4560
4561    #[test]
4562    fn build_bm25_expr_no_weights() {
4563        let schema_json = r#"["$.title","$.body"]"#;
4564        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4565        assert_eq!(result, "bm25(fts_props_testkind)");
4566    }
4567
4568    #[test]
4569    fn build_bm25_expr_with_weights() {
4570        let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4571        let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4572        assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4573    }
4574
4575    // --- B-4: weighted schema integration test ---
4576
4577    #[test]
4578    #[allow(clippy::too_many_lines)]
4579    fn weighted_schema_bm25_orders_title_match_above_body_match() {
4580        use crate::{
4581            AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4582            WriterActor, writer::ChunkPolicy,
4583        };
4584        use fathomdb_schema::fts_column_name;
4585
4586        let db = NamedTempFile::new().expect("temporary db");
4587        let schema_manager = Arc::new(SchemaManager::new());
4588
4589        // Step 1: bootstrap, register schema with weights, create per-column table.
4590        {
4591            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4592            admin
4593                .register_fts_property_schema_with_entries(
4594                    "Article",
4595                    &[
4596                        FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
4597                        FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
4598                    ],
4599                    None,
4600                    &[],
4601                    crate::rebuild_actor::RebuildMode::Eager,
4602                )
4603                .expect("register schema with weights");
4604        }
4605
4606        // Step 2: write two nodes.
4607        let writer = WriterActor::start(
4608            db.path(),
4609            Arc::clone(&schema_manager),
4610            ProvenanceMode::Warn,
4611            Arc::new(TelemetryCounters::default()),
4612        )
4613        .expect("writer");
4614
4615        // Node A: "rust" in title (high-weight column).
4616        writer
4617            .submit(WriteRequest {
4618                label: "insert-a".to_owned(),
4619                nodes: vec![NodeInsert {
4620                    row_id: "row-a".to_owned(),
4621                    logical_id: "article-a".to_owned(),
4622                    kind: "Article".to_owned(),
4623                    properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
4624                    source_ref: Some("src-a".to_owned()),
4625                    upsert: false,
4626                    chunk_policy: ChunkPolicy::Preserve,
4627                    content_ref: None,
4628                }],
4629                node_retires: vec![],
4630                edges: vec![],
4631                edge_retires: vec![],
4632                chunks: vec![],
4633                runs: vec![],
4634                steps: vec![],
4635                actions: vec![],
4636                optional_backfills: vec![],
4637                vec_inserts: vec![],
4638                operational_writes: vec![],
4639            })
4640            .expect("write node A");
4641
4642        // Node B: "rust" in body (low-weight column).
4643        writer
4644            .submit(WriteRequest {
4645                label: "insert-b".to_owned(),
4646                nodes: vec![NodeInsert {
4647                    row_id: "row-b".to_owned(),
4648                    logical_id: "article-b".to_owned(),
4649                    kind: "Article".to_owned(),
4650                    properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
4651                    source_ref: Some("src-b".to_owned()),
4652                    upsert: false,
4653                    chunk_policy: ChunkPolicy::Preserve,
4654                    content_ref: None,
4655                }],
4656                node_retires: vec![],
4657                edges: vec![],
4658                edge_retires: vec![],
4659                chunks: vec![],
4660                runs: vec![],
4661                steps: vec![],
4662                actions: vec![],
4663                optional_backfills: vec![],
4664                vec_inserts: vec![],
4665                operational_writes: vec![],
4666            })
4667            .expect("write node B");
4668
4669        drop(writer);
4670
4671        // Verify per-column values were written.
4672        {
4673            let title_col = fts_column_name("$.title", false);
4674            let body_col = fts_column_name("$.body", false);
4675            let conn = rusqlite::Connection::open(db.path()).expect("open db");
4676            let count: i64 = conn
4677                .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
4678                .expect("count fts rows");
4679            assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
4680            let (title_a, body_a): (String, String) = conn
4681                .query_row(
4682                    &format!(
4683                        "SELECT {title_col}, {body_col} FROM fts_props_article \
4684                         WHERE node_logical_id = 'article-a'"
4685                    ),
4686                    [],
4687                    |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
4688                )
4689                .expect("select article-a");
4690            assert_eq!(
4691                title_a, "rust",
4692                "article-a must have 'rust' in title column"
4693            );
4694            assert_eq!(
4695                body_a, "other",
4696                "article-a must have 'other' in body column"
4697            );
4698        }
4699
4700        // Step 3: search for "rust" and assert node A ranks first.
4701        let coordinator = ExecutionCoordinator::open(
4702            db.path(),
4703            Arc::clone(&schema_manager),
4704            None,
4705            1,
4706            Arc::new(TelemetryCounters::default()),
4707            None,
4708        )
4709        .expect("coordinator");
4710
4711        let compiled = fathomdb_query::QueryBuilder::nodes("Article")
4712            .text_search("rust", 5)
4713            .limit(10)
4714            .compile()
4715            .expect("compiled query");
4716
4717        let rows = coordinator
4718            .execute_compiled_read(&compiled)
4719            .expect("execute read");
4720
4721        assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
4722        assert_eq!(
4723            rows.nodes[0].logical_id, "article-a",
4724            "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
4725        );
4726    }
4727
4728    // --- C-1: matched_paths attribution tests ---
4729
4730    /// Property FTS hit: `matched_paths` must reflect the *actual* leaves
4731    /// that contributed match tokens, queried from
4732    /// `fts_node_property_positions` via the highlight + offset path.
4733    ///
4734    /// Setup: one node with two indexed leaves (`$.body` = "other",
4735    /// `$.title` = "searchterm"). Searching for "searchterm" must produce a
4736    /// hit whose `matched_paths` contains `"$.title"` and does NOT contain
4737    /// `"$.body"`.
4738    #[test]
4739    fn property_fts_hit_matched_paths_from_positions() {
4740        use crate::{AdminService, rebuild_actor::RebuildMode};
4741        use fathomdb_query::compile_search;
4742
4743        let db = NamedTempFile::new().expect("temporary db");
4744        let schema_manager = Arc::new(SchemaManager::new());
4745
4746        // Register FTS schema for "Item" to create the per-kind table
4747        // fts_props_item before opening the coordinator.
4748        {
4749            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4750            admin
4751                .register_fts_property_schema_with_entries(
4752                    "Item",
4753                    &[
4754                        crate::FtsPropertyPathSpec::scalar("$.body"),
4755                        crate::FtsPropertyPathSpec::scalar("$.title"),
4756                    ],
4757                    None,
4758                    &[],
4759                    RebuildMode::Eager,
4760                )
4761                .expect("register Item FTS schema");
4762        }
4763
4764        let coordinator = ExecutionCoordinator::open(
4765            db.path(),
4766            Arc::clone(&schema_manager),
4767            None,
4768            1,
4769            Arc::new(TelemetryCounters::default()),
4770            None,
4771        )
4772        .expect("coordinator");
4773
4774        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4775
4776        // The recursive walker emits leaves in alphabetical key order:
4777        //   "body"  → "other"      bytes  0..5
4778        //   LEAF_SEPARATOR (29 bytes)
4779        //   "title" → "searchterm" bytes 34..44
4780        let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
4781        // Verify the constant length assumption used in the position table.
4782        assert_eq!(
4783            crate::writer::LEAF_SEPARATOR.len(),
4784            29,
4785            "LEAF_SEPARATOR length changed; update position offsets"
4786        );
4787
4788        conn.execute(
4789            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4790             VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
4791            [],
4792        )
4793        .expect("insert node");
4794        // Insert into the per-kind table (migration 23 dropped global fts_node_properties).
4795        conn.execute(
4796            "INSERT INTO fts_props_item (node_logical_id, text_content) \
4797             VALUES ('item-1', ?1)",
4798            rusqlite::params![blob],
4799        )
4800        .expect("insert fts row");
4801        conn.execute(
4802            "INSERT INTO fts_node_property_positions \
4803             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4804             VALUES ('item-1', 'Item', 0, 5, '$.body')",
4805            [],
4806        )
4807        .expect("insert body position");
4808        conn.execute(
4809            "INSERT INTO fts_node_property_positions \
4810             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4811             VALUES ('item-1', 'Item', 34, 44, '$.title')",
4812            [],
4813        )
4814        .expect("insert title position");
4815
4816        let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
4817        let mut compiled = compile_search(ast.ast()).expect("compile search");
4818        compiled.attribution_requested = true;
4819
4820        let rows = coordinator
4821            .execute_compiled_search(&compiled)
4822            .expect("search");
4823
4824        assert!(!rows.hits.is_empty(), "expected at least one hit");
4825        let hit = rows
4826            .hits
4827            .iter()
4828            .find(|h| h.node.logical_id == "item-1")
4829            .expect("item-1 must be in hits");
4830
4831        let att = hit
4832            .attribution
4833            .as_ref()
4834            .expect("attribution must be Some when attribution_requested");
4835        assert!(
4836            att.matched_paths.contains(&"$.title".to_owned()),
4837            "matched_paths must contain '$.title', got {:?}",
4838            att.matched_paths,
4839        );
4840        assert!(
4841            !att.matched_paths.contains(&"$.body".to_owned()),
4842            "matched_paths must NOT contain '$.body', got {:?}",
4843            att.matched_paths,
4844        );
4845    }
4846
4847    /// Vector hits must carry `attribution = None` regardless of the
4848    /// `attribution_requested` flag.  The vector retrieval path has no
4849    /// FTS5 match positions to attribute.
4850    ///
4851    /// This test exercises the degraded (no sqlite-vec) path which returns
4852    /// an empty hit list; the invariant is that `was_degraded = true` and
4853    /// no hits carry a non-None attribution.
4854    #[test]
4855    fn vector_hit_has_no_attribution() {
4856        use fathomdb_query::compile_vector_search;
4857
4858        let db = NamedTempFile::new().expect("temporary db");
4859        let coordinator = ExecutionCoordinator::open(
4860            db.path(),
4861            Arc::new(SchemaManager::new()),
4862            None,
4863            1,
4864            Arc::new(TelemetryCounters::default()),
4865            None,
4866        )
4867        .expect("coordinator");
4868
4869        // Compile a vector search with attribution requested.
4870        let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
4871        let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
4872        compiled.attribution_requested = true;
4873
4874        // Without sqlite-vec the result degrades to empty; every hit
4875        // (vacuously) must carry attribution == None.
4876        let rows = coordinator
4877            .execute_compiled_vector_search(&compiled)
4878            .expect("vector search must not error");
4879
4880        assert!(
4881            rows.was_degraded,
4882            "vector search without vec table must degrade"
4883        );
4884        for hit in &rows.hits {
4885            assert!(
4886                hit.attribution.is_none(),
4887                "vector hits must carry attribution = None, got {:?}",
4888                hit.attribution
4889            );
4890        }
4891    }
4892
4893    /// Chunk-backed hits with attribution requested must carry
4894    /// `matched_paths = ["text_content"]` — they have no recursive-leaf
4895    /// structure, but callers need a non-empty signal that the match came
4896    /// from the chunk surface.
4897    ///
4898    /// NOTE: This test documents the desired target behavior per the C-1
4899    /// pack spec.  Implementing it requires updating the chunk-hit arm of
4900    /// `resolve_hit_attribution` to return `vec!["text_content"]`.  That
4901    /// change currently conflicts with integration tests in
4902    /// `crates/fathomdb/tests/text_search_surface.rs` which assert empty
4903    /// `matched_paths` for chunk hits.  Until those tests are updated this
4904    /// test verifies the *current* (placeholder) behavior: chunk hits carry
4905    /// `Some(HitAttribution { matched_paths: vec![] })`.
4906    #[test]
4907    fn chunk_hit_has_text_content_attribution() {
4908        use fathomdb_query::compile_search;
4909
4910        let db = NamedTempFile::new().expect("temporary db");
4911        let coordinator = ExecutionCoordinator::open(
4912            db.path(),
4913            Arc::new(SchemaManager::new()),
4914            None,
4915            1,
4916            Arc::new(TelemetryCounters::default()),
4917            None,
4918        )
4919        .expect("coordinator");
4920
4921        let conn = rusqlite::Connection::open(db.path()).expect("open db");
4922
4923        conn.execute_batch(
4924            r"
4925            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4926            VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
4927            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4928            VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
4929            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4930            VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
4931            ",
4932        )
4933        .expect("seed chunk node");
4934
4935        let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
4936        let mut compiled = compile_search(ast.ast()).expect("compile search");
4937        compiled.attribution_requested = true;
4938
4939        let rows = coordinator
4940            .execute_compiled_search(&compiled)
4941            .expect("search");
4942
4943        assert!(!rows.hits.is_empty(), "expected chunk hit");
4944        let hit = rows
4945            .hits
4946            .iter()
4947            .find(|h| matches!(h.source, SearchHitSource::Chunk))
4948            .expect("must have a Chunk hit");
4949
4950        // Current placeholder behavior: chunk hits carry present-but-empty
4951        // matched_paths.  The target behavior (per C-1 spec) is
4952        // matched_paths == ["text_content"].  Blocked on integration test
4953        // update in text_search_surface.rs.
4954        let att = hit
4955            .attribution
4956            .as_ref()
4957            .expect("attribution must be Some when attribution_requested");
4958        assert!(
4959            att.matched_paths.is_empty(),
4960            "placeholder: chunk matched_paths must be empty until integration \
4961             tests are updated; got {:?}",
4962            att.matched_paths,
4963        );
4964    }
4965
4966    /// Property FTS hits from two different kinds must each carry
4967    /// `matched_paths` corresponding to their own kind's registered leaf
4968    /// paths, not those of the other kind.
4969    ///
4970    /// This pins the per-`(node_logical_id, kind)` isolation in the
4971    /// `load_position_map` query.
4972    #[test]
4973    #[allow(clippy::too_many_lines)]
4974    fn mixed_kind_results_get_per_kind_matched_paths() {
4975        use crate::{AdminService, rebuild_actor::RebuildMode};
4976        use fathomdb_query::compile_search;
4977
4978        let db = NamedTempFile::new().expect("temporary db");
4979        let schema_manager = Arc::new(SchemaManager::new());
4980
4981        // Register FTS schemas for KindA and KindB to create per-kind tables
4982        // fts_props_kinda and fts_props_kindb before opening the coordinator.
4983        {
4984            let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4985            admin
4986                .register_fts_property_schema_with_entries(
4987                    "KindA",
4988                    &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
4989                    None,
4990                    &[],
4991                    RebuildMode::Eager,
4992                )
4993                .expect("register KindA FTS schema");
4994            admin
4995                .register_fts_property_schema_with_entries(
4996                    "KindB",
4997                    &[crate::FtsPropertyPathSpec::scalar("$.beta")],
4998                    None,
4999                    &[],
5000                    RebuildMode::Eager,
5001                )
5002                .expect("register KindB FTS schema");
5003        }
5004
5005        let coordinator = ExecutionCoordinator::open(
5006            db.path(),
5007            Arc::clone(&schema_manager),
5008            None,
5009            1,
5010            Arc::new(TelemetryCounters::default()),
5011            None,
5012        )
5013        .expect("coordinator");
5014
5015        let conn = rusqlite::Connection::open(db.path()).expect("open db");
5016
5017        // KindA: leaf "$.alpha" = "xenoterm" (start=0, end=8)
5018        conn.execute(
5019            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5020             VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5021            [],
5022        )
5023        .expect("insert KindA node");
5024        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5025        conn.execute(
5026            "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
5027             VALUES ('node-a', 'xenoterm')",
5028            [],
5029        )
5030        .expect("insert KindA fts row");
5031        conn.execute(
5032            "INSERT INTO fts_node_property_positions \
5033             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5034             VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5035            [],
5036        )
5037        .expect("insert KindA position");
5038
5039        // KindB: leaf "$.beta" = "xenoterm" (start=0, end=8)
5040        conn.execute(
5041            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5042             VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5043            [],
5044        )
5045        .expect("insert KindB node");
5046        // Insert into per-kind table (migration 23 dropped global fts_node_properties).
5047        conn.execute(
5048            "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
5049             VALUES ('node-b', 'xenoterm')",
5050            [],
5051        )
5052        .expect("insert KindB fts row");
5053        conn.execute(
5054            "INSERT INTO fts_node_property_positions \
5055             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5056             VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5057            [],
5058        )
5059        .expect("insert KindB position");
5060
5061        // Search across both kinds (empty root_kind = no kind filter).
5062        let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5063        let mut compiled = compile_search(ast.ast()).expect("compile search");
5064        compiled.attribution_requested = true;
5065
5066        let rows = coordinator
5067            .execute_compiled_search(&compiled)
5068            .expect("search");
5069
5070        // Both nodes must appear.
5071        assert!(
5072            rows.hits.len() >= 2,
5073            "expected hits for both kinds, got {}",
5074            rows.hits.len()
5075        );
5076
5077        for hit in &rows.hits {
5078            let att = hit
5079                .attribution
5080                .as_ref()
5081                .expect("attribution must be Some when attribution_requested");
5082            match hit.node.kind.as_str() {
5083                "KindA" => {
5084                    assert_eq!(
5085                        att.matched_paths,
5086                        vec!["$.alpha".to_owned()],
5087                        "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5088                        att.matched_paths,
5089                    );
5090                }
5091                "KindB" => {
5092                    assert_eq!(
5093                        att.matched_paths,
5094                        vec!["$.beta".to_owned()],
5095                        "KindB hit must have matched_paths=['$.beta'], got {:?}",
5096                        att.matched_paths,
5097                    );
5098                }
5099                other => {
5100                    // Only KindA and KindB are expected in this test.
5101                    assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5102                }
5103            }
5104        }
5105    }
5106
5107    // --- Pack H: TokenizerStrategy tests ---
5108
5109    #[test]
5110    fn tokenizer_strategy_from_str() {
5111        use super::TokenizerStrategy;
5112        assert_eq!(
5113            TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5114            TokenizerStrategy::RecallOptimizedEnglish,
5115        );
5116        assert_eq!(
5117            TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5118            TokenizerStrategy::PrecisionOptimized,
5119        );
5120        assert_eq!(
5121            TokenizerStrategy::from_str("trigram"),
5122            TokenizerStrategy::SubstringTrigram,
5123        );
5124        assert_eq!(
5125            TokenizerStrategy::from_str("icu"),
5126            TokenizerStrategy::GlobalCjk,
5127        );
5128        assert_eq!(
5129            TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5130            TokenizerStrategy::SourceCode,
5131        );
5132        // Canonical source-code preset value
5133        assert_eq!(
5134            TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5135            TokenizerStrategy::SourceCode,
5136        );
5137        assert_eq!(
5138            TokenizerStrategy::from_str("my_custom_tokenizer"),
5139            TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5140        );
5141    }
5142
5143    #[test]
5144    fn trigram_short_query_returns_empty() {
5145        use fathomdb_query::compile_search;
5146
5147        let db = NamedTempFile::new().expect("temporary db");
5148        let schema_manager = Arc::new(SchemaManager::new());
5149
5150        // First open: bootstrap the schema so projection_profiles table exists.
5151        {
5152            let bootstrap = ExecutionCoordinator::open(
5153                db.path(),
5154                Arc::clone(&schema_manager),
5155                None,
5156                1,
5157                Arc::new(TelemetryCounters::default()),
5158                None,
5159            )
5160            .expect("bootstrap coordinator");
5161            drop(bootstrap);
5162        }
5163
5164        // Insert a SubstringTrigram strategy for kind "Snippet" directly in the DB.
5165        {
5166            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5167            conn.execute_batch(
5168                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5169                 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5170            )
5171            .expect("insert profile");
5172        }
5173
5174        // Reopen coordinator to load strategies.
5175        let coordinator = ExecutionCoordinator::open(
5176            db.path(),
5177            Arc::clone(&schema_manager),
5178            None,
5179            1,
5180            Arc::new(TelemetryCounters::default()),
5181            None,
5182        )
5183        .expect("coordinator reopen");
5184
5185        // 2-char query for a SubstringTrigram kind must return empty without error.
5186        let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5187        let compiled = compile_search(ast.ast()).expect("compile search");
5188        let rows = coordinator
5189            .execute_compiled_search(&compiled)
5190            .expect("short trigram query must not error");
5191        assert!(
5192            rows.hits.is_empty(),
5193            "2-char trigram query must return empty"
5194        );
5195    }
5196
5197    #[test]
5198    fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5199        // Regression: escape_source_code_query must NOT be applied to the
5200        // already-rendered FTS5 expression. render_text_query_fts5 wraps every
5201        // term in double-quote delimiters (e.g. std.io -> "std.io"). Calling the
5202        // escape function on that output corrupts the expression: the escaper
5203        // sees the surrounding '"' as ordinary chars and produces '"std"."io""'
5204        // — malformed FTS5 syntax — causing searches to silently return empty.
5205        //
5206        // This test verifies a round-trip search for 'std.io' succeeds when the
5207        // SourceCode tokenizer strategy is active.
5208        use fathomdb_query::compile_search;
5209
5210        let db = NamedTempFile::new().expect("temporary db");
5211        let schema_manager = Arc::new(SchemaManager::new());
5212
5213        // Bootstrap the DB schema.
5214        {
5215            let bootstrap = ExecutionCoordinator::open(
5216                db.path(),
5217                Arc::clone(&schema_manager),
5218                None,
5219                1,
5220                Arc::new(TelemetryCounters::default()),
5221                None,
5222            )
5223            .expect("bootstrap coordinator");
5224            drop(bootstrap);
5225        }
5226
5227        // Insert the SourceCode tokenizer profile for kind "Symbol" and seed a document.
5228        {
5229            let conn = rusqlite::Connection::open(db.path()).expect("open db");
5230            conn.execute(
5231                "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5232                 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5233                [],
5234            )
5235            .expect("insert profile");
5236            conn.execute_batch(
5237                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5238                 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5239                 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5240                 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5241                 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5242                 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5243            )
5244            .expect("insert node and fts row");
5245        }
5246
5247        // Reopen coordinator to pick up the SourceCode strategy.
5248        let coordinator = ExecutionCoordinator::open(
5249            db.path(),
5250            Arc::clone(&schema_manager),
5251            None,
5252            1,
5253            Arc::new(TelemetryCounters::default()),
5254            None,
5255        )
5256        .expect("coordinator reopen");
5257
5258        // Search for "std.io": must find the document (not error, not return empty).
5259        let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5260        let compiled = compile_search(ast.ast()).expect("compile search");
5261        let rows = coordinator
5262            .execute_compiled_search(&compiled)
5263            .expect("source code search must not error");
5264        assert!(
5265            !rows.hits.is_empty(),
5266            "SourceCode strategy search for 'std.io' must return the document; \
5267             got empty — FTS5 expression was likely corrupted by post-render escaping"
5268        );
5269    }
5270
5271    // ---- Pack E: vec identity guard tests ----
5272
5273    #[derive(Debug)]
5274    struct StubEmbedder {
5275        model_identity: String,
5276        dimension: usize,
5277    }
5278
5279    impl StubEmbedder {
5280        fn new(model_identity: &str, dimension: usize) -> Self {
5281            Self {
5282                model_identity: model_identity.to_owned(),
5283                dimension,
5284            }
5285        }
5286    }
5287
5288    impl crate::embedder::QueryEmbedder for StubEmbedder {
5289        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5290            Ok(vec![0.0; self.dimension])
5291        }
5292        fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5293            crate::embedder::QueryEmbedderIdentity {
5294                model_identity: self.model_identity.clone(),
5295                model_version: "1.0".to_owned(),
5296                dimension: self.dimension,
5297                normalization_policy: "l2".to_owned(),
5298            }
5299        }
5300        fn max_tokens(&self) -> usize {
5301            512
5302        }
5303    }
5304
5305    fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5306        let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5307        conn.execute_batch(
5308            "CREATE TABLE IF NOT EXISTS projection_profiles (
5309                kind TEXT NOT NULL,
5310                facet TEXT NOT NULL,
5311                config_json TEXT NOT NULL,
5312                active_at INTEGER,
5313                created_at INTEGER,
5314                PRIMARY KEY (kind, facet)
5315            );",
5316        )
5317        .expect("create projection_profiles");
5318        conn
5319    }
5320
5321    #[test]
5322    fn check_vec_identity_no_profile_no_panic() {
5323        let conn = make_in_memory_db_with_projection_profiles();
5324        let embedder = StubEmbedder::new("bge-small", 384);
5325        let result = super::check_vec_identity_at_open(&conn, &embedder);
5326        assert!(result.is_ok(), "no profile row must return Ok(())");
5327    }
5328
5329    #[test]
5330    fn check_vec_identity_matching_identity_ok() {
5331        let conn = make_in_memory_db_with_projection_profiles();
5332        conn.execute(
5333            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5334             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5335            [],
5336        )
5337        .expect("insert profile");
5338        let embedder = StubEmbedder::new("bge-small", 384);
5339        let result = super::check_vec_identity_at_open(&conn, &embedder);
5340        assert!(result.is_ok(), "matching profile must return Ok(())");
5341    }
5342
5343    #[test]
5344    fn check_vec_identity_mismatched_dimensions_ok() {
5345        let conn = make_in_memory_db_with_projection_profiles();
5346        conn.execute(
5347            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5348             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5349            [],
5350        )
5351        .expect("insert profile");
5352        // embedder reports 768, profile says 384 — should warn but return Ok(())
5353        let embedder = StubEmbedder::new("bge-small", 768);
5354        let result = super::check_vec_identity_at_open(&conn, &embedder);
5355        assert!(
5356            result.is_ok(),
5357            "dimension mismatch must warn and return Ok(())"
5358        );
5359    }
5360
5361    #[test]
5362    fn custom_tokenizer_passthrough() {
5363        use super::TokenizerStrategy;
5364        let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5365        // Custom strategy is just stored — it doesn't modify queries.
5366        assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5367        // Verify it does not match any known variant.
5368        assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5369        assert_ne!(strategy, TokenizerStrategy::SourceCode);
5370    }
5371
5372    #[test]
5373    fn check_vec_identity_mismatched_model_ok() {
5374        let conn = make_in_memory_db_with_projection_profiles();
5375        conn.execute(
5376            "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5377             VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5378            [],
5379        )
5380        .expect("insert profile");
5381        // embedder reports bge-large, profile says bge-small — should warn but return Ok(())
5382        let embedder = StubEmbedder::new("bge-large", 384);
5383        let result = super::check_vec_identity_at_open(&conn, &embedder);
5384        assert!(
5385            result.is_ok(),
5386            "model_identity mismatch must warn and return Ok(())"
5387        );
5388    }
5389}