Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10    FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11    SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20/// Maximum number of cached shape-hash to SQL mappings before the cache is
21/// cleared entirely.  A clear-all strategy is simpler than partial eviction
22/// and the cost of re-compiling on a miss is negligible.
23const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25/// Maximum number of root IDs per batched expansion query.  Kept well below
26/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
27/// the edge-kind parameter.  Larger root sets are chunked into multiple
28/// batches of this size rather than falling back to per-root queries.
29const BATCH_CHUNK_SIZE: usize = 200;
30
31/// A pool of read-only `SQLite` connections for concurrent read access.
32///
33/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
34/// proceed in parallel when they happen to grab different slots.
35struct ReadPool {
36    connections: Vec<Mutex<Connection>>,
37}
38
39impl fmt::Debug for ReadPool {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("ReadPool")
42            .field("size", &self.connections.len())
43            .finish()
44    }
45}
46
47impl ReadPool {
48    /// Open `pool_size` read-only connections to the database at `path`.
49    ///
50    /// Each connection has PRAGMAs initialized via
51    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
52    /// feature is enabled and `vector_enabled` is true, the vec extension
53    /// auto-loaded.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`EngineError`] if any connection fails to open or initialize.
58    fn new(
59        db_path: &Path,
60        pool_size: usize,
61        schema_manager: &SchemaManager,
62        vector_enabled: bool,
63    ) -> Result<Self, EngineError> {
64        let mut connections = Vec::with_capacity(pool_size);
65        for _ in 0..pool_size {
66            let conn = if vector_enabled {
67                #[cfg(feature = "sqlite-vec")]
68                {
69                    sqlite::open_readonly_connection_with_vec(db_path)?
70                }
71                #[cfg(not(feature = "sqlite-vec"))]
72                {
73                    sqlite::open_readonly_connection(db_path)?
74                }
75            } else {
76                sqlite::open_readonly_connection(db_path)?
77            };
78            schema_manager
79                .initialize_reader_connection(&conn)
80                .map_err(EngineError::Schema)?;
81            connections.push(Mutex::new(conn));
82        }
83        Ok(Self { connections })
84    }
85
86    /// Acquire a connection from the pool.
87    ///
88    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
89    /// If every slot is held, falls back to a blocking lock on the first slot.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
94    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
95        // Fast path: try each connection without blocking.
96        for conn in &self.connections {
97            if let Ok(guard) = conn.try_lock() {
98                return Ok(guard);
99            }
100        }
101        // Fallback: block on the first connection.
102        self.connections[0].lock().map_err(|_| {
103            trace_error!("read pool: connection mutex poisoned");
104            EngineError::Bridge("connection mutex poisoned".to_owned())
105        })
106    }
107
108    /// Return the number of connections in the pool.
109    #[cfg(test)]
110    fn size(&self) -> usize {
111        self.connections.len()
112    }
113}
114
115/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
116///
117/// This is a read-only introspection struct. It does not execute SQL.
118#[derive(Clone, Debug, PartialEq, Eq)]
119pub struct QueryPlan {
120    pub sql: String,
121    pub bind_count: usize,
122    pub driving_table: DrivingTable,
123    pub shape_hash: ShapeHash,
124    pub cache_hit: bool,
125}
126
127/// A single node row returned from a query.
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct NodeRow {
130    /// Physical row ID.
131    pub row_id: String,
132    /// Logical ID of the node.
133    pub logical_id: String,
134    /// Node kind.
135    pub kind: String,
136    /// JSON-encoded node properties.
137    pub properties: String,
138    /// Optional URI referencing external content.
139    pub content_ref: Option<String>,
140    /// Unix timestamp of last access, if tracked.
141    pub last_accessed_at: Option<i64>,
142}
143
144/// A single run row returned from a query.
145#[derive(Clone, Debug, PartialEq, Eq)]
146pub struct RunRow {
147    /// Unique run ID.
148    pub id: String,
149    /// Run kind.
150    pub kind: String,
151    /// Current status.
152    pub status: String,
153    /// JSON-encoded run properties.
154    pub properties: String,
155}
156
157/// A single step row returned from a query.
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepRow {
160    /// Unique step ID.
161    pub id: String,
162    /// ID of the parent run.
163    pub run_id: String,
164    /// Step kind.
165    pub kind: String,
166    /// Current status.
167    pub status: String,
168    /// JSON-encoded step properties.
169    pub properties: String,
170}
171
172/// A single action row returned from a query.
173#[derive(Clone, Debug, PartialEq, Eq)]
174pub struct ActionRow {
175    /// Unique action ID.
176    pub id: String,
177    /// ID of the parent step.
178    pub step_id: String,
179    /// Action kind.
180    pub kind: String,
181    /// Current status.
182    pub status: String,
183    /// JSON-encoded action properties.
184    pub properties: String,
185}
186
187/// A single row from the `provenance_events` table.
188#[derive(Clone, Debug, PartialEq, Eq)]
189pub struct ProvenanceEvent {
190    pub id: String,
191    pub event_type: String,
192    pub subject: String,
193    pub source_ref: Option<String>,
194    pub metadata_json: String,
195    pub created_at: i64,
196}
197
198/// Result set from executing a flat (non-grouped) compiled query.
199#[derive(Clone, Debug, Default, PartialEq, Eq)]
200pub struct QueryRows {
201    /// Matched node rows.
202    pub nodes: Vec<NodeRow>,
203    /// Runs associated with the matched nodes.
204    pub runs: Vec<RunRow>,
205    /// Steps associated with the matched runs.
206    pub steps: Vec<StepRow>,
207    /// Actions associated with the matched steps.
208    pub actions: Vec<ActionRow>,
209    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
210    /// to degrade to an empty result instead of propagating an error.
211    pub was_degraded: bool,
212}
213
214/// Expansion results for a single root node within a grouped query.
215#[derive(Clone, Debug, PartialEq, Eq)]
216pub struct ExpansionRootRows {
217    /// Logical ID of the root node that seeded this expansion.
218    pub root_logical_id: String,
219    /// Nodes reached by traversing from the root.
220    pub nodes: Vec<NodeRow>,
221}
222
223/// All expansion results for a single named slot across all roots.
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct ExpansionSlotRows {
226    /// Name of the expansion slot.
227    pub slot: String,
228    /// Per-root expansion results.
229    pub roots: Vec<ExpansionRootRows>,
230}
231
232/// Result set from executing a grouped compiled query.
233#[derive(Clone, Debug, Default, PartialEq, Eq)]
234pub struct GroupedQueryRows {
235    /// Root node rows matched by the base query.
236    pub roots: Vec<NodeRow>,
237    /// Per-slot expansion results.
238    pub expansions: Vec<ExpansionSlotRows>,
239    /// `true` when a capability miss caused the query to degrade to an empty result.
240    pub was_degraded: bool,
241}
242
243/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
244pub struct ExecutionCoordinator {
245    database_path: PathBuf,
246    schema_manager: Arc<SchemaManager>,
247    pool: ReadPool,
248    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
249    vector_enabled: bool,
250    vec_degradation_warned: AtomicBool,
251    telemetry: Arc<TelemetryCounters>,
252    /// Phase 12.5a: optional read-time query embedder. When present,
253    /// [`Self::execute_retrieval_plan`] invokes it via
254    /// [`Self::fill_vector_branch`] after compile to populate
255    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
256    /// invariant on `search()` is preserved: the vector slot stays empty
257    /// and the coordinator's stage-gating check skips the vector branch.
258    query_embedder: Option<Arc<dyn QueryEmbedder>>,
259}
260
261impl fmt::Debug for ExecutionCoordinator {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.debug_struct("ExecutionCoordinator")
264            .field("database_path", &self.database_path)
265            .finish_non_exhaustive()
266    }
267}
268
269impl ExecutionCoordinator {
270    /// # Errors
271    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
272    pub fn open(
273        path: impl AsRef<Path>,
274        schema_manager: Arc<SchemaManager>,
275        vector_dimension: Option<usize>,
276        pool_size: usize,
277        telemetry: Arc<TelemetryCounters>,
278        query_embedder: Option<Arc<dyn QueryEmbedder>>,
279    ) -> Result<Self, EngineError> {
280        let path = path.as_ref().to_path_buf();
281        #[cfg(feature = "sqlite-vec")]
282        let conn = if vector_dimension.is_some() {
283            sqlite::open_connection_with_vec(&path)?
284        } else {
285            sqlite::open_connection(&path)?
286        };
287        #[cfg(not(feature = "sqlite-vec"))]
288        let conn = sqlite::open_connection(&path)?;
289
290        let report = schema_manager.bootstrap(&conn)?;
291
292        // ----- Open-time rebuild guards for derived FTS state -----
293        //
294        // `fts_node_properties` and `fts_node_property_positions` are both
295        // derived from the canonical `nodes.properties` blobs and the set
296        // of registered `fts_property_schemas`. They are NOT source of
297        // truth, so whenever a schema migration or crash leaves either
298        // table out of sync with the canonical state we repopulate them
299        // from scratch at open() time.
300        //
301        // The derived-state invariant is: if `fts_property_schemas` is
302        // non-empty, then `fts_node_properties` must also be non-empty,
303        // and if any schema is recursive-mode then
304        // `fts_node_property_positions` must also be non-empty. If either
305        // guard trips, both tables are cleared and rebuilt together — it
306        // is never correct to rebuild only one half.
307        //
308        // Guard 1 (P2-1, FX pack, v15→v16 migration): detects an empty
309        // `fts_node_properties` while schemas exist. This covers the
310        // Phase 2 tokenizer swap (migration 16 drops the FTS5 virtual
311        // table before recreating it under `porter unicode61`) and the
312        // crash-recovery case where a prior open applied migration 16 but
313        // crashed before the subsequent property-FTS rebuild committed.
314        //
315        // Guard 2 (P4-P2-3, FX2 pack, v16→v17 migration, extended to
316        // v17→v18 by P4-P2-4): detects an empty
317        // `fts_node_property_positions` while any recursive-mode schema
318        // exists. This covers migration 17 (which added the sidecar table
319        // but left it empty) and migration 18 (which drops and recreates
320        // the sidecar to add the UNIQUE constraint, also leaving it
321        // empty). Both migrations land on the same guard because both
322        // leave the positions table empty while recursive schemas remain
323        // registered.
324        //
325        // Both guards are no-ops on an already-consistent database, so
326        // running them on every open is safe and cheap.
327        let needs_property_fts_rebuild = {
328            let schema_count: i64 =
329                conn.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
330                    row.get(0)
331                })?;
332            if schema_count == 0 {
333                false
334            } else {
335                let fts_count: i64 =
336                    conn.query_row("SELECT COUNT(*) FROM fts_node_properties", [], |row| {
337                        row.get(0)
338                    })?;
339                fts_count == 0
340            }
341        };
342        // Guard 2 (see block comment above): recursive schemas registered
343        // but `fts_node_property_positions` empty. Rebuild regenerates
344        // both the FTS blob and the position map from canonical state, so
345        // it is safe to trigger even when `fts_node_properties` is
346        // already populated.
347        let needs_position_backfill = {
348            // NOTE: This LIKE pattern assumes `property_paths_json` is
349            // serialized with compact formatting (no whitespace around
350            // `:`). All current writers go through `serde_json`'s compact
351            // output so this holds. If a future writer emits pretty-
352            // printed JSON (`"mode": "recursive"` with a space), this
353            // guard would silently fail. A more robust check would use
354            // `json_extract(property_paths_json, '$[*].mode')` or a
355            // parsed scan, at the cost of a per-row JSON walk.
356            let recursive_schema_count: i64 = conn.query_row(
357                "SELECT COUNT(*) FROM fts_property_schemas \
358                 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
359                [],
360                |row| row.get(0),
361            )?;
362            if recursive_schema_count == 0 {
363                false
364            } else {
365                let position_count: i64 = conn.query_row(
366                    "SELECT COUNT(*) FROM fts_node_property_positions",
367                    [],
368                    |row| row.get(0),
369                )?;
370                position_count == 0
371            }
372        };
373        if needs_property_fts_rebuild || needs_position_backfill {
374            let tx = conn.unchecked_transaction()?;
375            tx.execute("DELETE FROM fts_node_properties", [])?;
376            tx.execute("DELETE FROM fts_node_property_positions", [])?;
377            crate::projection::insert_property_fts_rows(
378                &tx,
379                "SELECT logical_id, properties FROM nodes \
380                 WHERE kind = ?1 AND superseded_at IS NULL",
381            )?;
382            tx.commit()?;
383        }
384
385        #[cfg(feature = "sqlite-vec")]
386        let mut vector_enabled = report.vector_profile_enabled;
387        #[cfg(not(feature = "sqlite-vec"))]
388        let vector_enabled = {
389            let _ = &report;
390            false
391        };
392
393        if let Some(dim) = vector_dimension {
394            schema_manager
395                .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
396                .map_err(EngineError::Schema)?;
397            // Profile was just created or updated — mark as enabled.
398            #[cfg(feature = "sqlite-vec")]
399            {
400                vector_enabled = true;
401            }
402        }
403
404        // Drop the bootstrap connection — pool connections are used for reads.
405        drop(conn);
406
407        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
408
409        Ok(Self {
410            database_path: path,
411            schema_manager,
412            pool,
413            shape_sql_map: Mutex::new(HashMap::new()),
414            vector_enabled,
415            vec_degradation_warned: AtomicBool::new(false),
416            telemetry,
417            query_embedder,
418        })
419    }
420
421    /// Returns the filesystem path to the `SQLite` database.
422    pub fn database_path(&self) -> &Path {
423        &self.database_path
424    }
425
426    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
427    #[must_use]
428    pub fn vector_enabled(&self) -> bool {
429        self.vector_enabled
430    }
431
432    /// Returns the configured read-time query embedder, if any.
433    ///
434    /// The 0.4.0 write-time parity work reuses this same embedder for
435    /// vector regeneration via [`Engine::regenerate_vector_embeddings`],
436    /// so there is always exactly one source of truth for vector
437    /// identity per [`Engine`] instance.
438    #[must_use]
439    pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
440        self.query_embedder.as_ref()
441    }
442
443    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
444        self.pool.acquire()
445    }
446
447    /// Aggregate `SQLite` page-cache counters across all pool connections.
448    ///
449    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
450    /// Connections that are currently locked by a query are skipped — this
451    /// is acceptable for statistical counters.
452    #[must_use]
453    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
454        let mut total = SqliteCacheStatus::default();
455        for conn_mutex in &self.pool.connections {
456            if let Ok(conn) = conn_mutex.try_lock() {
457                total.add(&read_db_cache_status(&conn));
458            }
459        }
460        total
461    }
462
463    /// # Errors
464    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
465    #[allow(clippy::expect_used)]
466    pub fn execute_compiled_read(
467        &self,
468        compiled: &CompiledQuery,
469    ) -> Result<QueryRows, EngineError> {
470        let row_sql = wrap_node_row_projection_sql(&compiled.sql);
471        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
472        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
473        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
474        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
475        // so we propagate EngineError there instead.
476        {
477            let mut cache = self
478                .shape_sql_map
479                .lock()
480                .unwrap_or_else(PoisonError::into_inner);
481            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
482                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
483                cache.clear();
484            }
485            cache.insert(compiled.shape_hash, row_sql.clone());
486        }
487
488        let bind_values = compiled
489            .binds
490            .iter()
491            .map(bind_value_to_sql)
492            .collect::<Vec<_>>();
493
494        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
495        // shape_sql_map uses into_inner() (pure cache, safe to recover).
496        // conn uses map_err → EngineError (connection state may be corrupt after panic;
497        // into_inner() would risk using a connection with partial transaction state).
498        let conn_guard = match self.lock_connection() {
499            Ok(g) => g,
500            Err(e) => {
501                self.telemetry.increment_errors();
502                return Err(e);
503            }
504        };
505        let mut statement = match conn_guard.prepare_cached(&row_sql) {
506            Ok(stmt) => stmt,
507            Err(e) if is_vec_table_absent(&e) => {
508                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
509                    trace_warn!("vector table absent, degrading to non-vector query");
510                }
511                return Ok(QueryRows {
512                    was_degraded: true,
513                    ..Default::default()
514                });
515            }
516            Err(e) => {
517                self.telemetry.increment_errors();
518                return Err(EngineError::Sqlite(e));
519            }
520        };
521        let nodes = match statement
522            .query_map(params_from_iter(bind_values.iter()), |row| {
523                Ok(NodeRow {
524                    row_id: row.get(0)?,
525                    logical_id: row.get(1)?,
526                    kind: row.get(2)?,
527                    properties: row.get(3)?,
528                    content_ref: row.get(4)?,
529                    last_accessed_at: row.get(5)?,
530                })
531            })
532            .and_then(Iterator::collect)
533        {
534            Ok(rows) => rows,
535            Err(e) => {
536                self.telemetry.increment_errors();
537                return Err(EngineError::Sqlite(e));
538            }
539        };
540
541        self.telemetry.increment_queries();
542        Ok(QueryRows {
543            nodes,
544            runs: Vec::new(),
545            steps: Vec::new(),
546            actions: Vec::new(),
547            was_degraded: false,
548        })
549    }
550
551    /// Execute a compiled adaptive search and return matching hits.
552    ///
553    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
554    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
555    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
556    /// while residual predicates (JSON path filters) stay in the outer
557    /// `WHERE`. The chunk and property FTS
558    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
559    /// better matches), ordered, and limited. All hits return
560    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
561    /// later phases.
562    ///
563    /// # Errors
564    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
565    pub fn execute_compiled_search(
566        &self,
567        compiled: &CompiledSearch,
568    ) -> Result<SearchRows, EngineError> {
569        // Build the two-branch plan from the strict text query and delegate
570        // to the shared plan-execution routine. The relaxed branch is derived
571        // via `derive_relaxed` and only fires when strict returned fewer than
572        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
573        // "relaxed iff strict is empty," but the routine spells the rule out
574        // explicitly so raising K later is a one-line constant bump.
575        let (relaxed_query, was_degraded_at_plan_time) =
576            fathomdb_query::derive_relaxed(&compiled.text_query);
577        let relaxed = relaxed_query.map(|q| CompiledSearch {
578            root_kind: compiled.root_kind.clone(),
579            text_query: q,
580            limit: compiled.limit,
581            fusable_filters: compiled.fusable_filters.clone(),
582            residual_filters: compiled.residual_filters.clone(),
583            attribution_requested: compiled.attribution_requested,
584        });
585        let plan = CompiledSearchPlan {
586            strict: compiled.clone(),
587            relaxed,
588            was_degraded_at_plan_time,
589        };
590        self.execute_compiled_search_plan(&plan)
591    }
592
593    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
594    /// deduped result rows.
595    ///
596    /// This is the shared retrieval/merge routine that both
597    /// [`Self::execute_compiled_search`] (adaptive path) and
598    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
599    /// runs first; the relaxed branch only fires when it is present AND the
600    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
601    /// hits. Merge and dedup semantics are identical to the adaptive path
602    /// regardless of how the plan was constructed.
603    ///
604    /// Error contract: if the relaxed branch errors, the error propagates;
605    /// strict hits are not returned. This matches the rest of the engine's
606    /// fail-hard posture.
607    ///
608    /// # Errors
609    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
610    /// executed.
611    pub fn execute_compiled_search_plan(
612        &self,
613        plan: &CompiledSearchPlan,
614    ) -> Result<SearchRows, EngineError> {
615        let strict = &plan.strict;
616        let limit = strict.limit;
617        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
618
619        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
620        let strict_underfilled = strict_hits.len() < fallback_threshold;
621
622        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
623        let mut fallback_used = false;
624        let mut was_degraded = false;
625        if let Some(relaxed) = plan.relaxed.as_ref()
626            && strict_underfilled
627        {
628            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
629            fallback_used = true;
630            was_degraded = plan.was_degraded_at_plan_time;
631        }
632
633        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
634        // Attribution runs AFTER dedup so that duplicate hits dropped by
635        // `merge_search_branches` do not waste a highlight+position-map
636        // lookup.
637        if strict.attribution_requested {
638            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
639            self.populate_attribution_for_hits(
640                &mut merged,
641                &strict.text_query,
642                relaxed_text_query,
643            )?;
644        }
645        let strict_hit_count = merged
646            .iter()
647            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
648            .count();
649        let relaxed_hit_count = merged
650            .iter()
651            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
652            .count();
653        // Phase 10: no vector execution path yet, so vector_hit_count is
654        // always zero. Future phases that wire a vector branch will
655        // contribute here.
656        let vector_hit_count = 0;
657
658        Ok(SearchRows {
659            hits: merged,
660            strict_hit_count,
661            relaxed_hit_count,
662            vector_hit_count,
663            fallback_used,
664            was_degraded,
665        })
666    }
667
668    /// Execute a compiled vector-only search and return matching hits.
669    ///
670    /// Phase 11 delivers the standalone vector retrieval path. The emitted
671    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
672    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
673    /// into the candidate CTE. The outer `SELECT` applies residual JSON
674    /// predicates and orders by score descending, where `score = -distance`
675    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
676    ///
677    /// ## Capability-miss handling
678    ///
679    /// If the `sqlite-vec` capability is absent (feature disabled or the
680    /// `vec_nodes_active` virtual table has not been created because the
681    /// engine was not opened with a `vector_dimension`), this method returns
682    /// an empty [`SearchRows`] with `was_degraded = true`. This is
683    /// **non-fatal** — the error does not propagate — matching the addendum's
684    /// §Vector-Specific Behavior / Degradation.
685    ///
686    /// ## Attribution
687    ///
688    /// When `compiled.attribution_requested == true`, every returned hit
689    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
690    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
691    /// extended uniformly).
692    ///
693    /// # Errors
694    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
695    /// executed for reasons other than a vec-table capability miss.
696    #[allow(clippy::too_many_lines)]
697    pub fn execute_compiled_vector_search(
698        &self,
699        compiled: &CompiledVectorSearch,
700    ) -> Result<SearchRows, EngineError> {
701        use std::fmt::Write as _;
702
703        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
704        // empty result rather than a SQL error from `LIMIT 0` semantics in
705        // the inner vec0 scan.
706        if compiled.limit == 0 {
707            return Ok(SearchRows::default());
708        }
709
710        let filter_by_kind = !compiled.root_kind.is_empty();
711        let mut binds: Vec<BindValue> = Vec::new();
712        binds.push(BindValue::Text(compiled.query_text.clone()));
713        if filter_by_kind {
714            binds.push(BindValue::Text(compiled.root_kind.clone()));
715        }
716
717        // Build fusable-filter clauses, aliased against `src` inside the
718        // candidate CTE. Same predicate set the text path fuses.
719        let mut fused_clauses = String::new();
720        for predicate in &compiled.fusable_filters {
721            match predicate {
722                Predicate::KindEq(kind) => {
723                    binds.push(BindValue::Text(kind.clone()));
724                    let idx = binds.len();
725                    let _ = write!(
726                        fused_clauses,
727                        "\n                      AND src.kind = ?{idx}"
728                    );
729                }
730                Predicate::LogicalIdEq(logical_id) => {
731                    binds.push(BindValue::Text(logical_id.clone()));
732                    let idx = binds.len();
733                    let _ = write!(
734                        fused_clauses,
735                        "\n                      AND src.logical_id = ?{idx}"
736                    );
737                }
738                Predicate::SourceRefEq(source_ref) => {
739                    binds.push(BindValue::Text(source_ref.clone()));
740                    let idx = binds.len();
741                    let _ = write!(
742                        fused_clauses,
743                        "\n                      AND src.source_ref = ?{idx}"
744                    );
745                }
746                Predicate::ContentRefEq(uri) => {
747                    binds.push(BindValue::Text(uri.clone()));
748                    let idx = binds.len();
749                    let _ = write!(
750                        fused_clauses,
751                        "\n                      AND src.content_ref = ?{idx}"
752                    );
753                }
754                Predicate::ContentRefNotNull => {
755                    fused_clauses
756                        .push_str("\n                      AND src.content_ref IS NOT NULL");
757                }
758                Predicate::JsonPathFusedEq { path, value } => {
759                    binds.push(BindValue::Text(path.clone()));
760                    let path_idx = binds.len();
761                    binds.push(BindValue::Text(value.clone()));
762                    let value_idx = binds.len();
763                    let _ = write!(
764                        fused_clauses,
765                        "\n                      AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
766                    );
767                }
768                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
769                    binds.push(BindValue::Text(path.clone()));
770                    let path_idx = binds.len();
771                    binds.push(BindValue::Integer(*value));
772                    let value_idx = binds.len();
773                    let operator = match op {
774                        ComparisonOp::Gt => ">",
775                        ComparisonOp::Gte => ">=",
776                        ComparisonOp::Lt => "<",
777                        ComparisonOp::Lte => "<=",
778                    };
779                    let _ = write!(
780                        fused_clauses,
781                        "\n                      AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
782                    );
783                }
784                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
785                    // JSON predicates are residual; compile_vector_search
786                    // guarantees they never appear here, but stay defensive.
787                }
788            }
789        }
790
791        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
792        let mut filter_clauses = String::new();
793        for predicate in &compiled.residual_filters {
794            match predicate {
795                Predicate::JsonPathEq { path, value } => {
796                    binds.push(BindValue::Text(path.clone()));
797                    let path_idx = binds.len();
798                    binds.push(scalar_to_bind(value));
799                    let value_idx = binds.len();
800                    let _ = write!(
801                        filter_clauses,
802                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
803                    );
804                }
805                Predicate::JsonPathCompare { path, op, value } => {
806                    binds.push(BindValue::Text(path.clone()));
807                    let path_idx = binds.len();
808                    binds.push(scalar_to_bind(value));
809                    let value_idx = binds.len();
810                    let operator = match op {
811                        ComparisonOp::Gt => ">",
812                        ComparisonOp::Gte => ">=",
813                        ComparisonOp::Lt => "<",
814                        ComparisonOp::Lte => "<=",
815                    };
816                    let _ = write!(
817                        filter_clauses,
818                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
819                    );
820                }
821                Predicate::KindEq(_)
822                | Predicate::LogicalIdEq(_)
823                | Predicate::SourceRefEq(_)
824                | Predicate::ContentRefEq(_)
825                | Predicate::ContentRefNotNull
826                | Predicate::JsonPathFusedEq { .. }
827                | Predicate::JsonPathFusedTimestampCmp { .. } => {
828                    // Fusable predicates live in fused_clauses above.
829                }
830            }
831        }
832
833        // Bind the outer limit as a named parameter for prepare_cached
834        // stability across calls that vary only by limit value.
835        let limit = compiled.limit;
836        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
837        let limit_idx = binds.len();
838
839        // sqlite-vec requires the LIMIT/k constraint to be visible directly
840        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
841        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
842        // Phase 12's planner may raise this to compensate for fusion
843        // narrowing the candidate pool).
844        let base_limit = limit;
845        let kind_clause = if filter_by_kind {
846            "\n                      AND src.kind = ?2"
847        } else {
848            ""
849        };
850
851        let sql = format!(
852            "WITH vector_hits AS (
853                SELECT
854                    src.row_id AS row_id,
855                    src.logical_id AS logical_id,
856                    src.kind AS kind,
857                    src.properties AS properties,
858                    src.source_ref AS source_ref,
859                    src.content_ref AS content_ref,
860                    src.created_at AS created_at,
861                    vc.distance AS distance,
862                    vc.chunk_id AS chunk_id
863                FROM (
864                    SELECT chunk_id, distance
865                    FROM vec_nodes_active
866                    WHERE embedding MATCH ?1
867                    LIMIT {base_limit}
868                ) vc
869                JOIN chunks c ON c.id = vc.chunk_id
870                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
871                WHERE 1 = 1{kind_clause}{fused_clauses}
872            )
873            SELECT
874                h.row_id,
875                h.logical_id,
876                h.kind,
877                h.properties,
878                h.content_ref,
879                am.last_accessed_at,
880                h.created_at,
881                h.distance,
882                h.chunk_id
883            FROM vector_hits h
884            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
885            WHERE 1 = 1{filter_clauses}
886            ORDER BY h.distance ASC
887            LIMIT ?{limit_idx}"
888        );
889
890        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
891
892        let conn_guard = match self.lock_connection() {
893            Ok(g) => g,
894            Err(e) => {
895                self.telemetry.increment_errors();
896                return Err(e);
897            }
898        };
899        let mut statement = match conn_guard.prepare_cached(&sql) {
900            Ok(stmt) => stmt,
901            Err(e) if is_vec_table_absent(&e) => {
902                // Capability miss: non-fatal — surface as was_degraded.
903                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
904                    trace_warn!("vector table absent, degrading vector_search to empty result");
905                }
906                return Ok(SearchRows {
907                    hits: Vec::new(),
908                    strict_hit_count: 0,
909                    relaxed_hit_count: 0,
910                    vector_hit_count: 0,
911                    fallback_used: false,
912                    was_degraded: true,
913                });
914            }
915            Err(e) => {
916                self.telemetry.increment_errors();
917                return Err(EngineError::Sqlite(e));
918            }
919        };
920
921        let attribution_requested = compiled.attribution_requested;
922        let hits = match statement
923            .query_map(params_from_iter(bind_values.iter()), |row| {
924                let distance: f64 = row.get(7)?;
925                // Score is the negated distance per addendum 1
926                // §Vector-Specific Behavior / Score and distance. For
927                // distance metrics (sqlite-vec's default) lower distance =
928                // better match, so negating yields the higher-is-better
929                // convention that dedup_branch_hits and the unified result
930                // surface rely on.
931                let score = -distance;
932                Ok(SearchHit {
933                    node: fathomdb_query::NodeRowLite {
934                        row_id: row.get(0)?,
935                        logical_id: row.get(1)?,
936                        kind: row.get(2)?,
937                        properties: row.get(3)?,
938                        content_ref: row.get(4)?,
939                        last_accessed_at: row.get(5)?,
940                    },
941                    written_at: row.get(6)?,
942                    score,
943                    modality: RetrievalModality::Vector,
944                    source: SearchHitSource::Vector,
945                    // Vector hits have no strict/relaxed notion.
946                    match_mode: None,
947                    // Vector hits have no snippet.
948                    snippet: None,
949                    projection_row_id: row.get::<_, Option<String>>(8)?,
950                    vector_distance: Some(distance),
951                    attribution: if attribution_requested {
952                        Some(HitAttribution {
953                            matched_paths: Vec::new(),
954                        })
955                    } else {
956                        None
957                    },
958                })
959            })
960            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
961        {
962            Ok(rows) => rows,
963            Err(e) => {
964                // Some SQLite errors surface during row iteration (e.g. when
965                // the vec0 extension is not loaded but the table exists as a
966                // stub). Classify as capability-miss when the shape matches.
967                if is_vec_table_absent(&e) {
968                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
969                        trace_warn!(
970                            "vector table absent at query time, degrading vector_search to empty result"
971                        );
972                    }
973                    drop(statement);
974                    drop(conn_guard);
975                    return Ok(SearchRows {
976                        hits: Vec::new(),
977                        strict_hit_count: 0,
978                        relaxed_hit_count: 0,
979                        vector_hit_count: 0,
980                        fallback_used: false,
981                        was_degraded: true,
982                    });
983                }
984                self.telemetry.increment_errors();
985                return Err(EngineError::Sqlite(e));
986            }
987        };
988
989        drop(statement);
990        drop(conn_guard);
991
992        self.telemetry.increment_queries();
993        let vector_hit_count = hits.len();
994        Ok(SearchRows {
995            hits,
996            strict_hit_count: 0,
997            relaxed_hit_count: 0,
998            vector_hit_count,
999            fallback_used: false,
1000            was_degraded: false,
1001        })
1002    }
1003
1004    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
1005    /// entry point) and return deterministically ranked, block-ordered
1006    /// [`SearchRows`].
1007    ///
1008    /// Stages, per addendum 1 §Retrieval Planner Model:
1009    ///
1010    /// 1. **Text strict.** Always runs (empty query short-circuits to an
1011    ///    empty branch result inside `run_search_branch`).
1012    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
1013    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
1014    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
1015    ///    Phase 6 text-only path.
1016    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
1017    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
1018    ///    planner never wires a vector branch through `search()`, so this
1019    ///    code path is structurally present but dormant.** A future phase
1020    ///    that wires read-time embedding into `compile_retrieval_plan` will
1021    ///    immediately light it up.
1022    /// 4. **Fusion.** All collected hits are merged via
1023    ///    [`merge_search_branches_three`], which produces strict ->
1024    ///    relaxed -> vector block ordering with cross-branch dedup
1025    ///    resolved by branch precedence.
1026    ///
1027    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
1028    /// addendum's "vector capability miss => `was_degraded`" semantics
1029    /// applies to `search()` only when the unified planner actually fires
1030    /// the vector branch, which v1 never does.
1031    ///
1032    /// # Errors
1033    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
1034    /// executed for a non-capability-miss reason.
1035    pub fn execute_retrieval_plan(
1036        &self,
1037        plan: &CompiledRetrievalPlan,
1038        raw_query: &str,
1039    ) -> Result<SearchRows, EngineError> {
1040        // Phase 12.5a: we work against a local owned copy so
1041        // `fill_vector_branch` can mutate `plan.vector` and
1042        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1043        // a bounded set of predicates + text AST nodes) and avoids
1044        // forcing callers to hand us `&mut` access.
1045        let mut plan = plan.clone();
1046        let limit = plan.text.strict.limit;
1047
1048        // Stage 1: text strict.
1049        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1050
1051        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1052        // path uses.
1053        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1054        let strict_underfilled = strict_hits.len() < fallback_threshold;
1055        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1056        let mut fallback_used = false;
1057        let mut was_degraded = false;
1058        if let Some(relaxed) = plan.text.relaxed.as_ref()
1059            && strict_underfilled
1060        {
1061            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1062            fallback_used = true;
1063            was_degraded = plan.was_degraded_at_plan_time;
1064        }
1065
1066        // Phase 12.5a: fill the vector branch from the configured
1067        // read-time query embedder, if any. Option (b) from the spec:
1068        // only pay the embedding cost when the text branches returned
1069        // nothing, because the three-branch stage gate below only runs
1070        // the vector stage under exactly that condition. This keeps the
1071        // hot path (strict text matched) embedder-free.
1072        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1073        if text_branches_empty && self.query_embedder.is_some() {
1074            self.fill_vector_branch(&mut plan, raw_query);
1075        }
1076
1077        // Stage 3: vector. Runs only when text retrieval is empty AND a
1078        // vector branch is present. When no embedder is configured (Phase
1079        // 12.5a default) `plan.vector` stays `None` and this stage is a
1080        // no-op, preserving the Phase 12 v1 dormancy invariant.
1081        let mut vector_hits: Vec<SearchHit> = Vec::new();
1082        if let Some(vector) = plan.vector.as_ref()
1083            && strict_hits.is_empty()
1084            && relaxed_hits.is_empty()
1085        {
1086            let vector_rows = self.execute_compiled_vector_search(vector)?;
1087            // `execute_compiled_vector_search` returns a fully populated
1088            // `SearchRows`. Promote its hits into the merge stage and lift
1089            // its capability-miss `was_degraded` flag onto the unified
1090            // result, per addendum §Vector-Specific Behavior.
1091            vector_hits = vector_rows.hits;
1092            if vector_rows.was_degraded {
1093                was_degraded = true;
1094            }
1095        }
1096        // Phase 12.5a: an embedder-reported capability miss surfaces as
1097        // `plan.was_degraded_at_plan_time = true` set inside
1098        // `fill_vector_branch`. Lift it onto the response so callers see
1099        // the graceful degradation even when the vector stage-gate never
1100        // had the chance to fire (the embedder call itself failed and
1101        // the vector slot stayed `None`).
1102        if text_branches_empty
1103            && plan.was_degraded_at_plan_time
1104            && plan.vector.is_none()
1105            && self.query_embedder.is_some()
1106        {
1107            was_degraded = true;
1108        }
1109
1110        // Stage 4: fusion.
1111        let strict = &plan.text.strict;
1112        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1113        if strict.attribution_requested {
1114            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1115            self.populate_attribution_for_hits(
1116                &mut merged,
1117                &strict.text_query,
1118                relaxed_text_query,
1119            )?;
1120        }
1121
1122        let strict_hit_count = merged
1123            .iter()
1124            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1125            .count();
1126        let relaxed_hit_count = merged
1127            .iter()
1128            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1129            .count();
1130        let vector_hit_count = merged
1131            .iter()
1132            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1133            .count();
1134
1135        Ok(SearchRows {
1136            hits: merged,
1137            strict_hit_count,
1138            relaxed_hit_count,
1139            vector_hit_count,
1140            fallback_used,
1141            was_degraded,
1142        })
1143    }
1144
1145    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1146    /// query embedder, if any.
1147    ///
1148    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1149    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1150    /// - Both the strict and relaxed text branches already ran and
1151    ///   returned zero hits, so the existing three-branch stage gate
1152    ///   will actually fire the vector stage once the slot is populated.
1153    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1154    ///   cost entirely when text retrieval already won.
1155    ///
1156    /// Contract: never panics, never returns an error. On embedder error
1157    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1158    /// `plan.vector` as `None`; the coordinator's normal error-free
1159    /// degradation path then reports `was_degraded` on the result.
1160    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1161        let Some(embedder) = self.query_embedder.as_ref() else {
1162            return;
1163        };
1164        match embedder.embed_query(raw_query) {
1165            Ok(vec) => {
1166                // `CompiledVectorSearch::query_text` is a JSON float-array
1167                // literal at the time the coordinator binds it (see the
1168                // `CompiledVectorSearch` docs). `serde_json::to_string`
1169                // on a `Vec<f32>` produces exactly that shape — no
1170                // wire-format change required.
1171                let literal = match serde_json::to_string(&vec) {
1172                    Ok(s) => s,
1173                    Err(err) => {
1174                        trace_warn!(
1175                            error = %err,
1176                            "query embedder vector serialization failed; skipping vector branch"
1177                        );
1178                        let _ = err; // Used by trace_warn! when tracing feature is active
1179                        plan.was_degraded_at_plan_time = true;
1180                        return;
1181                    }
1182                };
1183                let strict = &plan.text.strict;
1184                plan.vector = Some(CompiledVectorSearch {
1185                    root_kind: strict.root_kind.clone(),
1186                    query_text: literal,
1187                    limit: strict.limit,
1188                    fusable_filters: strict.fusable_filters.clone(),
1189                    residual_filters: strict.residual_filters.clone(),
1190                    attribution_requested: strict.attribution_requested,
1191                });
1192            }
1193            Err(err) => {
1194                trace_warn!(
1195                    error = %err,
1196                    "query embedder unavailable, skipping vector branch"
1197                );
1198                let _ = err; // Used by trace_warn! when tracing feature is active
1199                plan.was_degraded_at_plan_time = true;
1200            }
1201        }
1202    }
1203
1204    /// Execute a single search branch against the underlying FTS surfaces.
1205    ///
1206    /// This is the shared SQL emission path used by
1207    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1208    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1209    /// The returned hits are tagged with `branch`'s corresponding
1210    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1211    /// caller is responsible for merging multiple branches.
1212    #[allow(clippy::too_many_lines)]
1213    fn run_search_branch(
1214        &self,
1215        compiled: &CompiledSearch,
1216        branch: SearchBranch,
1217    ) -> Result<Vec<SearchHit>, EngineError> {
1218        use std::fmt::Write as _;
1219        // Short-circuit an empty/whitespace-only query: rendering it would
1220        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1221        // (including the adaptive path when strict is Empty and derive_relaxed
1222        // returns None) must see an empty result, not an error. Each branch
1223        // is short-circuited independently so a strict-Empty + relaxed-Some
1224        // plan still exercises the relaxed branch.
1225        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1226        // matches "every row not containing X" — a complement-of-corpus scan
1227        // that no caller would intentionally want. Short-circuit to empty at
1228        // the root only; a `Not` nested inside an `And` is a legitimate
1229        // exclusion and must still run.
1230        if matches!(
1231            compiled.text_query,
1232            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1233        ) {
1234            return Ok(Vec::new());
1235        }
1236        let rendered = render_text_query_fts5(&compiled.text_query);
1237        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1238        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1239        // The adaptive `text_search()` path never produces an empty root_kind
1240        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1241        // the entry point.
1242        let filter_by_kind = !compiled.root_kind.is_empty();
1243        let mut binds: Vec<BindValue> = if filter_by_kind {
1244            vec![
1245                BindValue::Text(rendered.clone()),
1246                BindValue::Text(compiled.root_kind.clone()),
1247                BindValue::Text(rendered),
1248                BindValue::Text(compiled.root_kind.clone()),
1249            ]
1250        } else {
1251            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1252        };
1253
1254        // P2-5: both fusable and residual predicates now match against the
1255        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1256        // `u.content_ref`, `u.properties`) because the inner UNION arms
1257        // project the full active-row column set through the
1258        // `JOIN nodes src` already present in each arm. The previous
1259        // implementation re-joined `nodes hn` at the CTE level and
1260        // `nodes n` again at the outer SELECT, which was triple work on
1261        // the hot search path.
1262        let mut fused_clauses = String::new();
1263        for predicate in &compiled.fusable_filters {
1264            match predicate {
1265                Predicate::KindEq(kind) => {
1266                    binds.push(BindValue::Text(kind.clone()));
1267                    let idx = binds.len();
1268                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1269                }
1270                Predicate::LogicalIdEq(logical_id) => {
1271                    binds.push(BindValue::Text(logical_id.clone()));
1272                    let idx = binds.len();
1273                    let _ = write!(
1274                        fused_clauses,
1275                        "\n                  AND u.logical_id = ?{idx}"
1276                    );
1277                }
1278                Predicate::SourceRefEq(source_ref) => {
1279                    binds.push(BindValue::Text(source_ref.clone()));
1280                    let idx = binds.len();
1281                    let _ = write!(
1282                        fused_clauses,
1283                        "\n                  AND u.source_ref = ?{idx}"
1284                    );
1285                }
1286                Predicate::ContentRefEq(uri) => {
1287                    binds.push(BindValue::Text(uri.clone()));
1288                    let idx = binds.len();
1289                    let _ = write!(
1290                        fused_clauses,
1291                        "\n                  AND u.content_ref = ?{idx}"
1292                    );
1293                }
1294                Predicate::ContentRefNotNull => {
1295                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1296                }
1297                Predicate::JsonPathFusedEq { path, value } => {
1298                    binds.push(BindValue::Text(path.clone()));
1299                    let path_idx = binds.len();
1300                    binds.push(BindValue::Text(value.clone()));
1301                    let value_idx = binds.len();
1302                    let _ = write!(
1303                        fused_clauses,
1304                        "\n                  AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1305                    );
1306                }
1307                Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1308                    binds.push(BindValue::Text(path.clone()));
1309                    let path_idx = binds.len();
1310                    binds.push(BindValue::Integer(*value));
1311                    let value_idx = binds.len();
1312                    let operator = match op {
1313                        ComparisonOp::Gt => ">",
1314                        ComparisonOp::Gte => ">=",
1315                        ComparisonOp::Lt => "<",
1316                        ComparisonOp::Lte => "<=",
1317                    };
1318                    let _ = write!(
1319                        fused_clauses,
1320                        "\n                  AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1321                    );
1322                }
1323                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1324                    // Should be in residual_filters; compile_search guarantees
1325                    // this, but stay defensive.
1326                }
1327            }
1328        }
1329
1330        let mut filter_clauses = String::new();
1331        for predicate in &compiled.residual_filters {
1332            match predicate {
1333                Predicate::JsonPathEq { path, value } => {
1334                    binds.push(BindValue::Text(path.clone()));
1335                    let path_idx = binds.len();
1336                    binds.push(scalar_to_bind(value));
1337                    let value_idx = binds.len();
1338                    let _ = write!(
1339                        filter_clauses,
1340                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1341                    );
1342                }
1343                Predicate::JsonPathCompare { path, op, value } => {
1344                    binds.push(BindValue::Text(path.clone()));
1345                    let path_idx = binds.len();
1346                    binds.push(scalar_to_bind(value));
1347                    let value_idx = binds.len();
1348                    let operator = match op {
1349                        ComparisonOp::Gt => ">",
1350                        ComparisonOp::Gte => ">=",
1351                        ComparisonOp::Lt => "<",
1352                        ComparisonOp::Lte => "<=",
1353                    };
1354                    let _ = write!(
1355                        filter_clauses,
1356                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1357                    );
1358                }
1359                Predicate::KindEq(_)
1360                | Predicate::LogicalIdEq(_)
1361                | Predicate::SourceRefEq(_)
1362                | Predicate::ContentRefEq(_)
1363                | Predicate::ContentRefNotNull
1364                | Predicate::JsonPathFusedEq { .. }
1365                | Predicate::JsonPathFusedTimestampCmp { .. } => {
1366                    // Fusable predicates live in fused_clauses; compile_search
1367                    // partitions them out of residual_filters.
1368                }
1369            }
1370        }
1371
1372        // Bind `limit` as an integer parameter rather than formatting it into
1373        // the SQL string. Interpolating the limit made the prepared-statement
1374        // SQL vary by limit value, so rusqlite's default 16-slot
1375        // `prepare_cached` cache thrashed for paginated callers that varied
1376        // limits per call. With the bind the SQL is structurally stable for
1377        // a given filter shape regardless of `limit` value.
1378        let limit = compiled.limit;
1379        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1380        let limit_idx = binds.len();
1381        // P2-5: the inner UNION arms project the full active-row column
1382        // set through `JOIN nodes src` (kind, row_id, source_ref,
1383        // content_ref, content_hash, created_at, properties). Both the
1384        // CTE's outer WHERE and the final SELECT consume those columns
1385        // directly, which eliminates the previous `JOIN nodes hn` at the
1386        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1387        // redundant joins on the hot search path. `src.superseded_at IS
1388        // NULL` in each arm already filters retired rows, which is what
1389        // the dropped outer joins used to do.
1390        let (chunk_fts_bind, chunk_kind_clause, prop_fts_bind, prop_kind_clause) = if filter_by_kind
1391        {
1392            (
1393                "?1",
1394                "\n                      AND src.kind = ?2",
1395                "?3",
1396                "\n                      AND fp.kind = ?4",
1397            )
1398        } else {
1399            ("?1", "", "?2", "")
1400        };
1401        let sql = format!(
1402            "WITH search_hits AS (
1403                SELECT
1404                    u.row_id AS row_id,
1405                    u.logical_id AS logical_id,
1406                    u.kind AS kind,
1407                    u.properties AS properties,
1408                    u.source_ref AS source_ref,
1409                    u.content_ref AS content_ref,
1410                    u.created_at AS created_at,
1411                    u.score AS score,
1412                    u.source AS source,
1413                    u.snippet AS snippet,
1414                    u.projection_row_id AS projection_row_id
1415                FROM (
1416                    SELECT
1417                        src.row_id AS row_id,
1418                        c.node_logical_id AS logical_id,
1419                        src.kind AS kind,
1420                        src.properties AS properties,
1421                        src.source_ref AS source_ref,
1422                        src.content_ref AS content_ref,
1423                        src.created_at AS created_at,
1424                        -bm25(fts_nodes) AS score,
1425                        'chunk' AS source,
1426                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1427                        f.chunk_id AS projection_row_id
1428                    FROM fts_nodes f
1429                    JOIN chunks c ON c.id = f.chunk_id
1430                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1431                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}
1432                    UNION ALL
1433                    SELECT
1434                        src.row_id AS row_id,
1435                        fp.node_logical_id AS logical_id,
1436                        src.kind AS kind,
1437                        src.properties AS properties,
1438                        src.source_ref AS source_ref,
1439                        src.content_ref AS content_ref,
1440                        src.created_at AS created_at,
1441                        -bm25(fts_node_properties) AS score,
1442                        'property' AS source,
1443                        substr(fp.text_content, 1, 200) AS snippet,
1444                        CAST(fp.rowid AS TEXT) AS projection_row_id
1445                    FROM fts_node_properties fp
1446                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1447                    WHERE fts_node_properties MATCH {prop_fts_bind}{prop_kind_clause}
1448                ) u
1449                WHERE 1 = 1{fused_clauses}
1450                ORDER BY u.score DESC
1451                LIMIT ?{limit_idx}
1452            )
1453            SELECT
1454                h.row_id,
1455                h.logical_id,
1456                h.kind,
1457                h.properties,
1458                h.content_ref,
1459                am.last_accessed_at,
1460                h.created_at,
1461                h.score,
1462                h.source,
1463                h.snippet,
1464                h.projection_row_id
1465            FROM search_hits h
1466            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1467            WHERE 1 = 1{filter_clauses}
1468            ORDER BY h.score DESC"
1469        );
1470
1471        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1472
1473        let conn_guard = match self.lock_connection() {
1474            Ok(g) => g,
1475            Err(e) => {
1476                self.telemetry.increment_errors();
1477                return Err(e);
1478            }
1479        };
1480        let mut statement = match conn_guard.prepare_cached(&sql) {
1481            Ok(stmt) => stmt,
1482            Err(e) => {
1483                self.telemetry.increment_errors();
1484                return Err(EngineError::Sqlite(e));
1485            }
1486        };
1487
1488        let hits = match statement
1489            .query_map(params_from_iter(bind_values.iter()), |row| {
1490                let source_str: String = row.get(8)?;
1491                // The CTE emits only two literal values here: `'chunk'` and
1492                // `'property'`. Default to `Chunk` on anything unexpected so a
1493                // schema drift surfaces as a mislabelled hit rather than a
1494                // row-level error.
1495                let source = if source_str == "property" {
1496                    SearchHitSource::Property
1497                } else {
1498                    SearchHitSource::Chunk
1499                };
1500                let match_mode = match branch {
1501                    SearchBranch::Strict => SearchMatchMode::Strict,
1502                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1503                };
1504                Ok(SearchHit {
1505                    node: fathomdb_query::NodeRowLite {
1506                        row_id: row.get(0)?,
1507                        logical_id: row.get(1)?,
1508                        kind: row.get(2)?,
1509                        properties: row.get(3)?,
1510                        content_ref: row.get(4)?,
1511                        last_accessed_at: row.get(5)?,
1512                    },
1513                    written_at: row.get(6)?,
1514                    score: row.get(7)?,
1515                    // Phase 10: every branch currently emits text hits.
1516                    modality: RetrievalModality::Text,
1517                    source,
1518                    match_mode: Some(match_mode),
1519                    snippet: row.get(9)?,
1520                    projection_row_id: row.get(10)?,
1521                    vector_distance: None,
1522                    attribution: None,
1523                })
1524            })
1525            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1526        {
1527            Ok(rows) => rows,
1528            Err(e) => {
1529                self.telemetry.increment_errors();
1530                return Err(EngineError::Sqlite(e));
1531            }
1532        };
1533
1534        // Drop the statement so `conn_guard` is free (attribution is
1535        // resolved after dedup in `execute_compiled_search_plan` to avoid
1536        // spending highlight lookups on hits that will be discarded).
1537        drop(statement);
1538        drop(conn_guard);
1539
1540        self.telemetry.increment_queries();
1541        Ok(hits)
1542    }
1543
1544    /// Populate per-hit attribution for the given deduped merged hits.
1545    /// Runs after [`merge_search_branches`] so dropped duplicates do not
1546    /// incur the highlight+position-map lookup cost.
1547    fn populate_attribution_for_hits(
1548        &self,
1549        hits: &mut [SearchHit],
1550        strict_text_query: &fathomdb_query::TextQuery,
1551        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1552    ) -> Result<(), EngineError> {
1553        let conn_guard = match self.lock_connection() {
1554            Ok(g) => g,
1555            Err(e) => {
1556                self.telemetry.increment_errors();
1557                return Err(e);
1558            }
1559        };
1560        let strict_expr = render_text_query_fts5(strict_text_query);
1561        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1562        for hit in hits.iter_mut() {
1563            // Phase 10: text hits always carry `Some(match_mode)`. Vector
1564            // hits (when a future phase adds them) have `None` here and
1565            // are skipped by the attribution resolver because attribution
1566            // is meaningless for vector matches.
1567            let match_expr = match hit.match_mode {
1568                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1569                Some(SearchMatchMode::Relaxed) => {
1570                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1571                }
1572                None => continue,
1573            };
1574            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1575                Ok(att) => hit.attribution = Some(att),
1576                Err(e) => {
1577                    self.telemetry.increment_errors();
1578                    return Err(e);
1579                }
1580            }
1581        }
1582        Ok(())
1583    }
1584
1585    /// # Errors
1586    /// Returns [`EngineError`] if the root query or any bounded expansion
1587    /// query cannot be prepared or executed.
1588    pub fn execute_compiled_grouped_read(
1589        &self,
1590        compiled: &CompiledGroupedQuery,
1591    ) -> Result<GroupedQueryRows, EngineError> {
1592        let root_rows = self.execute_compiled_read(&compiled.root)?;
1593        if root_rows.was_degraded {
1594            return Ok(GroupedQueryRows {
1595                roots: Vec::new(),
1596                expansions: Vec::new(),
1597                was_degraded: true,
1598            });
1599        }
1600
1601        let roots = root_rows.nodes;
1602        let mut expansions = Vec::with_capacity(compiled.expansions.len());
1603        for expansion in &compiled.expansions {
1604            let slot_rows = if roots.is_empty() {
1605                Vec::new()
1606            } else {
1607                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1608            };
1609            expansions.push(ExpansionSlotRows {
1610                slot: expansion.slot.clone(),
1611                roots: slot_rows,
1612            });
1613        }
1614
1615        Ok(GroupedQueryRows {
1616            roots,
1617            expansions,
1618            was_degraded: false,
1619        })
1620    }
1621
1622    /// Chunked batched expansion: splits roots into chunks of
1623    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
1624    /// results while preserving root ordering.  This keeps bind-parameter
1625    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
1626    /// for large result sets.
1627    fn read_expansion_nodes_chunked(
1628        &self,
1629        roots: &[NodeRow],
1630        expansion: &ExpansionSlot,
1631        hard_limit: usize,
1632    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1633        if roots.len() <= BATCH_CHUNK_SIZE {
1634            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1635        }
1636
1637        // Merge chunk results keyed by root logical_id, then reassemble in
1638        // root order.
1639        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1640        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1641            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1642                per_root
1643                    .entry(group.root_logical_id)
1644                    .or_default()
1645                    .extend(group.nodes);
1646            }
1647        }
1648
1649        Ok(roots
1650            .iter()
1651            .map(|root| ExpansionRootRows {
1652                root_logical_id: root.logical_id.clone(),
1653                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1654            })
1655            .collect())
1656    }
1657
1658    /// Batched expansion: one recursive CTE query per expansion slot that
1659    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
1660    /// source_logical_id ...)` to enforce the per-root hard limit inside the
1661    /// database rather than in Rust.
1662    fn read_expansion_nodes_batched(
1663        &self,
1664        roots: &[NodeRow],
1665        expansion: &ExpansionSlot,
1666        hard_limit: usize,
1667    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1668        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1669        let (join_condition, next_logical_id) = match expansion.direction {
1670            fathomdb_query::TraverseDirection::Out => {
1671                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1672            }
1673            fathomdb_query::TraverseDirection::In => {
1674                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1675            }
1676        };
1677
1678        // Build a UNION ALL of SELECT literals for the root seed rows.
1679        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
1680        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
1681        let root_seed_union: String = (1..=root_ids.len())
1682            .map(|i| format!("SELECT ?{i}"))
1683            .collect::<Vec<_>>()
1684            .join(" UNION ALL ");
1685
1686        // The `root_id` column tracks which root each traversal path
1687        // originated from. The `ROW_NUMBER()` window in the outer query
1688        // enforces the per-root hard limit.
1689        let sql = format!(
1690            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
1691            traversed(root_id, logical_id, depth, visited, emitted) AS (
1692                SELECT rid, rid, 0, printf(',%s,', rid), 0
1693                FROM root_ids
1694                UNION ALL
1695                SELECT
1696                    t.root_id,
1697                    {next_logical_id},
1698                    t.depth + 1,
1699                    t.visited || {next_logical_id} || ',',
1700                    t.emitted + 1
1701                FROM traversed t
1702                JOIN edges e ON {join_condition}
1703                    AND e.kind = ?{edge_kind_param}
1704                    AND e.superseded_at IS NULL
1705                WHERE t.depth < {max_depth}
1706                  AND t.emitted < {hard_limit}
1707                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
1708            ),
1709            numbered AS (
1710                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
1711                     , n.content_ref, am.last_accessed_at
1712                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
1713                FROM traversed t
1714                JOIN nodes n ON n.logical_id = t.logical_id
1715                    AND n.superseded_at IS NULL
1716                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
1717                WHERE t.depth > 0
1718            )
1719            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
1720            FROM numbered
1721            WHERE rn <= {hard_limit}
1722            ORDER BY root_id, logical_id",
1723            edge_kind_param = root_ids.len() + 1,
1724            max_depth = expansion.max_depth,
1725        );
1726
1727        let conn_guard = self.lock_connection()?;
1728        let mut statement = conn_guard
1729            .prepare_cached(&sql)
1730            .map_err(EngineError::Sqlite)?;
1731
1732        // Bind root IDs (1..=N) and edge kind (N+1).
1733        let mut bind_values: Vec<Value> = root_ids
1734            .iter()
1735            .map(|id| Value::Text((*id).to_owned()))
1736            .collect();
1737        bind_values.push(Value::Text(expansion.label.clone()));
1738
1739        let rows = statement
1740            .query_map(params_from_iter(bind_values.iter()), |row| {
1741                Ok((
1742                    row.get::<_, String>(0)?, // root_id
1743                    NodeRow {
1744                        row_id: row.get(1)?,
1745                        logical_id: row.get(2)?,
1746                        kind: row.get(3)?,
1747                        properties: row.get(4)?,
1748                        content_ref: row.get(5)?,
1749                        last_accessed_at: row.get(6)?,
1750                    },
1751                ))
1752            })
1753            .map_err(EngineError::Sqlite)?
1754            .collect::<Result<Vec<_>, _>>()
1755            .map_err(EngineError::Sqlite)?;
1756
1757        // Partition results back into per-root groups, preserving root order.
1758        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1759        for (root_id, node) in rows {
1760            per_root.entry(root_id).or_default().push(node);
1761        }
1762
1763        let root_groups = roots
1764            .iter()
1765            .map(|root| ExpansionRootRows {
1766                root_logical_id: root.logical_id.clone(),
1767                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1768            })
1769            .collect();
1770
1771        Ok(root_groups)
1772    }
1773
1774    /// Read a single run by id.
1775    ///
1776    /// # Errors
1777    /// Returns [`EngineError`] if the query fails or if the connection mutex
1778    /// has been poisoned.
1779    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
1780        let conn = self.lock_connection()?;
1781        conn.query_row(
1782            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
1783            rusqlite::params![id],
1784            |row| {
1785                Ok(RunRow {
1786                    id: row.get(0)?,
1787                    kind: row.get(1)?,
1788                    status: row.get(2)?,
1789                    properties: row.get(3)?,
1790                })
1791            },
1792        )
1793        .optional()
1794        .map_err(EngineError::Sqlite)
1795    }
1796
1797    /// Read a single step by id.
1798    ///
1799    /// # Errors
1800    /// Returns [`EngineError`] if the query fails or if the connection mutex
1801    /// has been poisoned.
1802    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
1803        let conn = self.lock_connection()?;
1804        conn.query_row(
1805            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
1806            rusqlite::params![id],
1807            |row| {
1808                Ok(StepRow {
1809                    id: row.get(0)?,
1810                    run_id: row.get(1)?,
1811                    kind: row.get(2)?,
1812                    status: row.get(3)?,
1813                    properties: row.get(4)?,
1814                })
1815            },
1816        )
1817        .optional()
1818        .map_err(EngineError::Sqlite)
1819    }
1820
1821    /// Read a single action by id.
1822    ///
1823    /// # Errors
1824    /// Returns [`EngineError`] if the query fails or if the connection mutex
1825    /// has been poisoned.
1826    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
1827        let conn = self.lock_connection()?;
1828        conn.query_row(
1829            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
1830            rusqlite::params![id],
1831            |row| {
1832                Ok(ActionRow {
1833                    id: row.get(0)?,
1834                    step_id: row.get(1)?,
1835                    kind: row.get(2)?,
1836                    status: row.get(3)?,
1837                    properties: row.get(4)?,
1838                })
1839            },
1840        )
1841        .optional()
1842        .map_err(EngineError::Sqlite)
1843    }
1844
1845    /// Read all active (non-superseded) runs.
1846    ///
1847    /// # Errors
1848    /// Returns [`EngineError`] if the query fails or if the connection mutex
1849    /// has been poisoned.
1850    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
1851        let conn = self.lock_connection()?;
1852        let mut stmt = conn
1853            .prepare_cached(
1854                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
1855            )
1856            .map_err(EngineError::Sqlite)?;
1857        let rows = stmt
1858            .query_map([], |row| {
1859                Ok(RunRow {
1860                    id: row.get(0)?,
1861                    kind: row.get(1)?,
1862                    status: row.get(2)?,
1863                    properties: row.get(3)?,
1864                })
1865            })
1866            .map_err(EngineError::Sqlite)?
1867            .collect::<Result<Vec<_>, _>>()
1868            .map_err(EngineError::Sqlite)?;
1869        Ok(rows)
1870    }
1871
1872    /// Returns the number of shape→SQL entries currently indexed.
1873    ///
1874    /// Each distinct query shape (structural hash of kind + steps + limits)
1875    /// maps to exactly one SQL string.  This is a test-oriented introspection
1876    /// helper; it does not reflect rusqlite's internal prepared-statement
1877    /// cache, which is keyed by SQL text.
1878    ///
1879    /// # Panics
1880    /// Panics if the internal shape-SQL-map mutex is poisoned.
1881    #[must_use]
1882    #[allow(clippy::expect_used)]
1883    pub fn shape_sql_count(&self) -> usize {
1884        self.shape_sql_map
1885            .lock()
1886            .unwrap_or_else(PoisonError::into_inner)
1887            .len()
1888    }
1889
1890    /// Returns a cloned `Arc` to the schema manager.
1891    #[must_use]
1892    pub fn schema_manager(&self) -> Arc<SchemaManager> {
1893        Arc::clone(&self.schema_manager)
1894    }
1895
1896    /// Return the execution plan for a compiled query without executing it.
1897    ///
1898    /// Useful for debugging, testing shape-hash caching, and operator
1899    /// diagnostics. Does not open a transaction or touch the database beyond
1900    /// checking the statement cache.
1901    ///
1902    /// # Panics
1903    /// Panics if the internal shape-SQL-map mutex is poisoned.
1904    #[must_use]
1905    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
1906        let cache_hit = self
1907            .shape_sql_map
1908            .lock()
1909            .unwrap_or_else(PoisonError::into_inner)
1910            .contains_key(&compiled.shape_hash);
1911        QueryPlan {
1912            sql: wrap_node_row_projection_sql(&compiled.sql),
1913            bind_count: compiled.binds.len(),
1914            driving_table: compiled.driving_table,
1915            shape_hash: compiled.shape_hash,
1916            cache_hit,
1917        }
1918    }
1919
1920    /// Execute a named PRAGMA and return the result as a String.
1921    /// Used by Layer 1 tests to verify startup pragma initialization.
1922    ///
1923    /// # Errors
1924    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
1925    /// mutex has been poisoned.
1926    #[doc(hidden)]
1927    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
1928        let conn = self.lock_connection()?;
1929        let result = conn
1930            .query_row(&format!("PRAGMA {name}"), [], |row| {
1931                // PRAGMAs may return TEXT or INTEGER; normalise to String.
1932                row.get::<_, rusqlite::types::Value>(0)
1933            })
1934            .map_err(EngineError::Sqlite)?;
1935        let s = match result {
1936            rusqlite::types::Value::Text(t) => t,
1937            rusqlite::types::Value::Integer(i) => i.to_string(),
1938            rusqlite::types::Value::Real(f) => f.to_string(),
1939            rusqlite::types::Value::Blob(_) => {
1940                return Err(EngineError::InvalidWrite(format!(
1941                    "PRAGMA {name} returned an unexpected BLOB value"
1942                )));
1943            }
1944            rusqlite::types::Value::Null => String::new(),
1945        };
1946        Ok(s)
1947    }
1948
1949    /// Return all provenance events whose `subject` matches the given value.
1950    ///
1951    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
1952    /// values (for excise events).
1953    ///
1954    /// # Errors
1955    /// Returns [`EngineError`] if the query fails or if the connection mutex
1956    /// has been poisoned.
1957    pub fn query_provenance_events(
1958        &self,
1959        subject: &str,
1960    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
1961        let conn = self.lock_connection()?;
1962        let mut stmt = conn
1963            .prepare_cached(
1964                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
1965                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
1966            )
1967            .map_err(EngineError::Sqlite)?;
1968        let events = stmt
1969            .query_map(rusqlite::params![subject], |row| {
1970                Ok(ProvenanceEvent {
1971                    id: row.get(0)?,
1972                    event_type: row.get(1)?,
1973                    subject: row.get(2)?,
1974                    source_ref: row.get(3)?,
1975                    metadata_json: row.get(4)?,
1976                    created_at: row.get(5)?,
1977                })
1978            })
1979            .map_err(EngineError::Sqlite)?
1980            .collect::<Result<Vec<_>, _>>()
1981            .map_err(EngineError::Sqlite)?;
1982        Ok(events)
1983    }
1984}
1985
1986fn wrap_node_row_projection_sql(base_sql: &str) -> String {
1987    format!(
1988        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
1989         FROM ({base_sql}) q \
1990         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
1991    )
1992}
1993
1994/// Returns `true` when `err` indicates the vec virtual table is absent
1995/// (sqlite-vec feature enabled but `vec_nodes_active` not yet created).
1996pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
1997    match err {
1998        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
1999            msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
2000        }
2001        _ => false,
2002    }
2003}
2004
2005fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2006    match value {
2007        ScalarValue::Text(text) => BindValue::Text(text.clone()),
2008        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2009        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2010    }
2011}
2012
2013/// Merge strict and relaxed search branches into a single block-ordered,
2014/// deduplicated, limit-truncated hit list.
2015///
2016/// Phase 3 rules, in order:
2017///
2018/// 1. Each branch is sorted internally by score descending with `logical_id`
2019///    ascending as the deterministic tiebreak.
2020/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
2021///    once from the chunk surface and once from the property surface) the
2022///    higher-score row wins, then chunk > property > vector, then declaration
2023///    order (chunk first).
2024/// 3. Strict hits form one block and relaxed hits form the next. Strict
2025///    always precedes relaxed in the merged output regardless of per-hit
2026///    score.
2027/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
2028///    already appears in the strict block is dropped.
2029/// 5. The merged output is truncated to `limit`.
2030fn merge_search_branches(
2031    strict: Vec<SearchHit>,
2032    relaxed: Vec<SearchHit>,
2033    limit: usize,
2034) -> Vec<SearchHit> {
2035    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2036}
2037
2038/// Three-branch generalization of [`merge_search_branches`]: orders hits as
2039/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
2040/// Semantics, with cross-branch dedup resolved by branch precedence
2041/// (strict > relaxed > vector). Within each block the existing
2042/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
2043/// priority chunk > property > vector).
2044///
2045/// Phase 12 (the unified `search()` entry point) calls this directly. The
2046/// two-branch [`merge_search_branches`] wrapper is preserved as a
2047/// convenience for the text-only `execute_compiled_search_plan` path; both
2048/// reduce to the same code.
2049fn merge_search_branches_three(
2050    strict: Vec<SearchHit>,
2051    relaxed: Vec<SearchHit>,
2052    vector: Vec<SearchHit>,
2053    limit: usize,
2054) -> Vec<SearchHit> {
2055    let strict_block = dedup_branch_hits(strict);
2056    let relaxed_block = dedup_branch_hits(relaxed);
2057    let vector_block = dedup_branch_hits(vector);
2058
2059    let mut seen: std::collections::HashSet<String> = strict_block
2060        .iter()
2061        .map(|h| h.node.logical_id.clone())
2062        .collect();
2063
2064    let mut merged = strict_block;
2065    for hit in relaxed_block {
2066        if seen.insert(hit.node.logical_id.clone()) {
2067            merged.push(hit);
2068        }
2069    }
2070    for hit in vector_block {
2071        if seen.insert(hit.node.logical_id.clone()) {
2072            merged.push(hit);
2073        }
2074    }
2075
2076    if merged.len() > limit {
2077        merged.truncate(limit);
2078    }
2079    merged
2080}
2081
2082/// Sort a branch's hits by score descending + `logical_id` ascending, then
2083/// dedup duplicate `logical_id`s within the branch using source priority
2084/// (chunk > property > vector) and declaration order.
2085fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2086    hits.sort_by(|a, b| {
2087        b.score
2088            .partial_cmp(&a.score)
2089            .unwrap_or(std::cmp::Ordering::Equal)
2090            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2091            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2092    });
2093
2094    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2095    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2096    hits
2097}
2098
2099fn source_priority(source: SearchHitSource) -> u8 {
2100    // Lower is better. Chunk is declared before property in the CTE; vector
2101    // is reserved for future wiring but comes last among the known variants.
2102    match source {
2103        SearchHitSource::Chunk => 0,
2104        SearchHitSource::Property => 1,
2105        SearchHitSource::Vector => 2,
2106    }
2107}
2108
2109/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
2110/// output so the coordinator can recover per-term byte offsets in the
2111/// original `text_content` column.
2112///
2113/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
2114/// ("start of text") byte. These bytes are safe for all valid JSON text
2115/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
2116/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
2117/// extracted blob, making the sentinel ambiguous for that row. Such input
2118/// is treated as out of scope for attribution correctness — hits on those
2119/// rows may have misattributed `matched_paths`, but no panic or query
2120/// failure occurs. A future hardening step could strip bytes < 0x20 at
2121/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
2122///
2123/// Using one-byte markers keeps the original-to-highlighted offset
2124/// accounting trivial: every sentinel adds exactly one byte to the
2125/// highlighted string.
2126const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2127const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2128
2129/// Load the `fts_node_property_positions` sidecar rows for a given
2130/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
2131/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
2132fn load_position_map(
2133    conn: &Connection,
2134    logical_id: &str,
2135    kind: &str,
2136) -> Result<Vec<(usize, usize, String)>, EngineError> {
2137    let mut stmt = conn
2138        .prepare_cached(
2139            "SELECT start_offset, end_offset, leaf_path \
2140             FROM fts_node_property_positions \
2141             WHERE node_logical_id = ?1 AND kind = ?2 \
2142             ORDER BY start_offset ASC",
2143        )
2144        .map_err(EngineError::Sqlite)?;
2145    let rows = stmt
2146        .query_map(rusqlite::params![logical_id, kind], |row| {
2147            let start: i64 = row.get(0)?;
2148            let end: i64 = row.get(1)?;
2149            let path: String = row.get(2)?;
2150            // Offsets are non-negative and within blob byte limits; on the
2151            // off chance a corrupt row is encountered, fall back to 0 so
2152            // lookups silently skip it rather than panicking.
2153            let start = usize::try_from(start).unwrap_or(0);
2154            let end = usize::try_from(end).unwrap_or(0);
2155            Ok((start, end, path))
2156        })
2157        .map_err(EngineError::Sqlite)?;
2158    let mut out = Vec::new();
2159    for row in rows {
2160        out.push(row.map_err(EngineError::Sqlite)?);
2161    }
2162    Ok(out)
2163}
2164
2165/// Parse a `highlight()`-wrapped string, returning the list of original-text
2166/// byte offsets at which matched terms begin. `wrapped` is the
2167/// highlight-decorated form of a text column; `open` / `close` are the
2168/// sentinel markers passed to `highlight()`. The returned offsets refer to
2169/// positions in the *original* text (i.e. the column as it would be stored
2170/// without highlight decoration).
2171fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2172    let mut offsets = Vec::new();
2173    let bytes = wrapped.as_bytes();
2174    let open_bytes = open.as_bytes();
2175    let close_bytes = close.as_bytes();
2176    let mut i = 0usize;
2177    // Number of sentinel bytes consumed so far — every marker encountered
2178    // subtracts from the wrapped-string index to get the original offset.
2179    let mut marker_bytes_seen = 0usize;
2180    while i < bytes.len() {
2181        if bytes[i..].starts_with(open_bytes) {
2182            // Record the original-text offset of the term following the open
2183            // marker.
2184            let original_offset = i - marker_bytes_seen;
2185            offsets.push(original_offset);
2186            i += open_bytes.len();
2187            marker_bytes_seen += open_bytes.len();
2188        } else if bytes[i..].starts_with(close_bytes) {
2189            i += close_bytes.len();
2190            marker_bytes_seen += close_bytes.len();
2191        } else {
2192            i += 1;
2193        }
2194    }
2195    offsets
2196}
2197
2198/// Binary-search the position map for the leaf whose `[start, end)` range
2199/// contains `offset`. Returns `None` if no leaf covers the offset.
2200fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
2201    // Binary search for the greatest start_offset <= offset.
2202    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
2203        Ok(i) => i,
2204        Err(0) => return None,
2205        Err(i) => i - 1,
2206    };
2207    let (start, end, path) = &positions[idx];
2208    if offset >= *start && offset < *end {
2209        Some(path.as_str())
2210    } else {
2211        None
2212    }
2213}
2214
2215/// Resolve per-hit match attribution by introspecting the FTS5 match state
2216/// for the hit's row via `highlight()` and mapping the resulting original-
2217/// text offsets back to recursive-leaf paths via the Phase 4 position map.
2218///
2219/// Chunk-backed hits carry no leaf structure and always return an empty
2220/// `matched_paths` vector. Property-backed hits without a `projection_row_id`
2221/// (which should not happen — the search CTE always populates it) also
2222/// return empty attribution rather than erroring.
2223fn resolve_hit_attribution(
2224    conn: &Connection,
2225    hit: &SearchHit,
2226    match_expr: &str,
2227) -> Result<HitAttribution, EngineError> {
2228    if !matches!(hit.source, SearchHitSource::Property) {
2229        return Ok(HitAttribution {
2230            matched_paths: Vec::new(),
2231        });
2232    }
2233    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
2234        return Ok(HitAttribution {
2235            matched_paths: Vec::new(),
2236        });
2237    };
2238    let rowid: i64 = match rowid_str.parse() {
2239        Ok(v) => v,
2240        Err(_) => {
2241            return Ok(HitAttribution {
2242                matched_paths: Vec::new(),
2243            });
2244        }
2245    };
2246
2247    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
2248    // FTS5 MATCH in the WHERE clause re-establishes the match state that
2249    // `highlight()` needs to decorate the returned text.
2250    let mut stmt = conn
2251        .prepare_cached(
2252            "SELECT highlight(fts_node_properties, 2, ?1, ?2) \
2253             FROM fts_node_properties \
2254             WHERE rowid = ?3 AND fts_node_properties MATCH ?4",
2255        )
2256        .map_err(EngineError::Sqlite)?;
2257    let wrapped: Option<String> = stmt
2258        .query_row(
2259            rusqlite::params![
2260                ATTRIBUTION_HIGHLIGHT_OPEN,
2261                ATTRIBUTION_HIGHLIGHT_CLOSE,
2262                rowid,
2263                match_expr,
2264            ],
2265            |row| row.get(0),
2266        )
2267        .optional()
2268        .map_err(EngineError::Sqlite)?;
2269    let Some(wrapped) = wrapped else {
2270        return Ok(HitAttribution {
2271            matched_paths: Vec::new(),
2272        });
2273    };
2274
2275    let offsets = parse_highlight_offsets(
2276        &wrapped,
2277        ATTRIBUTION_HIGHLIGHT_OPEN,
2278        ATTRIBUTION_HIGHLIGHT_CLOSE,
2279    );
2280    if offsets.is_empty() {
2281        return Ok(HitAttribution {
2282            matched_paths: Vec::new(),
2283        });
2284    }
2285
2286    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
2287    if positions.is_empty() {
2288        // Scalar-only schemas have no position-map entries; attribution
2289        // degrades to an empty vector rather than erroring.
2290        return Ok(HitAttribution {
2291            matched_paths: Vec::new(),
2292        });
2293    }
2294
2295    let mut matched_paths: Vec<String> = Vec::new();
2296    for offset in offsets {
2297        if let Some(path) = find_leaf_for_offset(&positions, offset)
2298            && !matched_paths.iter().any(|p| p == path)
2299        {
2300            matched_paths.push(path.to_owned());
2301        }
2302    }
2303    Ok(HitAttribution { matched_paths })
2304}
2305
2306fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
2307    match value {
2308        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
2309        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
2310        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
2311    }
2312}
2313
2314#[cfg(test)]
2315#[allow(clippy::expect_used)]
2316mod tests {
2317    use std::panic::{AssertUnwindSafe, catch_unwind};
2318    use std::sync::Arc;
2319
2320    use fathomdb_query::{BindValue, QueryBuilder};
2321    use fathomdb_schema::SchemaManager;
2322    use rusqlite::types::Value;
2323    use tempfile::NamedTempFile;
2324
2325    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
2326
2327    use fathomdb_query::{
2328        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
2329    };
2330
2331    use super::{
2332        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
2333        wrap_node_row_projection_sql,
2334    };
2335
2336    fn mk_hit(
2337        logical_id: &str,
2338        score: f64,
2339        match_mode: SearchMatchMode,
2340        source: SearchHitSource,
2341    ) -> SearchHit {
2342        SearchHit {
2343            node: NodeRowLite {
2344                row_id: format!("{logical_id}-row"),
2345                logical_id: logical_id.to_owned(),
2346                kind: "Goal".to_owned(),
2347                properties: "{}".to_owned(),
2348                content_ref: None,
2349                last_accessed_at: None,
2350            },
2351            score,
2352            modality: RetrievalModality::Text,
2353            source,
2354            match_mode: Some(match_mode),
2355            snippet: None,
2356            written_at: 0,
2357            projection_row_id: None,
2358            vector_distance: None,
2359            attribution: None,
2360        }
2361    }
2362
2363    #[test]
2364    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
2365        let strict = vec![mk_hit(
2366            "a",
2367            1.0,
2368            SearchMatchMode::Strict,
2369            SearchHitSource::Chunk,
2370        )];
2371        // Relaxed has a higher score but must still come second.
2372        let relaxed = vec![mk_hit(
2373            "b",
2374            9.9,
2375            SearchMatchMode::Relaxed,
2376            SearchHitSource::Chunk,
2377        )];
2378        let merged = merge_search_branches(strict, relaxed, 10);
2379        assert_eq!(merged.len(), 2);
2380        assert_eq!(merged[0].node.logical_id, "a");
2381        assert!(matches!(
2382            merged[0].match_mode,
2383            Some(SearchMatchMode::Strict)
2384        ));
2385        assert_eq!(merged[1].node.logical_id, "b");
2386        assert!(matches!(
2387            merged[1].match_mode,
2388            Some(SearchMatchMode::Relaxed)
2389        ));
2390    }
2391
2392    #[test]
2393    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
2394        let strict = vec![mk_hit(
2395            "shared",
2396            1.0,
2397            SearchMatchMode::Strict,
2398            SearchHitSource::Chunk,
2399        )];
2400        let relaxed = vec![
2401            mk_hit(
2402                "shared",
2403                9.9,
2404                SearchMatchMode::Relaxed,
2405                SearchHitSource::Chunk,
2406            ),
2407            mk_hit(
2408                "other",
2409                2.0,
2410                SearchMatchMode::Relaxed,
2411                SearchHitSource::Chunk,
2412            ),
2413        ];
2414        let merged = merge_search_branches(strict, relaxed, 10);
2415        assert_eq!(merged.len(), 2);
2416        assert_eq!(merged[0].node.logical_id, "shared");
2417        assert!(matches!(
2418            merged[0].match_mode,
2419            Some(SearchMatchMode::Strict)
2420        ));
2421        assert_eq!(merged[1].node.logical_id, "other");
2422        assert!(matches!(
2423            merged[1].match_mode,
2424            Some(SearchMatchMode::Relaxed)
2425        ));
2426    }
2427
2428    #[test]
2429    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
2430        let strict = vec![
2431            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2432            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2433            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2434        ];
2435        let merged = merge_search_branches(strict, vec![], 10);
2436        assert_eq!(
2437            merged
2438                .iter()
2439                .map(|h| &h.node.logical_id)
2440                .collect::<Vec<_>>(),
2441            vec!["a", "c", "b"]
2442        );
2443    }
2444
2445    #[test]
2446    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
2447        let strict = vec![
2448            mk_hit(
2449                "shared",
2450                1.0,
2451                SearchMatchMode::Strict,
2452                SearchHitSource::Property,
2453            ),
2454            mk_hit(
2455                "shared",
2456                1.0,
2457                SearchMatchMode::Strict,
2458                SearchHitSource::Chunk,
2459            ),
2460        ];
2461        let merged = merge_search_branches(strict, vec![], 10);
2462        assert_eq!(merged.len(), 1);
2463        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
2464    }
2465
2466    #[test]
2467    fn merge_truncates_to_limit_after_block_merge() {
2468        let strict = vec![
2469            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2470            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2471        ];
2472        let relaxed = vec![mk_hit(
2473            "c",
2474            9.0,
2475            SearchMatchMode::Relaxed,
2476            SearchHitSource::Chunk,
2477        )];
2478        let merged = merge_search_branches(strict, relaxed, 2);
2479        assert_eq!(merged.len(), 2);
2480        assert_eq!(merged[0].node.logical_id, "a");
2481        assert_eq!(merged[1].node.logical_id, "b");
2482    }
2483
2484    /// P12 architectural pin: the generalized three-branch merger must
2485    /// produce strict -> relaxed -> vector block ordering with cross-branch
2486    /// dedup resolved by branch precedence (strict > relaxed > vector).
2487    /// v1 `search()` policy never fires the vector branch through the
2488    /// unified planner because read-time embedding is deferred, but the
2489    /// merge helper itself must be ready for the day the planner does so —
2490    /// otherwise wiring the future phase requires touching the core merge
2491    /// code as well as the planner.
2492    #[test]
2493    fn search_architecturally_supports_three_branch_fusion() {
2494        let strict = vec![mk_hit(
2495            "alpha",
2496            1.0,
2497            SearchMatchMode::Strict,
2498            SearchHitSource::Chunk,
2499        )];
2500        let relaxed = vec![mk_hit(
2501            "bravo",
2502            5.0,
2503            SearchMatchMode::Relaxed,
2504            SearchHitSource::Chunk,
2505        )];
2506        // Synthetic vector hit with the highest score. Three-block ordering
2507        // must still place it last.
2508        let mut vector_hit = mk_hit(
2509            "charlie",
2510            9.9,
2511            SearchMatchMode::Strict,
2512            SearchHitSource::Vector,
2513        );
2514        // Vector hits actually carry match_mode=None per the addendum, but
2515        // the merge helper's ordering is mode-agnostic; we override here to
2516        // pin the modality field for the test.
2517        vector_hit.match_mode = None;
2518        vector_hit.modality = RetrievalModality::Vector;
2519        let vector = vec![vector_hit];
2520
2521        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
2522        assert_eq!(merged.len(), 3);
2523        assert_eq!(merged[0].node.logical_id, "alpha");
2524        assert_eq!(merged[1].node.logical_id, "bravo");
2525        assert_eq!(merged[2].node.logical_id, "charlie");
2526        // Vector block comes last regardless of its higher score.
2527        assert!(matches!(merged[2].source, SearchHitSource::Vector));
2528
2529        // Cross-branch dedup: a logical_id that appears in multiple branches
2530        // is attributed to its highest-priority originating branch only.
2531        let strict2 = vec![mk_hit(
2532            "shared",
2533            0.5,
2534            SearchMatchMode::Strict,
2535            SearchHitSource::Chunk,
2536        )];
2537        let relaxed2 = vec![mk_hit(
2538            "shared",
2539            5.0,
2540            SearchMatchMode::Relaxed,
2541            SearchHitSource::Chunk,
2542        )];
2543        let mut vshared = mk_hit(
2544            "shared",
2545            9.9,
2546            SearchMatchMode::Strict,
2547            SearchHitSource::Vector,
2548        );
2549        vshared.match_mode = None;
2550        vshared.modality = RetrievalModality::Vector;
2551        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
2552        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
2553        assert!(matches!(
2554            merged2[0].match_mode,
2555            Some(SearchMatchMode::Strict)
2556        ));
2557        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
2558
2559        // Relaxed wins over vector when strict is absent.
2560        let mut vshared2 = mk_hit(
2561            "shared",
2562            9.9,
2563            SearchMatchMode::Strict,
2564            SearchHitSource::Vector,
2565        );
2566        vshared2.match_mode = None;
2567        vshared2.modality = RetrievalModality::Vector;
2568        let merged3 = merge_search_branches_three(
2569            vec![],
2570            vec![mk_hit(
2571                "shared",
2572                1.0,
2573                SearchMatchMode::Relaxed,
2574                SearchHitSource::Chunk,
2575            )],
2576            vec![vshared2],
2577            10,
2578        );
2579        assert_eq!(merged3.len(), 1);
2580        assert!(matches!(
2581            merged3[0].match_mode,
2582            Some(SearchMatchMode::Relaxed)
2583        ));
2584    }
2585
2586    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
2587    /// never fires this shape today (read-time embedding is deferred), but
2588    /// when it does the merger will see empty strict + empty relaxed + a
2589    /// non-empty vector block. The three-branch merger must pass that
2590    /// block through unchanged, preserving `RetrievalModality::Vector`,
2591    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
2592    ///
2593    /// Note: the review spec asked for `vector_hit_count == 1` /
2594    /// `strict_hit_count == 0` assertions. Those are fields on
2595    /// `SearchRows`, which is assembled one layer up in
2596    /// `execute_compiled_retrieval_plan`. The merger returns a bare
2597    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
2598    /// directly on the returned vec (block shape + per-hit fields).
2599    #[test]
2600    fn merge_search_branches_three_vector_only_preserves_vector_block() {
2601        let mut vector_hit = mk_hit(
2602            "solo",
2603            0.75,
2604            SearchMatchMode::Strict,
2605            SearchHitSource::Vector,
2606        );
2607        vector_hit.match_mode = None;
2608        vector_hit.modality = RetrievalModality::Vector;
2609
2610        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
2611
2612        assert_eq!(merged.len(), 1);
2613        assert_eq!(merged[0].node.logical_id, "solo");
2614        assert!(matches!(merged[0].source, SearchHitSource::Vector));
2615        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
2616        assert!(
2617            merged[0].match_mode.is_none(),
2618            "vector hits carry match_mode=None per addendum 1"
2619        );
2620    }
2621
2622    /// P12-N-3: limit truncation must preserve block precedence — when the
2623    /// strict block alone already exceeds the limit, relaxed and vector
2624    /// hits must be dropped entirely even if they have higher raw scores.
2625    ///
2626    /// Note: the review spec asked for `strict_hit_count == 2` /
2627    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
2628    /// are `SearchRows` fields assembled one layer up. Since
2629    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
2630    /// test asserts the corresponding invariants directly: the returned
2631    /// vec contains exactly the top two strict hits, with no relaxed or
2632    /// vector hits leaking past the limit.
2633    #[test]
2634    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
2635        let strict = vec![
2636            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2637            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2638            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2639        ];
2640        let relaxed = vec![mk_hit(
2641            "d",
2642            9.0,
2643            SearchMatchMode::Relaxed,
2644            SearchHitSource::Chunk,
2645        )];
2646        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
2647        vector_hit.match_mode = None;
2648        vector_hit.modality = RetrievalModality::Vector;
2649        let vector = vec![vector_hit];
2650
2651        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
2652
2653        assert_eq!(merged.len(), 2);
2654        assert_eq!(merged[0].node.logical_id, "a");
2655        assert_eq!(merged[1].node.logical_id, "b");
2656        // Neither relaxed nor vector hits made it past the limit.
2657        assert!(
2658            merged
2659                .iter()
2660                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
2661            "strict block must win limit contention against higher-scored relaxed/vector hits"
2662        );
2663        assert!(
2664            merged
2665                .iter()
2666                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
2667            "no vector source hits should leak past the limit"
2668        );
2669    }
2670
2671    #[test]
2672    fn is_vec_table_absent_matches_known_error_messages() {
2673        use rusqlite::ffi;
2674        fn make_err(msg: &str) -> rusqlite::Error {
2675            rusqlite::Error::SqliteFailure(
2676                ffi::Error {
2677                    code: ffi::ErrorCode::Unknown,
2678                    extended_code: 1,
2679                },
2680                Some(msg.to_owned()),
2681            )
2682        }
2683        assert!(is_vec_table_absent(&make_err(
2684            "no such table: vec_nodes_active"
2685        )));
2686        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
2687        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
2688        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
2689        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
2690    }
2691
2692    #[test]
2693    fn bind_value_text_maps_to_sql_text() {
2694        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
2695        assert_eq!(val, Value::Text("hello".to_owned()));
2696    }
2697
2698    #[test]
2699    fn bind_value_integer_maps_to_sql_integer() {
2700        let val = bind_value_to_sql(&BindValue::Integer(42));
2701        assert_eq!(val, Value::Integer(42));
2702    }
2703
2704    #[test]
2705    fn bind_value_bool_true_maps_to_integer_one() {
2706        let val = bind_value_to_sql(&BindValue::Bool(true));
2707        assert_eq!(val, Value::Integer(1));
2708    }
2709
2710    #[test]
2711    fn bind_value_bool_false_maps_to_integer_zero() {
2712        let val = bind_value_to_sql(&BindValue::Bool(false));
2713        assert_eq!(val, Value::Integer(0));
2714    }
2715
2716    #[test]
2717    fn same_shape_queries_share_one_cache_entry() {
2718        let db = NamedTempFile::new().expect("temporary db");
2719        let coordinator = ExecutionCoordinator::open(
2720            db.path(),
2721            Arc::new(SchemaManager::new()),
2722            None,
2723            1,
2724            Arc::new(TelemetryCounters::default()),
2725            None,
2726        )
2727        .expect("coordinator");
2728
2729        let compiled_a = QueryBuilder::nodes("Meeting")
2730            .text_search("budget", 5)
2731            .limit(10)
2732            .compile()
2733            .expect("compiled a");
2734        let compiled_b = QueryBuilder::nodes("Meeting")
2735            .text_search("standup", 5)
2736            .limit(10)
2737            .compile()
2738            .expect("compiled b");
2739
2740        coordinator
2741            .execute_compiled_read(&compiled_a)
2742            .expect("read a");
2743        coordinator
2744            .execute_compiled_read(&compiled_b)
2745            .expect("read b");
2746
2747        assert_eq!(
2748            compiled_a.shape_hash, compiled_b.shape_hash,
2749            "different bind values, same structural shape → same hash"
2750        );
2751        assert_eq!(coordinator.shape_sql_count(), 1);
2752    }
2753
2754    #[test]
2755    fn vector_read_degrades_gracefully_when_vec_table_absent() {
2756        let db = NamedTempFile::new().expect("temporary db");
2757        let coordinator = ExecutionCoordinator::open(
2758            db.path(),
2759            Arc::new(SchemaManager::new()),
2760            None,
2761            1,
2762            Arc::new(TelemetryCounters::default()),
2763            None,
2764        )
2765        .expect("coordinator");
2766
2767        let compiled = QueryBuilder::nodes("Meeting")
2768            .vector_search("budget embeddings", 5)
2769            .compile()
2770            .expect("vector query compiles");
2771
2772        let result = coordinator.execute_compiled_read(&compiled);
2773        let rows = result.expect("degraded read must succeed, not error");
2774        assert!(
2775            rows.was_degraded,
2776            "result must be flagged as degraded when vec_nodes_active is absent"
2777        );
2778        assert!(
2779            rows.nodes.is_empty(),
2780            "degraded result must return empty nodes"
2781        );
2782    }
2783
2784    #[test]
2785    fn coordinator_caches_by_shape_hash() {
2786        let db = NamedTempFile::new().expect("temporary db");
2787        let coordinator = ExecutionCoordinator::open(
2788            db.path(),
2789            Arc::new(SchemaManager::new()),
2790            None,
2791            1,
2792            Arc::new(TelemetryCounters::default()),
2793            None,
2794        )
2795        .expect("coordinator");
2796
2797        let compiled = QueryBuilder::nodes("Meeting")
2798            .text_search("budget", 5)
2799            .compile()
2800            .expect("compiled query");
2801
2802        coordinator
2803            .execute_compiled_read(&compiled)
2804            .expect("execute compiled read");
2805        assert_eq!(coordinator.shape_sql_count(), 1);
2806    }
2807
2808    // --- Item 6: explain_compiled_read tests ---
2809
2810    #[test]
2811    fn explain_returns_correct_sql() {
2812        let db = NamedTempFile::new().expect("temporary db");
2813        let coordinator = ExecutionCoordinator::open(
2814            db.path(),
2815            Arc::new(SchemaManager::new()),
2816            None,
2817            1,
2818            Arc::new(TelemetryCounters::default()),
2819            None,
2820        )
2821        .expect("coordinator");
2822
2823        let compiled = QueryBuilder::nodes("Meeting")
2824            .text_search("budget", 5)
2825            .compile()
2826            .expect("compiled query");
2827
2828        let plan = coordinator.explain_compiled_read(&compiled);
2829
2830        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
2831    }
2832
2833    #[test]
2834    fn explain_returns_correct_driving_table() {
2835        use fathomdb_query::DrivingTable;
2836
2837        let db = NamedTempFile::new().expect("temporary db");
2838        let coordinator = ExecutionCoordinator::open(
2839            db.path(),
2840            Arc::new(SchemaManager::new()),
2841            None,
2842            1,
2843            Arc::new(TelemetryCounters::default()),
2844            None,
2845        )
2846        .expect("coordinator");
2847
2848        let compiled = QueryBuilder::nodes("Meeting")
2849            .text_search("budget", 5)
2850            .compile()
2851            .expect("compiled query");
2852
2853        let plan = coordinator.explain_compiled_read(&compiled);
2854
2855        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
2856    }
2857
2858    #[test]
2859    fn explain_reports_cache_miss_then_hit() {
2860        let db = NamedTempFile::new().expect("temporary db");
2861        let coordinator = ExecutionCoordinator::open(
2862            db.path(),
2863            Arc::new(SchemaManager::new()),
2864            None,
2865            1,
2866            Arc::new(TelemetryCounters::default()),
2867            None,
2868        )
2869        .expect("coordinator");
2870
2871        let compiled = QueryBuilder::nodes("Meeting")
2872            .text_search("budget", 5)
2873            .compile()
2874            .expect("compiled query");
2875
2876        // Before execution: cache miss
2877        let plan_before = coordinator.explain_compiled_read(&compiled);
2878        assert!(
2879            !plan_before.cache_hit,
2880            "cache miss expected before first execute"
2881        );
2882
2883        // Execute to populate cache
2884        coordinator
2885            .execute_compiled_read(&compiled)
2886            .expect("execute read");
2887
2888        // After execution: cache hit
2889        let plan_after = coordinator.explain_compiled_read(&compiled);
2890        assert!(
2891            plan_after.cache_hit,
2892            "cache hit expected after first execute"
2893        );
2894    }
2895
2896    #[test]
2897    fn explain_does_not_execute_query() {
2898        // Call explain_compiled_read on an empty database. If explain were
2899        // actually executing SQL, it would return Ok with 0 rows. But the
2900        // key assertion is that it returns a QueryPlan (not an error) even
2901        // without touching the database.
2902        let db = NamedTempFile::new().expect("temporary db");
2903        let coordinator = ExecutionCoordinator::open(
2904            db.path(),
2905            Arc::new(SchemaManager::new()),
2906            None,
2907            1,
2908            Arc::new(TelemetryCounters::default()),
2909            None,
2910        )
2911        .expect("coordinator");
2912
2913        let compiled = QueryBuilder::nodes("Meeting")
2914            .text_search("anything", 5)
2915            .compile()
2916            .expect("compiled query");
2917
2918        // This must not error, even though the database is empty
2919        let plan = coordinator.explain_compiled_read(&compiled);
2920
2921        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
2922        assert_eq!(plan.bind_count, compiled.binds.len());
2923    }
2924
2925    #[test]
2926    fn coordinator_executes_compiled_read() {
2927        let db = NamedTempFile::new().expect("temporary db");
2928        let coordinator = ExecutionCoordinator::open(
2929            db.path(),
2930            Arc::new(SchemaManager::new()),
2931            None,
2932            1,
2933            Arc::new(TelemetryCounters::default()),
2934            None,
2935        )
2936        .expect("coordinator");
2937        let conn = rusqlite::Connection::open(db.path()).expect("open db");
2938
2939        conn.execute_batch(
2940            r#"
2941            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
2942            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
2943            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
2944            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
2945            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
2946            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
2947            "#,
2948        )
2949        .expect("seed data");
2950
2951        let compiled = QueryBuilder::nodes("Meeting")
2952            .text_search("budget", 5)
2953            .limit(5)
2954            .compile()
2955            .expect("compiled query");
2956
2957        let rows = coordinator
2958            .execute_compiled_read(&compiled)
2959            .expect("execute read");
2960
2961        assert_eq!(rows.nodes.len(), 1);
2962        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
2963    }
2964
2965    #[test]
2966    fn text_search_finds_structured_only_node_via_property_fts() {
2967        let db = NamedTempFile::new().expect("temporary db");
2968        let coordinator = ExecutionCoordinator::open(
2969            db.path(),
2970            Arc::new(SchemaManager::new()),
2971            None,
2972            1,
2973            Arc::new(TelemetryCounters::default()),
2974            None,
2975        )
2976        .expect("coordinator");
2977        let conn = rusqlite::Connection::open(db.path()).expect("open db");
2978
2979        // Insert a structured-only node (no chunks) with a property FTS row.
2980        conn.execute_batch(
2981            r#"
2982            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2983            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
2984            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
2985            VALUES ('goal-1', 'Goal', 'Ship v2');
2986            "#,
2987        )
2988        .expect("seed data");
2989
2990        let compiled = QueryBuilder::nodes("Goal")
2991            .text_search("Ship", 5)
2992            .limit(5)
2993            .compile()
2994            .expect("compiled query");
2995
2996        let rows = coordinator
2997            .execute_compiled_read(&compiled)
2998            .expect("execute read");
2999
3000        assert_eq!(rows.nodes.len(), 1);
3001        assert_eq!(rows.nodes[0].logical_id, "goal-1");
3002    }
3003
3004    #[test]
3005    fn text_search_returns_both_chunk_and_property_backed_hits() {
3006        let db = NamedTempFile::new().expect("temporary db");
3007        let coordinator = ExecutionCoordinator::open(
3008            db.path(),
3009            Arc::new(SchemaManager::new()),
3010            None,
3011            1,
3012            Arc::new(TelemetryCounters::default()),
3013            None,
3014        )
3015        .expect("coordinator");
3016        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3017
3018        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
3019        conn.execute_batch(
3020            r"
3021            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3022            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3023            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3024            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3025            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3026            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3027            ",
3028        )
3029        .expect("seed chunk-backed node");
3030
3031        // Property-backed hit: a Meeting with property FTS containing "quarterly".
3032        conn.execute_batch(
3033            r#"
3034            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3035            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3036            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
3037            VALUES ('meeting-2', 'Meeting', 'quarterly sync');
3038            "#,
3039        )
3040        .expect("seed property-backed node");
3041
3042        let compiled = QueryBuilder::nodes("Meeting")
3043            .text_search("quarterly", 10)
3044            .limit(10)
3045            .compile()
3046            .expect("compiled query");
3047
3048        let rows = coordinator
3049            .execute_compiled_read(&compiled)
3050            .expect("execute read");
3051
3052        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3053        ids.sort_unstable();
3054        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3055    }
3056
3057    #[test]
3058    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3059        let db = NamedTempFile::new().expect("temporary db");
3060        let coordinator = ExecutionCoordinator::open(
3061            db.path(),
3062            Arc::new(SchemaManager::new()),
3063            None,
3064            1,
3065            Arc::new(TelemetryCounters::default()),
3066            None,
3067        )
3068        .expect("coordinator");
3069        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3070
3071        conn.execute_batch(
3072            r"
3073            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3074            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3075            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3076            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3077            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3078            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3079            ",
3080        )
3081        .expect("seed chunk-backed node");
3082
3083        let compiled = QueryBuilder::nodes("Meeting")
3084            .text_search("not a ship", 10)
3085            .limit(10)
3086            .compile()
3087            .expect("compiled query");
3088
3089        let rows = coordinator
3090            .execute_compiled_read(&compiled)
3091            .expect("execute read");
3092
3093        assert_eq!(rows.nodes.len(), 1);
3094        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3095    }
3096
3097    // --- Item 1: capability gate tests ---
3098
3099    #[test]
3100    fn capability_gate_reports_false_without_feature() {
3101        let db = NamedTempFile::new().expect("temporary db");
3102        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
3103        // when no dimension is requested (the vector profile is never bootstrapped).
3104        let coordinator = ExecutionCoordinator::open(
3105            db.path(),
3106            Arc::new(SchemaManager::new()),
3107            None,
3108            1,
3109            Arc::new(TelemetryCounters::default()),
3110            None,
3111        )
3112        .expect("coordinator");
3113        assert!(
3114            !coordinator.vector_enabled(),
3115            "vector_enabled must be false when no dimension is requested"
3116        );
3117    }
3118
3119    #[cfg(feature = "sqlite-vec")]
3120    #[test]
3121    fn capability_gate_reports_true_when_feature_enabled() {
3122        let db = NamedTempFile::new().expect("temporary db");
3123        let coordinator = ExecutionCoordinator::open(
3124            db.path(),
3125            Arc::new(SchemaManager::new()),
3126            Some(128),
3127            1,
3128            Arc::new(TelemetryCounters::default()),
3129            None,
3130        )
3131        .expect("coordinator");
3132        assert!(
3133            coordinator.vector_enabled(),
3134            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3135        );
3136    }
3137
3138    // --- Item 4: runtime table read tests ---
3139
3140    #[test]
3141    fn read_run_returns_inserted_run() {
3142        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3143
3144        let db = NamedTempFile::new().expect("temporary db");
3145        let writer = WriterActor::start(
3146            db.path(),
3147            Arc::new(SchemaManager::new()),
3148            ProvenanceMode::Warn,
3149            Arc::new(TelemetryCounters::default()),
3150        )
3151        .expect("writer");
3152        writer
3153            .submit(WriteRequest {
3154                label: "runtime".to_owned(),
3155                nodes: vec![],
3156                node_retires: vec![],
3157                edges: vec![],
3158                edge_retires: vec![],
3159                chunks: vec![],
3160                runs: vec![RunInsert {
3161                    id: "run-r1".to_owned(),
3162                    kind: "session".to_owned(),
3163                    status: "active".to_owned(),
3164                    properties: "{}".to_owned(),
3165                    source_ref: Some("src-1".to_owned()),
3166                    upsert: false,
3167                    supersedes_id: None,
3168                }],
3169                steps: vec![],
3170                actions: vec![],
3171                optional_backfills: vec![],
3172                vec_inserts: vec![],
3173                operational_writes: vec![],
3174            })
3175            .expect("write run");
3176
3177        let coordinator = ExecutionCoordinator::open(
3178            db.path(),
3179            Arc::new(SchemaManager::new()),
3180            None,
3181            1,
3182            Arc::new(TelemetryCounters::default()),
3183            None,
3184        )
3185        .expect("coordinator");
3186        let row = coordinator
3187            .read_run("run-r1")
3188            .expect("read_run")
3189            .expect("row exists");
3190        assert_eq!(row.id, "run-r1");
3191        assert_eq!(row.kind, "session");
3192        assert_eq!(row.status, "active");
3193    }
3194
3195    #[test]
3196    fn read_step_returns_inserted_step() {
3197        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
3198
3199        let db = NamedTempFile::new().expect("temporary db");
3200        let writer = WriterActor::start(
3201            db.path(),
3202            Arc::new(SchemaManager::new()),
3203            ProvenanceMode::Warn,
3204            Arc::new(TelemetryCounters::default()),
3205        )
3206        .expect("writer");
3207        writer
3208            .submit(WriteRequest {
3209                label: "runtime".to_owned(),
3210                nodes: vec![],
3211                node_retires: vec![],
3212                edges: vec![],
3213                edge_retires: vec![],
3214                chunks: vec![],
3215                runs: vec![RunInsert {
3216                    id: "run-s1".to_owned(),
3217                    kind: "session".to_owned(),
3218                    status: "active".to_owned(),
3219                    properties: "{}".to_owned(),
3220                    source_ref: Some("src-1".to_owned()),
3221                    upsert: false,
3222                    supersedes_id: None,
3223                }],
3224                steps: vec![StepInsert {
3225                    id: "step-s1".to_owned(),
3226                    run_id: "run-s1".to_owned(),
3227                    kind: "llm".to_owned(),
3228                    status: "completed".to_owned(),
3229                    properties: "{}".to_owned(),
3230                    source_ref: Some("src-1".to_owned()),
3231                    upsert: false,
3232                    supersedes_id: None,
3233                }],
3234                actions: vec![],
3235                optional_backfills: vec![],
3236                vec_inserts: vec![],
3237                operational_writes: vec![],
3238            })
3239            .expect("write step");
3240
3241        let coordinator = ExecutionCoordinator::open(
3242            db.path(),
3243            Arc::new(SchemaManager::new()),
3244            None,
3245            1,
3246            Arc::new(TelemetryCounters::default()),
3247            None,
3248        )
3249        .expect("coordinator");
3250        let row = coordinator
3251            .read_step("step-s1")
3252            .expect("read_step")
3253            .expect("row exists");
3254        assert_eq!(row.id, "step-s1");
3255        assert_eq!(row.run_id, "run-s1");
3256        assert_eq!(row.kind, "llm");
3257    }
3258
3259    #[test]
3260    fn read_action_returns_inserted_action() {
3261        use crate::{
3262            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
3263            writer::{ActionInsert, StepInsert},
3264        };
3265
3266        let db = NamedTempFile::new().expect("temporary db");
3267        let writer = WriterActor::start(
3268            db.path(),
3269            Arc::new(SchemaManager::new()),
3270            ProvenanceMode::Warn,
3271            Arc::new(TelemetryCounters::default()),
3272        )
3273        .expect("writer");
3274        writer
3275            .submit(WriteRequest {
3276                label: "runtime".to_owned(),
3277                nodes: vec![],
3278                node_retires: vec![],
3279                edges: vec![],
3280                edge_retires: vec![],
3281                chunks: vec![],
3282                runs: vec![RunInsert {
3283                    id: "run-a1".to_owned(),
3284                    kind: "session".to_owned(),
3285                    status: "active".to_owned(),
3286                    properties: "{}".to_owned(),
3287                    source_ref: Some("src-1".to_owned()),
3288                    upsert: false,
3289                    supersedes_id: None,
3290                }],
3291                steps: vec![StepInsert {
3292                    id: "step-a1".to_owned(),
3293                    run_id: "run-a1".to_owned(),
3294                    kind: "llm".to_owned(),
3295                    status: "completed".to_owned(),
3296                    properties: "{}".to_owned(),
3297                    source_ref: Some("src-1".to_owned()),
3298                    upsert: false,
3299                    supersedes_id: None,
3300                }],
3301                actions: vec![ActionInsert {
3302                    id: "action-a1".to_owned(),
3303                    step_id: "step-a1".to_owned(),
3304                    kind: "emit".to_owned(),
3305                    status: "completed".to_owned(),
3306                    properties: "{}".to_owned(),
3307                    source_ref: Some("src-1".to_owned()),
3308                    upsert: false,
3309                    supersedes_id: None,
3310                }],
3311                optional_backfills: vec![],
3312                vec_inserts: vec![],
3313                operational_writes: vec![],
3314            })
3315            .expect("write action");
3316
3317        let coordinator = ExecutionCoordinator::open(
3318            db.path(),
3319            Arc::new(SchemaManager::new()),
3320            None,
3321            1,
3322            Arc::new(TelemetryCounters::default()),
3323            None,
3324        )
3325        .expect("coordinator");
3326        let row = coordinator
3327            .read_action("action-a1")
3328            .expect("read_action")
3329            .expect("row exists");
3330        assert_eq!(row.id, "action-a1");
3331        assert_eq!(row.step_id, "step-a1");
3332        assert_eq!(row.kind, "emit");
3333    }
3334
3335    #[test]
3336    fn read_active_runs_excludes_superseded() {
3337        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3338
3339        let db = NamedTempFile::new().expect("temporary db");
3340        let writer = WriterActor::start(
3341            db.path(),
3342            Arc::new(SchemaManager::new()),
3343            ProvenanceMode::Warn,
3344            Arc::new(TelemetryCounters::default()),
3345        )
3346        .expect("writer");
3347
3348        // Insert original run
3349        writer
3350            .submit(WriteRequest {
3351                label: "v1".to_owned(),
3352                nodes: vec![],
3353                node_retires: vec![],
3354                edges: vec![],
3355                edge_retires: vec![],
3356                chunks: vec![],
3357                runs: vec![RunInsert {
3358                    id: "run-v1".to_owned(),
3359                    kind: "session".to_owned(),
3360                    status: "active".to_owned(),
3361                    properties: "{}".to_owned(),
3362                    source_ref: Some("src-1".to_owned()),
3363                    upsert: false,
3364                    supersedes_id: None,
3365                }],
3366                steps: vec![],
3367                actions: vec![],
3368                optional_backfills: vec![],
3369                vec_inserts: vec![],
3370                operational_writes: vec![],
3371            })
3372            .expect("v1 write");
3373
3374        // Supersede original run with v2
3375        writer
3376            .submit(WriteRequest {
3377                label: "v2".to_owned(),
3378                nodes: vec![],
3379                node_retires: vec![],
3380                edges: vec![],
3381                edge_retires: vec![],
3382                chunks: vec![],
3383                runs: vec![RunInsert {
3384                    id: "run-v2".to_owned(),
3385                    kind: "session".to_owned(),
3386                    status: "completed".to_owned(),
3387                    properties: "{}".to_owned(),
3388                    source_ref: Some("src-2".to_owned()),
3389                    upsert: true,
3390                    supersedes_id: Some("run-v1".to_owned()),
3391                }],
3392                steps: vec![],
3393                actions: vec![],
3394                optional_backfills: vec![],
3395                vec_inserts: vec![],
3396                operational_writes: vec![],
3397            })
3398            .expect("v2 write");
3399
3400        let coordinator = ExecutionCoordinator::open(
3401            db.path(),
3402            Arc::new(SchemaManager::new()),
3403            None,
3404            1,
3405            Arc::new(TelemetryCounters::default()),
3406            None,
3407        )
3408        .expect("coordinator");
3409        let active = coordinator.read_active_runs().expect("read_active_runs");
3410
3411        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
3412        assert_eq!(active[0].id, "run-v2");
3413    }
3414
3415    #[allow(clippy::panic)]
3416    fn poison_connection(coordinator: &ExecutionCoordinator) {
3417        let result = catch_unwind(AssertUnwindSafe(|| {
3418            let _guard = coordinator.pool.connections[0]
3419                .lock()
3420                .expect("poison test lock");
3421            panic!("poison coordinator connection mutex");
3422        }));
3423        assert!(
3424            result.is_err(),
3425            "poison test must unwind while holding the connection mutex"
3426        );
3427    }
3428
3429    #[allow(clippy::panic)]
3430    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
3431    where
3432        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
3433    {
3434        match op(coordinator) {
3435            Err(EngineError::Bridge(message)) => {
3436                assert_eq!(message, "connection mutex poisoned");
3437            }
3438            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
3439            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
3440        }
3441    }
3442
3443    #[test]
3444    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
3445        let db = NamedTempFile::new().expect("temporary db");
3446        let coordinator = ExecutionCoordinator::open(
3447            db.path(),
3448            Arc::new(SchemaManager::new()),
3449            None,
3450            1,
3451            Arc::new(TelemetryCounters::default()),
3452            None,
3453        )
3454        .expect("coordinator");
3455
3456        poison_connection(&coordinator);
3457
3458        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
3459        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
3460        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
3461        assert_poisoned_connection_error(
3462            &coordinator,
3463            super::ExecutionCoordinator::read_active_runs,
3464        );
3465        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
3466        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
3467    }
3468
3469    // --- M-2: Bounded shape cache ---
3470
3471    #[test]
3472    fn shape_cache_stays_bounded() {
3473        use fathomdb_query::ShapeHash;
3474
3475        let db = NamedTempFile::new().expect("temporary db");
3476        let coordinator = ExecutionCoordinator::open(
3477            db.path(),
3478            Arc::new(SchemaManager::new()),
3479            None,
3480            1,
3481            Arc::new(TelemetryCounters::default()),
3482            None,
3483        )
3484        .expect("coordinator");
3485
3486        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
3487        {
3488            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
3489            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
3490                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
3491            }
3492        }
3493        // The cache is now over the limit but hasn't been pruned yet (pruning
3494        // happens on the insert path in execute_compiled_read).
3495
3496        // Execute a compiled read to trigger the bounded-cache check.
3497        let compiled = QueryBuilder::nodes("Meeting")
3498            .text_search("budget", 5)
3499            .limit(10)
3500            .compile()
3501            .expect("compiled query");
3502
3503        coordinator
3504            .execute_compiled_read(&compiled)
3505            .expect("execute read");
3506
3507        assert!(
3508            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
3509            "shape cache must stay bounded: got {} entries, max {}",
3510            coordinator.shape_sql_count(),
3511            super::MAX_SHAPE_CACHE_SIZE
3512        );
3513    }
3514
3515    // --- M-1: Read pool size ---
3516
3517    #[test]
3518    fn read_pool_size_configurable() {
3519        let db = NamedTempFile::new().expect("temporary db");
3520        let coordinator = ExecutionCoordinator::open(
3521            db.path(),
3522            Arc::new(SchemaManager::new()),
3523            None,
3524            2,
3525            Arc::new(TelemetryCounters::default()),
3526            None,
3527        )
3528        .expect("coordinator with pool_size=2");
3529
3530        assert_eq!(coordinator.pool.size(), 2);
3531
3532        // Basic read should succeed through the pool.
3533        let compiled = QueryBuilder::nodes("Meeting")
3534            .text_search("budget", 5)
3535            .limit(10)
3536            .compile()
3537            .expect("compiled query");
3538
3539        let result = coordinator.execute_compiled_read(&compiled);
3540        assert!(result.is_ok(), "read through pool must succeed");
3541    }
3542
3543    // --- M-4: Grouped read batching ---
3544
3545    #[test]
3546    fn grouped_read_results_match_baseline() {
3547        use fathomdb_query::TraverseDirection;
3548
3549        let db = NamedTempFile::new().expect("temporary db");
3550
3551        // Bootstrap the database via coordinator (creates schema).
3552        let coordinator = ExecutionCoordinator::open(
3553            db.path(),
3554            Arc::new(SchemaManager::new()),
3555            None,
3556            1,
3557            Arc::new(TelemetryCounters::default()),
3558            None,
3559        )
3560        .expect("coordinator");
3561
3562        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
3563        // to expansion nodes (Task-0-a, Task-0-b, etc.).
3564        {
3565            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
3566            for i in 0..10 {
3567                conn.execute_batch(&format!(
3568                    r#"
3569                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3570                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
3571                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3572                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
3573                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3574                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
3575
3576                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3577                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
3578                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3579                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
3580
3581                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3582                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
3583                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3584                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
3585                    "#,
3586                )).expect("seed data");
3587            }
3588        }
3589
3590        let compiled = QueryBuilder::nodes("Meeting")
3591            .text_search("meeting", 10)
3592            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
3593            .limit(10)
3594            .compile_grouped()
3595            .expect("compiled grouped query");
3596
3597        let result = coordinator
3598            .execute_compiled_grouped_read(&compiled)
3599            .expect("grouped read");
3600
3601        assert!(!result.was_degraded, "grouped read should not be degraded");
3602        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
3603        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
3604        assert_eq!(result.expansions[0].slot, "tasks");
3605        assert_eq!(
3606            result.expansions[0].roots.len(),
3607            10,
3608            "each expansion slot should have entries for all 10 roots"
3609        );
3610
3611        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
3612        for root_expansion in &result.expansions[0].roots {
3613            assert_eq!(
3614                root_expansion.nodes.len(),
3615                2,
3616                "root {} should have 2 expansion nodes, got {}",
3617                root_expansion.root_logical_id,
3618                root_expansion.nodes.len()
3619            );
3620        }
3621    }
3622}