Skip to main content

fathomdb_engine/
coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8    BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9    CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10    FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11    SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20/// Maximum number of cached shape-hash to SQL mappings before the cache is
21/// cleared entirely.  A clear-all strategy is simpler than partial eviction
22/// and the cost of re-compiling on a miss is negligible.
23const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25/// Maximum number of root IDs per batched expansion query.  Kept well below
26/// `SQLITE_MAX_VARIABLE_NUMBER` (default 999) because each batch also binds
27/// the edge-kind parameter.  Larger root sets are chunked into multiple
28/// batches of this size rather than falling back to per-root queries.
29const BATCH_CHUNK_SIZE: usize = 200;
30
31/// A pool of read-only `SQLite` connections for concurrent read access.
32///
33/// Each connection is wrapped in its own [`Mutex`] so multiple readers can
34/// proceed in parallel when they happen to grab different slots.
35struct ReadPool {
36    connections: Vec<Mutex<Connection>>,
37}
38
39impl fmt::Debug for ReadPool {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("ReadPool")
42            .field("size", &self.connections.len())
43            .finish()
44    }
45}
46
47impl ReadPool {
48    /// Open `pool_size` read-only connections to the database at `path`.
49    ///
50    /// Each connection has PRAGMAs initialized via
51    /// [`SchemaManager::initialize_connection`] and, when the `sqlite-vec`
52    /// feature is enabled and `vector_enabled` is true, the vec extension
53    /// auto-loaded.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`EngineError`] if any connection fails to open or initialize.
58    fn new(
59        db_path: &Path,
60        pool_size: usize,
61        schema_manager: &SchemaManager,
62        vector_enabled: bool,
63    ) -> Result<Self, EngineError> {
64        let mut connections = Vec::with_capacity(pool_size);
65        for _ in 0..pool_size {
66            let conn = if vector_enabled {
67                #[cfg(feature = "sqlite-vec")]
68                {
69                    sqlite::open_readonly_connection_with_vec(db_path)?
70                }
71                #[cfg(not(feature = "sqlite-vec"))]
72                {
73                    sqlite::open_readonly_connection(db_path)?
74                }
75            } else {
76                sqlite::open_readonly_connection(db_path)?
77            };
78            schema_manager
79                .initialize_reader_connection(&conn)
80                .map_err(EngineError::Schema)?;
81            connections.push(Mutex::new(conn));
82        }
83        Ok(Self { connections })
84    }
85
86    /// Acquire a connection from the pool.
87    ///
88    /// Tries [`Mutex::try_lock`] on each slot first (fast non-blocking path).
89    /// If every slot is held, falls back to a blocking lock on the first slot.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`EngineError::Bridge`] if the underlying mutex is poisoned.
94    fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
95        // Fast path: try each connection without blocking.
96        for conn in &self.connections {
97            if let Ok(guard) = conn.try_lock() {
98                return Ok(guard);
99            }
100        }
101        // Fallback: block on the first connection.
102        self.connections[0].lock().map_err(|_| {
103            trace_error!("read pool: connection mutex poisoned");
104            EngineError::Bridge("connection mutex poisoned".to_owned())
105        })
106    }
107
108    /// Return the number of connections in the pool.
109    #[cfg(test)]
110    fn size(&self) -> usize {
111        self.connections.len()
112    }
113}
114
115/// Execution plan returned by [`ExecutionCoordinator::explain_compiled_read`].
116///
117/// This is a read-only introspection struct. It does not execute SQL.
118#[derive(Clone, Debug, PartialEq, Eq)]
119pub struct QueryPlan {
120    pub sql: String,
121    pub bind_count: usize,
122    pub driving_table: DrivingTable,
123    pub shape_hash: ShapeHash,
124    pub cache_hit: bool,
125}
126
127/// A single node row returned from a query.
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct NodeRow {
130    /// Physical row ID.
131    pub row_id: String,
132    /// Logical ID of the node.
133    pub logical_id: String,
134    /// Node kind.
135    pub kind: String,
136    /// JSON-encoded node properties.
137    pub properties: String,
138    /// Optional URI referencing external content.
139    pub content_ref: Option<String>,
140    /// Unix timestamp of last access, if tracked.
141    pub last_accessed_at: Option<i64>,
142}
143
144/// A single run row returned from a query.
145#[derive(Clone, Debug, PartialEq, Eq)]
146pub struct RunRow {
147    /// Unique run ID.
148    pub id: String,
149    /// Run kind.
150    pub kind: String,
151    /// Current status.
152    pub status: String,
153    /// JSON-encoded run properties.
154    pub properties: String,
155}
156
157/// A single step row returned from a query.
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepRow {
160    /// Unique step ID.
161    pub id: String,
162    /// ID of the parent run.
163    pub run_id: String,
164    /// Step kind.
165    pub kind: String,
166    /// Current status.
167    pub status: String,
168    /// JSON-encoded step properties.
169    pub properties: String,
170}
171
172/// A single action row returned from a query.
173#[derive(Clone, Debug, PartialEq, Eq)]
174pub struct ActionRow {
175    /// Unique action ID.
176    pub id: String,
177    /// ID of the parent step.
178    pub step_id: String,
179    /// Action kind.
180    pub kind: String,
181    /// Current status.
182    pub status: String,
183    /// JSON-encoded action properties.
184    pub properties: String,
185}
186
187/// A single row from the `provenance_events` table.
188#[derive(Clone, Debug, PartialEq, Eq)]
189pub struct ProvenanceEvent {
190    pub id: String,
191    pub event_type: String,
192    pub subject: String,
193    pub source_ref: Option<String>,
194    pub metadata_json: String,
195    pub created_at: i64,
196}
197
198/// Result set from executing a flat (non-grouped) compiled query.
199#[derive(Clone, Debug, Default, PartialEq, Eq)]
200pub struct QueryRows {
201    /// Matched node rows.
202    pub nodes: Vec<NodeRow>,
203    /// Runs associated with the matched nodes.
204    pub runs: Vec<RunRow>,
205    /// Steps associated with the matched runs.
206    pub steps: Vec<StepRow>,
207    /// Actions associated with the matched steps.
208    pub actions: Vec<ActionRow>,
209    /// `true` when a capability miss (e.g. missing sqlite-vec) caused the query
210    /// to degrade to an empty result instead of propagating an error.
211    pub was_degraded: bool,
212}
213
214/// Expansion results for a single root node within a grouped query.
215#[derive(Clone, Debug, PartialEq, Eq)]
216pub struct ExpansionRootRows {
217    /// Logical ID of the root node that seeded this expansion.
218    pub root_logical_id: String,
219    /// Nodes reached by traversing from the root.
220    pub nodes: Vec<NodeRow>,
221}
222
223/// All expansion results for a single named slot across all roots.
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct ExpansionSlotRows {
226    /// Name of the expansion slot.
227    pub slot: String,
228    /// Per-root expansion results.
229    pub roots: Vec<ExpansionRootRows>,
230}
231
232/// Result set from executing a grouped compiled query.
233#[derive(Clone, Debug, Default, PartialEq, Eq)]
234pub struct GroupedQueryRows {
235    /// Root node rows matched by the base query.
236    pub roots: Vec<NodeRow>,
237    /// Per-slot expansion results.
238    pub expansions: Vec<ExpansionSlotRows>,
239    /// `true` when a capability miss caused the query to degrade to an empty result.
240    pub was_degraded: bool,
241}
242
243/// Manages a pool of read-only `SQLite` connections and executes compiled queries.
244pub struct ExecutionCoordinator {
245    database_path: PathBuf,
246    schema_manager: Arc<SchemaManager>,
247    pool: ReadPool,
248    shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
249    vector_enabled: bool,
250    vec_degradation_warned: AtomicBool,
251    telemetry: Arc<TelemetryCounters>,
252    /// Phase 12.5a: optional read-time query embedder. When present,
253    /// [`Self::execute_retrieval_plan`] invokes it via
254    /// [`Self::fill_vector_branch`] after compile to populate
255    /// `plan.vector`. When `None`, the Phase 12 v1 vector-dormancy
256    /// invariant on `search()` is preserved: the vector slot stays empty
257    /// and the coordinator's stage-gating check skips the vector branch.
258    query_embedder: Option<Arc<dyn QueryEmbedder>>,
259}
260
261impl fmt::Debug for ExecutionCoordinator {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.debug_struct("ExecutionCoordinator")
264            .field("database_path", &self.database_path)
265            .finish_non_exhaustive()
266    }
267}
268
269impl ExecutionCoordinator {
270    /// # Errors
271    /// Returns [`EngineError`] if the database connection cannot be opened or schema bootstrap fails.
272    pub fn open(
273        path: impl AsRef<Path>,
274        schema_manager: Arc<SchemaManager>,
275        vector_dimension: Option<usize>,
276        pool_size: usize,
277        telemetry: Arc<TelemetryCounters>,
278        query_embedder: Option<Arc<dyn QueryEmbedder>>,
279    ) -> Result<Self, EngineError> {
280        let path = path.as_ref().to_path_buf();
281        #[cfg(feature = "sqlite-vec")]
282        let conn = if vector_dimension.is_some() {
283            sqlite::open_connection_with_vec(&path)?
284        } else {
285            sqlite::open_connection(&path)?
286        };
287        #[cfg(not(feature = "sqlite-vec"))]
288        let conn = sqlite::open_connection(&path)?;
289
290        let report = schema_manager.bootstrap(&conn)?;
291
292        // ----- Open-time rebuild guards for derived FTS state -----
293        //
294        // `fts_node_properties` and `fts_node_property_positions` are both
295        // derived from the canonical `nodes.properties` blobs and the set
296        // of registered `fts_property_schemas`. They are NOT source of
297        // truth, so whenever a schema migration or crash leaves either
298        // table out of sync with the canonical state we repopulate them
299        // from scratch at open() time.
300        //
301        // The derived-state invariant is: if `fts_property_schemas` is
302        // non-empty, then `fts_node_properties` must also be non-empty,
303        // and if any schema is recursive-mode then
304        // `fts_node_property_positions` must also be non-empty. If either
305        // guard trips, both tables are cleared and rebuilt together — it
306        // is never correct to rebuild only one half.
307        //
308        // Guard 1 (P2-1, FX pack, v15→v16 migration): detects an empty
309        // `fts_node_properties` while schemas exist. This covers the
310        // Phase 2 tokenizer swap (migration 16 drops the FTS5 virtual
311        // table before recreating it under `porter unicode61`) and the
312        // crash-recovery case where a prior open applied migration 16 but
313        // crashed before the subsequent property-FTS rebuild committed.
314        //
315        // Guard 2 (P4-P2-3, FX2 pack, v16→v17 migration, extended to
316        // v17→v18 by P4-P2-4): detects an empty
317        // `fts_node_property_positions` while any recursive-mode schema
318        // exists. This covers migration 17 (which added the sidecar table
319        // but left it empty) and migration 18 (which drops and recreates
320        // the sidecar to add the UNIQUE constraint, also leaving it
321        // empty). Both migrations land on the same guard because both
322        // leave the positions table empty while recursive schemas remain
323        // registered.
324        //
325        // Both guards are no-ops on an already-consistent database, so
326        // running them on every open is safe and cheap.
327        let needs_property_fts_rebuild = {
328            let schema_count: i64 =
329                conn.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
330                    row.get(0)
331                })?;
332            if schema_count == 0 {
333                false
334            } else {
335                let fts_count: i64 =
336                    conn.query_row("SELECT COUNT(*) FROM fts_node_properties", [], |row| {
337                        row.get(0)
338                    })?;
339                fts_count == 0
340            }
341        };
342        // Guard 2 (see block comment above): recursive schemas registered
343        // but `fts_node_property_positions` empty. Rebuild regenerates
344        // both the FTS blob and the position map from canonical state, so
345        // it is safe to trigger even when `fts_node_properties` is
346        // already populated.
347        let needs_position_backfill = {
348            // NOTE: This LIKE pattern assumes `property_paths_json` is
349            // serialized with compact formatting (no whitespace around
350            // `:`). All current writers go through `serde_json`'s compact
351            // output so this holds. If a future writer emits pretty-
352            // printed JSON (`"mode": "recursive"` with a space), this
353            // guard would silently fail. A more robust check would use
354            // `json_extract(property_paths_json, '$[*].mode')` or a
355            // parsed scan, at the cost of a per-row JSON walk.
356            let recursive_schema_count: i64 = conn.query_row(
357                "SELECT COUNT(*) FROM fts_property_schemas \
358                 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
359                [],
360                |row| row.get(0),
361            )?;
362            if recursive_schema_count == 0 {
363                false
364            } else {
365                let position_count: i64 = conn.query_row(
366                    "SELECT COUNT(*) FROM fts_node_property_positions",
367                    [],
368                    |row| row.get(0),
369                )?;
370                position_count == 0
371            }
372        };
373        if needs_property_fts_rebuild || needs_position_backfill {
374            let tx = conn.unchecked_transaction()?;
375            tx.execute("DELETE FROM fts_node_properties", [])?;
376            tx.execute("DELETE FROM fts_node_property_positions", [])?;
377            crate::projection::insert_property_fts_rows(
378                &tx,
379                "SELECT logical_id, properties FROM nodes \
380                 WHERE kind = ?1 AND superseded_at IS NULL",
381            )?;
382            tx.commit()?;
383        }
384
385        #[cfg(feature = "sqlite-vec")]
386        let mut vector_enabled = report.vector_profile_enabled;
387        #[cfg(not(feature = "sqlite-vec"))]
388        let vector_enabled = {
389            let _ = &report;
390            false
391        };
392
393        if let Some(dim) = vector_dimension {
394            schema_manager
395                .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
396                .map_err(EngineError::Schema)?;
397            // Profile was just created or updated — mark as enabled.
398            #[cfg(feature = "sqlite-vec")]
399            {
400                vector_enabled = true;
401            }
402        }
403
404        // Drop the bootstrap connection — pool connections are used for reads.
405        drop(conn);
406
407        let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
408
409        Ok(Self {
410            database_path: path,
411            schema_manager,
412            pool,
413            shape_sql_map: Mutex::new(HashMap::new()),
414            vector_enabled,
415            vec_degradation_warned: AtomicBool::new(false),
416            telemetry,
417            query_embedder,
418        })
419    }
420
421    /// Returns the filesystem path to the `SQLite` database.
422    pub fn database_path(&self) -> &Path {
423        &self.database_path
424    }
425
426    /// Returns `true` when sqlite-vec was loaded and a vector profile is active.
427    #[must_use]
428    pub fn vector_enabled(&self) -> bool {
429        self.vector_enabled
430    }
431
432    fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
433        self.pool.acquire()
434    }
435
436    /// Aggregate `SQLite` page-cache counters across all pool connections.
437    ///
438    /// Uses `try_lock` to avoid blocking reads for telemetry reporting.
439    /// Connections that are currently locked by a query are skipped — this
440    /// is acceptable for statistical counters.
441    #[must_use]
442    pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
443        let mut total = SqliteCacheStatus::default();
444        for conn_mutex in &self.pool.connections {
445            if let Ok(conn) = conn_mutex.try_lock() {
446                total.add(&read_db_cache_status(&conn));
447            }
448        }
449        total
450    }
451
452    /// # Errors
453    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
454    #[allow(clippy::expect_used)]
455    pub fn execute_compiled_read(
456        &self,
457        compiled: &CompiledQuery,
458    ) -> Result<QueryRows, EngineError> {
459        let row_sql = wrap_node_row_projection_sql(&compiled.sql);
460        // FIX(review): was .expect() — panics on mutex poisoning, cascading failure.
461        // Options: (A) into_inner() for all, (B) EngineError for all, (C) mixed.
462        // Chose (C): shape_sql_map is a pure cache — into_inner() is safe to recover.
463        // conn wraps a SQLite connection whose state may be corrupt after a thread panic,
464        // so we propagate EngineError there instead.
465        {
466            let mut cache = self
467                .shape_sql_map
468                .lock()
469                .unwrap_or_else(PoisonError::into_inner);
470            if cache.len() >= MAX_SHAPE_CACHE_SIZE {
471                trace_debug!(evicted = cache.len(), "shape cache full, clearing");
472                cache.clear();
473            }
474            cache.insert(compiled.shape_hash, row_sql.clone());
475        }
476
477        let bind_values = compiled
478            .binds
479            .iter()
480            .map(bind_value_to_sql)
481            .collect::<Vec<_>>();
482
483        // FIX(review) + Security fix M-8: was .expect() — panics on mutex poisoning.
484        // shape_sql_map uses into_inner() (pure cache, safe to recover).
485        // conn uses map_err → EngineError (connection state may be corrupt after panic;
486        // into_inner() would risk using a connection with partial transaction state).
487        let conn_guard = match self.lock_connection() {
488            Ok(g) => g,
489            Err(e) => {
490                self.telemetry.increment_errors();
491                return Err(e);
492            }
493        };
494        let mut statement = match conn_guard.prepare_cached(&row_sql) {
495            Ok(stmt) => stmt,
496            Err(e) if is_vec_table_absent(&e) => {
497                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
498                    trace_warn!("vector table absent, degrading to non-vector query");
499                }
500                return Ok(QueryRows {
501                    was_degraded: true,
502                    ..Default::default()
503                });
504            }
505            Err(e) => {
506                self.telemetry.increment_errors();
507                return Err(EngineError::Sqlite(e));
508            }
509        };
510        let nodes = match statement
511            .query_map(params_from_iter(bind_values.iter()), |row| {
512                Ok(NodeRow {
513                    row_id: row.get(0)?,
514                    logical_id: row.get(1)?,
515                    kind: row.get(2)?,
516                    properties: row.get(3)?,
517                    content_ref: row.get(4)?,
518                    last_accessed_at: row.get(5)?,
519                })
520            })
521            .and_then(Iterator::collect)
522        {
523            Ok(rows) => rows,
524            Err(e) => {
525                self.telemetry.increment_errors();
526                return Err(EngineError::Sqlite(e));
527            }
528        };
529
530        self.telemetry.increment_queries();
531        Ok(QueryRows {
532            nodes,
533            runs: Vec::new(),
534            steps: Vec::new(),
535            actions: Vec::new(),
536            was_degraded: false,
537        })
538    }
539
540    /// Execute a compiled adaptive search and return matching hits.
541    ///
542    /// Phase 2 splits filters: fusable predicates (`KindEq`, `LogicalIdEq`,
543    /// `SourceRefEq`, `ContentRefEq`, `ContentRefNotNull`) are injected into
544    /// the `search_hits` CTE so the CTE `LIMIT` applies after filtering,
545    /// while residual predicates (JSON path filters) stay in the outer
546    /// `WHERE`. The chunk and property FTS
547    /// indexes are `UNION ALL`-ed, BM25-scored (flipped so larger values mean
548    /// better matches), ordered, and limited. All hits return
549    /// `match_mode = Strict` — the relaxed branch and fallback arrive in
550    /// later phases.
551    ///
552    /// # Errors
553    /// Returns [`EngineError`] if the SQL statement cannot be prepared or executed.
554    pub fn execute_compiled_search(
555        &self,
556        compiled: &CompiledSearch,
557    ) -> Result<SearchRows, EngineError> {
558        // Build the two-branch plan from the strict text query and delegate
559        // to the shared plan-execution routine. The relaxed branch is derived
560        // via `derive_relaxed` and only fires when strict returned fewer than
561        // `min(FALLBACK_TRIGGER_K, limit)` hits. With K = 1 this collapses to
562        // "relaxed iff strict is empty," but the routine spells the rule out
563        // explicitly so raising K later is a one-line constant bump.
564        let (relaxed_query, was_degraded_at_plan_time) =
565            fathomdb_query::derive_relaxed(&compiled.text_query);
566        let relaxed = relaxed_query.map(|q| CompiledSearch {
567            root_kind: compiled.root_kind.clone(),
568            text_query: q,
569            limit: compiled.limit,
570            fusable_filters: compiled.fusable_filters.clone(),
571            residual_filters: compiled.residual_filters.clone(),
572            attribution_requested: compiled.attribution_requested,
573        });
574        let plan = CompiledSearchPlan {
575            strict: compiled.clone(),
576            relaxed,
577            was_degraded_at_plan_time,
578        };
579        self.execute_compiled_search_plan(&plan)
580    }
581
582    /// Execute a two-branch [`CompiledSearchPlan`] and return the merged,
583    /// deduped result rows.
584    ///
585    /// This is the shared retrieval/merge routine that both
586    /// [`Self::execute_compiled_search`] (adaptive path) and
587    /// `Engine::fallback_search` (narrow two-shape path) call into. Strict
588    /// runs first; the relaxed branch only fires when it is present AND the
589    /// strict branch returned fewer than `min(FALLBACK_TRIGGER_K, limit)`
590    /// hits. Merge and dedup semantics are identical to the adaptive path
591    /// regardless of how the plan was constructed.
592    ///
593    /// Error contract: if the relaxed branch errors, the error propagates;
594    /// strict hits are not returned. This matches the rest of the engine's
595    /// fail-hard posture.
596    ///
597    /// # Errors
598    /// Returns [`EngineError`] if either branch's SQL cannot be prepared or
599    /// executed.
600    pub fn execute_compiled_search_plan(
601        &self,
602        plan: &CompiledSearchPlan,
603    ) -> Result<SearchRows, EngineError> {
604        let strict = &plan.strict;
605        let limit = strict.limit;
606        let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
607
608        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
609        let strict_underfilled = strict_hits.len() < fallback_threshold;
610
611        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
612        let mut fallback_used = false;
613        let mut was_degraded = false;
614        if let Some(relaxed) = plan.relaxed.as_ref()
615            && strict_underfilled
616        {
617            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
618            fallback_used = true;
619            was_degraded = plan.was_degraded_at_plan_time;
620        }
621
622        let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
623        // Attribution runs AFTER dedup so that duplicate hits dropped by
624        // `merge_search_branches` do not waste a highlight+position-map
625        // lookup.
626        if strict.attribution_requested {
627            let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
628            self.populate_attribution_for_hits(
629                &mut merged,
630                &strict.text_query,
631                relaxed_text_query,
632            )?;
633        }
634        let strict_hit_count = merged
635            .iter()
636            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
637            .count();
638        let relaxed_hit_count = merged
639            .iter()
640            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
641            .count();
642        // Phase 10: no vector execution path yet, so vector_hit_count is
643        // always zero. Future phases that wire a vector branch will
644        // contribute here.
645        let vector_hit_count = 0;
646
647        Ok(SearchRows {
648            hits: merged,
649            strict_hit_count,
650            relaxed_hit_count,
651            vector_hit_count,
652            fallback_used,
653            was_degraded,
654        })
655    }
656
657    /// Execute a compiled vector-only search and return matching hits.
658    ///
659    /// Phase 11 delivers the standalone vector retrieval path. The emitted
660    /// SQL performs a vec0 KNN scan over `vec_nodes_active`, joins to
661    /// `chunks` and `nodes` (active rows only), and pushes fusable filters
662    /// into the candidate CTE. The outer `SELECT` applies residual JSON
663    /// predicates and orders by score descending, where `score = -distance`
664    /// (higher is better) per addendum 1 §Vector-Specific Behavior.
665    ///
666    /// ## Capability-miss handling
667    ///
668    /// If the `sqlite-vec` capability is absent (feature disabled or the
669    /// `vec_nodes_active` virtual table has not been created because the
670    /// engine was not opened with a `vector_dimension`), this method returns
671    /// an empty [`SearchRows`] with `was_degraded = true`. This is
672    /// **non-fatal** — the error does not propagate — matching the addendum's
673    /// §Vector-Specific Behavior / Degradation.
674    ///
675    /// ## Attribution
676    ///
677    /// When `compiled.attribution_requested == true`, every returned hit
678    /// carries `attribution: Some(HitAttribution { matched_paths: vec![] })`
679    /// per addendum 1 §Attribution on vector hits (Phase 5 chunk-hit rule
680    /// extended uniformly).
681    ///
682    /// # Errors
683    /// Returns [`EngineError`] if the SQL statement cannot be prepared or
684    /// executed for reasons other than a vec-table capability miss.
685    #[allow(clippy::too_many_lines)]
686    pub fn execute_compiled_vector_search(
687        &self,
688        compiled: &CompiledVectorSearch,
689    ) -> Result<SearchRows, EngineError> {
690        use std::fmt::Write as _;
691
692        // Short-circuit zero-limit: callers that pass `limit == 0` expect an
693        // empty result rather than a SQL error from `LIMIT 0` semantics in
694        // the inner vec0 scan.
695        if compiled.limit == 0 {
696            return Ok(SearchRows::default());
697        }
698
699        let filter_by_kind = !compiled.root_kind.is_empty();
700        let mut binds: Vec<BindValue> = Vec::new();
701        binds.push(BindValue::Text(compiled.query_text.clone()));
702        if filter_by_kind {
703            binds.push(BindValue::Text(compiled.root_kind.clone()));
704        }
705
706        // Build fusable-filter clauses, aliased against `src` inside the
707        // candidate CTE. Same predicate set the text path fuses.
708        let mut fused_clauses = String::new();
709        for predicate in &compiled.fusable_filters {
710            match predicate {
711                Predicate::KindEq(kind) => {
712                    binds.push(BindValue::Text(kind.clone()));
713                    let idx = binds.len();
714                    let _ = write!(
715                        fused_clauses,
716                        "\n                      AND src.kind = ?{idx}"
717                    );
718                }
719                Predicate::LogicalIdEq(logical_id) => {
720                    binds.push(BindValue::Text(logical_id.clone()));
721                    let idx = binds.len();
722                    let _ = write!(
723                        fused_clauses,
724                        "\n                      AND src.logical_id = ?{idx}"
725                    );
726                }
727                Predicate::SourceRefEq(source_ref) => {
728                    binds.push(BindValue::Text(source_ref.clone()));
729                    let idx = binds.len();
730                    let _ = write!(
731                        fused_clauses,
732                        "\n                      AND src.source_ref = ?{idx}"
733                    );
734                }
735                Predicate::ContentRefEq(uri) => {
736                    binds.push(BindValue::Text(uri.clone()));
737                    let idx = binds.len();
738                    let _ = write!(
739                        fused_clauses,
740                        "\n                      AND src.content_ref = ?{idx}"
741                    );
742                }
743                Predicate::ContentRefNotNull => {
744                    fused_clauses
745                        .push_str("\n                      AND src.content_ref IS NOT NULL");
746                }
747                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
748                    // JSON predicates are residual; compile_vector_search
749                    // guarantees they never appear here, but stay defensive.
750                }
751            }
752        }
753
754        // Build residual JSON clauses, aliased against `h` in the outer SELECT.
755        let mut filter_clauses = String::new();
756        for predicate in &compiled.residual_filters {
757            match predicate {
758                Predicate::JsonPathEq { path, value } => {
759                    binds.push(BindValue::Text(path.clone()));
760                    let path_idx = binds.len();
761                    binds.push(scalar_to_bind(value));
762                    let value_idx = binds.len();
763                    let _ = write!(
764                        filter_clauses,
765                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
766                    );
767                }
768                Predicate::JsonPathCompare { path, op, value } => {
769                    binds.push(BindValue::Text(path.clone()));
770                    let path_idx = binds.len();
771                    binds.push(scalar_to_bind(value));
772                    let value_idx = binds.len();
773                    let operator = match op {
774                        ComparisonOp::Gt => ">",
775                        ComparisonOp::Gte => ">=",
776                        ComparisonOp::Lt => "<",
777                        ComparisonOp::Lte => "<=",
778                    };
779                    let _ = write!(
780                        filter_clauses,
781                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
782                    );
783                }
784                Predicate::KindEq(_)
785                | Predicate::LogicalIdEq(_)
786                | Predicate::SourceRefEq(_)
787                | Predicate::ContentRefEq(_)
788                | Predicate::ContentRefNotNull => {
789                    // Fusable predicates live in fused_clauses above.
790                }
791            }
792        }
793
794        // Bind the outer limit as a named parameter for prepare_cached
795        // stability across calls that vary only by limit value.
796        let limit = compiled.limit;
797        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
798        let limit_idx = binds.len();
799
800        // sqlite-vec requires the LIMIT/k constraint to be visible directly
801        // on the vec0 KNN scan, so we isolate it in a sub-select. The vec0
802        // LIMIT overfetches `base_limit` = limit (Phase 11 keeps it simple;
803        // Phase 12's planner may raise this to compensate for fusion
804        // narrowing the candidate pool).
805        let base_limit = limit;
806        let kind_clause = if filter_by_kind {
807            "\n                      AND src.kind = ?2"
808        } else {
809            ""
810        };
811
812        let sql = format!(
813            "WITH vector_hits AS (
814                SELECT
815                    src.row_id AS row_id,
816                    src.logical_id AS logical_id,
817                    src.kind AS kind,
818                    src.properties AS properties,
819                    src.source_ref AS source_ref,
820                    src.content_ref AS content_ref,
821                    src.created_at AS created_at,
822                    vc.distance AS distance,
823                    vc.chunk_id AS chunk_id
824                FROM (
825                    SELECT chunk_id, distance
826                    FROM vec_nodes_active
827                    WHERE embedding MATCH ?1
828                    LIMIT {base_limit}
829                ) vc
830                JOIN chunks c ON c.id = vc.chunk_id
831                JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
832                WHERE 1 = 1{kind_clause}{fused_clauses}
833            )
834            SELECT
835                h.row_id,
836                h.logical_id,
837                h.kind,
838                h.properties,
839                h.content_ref,
840                am.last_accessed_at,
841                h.created_at,
842                h.distance,
843                h.chunk_id
844            FROM vector_hits h
845            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
846            WHERE 1 = 1{filter_clauses}
847            ORDER BY h.distance ASC
848            LIMIT ?{limit_idx}"
849        );
850
851        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
852
853        let conn_guard = match self.lock_connection() {
854            Ok(g) => g,
855            Err(e) => {
856                self.telemetry.increment_errors();
857                return Err(e);
858            }
859        };
860        let mut statement = match conn_guard.prepare_cached(&sql) {
861            Ok(stmt) => stmt,
862            Err(e) if is_vec_table_absent(&e) => {
863                // Capability miss: non-fatal — surface as was_degraded.
864                if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
865                    trace_warn!("vector table absent, degrading vector_search to empty result");
866                }
867                return Ok(SearchRows {
868                    hits: Vec::new(),
869                    strict_hit_count: 0,
870                    relaxed_hit_count: 0,
871                    vector_hit_count: 0,
872                    fallback_used: false,
873                    was_degraded: true,
874                });
875            }
876            Err(e) => {
877                self.telemetry.increment_errors();
878                return Err(EngineError::Sqlite(e));
879            }
880        };
881
882        let attribution_requested = compiled.attribution_requested;
883        let hits = match statement
884            .query_map(params_from_iter(bind_values.iter()), |row| {
885                let distance: f64 = row.get(7)?;
886                // Score is the negated distance per addendum 1
887                // §Vector-Specific Behavior / Score and distance. For
888                // distance metrics (sqlite-vec's default) lower distance =
889                // better match, so negating yields the higher-is-better
890                // convention that dedup_branch_hits and the unified result
891                // surface rely on.
892                let score = -distance;
893                Ok(SearchHit {
894                    node: fathomdb_query::NodeRowLite {
895                        row_id: row.get(0)?,
896                        logical_id: row.get(1)?,
897                        kind: row.get(2)?,
898                        properties: row.get(3)?,
899                        content_ref: row.get(4)?,
900                        last_accessed_at: row.get(5)?,
901                    },
902                    written_at: row.get(6)?,
903                    score,
904                    modality: RetrievalModality::Vector,
905                    source: SearchHitSource::Vector,
906                    // Vector hits have no strict/relaxed notion.
907                    match_mode: None,
908                    // Vector hits have no snippet.
909                    snippet: None,
910                    projection_row_id: row.get::<_, Option<String>>(8)?,
911                    vector_distance: Some(distance),
912                    attribution: if attribution_requested {
913                        Some(HitAttribution {
914                            matched_paths: Vec::new(),
915                        })
916                    } else {
917                        None
918                    },
919                })
920            })
921            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
922        {
923            Ok(rows) => rows,
924            Err(e) => {
925                // Some SQLite errors surface during row iteration (e.g. when
926                // the vec0 extension is not loaded but the table exists as a
927                // stub). Classify as capability-miss when the shape matches.
928                if is_vec_table_absent(&e) {
929                    if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
930                        trace_warn!(
931                            "vector table absent at query time, degrading vector_search to empty result"
932                        );
933                    }
934                    drop(statement);
935                    drop(conn_guard);
936                    return Ok(SearchRows {
937                        hits: Vec::new(),
938                        strict_hit_count: 0,
939                        relaxed_hit_count: 0,
940                        vector_hit_count: 0,
941                        fallback_used: false,
942                        was_degraded: true,
943                    });
944                }
945                self.telemetry.increment_errors();
946                return Err(EngineError::Sqlite(e));
947            }
948        };
949
950        drop(statement);
951        drop(conn_guard);
952
953        self.telemetry.increment_queries();
954        let vector_hit_count = hits.len();
955        Ok(SearchRows {
956            hits,
957            strict_hit_count: 0,
958            relaxed_hit_count: 0,
959            vector_hit_count,
960            fallback_used: false,
961            was_degraded: false,
962        })
963    }
964
965    /// Execute a unified [`CompiledRetrievalPlan`] (Phase 12 `search()`
966    /// entry point) and return deterministically ranked, block-ordered
967    /// [`SearchRows`].
968    ///
969    /// Stages, per addendum 1 §Retrieval Planner Model:
970    ///
971    /// 1. **Text strict.** Always runs (empty query short-circuits to an
972    ///    empty branch result inside `run_search_branch`).
973    /// 2. **Text relaxed.** Runs iff the plan carries a relaxed branch AND
974    ///    the strict branch returned fewer than `min(FALLBACK_TRIGGER_K,
975    ///    limit)` hits — same v1 (`K = 1`) zero-hits-only trigger as the
976    ///    Phase 6 text-only path.
977    /// 3. **Vector.** Runs iff text retrieval (strict + relaxed combined)
978    ///    returned zero hits AND `plan.vector` is `Some`. **In v1 the
979    ///    planner never wires a vector branch through `search()`, so this
980    ///    code path is structurally present but dormant.** A future phase
981    ///    that wires read-time embedding into `compile_retrieval_plan` will
982    ///    immediately light it up.
983    /// 4. **Fusion.** All collected hits are merged via
984    ///    [`merge_search_branches_three`], which produces strict ->
985    ///    relaxed -> vector block ordering with cross-branch dedup
986    ///    resolved by branch precedence.
987    ///
988    /// `was_degraded` covers only the relaxed-branch cap miss in v1. The
989    /// addendum's "vector capability miss => `was_degraded`" semantics
990    /// applies to `search()` only when the unified planner actually fires
991    /// the vector branch, which v1 never does.
992    ///
993    /// # Errors
994    /// Returns [`EngineError`] if any stage's SQL cannot be prepared or
995    /// executed for a non-capability-miss reason.
996    pub fn execute_retrieval_plan(
997        &self,
998        plan: &CompiledRetrievalPlan,
999        raw_query: &str,
1000    ) -> Result<SearchRows, EngineError> {
1001        // Phase 12.5a: we work against a local owned copy so
1002        // `fill_vector_branch` can mutate `plan.vector` and
1003        // `plan.was_degraded_at_plan_time`. Cloning is cheap (the plan is
1004        // a bounded set of predicates + text AST nodes) and avoids
1005        // forcing callers to hand us `&mut` access.
1006        let mut plan = plan.clone();
1007        let limit = plan.text.strict.limit;
1008
1009        // Stage 1: text strict.
1010        let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1011
1012        // Stage 2: text relaxed. Same K=1 zero-hits-only trigger the Phase 6
1013        // path uses.
1014        let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1015        let strict_underfilled = strict_hits.len() < fallback_threshold;
1016        let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1017        let mut fallback_used = false;
1018        let mut was_degraded = false;
1019        if let Some(relaxed) = plan.text.relaxed.as_ref()
1020            && strict_underfilled
1021        {
1022            relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1023            fallback_used = true;
1024            was_degraded = plan.was_degraded_at_plan_time;
1025        }
1026
1027        // Phase 12.5a: fill the vector branch from the configured
1028        // read-time query embedder, if any. Option (b) from the spec:
1029        // only pay the embedding cost when the text branches returned
1030        // nothing, because the three-branch stage gate below only runs
1031        // the vector stage under exactly that condition. This keeps the
1032        // hot path (strict text matched) embedder-free.
1033        let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1034        if text_branches_empty && self.query_embedder.is_some() {
1035            self.fill_vector_branch(&mut plan, raw_query);
1036        }
1037
1038        // Stage 3: vector. Runs only when text retrieval is empty AND a
1039        // vector branch is present. When no embedder is configured (Phase
1040        // 12.5a default) `plan.vector` stays `None` and this stage is a
1041        // no-op, preserving the Phase 12 v1 dormancy invariant.
1042        let mut vector_hits: Vec<SearchHit> = Vec::new();
1043        if let Some(vector) = plan.vector.as_ref()
1044            && strict_hits.is_empty()
1045            && relaxed_hits.is_empty()
1046        {
1047            let vector_rows = self.execute_compiled_vector_search(vector)?;
1048            // `execute_compiled_vector_search` returns a fully populated
1049            // `SearchRows`. Promote its hits into the merge stage and lift
1050            // its capability-miss `was_degraded` flag onto the unified
1051            // result, per addendum §Vector-Specific Behavior.
1052            vector_hits = vector_rows.hits;
1053            if vector_rows.was_degraded {
1054                was_degraded = true;
1055            }
1056        }
1057        // Phase 12.5a: an embedder-reported capability miss surfaces as
1058        // `plan.was_degraded_at_plan_time = true` set inside
1059        // `fill_vector_branch`. Lift it onto the response so callers see
1060        // the graceful degradation even when the vector stage-gate never
1061        // had the chance to fire (the embedder call itself failed and
1062        // the vector slot stayed `None`).
1063        if text_branches_empty
1064            && plan.was_degraded_at_plan_time
1065            && plan.vector.is_none()
1066            && self.query_embedder.is_some()
1067        {
1068            was_degraded = true;
1069        }
1070
1071        // Stage 4: fusion.
1072        let strict = &plan.text.strict;
1073        let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1074        if strict.attribution_requested {
1075            let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1076            self.populate_attribution_for_hits(
1077                &mut merged,
1078                &strict.text_query,
1079                relaxed_text_query,
1080            )?;
1081        }
1082
1083        let strict_hit_count = merged
1084            .iter()
1085            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1086            .count();
1087        let relaxed_hit_count = merged
1088            .iter()
1089            .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1090            .count();
1091        let vector_hit_count = merged
1092            .iter()
1093            .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1094            .count();
1095
1096        Ok(SearchRows {
1097            hits: merged,
1098            strict_hit_count,
1099            relaxed_hit_count,
1100            vector_hit_count,
1101            fallback_used,
1102            was_degraded,
1103        })
1104    }
1105
1106    /// Phase 12.5a: populate `plan.vector` from the configured read-time
1107    /// query embedder, if any.
1108    ///
1109    /// Preconditions (enforced by the caller in `execute_retrieval_plan`):
1110    /// - `self.query_embedder.is_some()` — no point calling otherwise.
1111    /// - Both the strict and relaxed text branches already ran and
1112    ///   returned zero hits, so the existing three-branch stage gate
1113    ///   will actually fire the vector stage once the slot is populated.
1114    ///   This is option (b) from the Phase 12.5a spec: skip the embedding
1115    ///   cost entirely when text retrieval already won.
1116    ///
1117    /// Contract: never panics, never returns an error. On embedder error
1118    /// it sets `plan.was_degraded_at_plan_time = true` and leaves
1119    /// `plan.vector` as `None`; the coordinator's normal error-free
1120    /// degradation path then reports `was_degraded` on the result.
1121    fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1122        let Some(embedder) = self.query_embedder.as_ref() else {
1123            return;
1124        };
1125        match embedder.embed_query(raw_query) {
1126            Ok(vec) => {
1127                // `CompiledVectorSearch::query_text` is a JSON float-array
1128                // literal at the time the coordinator binds it (see the
1129                // `CompiledVectorSearch` docs). `serde_json::to_string`
1130                // on a `Vec<f32>` produces exactly that shape — no
1131                // wire-format change required.
1132                let literal = match serde_json::to_string(&vec) {
1133                    Ok(s) => s,
1134                    Err(err) => {
1135                        trace_warn!(
1136                            error = %err,
1137                            "query embedder vector serialization failed; skipping vector branch"
1138                        );
1139                        let _ = err; // Used by trace_warn! when tracing feature is active
1140                        plan.was_degraded_at_plan_time = true;
1141                        return;
1142                    }
1143                };
1144                let strict = &plan.text.strict;
1145                plan.vector = Some(CompiledVectorSearch {
1146                    root_kind: strict.root_kind.clone(),
1147                    query_text: literal,
1148                    limit: strict.limit,
1149                    fusable_filters: strict.fusable_filters.clone(),
1150                    residual_filters: strict.residual_filters.clone(),
1151                    attribution_requested: strict.attribution_requested,
1152                });
1153            }
1154            Err(err) => {
1155                trace_warn!(
1156                    error = %err,
1157                    "query embedder unavailable, skipping vector branch"
1158                );
1159                let _ = err; // Used by trace_warn! when tracing feature is active
1160                plan.was_degraded_at_plan_time = true;
1161            }
1162        }
1163    }
1164
1165    /// Execute a single search branch against the underlying FTS surfaces.
1166    ///
1167    /// This is the shared SQL emission path used by
1168    /// [`Self::execute_compiled_search_plan`] to run strict and (when
1169    /// present) relaxed branches of a [`CompiledSearchPlan`] in sequence.
1170    /// The returned hits are tagged with `branch`'s corresponding
1171    /// [`SearchMatchMode`] and are **not** yet deduped or truncated — the
1172    /// caller is responsible for merging multiple branches.
1173    #[allow(clippy::too_many_lines)]
1174    fn run_search_branch(
1175        &self,
1176        compiled: &CompiledSearch,
1177        branch: SearchBranch,
1178    ) -> Result<Vec<SearchHit>, EngineError> {
1179        use std::fmt::Write as _;
1180        // Short-circuit an empty/whitespace-only query: rendering it would
1181        // yield `MATCH ""`, which FTS5 rejects as a syntax error. Callers
1182        // (including the adaptive path when strict is Empty and derive_relaxed
1183        // returns None) must see an empty result, not an error. Each branch
1184        // is short-circuited independently so a strict-Empty + relaxed-Some
1185        // plan still exercises the relaxed branch.
1186        // A top-level `TextQuery::Not` renders to an FTS5 expression that
1187        // matches "every row not containing X" — a complement-of-corpus scan
1188        // that no caller would intentionally want. Short-circuit to empty at
1189        // the root only; a `Not` nested inside an `And` is a legitimate
1190        // exclusion and must still run.
1191        if matches!(
1192            compiled.text_query,
1193            fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1194        ) {
1195            return Ok(Vec::new());
1196        }
1197        let rendered = render_text_query_fts5(&compiled.text_query);
1198        // An empty `root_kind` means "unkind-filtered" — the fallback_search
1199        // helper uses this when the caller did not add `.filter_kind_eq(...)`.
1200        // The adaptive `text_search()` path never produces an empty root_kind
1201        // because `QueryBuilder::nodes(kind)` requires a non-empty string at
1202        // the entry point.
1203        let filter_by_kind = !compiled.root_kind.is_empty();
1204        let mut binds: Vec<BindValue> = if filter_by_kind {
1205            vec![
1206                BindValue::Text(rendered.clone()),
1207                BindValue::Text(compiled.root_kind.clone()),
1208                BindValue::Text(rendered),
1209                BindValue::Text(compiled.root_kind.clone()),
1210            ]
1211        } else {
1212            vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1213        };
1214
1215        // P2-5: both fusable and residual predicates now match against the
1216        // CTE's projected columns (`u.kind`, `u.logical_id`, `u.source_ref`,
1217        // `u.content_ref`, `u.properties`) because the inner UNION arms
1218        // project the full active-row column set through the
1219        // `JOIN nodes src` already present in each arm. The previous
1220        // implementation re-joined `nodes hn` at the CTE level and
1221        // `nodes n` again at the outer SELECT, which was triple work on
1222        // the hot search path.
1223        let mut fused_clauses = String::new();
1224        for predicate in &compiled.fusable_filters {
1225            match predicate {
1226                Predicate::KindEq(kind) => {
1227                    binds.push(BindValue::Text(kind.clone()));
1228                    let idx = binds.len();
1229                    let _ = write!(fused_clauses, "\n                  AND u.kind = ?{idx}");
1230                }
1231                Predicate::LogicalIdEq(logical_id) => {
1232                    binds.push(BindValue::Text(logical_id.clone()));
1233                    let idx = binds.len();
1234                    let _ = write!(
1235                        fused_clauses,
1236                        "\n                  AND u.logical_id = ?{idx}"
1237                    );
1238                }
1239                Predicate::SourceRefEq(source_ref) => {
1240                    binds.push(BindValue::Text(source_ref.clone()));
1241                    let idx = binds.len();
1242                    let _ = write!(
1243                        fused_clauses,
1244                        "\n                  AND u.source_ref = ?{idx}"
1245                    );
1246                }
1247                Predicate::ContentRefEq(uri) => {
1248                    binds.push(BindValue::Text(uri.clone()));
1249                    let idx = binds.len();
1250                    let _ = write!(
1251                        fused_clauses,
1252                        "\n                  AND u.content_ref = ?{idx}"
1253                    );
1254                }
1255                Predicate::ContentRefNotNull => {
1256                    fused_clauses.push_str("\n                  AND u.content_ref IS NOT NULL");
1257                }
1258                Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1259                    // Should be in residual_filters; compile_search guarantees
1260                    // this, but stay defensive.
1261                }
1262            }
1263        }
1264
1265        let mut filter_clauses = String::new();
1266        for predicate in &compiled.residual_filters {
1267            match predicate {
1268                Predicate::JsonPathEq { path, value } => {
1269                    binds.push(BindValue::Text(path.clone()));
1270                    let path_idx = binds.len();
1271                    binds.push(scalar_to_bind(value));
1272                    let value_idx = binds.len();
1273                    let _ = write!(
1274                        filter_clauses,
1275                        "\n  AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1276                    );
1277                }
1278                Predicate::JsonPathCompare { path, op, value } => {
1279                    binds.push(BindValue::Text(path.clone()));
1280                    let path_idx = binds.len();
1281                    binds.push(scalar_to_bind(value));
1282                    let value_idx = binds.len();
1283                    let operator = match op {
1284                        ComparisonOp::Gt => ">",
1285                        ComparisonOp::Gte => ">=",
1286                        ComparisonOp::Lt => "<",
1287                        ComparisonOp::Lte => "<=",
1288                    };
1289                    let _ = write!(
1290                        filter_clauses,
1291                        "\n  AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1292                    );
1293                }
1294                Predicate::KindEq(_)
1295                | Predicate::LogicalIdEq(_)
1296                | Predicate::SourceRefEq(_)
1297                | Predicate::ContentRefEq(_)
1298                | Predicate::ContentRefNotNull => {
1299                    // Fusable predicates live in fused_clauses; compile_search
1300                    // partitions them out of residual_filters.
1301                }
1302            }
1303        }
1304
1305        // Bind `limit` as an integer parameter rather than formatting it into
1306        // the SQL string. Interpolating the limit made the prepared-statement
1307        // SQL vary by limit value, so rusqlite's default 16-slot
1308        // `prepare_cached` cache thrashed for paginated callers that varied
1309        // limits per call. With the bind the SQL is structurally stable for
1310        // a given filter shape regardless of `limit` value.
1311        let limit = compiled.limit;
1312        binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1313        let limit_idx = binds.len();
1314        // P2-5: the inner UNION arms project the full active-row column
1315        // set through `JOIN nodes src` (kind, row_id, source_ref,
1316        // content_ref, content_hash, created_at, properties). Both the
1317        // CTE's outer WHERE and the final SELECT consume those columns
1318        // directly, which eliminates the previous `JOIN nodes hn` at the
1319        // CTE level and `JOIN nodes n` at the outer SELECT — saving two
1320        // redundant joins on the hot search path. `src.superseded_at IS
1321        // NULL` in each arm already filters retired rows, which is what
1322        // the dropped outer joins used to do.
1323        let (chunk_fts_bind, chunk_kind_clause, prop_fts_bind, prop_kind_clause) = if filter_by_kind
1324        {
1325            (
1326                "?1",
1327                "\n                      AND src.kind = ?2",
1328                "?3",
1329                "\n                      AND fp.kind = ?4",
1330            )
1331        } else {
1332            ("?1", "", "?2", "")
1333        };
1334        let sql = format!(
1335            "WITH search_hits AS (
1336                SELECT
1337                    u.row_id AS row_id,
1338                    u.logical_id AS logical_id,
1339                    u.kind AS kind,
1340                    u.properties AS properties,
1341                    u.source_ref AS source_ref,
1342                    u.content_ref AS content_ref,
1343                    u.created_at AS created_at,
1344                    u.score AS score,
1345                    u.source AS source,
1346                    u.snippet AS snippet,
1347                    u.projection_row_id AS projection_row_id
1348                FROM (
1349                    SELECT
1350                        src.row_id AS row_id,
1351                        c.node_logical_id AS logical_id,
1352                        src.kind AS kind,
1353                        src.properties AS properties,
1354                        src.source_ref AS source_ref,
1355                        src.content_ref AS content_ref,
1356                        src.created_at AS created_at,
1357                        -bm25(fts_nodes) AS score,
1358                        'chunk' AS source,
1359                        snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1360                        f.chunk_id AS projection_row_id
1361                    FROM fts_nodes f
1362                    JOIN chunks c ON c.id = f.chunk_id
1363                    JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1364                    WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}
1365                    UNION ALL
1366                    SELECT
1367                        src.row_id AS row_id,
1368                        fp.node_logical_id AS logical_id,
1369                        src.kind AS kind,
1370                        src.properties AS properties,
1371                        src.source_ref AS source_ref,
1372                        src.content_ref AS content_ref,
1373                        src.created_at AS created_at,
1374                        -bm25(fts_node_properties) AS score,
1375                        'property' AS source,
1376                        substr(fp.text_content, 1, 200) AS snippet,
1377                        CAST(fp.rowid AS TEXT) AS projection_row_id
1378                    FROM fts_node_properties fp
1379                    JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1380                    WHERE fts_node_properties MATCH {prop_fts_bind}{prop_kind_clause}
1381                ) u
1382                WHERE 1 = 1{fused_clauses}
1383                ORDER BY u.score DESC
1384                LIMIT ?{limit_idx}
1385            )
1386            SELECT
1387                h.row_id,
1388                h.logical_id,
1389                h.kind,
1390                h.properties,
1391                h.content_ref,
1392                am.last_accessed_at,
1393                h.created_at,
1394                h.score,
1395                h.source,
1396                h.snippet,
1397                h.projection_row_id
1398            FROM search_hits h
1399            LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1400            WHERE 1 = 1{filter_clauses}
1401            ORDER BY h.score DESC"
1402        );
1403
1404        let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1405
1406        let conn_guard = match self.lock_connection() {
1407            Ok(g) => g,
1408            Err(e) => {
1409                self.telemetry.increment_errors();
1410                return Err(e);
1411            }
1412        };
1413        let mut statement = match conn_guard.prepare_cached(&sql) {
1414            Ok(stmt) => stmt,
1415            Err(e) => {
1416                self.telemetry.increment_errors();
1417                return Err(EngineError::Sqlite(e));
1418            }
1419        };
1420
1421        let hits = match statement
1422            .query_map(params_from_iter(bind_values.iter()), |row| {
1423                let source_str: String = row.get(8)?;
1424                // The CTE emits only two literal values here: `'chunk'` and
1425                // `'property'`. Default to `Chunk` on anything unexpected so a
1426                // schema drift surfaces as a mislabelled hit rather than a
1427                // row-level error.
1428                let source = if source_str == "property" {
1429                    SearchHitSource::Property
1430                } else {
1431                    SearchHitSource::Chunk
1432                };
1433                let match_mode = match branch {
1434                    SearchBranch::Strict => SearchMatchMode::Strict,
1435                    SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1436                };
1437                Ok(SearchHit {
1438                    node: fathomdb_query::NodeRowLite {
1439                        row_id: row.get(0)?,
1440                        logical_id: row.get(1)?,
1441                        kind: row.get(2)?,
1442                        properties: row.get(3)?,
1443                        content_ref: row.get(4)?,
1444                        last_accessed_at: row.get(5)?,
1445                    },
1446                    written_at: row.get(6)?,
1447                    score: row.get(7)?,
1448                    // Phase 10: every branch currently emits text hits.
1449                    modality: RetrievalModality::Text,
1450                    source,
1451                    match_mode: Some(match_mode),
1452                    snippet: row.get(9)?,
1453                    projection_row_id: row.get(10)?,
1454                    vector_distance: None,
1455                    attribution: None,
1456                })
1457            })
1458            .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1459        {
1460            Ok(rows) => rows,
1461            Err(e) => {
1462                self.telemetry.increment_errors();
1463                return Err(EngineError::Sqlite(e));
1464            }
1465        };
1466
1467        // Drop the statement so `conn_guard` is free (attribution is
1468        // resolved after dedup in `execute_compiled_search_plan` to avoid
1469        // spending highlight lookups on hits that will be discarded).
1470        drop(statement);
1471        drop(conn_guard);
1472
1473        self.telemetry.increment_queries();
1474        Ok(hits)
1475    }
1476
1477    /// Populate per-hit attribution for the given deduped merged hits.
1478    /// Runs after [`merge_search_branches`] so dropped duplicates do not
1479    /// incur the highlight+position-map lookup cost.
1480    fn populate_attribution_for_hits(
1481        &self,
1482        hits: &mut [SearchHit],
1483        strict_text_query: &fathomdb_query::TextQuery,
1484        relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1485    ) -> Result<(), EngineError> {
1486        let conn_guard = match self.lock_connection() {
1487            Ok(g) => g,
1488            Err(e) => {
1489                self.telemetry.increment_errors();
1490                return Err(e);
1491            }
1492        };
1493        let strict_expr = render_text_query_fts5(strict_text_query);
1494        let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1495        for hit in hits.iter_mut() {
1496            // Phase 10: text hits always carry `Some(match_mode)`. Vector
1497            // hits (when a future phase adds them) have `None` here and
1498            // are skipped by the attribution resolver because attribution
1499            // is meaningless for vector matches.
1500            let match_expr = match hit.match_mode {
1501                Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1502                Some(SearchMatchMode::Relaxed) => {
1503                    relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1504                }
1505                None => continue,
1506            };
1507            match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1508                Ok(att) => hit.attribution = Some(att),
1509                Err(e) => {
1510                    self.telemetry.increment_errors();
1511                    return Err(e);
1512                }
1513            }
1514        }
1515        Ok(())
1516    }
1517
1518    /// # Errors
1519    /// Returns [`EngineError`] if the root query or any bounded expansion
1520    /// query cannot be prepared or executed.
1521    pub fn execute_compiled_grouped_read(
1522        &self,
1523        compiled: &CompiledGroupedQuery,
1524    ) -> Result<GroupedQueryRows, EngineError> {
1525        let root_rows = self.execute_compiled_read(&compiled.root)?;
1526        if root_rows.was_degraded {
1527            return Ok(GroupedQueryRows {
1528                roots: Vec::new(),
1529                expansions: Vec::new(),
1530                was_degraded: true,
1531            });
1532        }
1533
1534        let roots = root_rows.nodes;
1535        let mut expansions = Vec::with_capacity(compiled.expansions.len());
1536        for expansion in &compiled.expansions {
1537            let slot_rows = if roots.is_empty() {
1538                Vec::new()
1539            } else {
1540                self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1541            };
1542            expansions.push(ExpansionSlotRows {
1543                slot: expansion.slot.clone(),
1544                roots: slot_rows,
1545            });
1546        }
1547
1548        Ok(GroupedQueryRows {
1549            roots,
1550            expansions,
1551            was_degraded: false,
1552        })
1553    }
1554
1555    /// Chunked batched expansion: splits roots into chunks of
1556    /// `BATCH_CHUNK_SIZE` and runs one batched query per chunk, then merges
1557    /// results while preserving root ordering.  This keeps bind-parameter
1558    /// counts within `SQLite` limits while avoiding the N+1 per-root pattern
1559    /// for large result sets.
1560    fn read_expansion_nodes_chunked(
1561        &self,
1562        roots: &[NodeRow],
1563        expansion: &ExpansionSlot,
1564        hard_limit: usize,
1565    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1566        if roots.len() <= BATCH_CHUNK_SIZE {
1567            return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1568        }
1569
1570        // Merge chunk results keyed by root logical_id, then reassemble in
1571        // root order.
1572        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1573        for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1574            for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1575                per_root
1576                    .entry(group.root_logical_id)
1577                    .or_default()
1578                    .extend(group.nodes);
1579            }
1580        }
1581
1582        Ok(roots
1583            .iter()
1584            .map(|root| ExpansionRootRows {
1585                root_logical_id: root.logical_id.clone(),
1586                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1587            })
1588            .collect())
1589    }
1590
1591    /// Batched expansion: one recursive CTE query per expansion slot that
1592    /// processes all root IDs at once. Uses `ROW_NUMBER() OVER (PARTITION BY
1593    /// source_logical_id ...)` to enforce the per-root hard limit inside the
1594    /// database rather than in Rust.
1595    fn read_expansion_nodes_batched(
1596        &self,
1597        roots: &[NodeRow],
1598        expansion: &ExpansionSlot,
1599        hard_limit: usize,
1600    ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1601        let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1602        let (join_condition, next_logical_id) = match expansion.direction {
1603            fathomdb_query::TraverseDirection::Out => {
1604                ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1605            }
1606            fathomdb_query::TraverseDirection::In => {
1607                ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1608            }
1609        };
1610
1611        // Build a UNION ALL of SELECT literals for the root seed rows.
1612        // SQLite does not support `VALUES ... AS alias(col)` in older versions,
1613        // so we use `SELECT ?1 UNION ALL SELECT ?2 ...` instead.
1614        let root_seed_union: String = (1..=root_ids.len())
1615            .map(|i| format!("SELECT ?{i}"))
1616            .collect::<Vec<_>>()
1617            .join(" UNION ALL ");
1618
1619        // The `root_id` column tracks which root each traversal path
1620        // originated from. The `ROW_NUMBER()` window in the outer query
1621        // enforces the per-root hard limit.
1622        let sql = format!(
1623            "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
1624            traversed(root_id, logical_id, depth, visited, emitted) AS (
1625                SELECT rid, rid, 0, printf(',%s,', rid), 0
1626                FROM root_ids
1627                UNION ALL
1628                SELECT
1629                    t.root_id,
1630                    {next_logical_id},
1631                    t.depth + 1,
1632                    t.visited || {next_logical_id} || ',',
1633                    t.emitted + 1
1634                FROM traversed t
1635                JOIN edges e ON {join_condition}
1636                    AND e.kind = ?{edge_kind_param}
1637                    AND e.superseded_at IS NULL
1638                WHERE t.depth < {max_depth}
1639                  AND t.emitted < {hard_limit}
1640                  AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
1641            ),
1642            numbered AS (
1643                SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
1644                     , n.content_ref, am.last_accessed_at
1645                     , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
1646                FROM traversed t
1647                JOIN nodes n ON n.logical_id = t.logical_id
1648                    AND n.superseded_at IS NULL
1649                LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
1650                WHERE t.depth > 0
1651            )
1652            SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
1653            FROM numbered
1654            WHERE rn <= {hard_limit}
1655            ORDER BY root_id, logical_id",
1656            edge_kind_param = root_ids.len() + 1,
1657            max_depth = expansion.max_depth,
1658        );
1659
1660        let conn_guard = self.lock_connection()?;
1661        let mut statement = conn_guard
1662            .prepare_cached(&sql)
1663            .map_err(EngineError::Sqlite)?;
1664
1665        // Bind root IDs (1..=N) and edge kind (N+1).
1666        let mut bind_values: Vec<Value> = root_ids
1667            .iter()
1668            .map(|id| Value::Text((*id).to_owned()))
1669            .collect();
1670        bind_values.push(Value::Text(expansion.label.clone()));
1671
1672        let rows = statement
1673            .query_map(params_from_iter(bind_values.iter()), |row| {
1674                Ok((
1675                    row.get::<_, String>(0)?, // root_id
1676                    NodeRow {
1677                        row_id: row.get(1)?,
1678                        logical_id: row.get(2)?,
1679                        kind: row.get(3)?,
1680                        properties: row.get(4)?,
1681                        content_ref: row.get(5)?,
1682                        last_accessed_at: row.get(6)?,
1683                    },
1684                ))
1685            })
1686            .map_err(EngineError::Sqlite)?
1687            .collect::<Result<Vec<_>, _>>()
1688            .map_err(EngineError::Sqlite)?;
1689
1690        // Partition results back into per-root groups, preserving root order.
1691        let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1692        for (root_id, node) in rows {
1693            per_root.entry(root_id).or_default().push(node);
1694        }
1695
1696        let root_groups = roots
1697            .iter()
1698            .map(|root| ExpansionRootRows {
1699                root_logical_id: root.logical_id.clone(),
1700                nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1701            })
1702            .collect();
1703
1704        Ok(root_groups)
1705    }
1706
1707    /// Read a single run by id.
1708    ///
1709    /// # Errors
1710    /// Returns [`EngineError`] if the query fails or if the connection mutex
1711    /// has been poisoned.
1712    pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
1713        let conn = self.lock_connection()?;
1714        conn.query_row(
1715            "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
1716            rusqlite::params![id],
1717            |row| {
1718                Ok(RunRow {
1719                    id: row.get(0)?,
1720                    kind: row.get(1)?,
1721                    status: row.get(2)?,
1722                    properties: row.get(3)?,
1723                })
1724            },
1725        )
1726        .optional()
1727        .map_err(EngineError::Sqlite)
1728    }
1729
1730    /// Read a single step by id.
1731    ///
1732    /// # Errors
1733    /// Returns [`EngineError`] if the query fails or if the connection mutex
1734    /// has been poisoned.
1735    pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
1736        let conn = self.lock_connection()?;
1737        conn.query_row(
1738            "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
1739            rusqlite::params![id],
1740            |row| {
1741                Ok(StepRow {
1742                    id: row.get(0)?,
1743                    run_id: row.get(1)?,
1744                    kind: row.get(2)?,
1745                    status: row.get(3)?,
1746                    properties: row.get(4)?,
1747                })
1748            },
1749        )
1750        .optional()
1751        .map_err(EngineError::Sqlite)
1752    }
1753
1754    /// Read a single action by id.
1755    ///
1756    /// # Errors
1757    /// Returns [`EngineError`] if the query fails or if the connection mutex
1758    /// has been poisoned.
1759    pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
1760        let conn = self.lock_connection()?;
1761        conn.query_row(
1762            "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
1763            rusqlite::params![id],
1764            |row| {
1765                Ok(ActionRow {
1766                    id: row.get(0)?,
1767                    step_id: row.get(1)?,
1768                    kind: row.get(2)?,
1769                    status: row.get(3)?,
1770                    properties: row.get(4)?,
1771                })
1772            },
1773        )
1774        .optional()
1775        .map_err(EngineError::Sqlite)
1776    }
1777
1778    /// Read all active (non-superseded) runs.
1779    ///
1780    /// # Errors
1781    /// Returns [`EngineError`] if the query fails or if the connection mutex
1782    /// has been poisoned.
1783    pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
1784        let conn = self.lock_connection()?;
1785        let mut stmt = conn
1786            .prepare_cached(
1787                "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
1788            )
1789            .map_err(EngineError::Sqlite)?;
1790        let rows = stmt
1791            .query_map([], |row| {
1792                Ok(RunRow {
1793                    id: row.get(0)?,
1794                    kind: row.get(1)?,
1795                    status: row.get(2)?,
1796                    properties: row.get(3)?,
1797                })
1798            })
1799            .map_err(EngineError::Sqlite)?
1800            .collect::<Result<Vec<_>, _>>()
1801            .map_err(EngineError::Sqlite)?;
1802        Ok(rows)
1803    }
1804
1805    /// Returns the number of shape→SQL entries currently indexed.
1806    ///
1807    /// Each distinct query shape (structural hash of kind + steps + limits)
1808    /// maps to exactly one SQL string.  This is a test-oriented introspection
1809    /// helper; it does not reflect rusqlite's internal prepared-statement
1810    /// cache, which is keyed by SQL text.
1811    ///
1812    /// # Panics
1813    /// Panics if the internal shape-SQL-map mutex is poisoned.
1814    #[must_use]
1815    #[allow(clippy::expect_used)]
1816    pub fn shape_sql_count(&self) -> usize {
1817        self.shape_sql_map
1818            .lock()
1819            .unwrap_or_else(PoisonError::into_inner)
1820            .len()
1821    }
1822
1823    /// Returns a cloned `Arc` to the schema manager.
1824    #[must_use]
1825    pub fn schema_manager(&self) -> Arc<SchemaManager> {
1826        Arc::clone(&self.schema_manager)
1827    }
1828
1829    /// Return the execution plan for a compiled query without executing it.
1830    ///
1831    /// Useful for debugging, testing shape-hash caching, and operator
1832    /// diagnostics. Does not open a transaction or touch the database beyond
1833    /// checking the statement cache.
1834    ///
1835    /// # Panics
1836    /// Panics if the internal shape-SQL-map mutex is poisoned.
1837    #[must_use]
1838    pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
1839        let cache_hit = self
1840            .shape_sql_map
1841            .lock()
1842            .unwrap_or_else(PoisonError::into_inner)
1843            .contains_key(&compiled.shape_hash);
1844        QueryPlan {
1845            sql: wrap_node_row_projection_sql(&compiled.sql),
1846            bind_count: compiled.binds.len(),
1847            driving_table: compiled.driving_table,
1848            shape_hash: compiled.shape_hash,
1849            cache_hit,
1850        }
1851    }
1852
1853    /// Execute a named PRAGMA and return the result as a String.
1854    /// Used by Layer 1 tests to verify startup pragma initialization.
1855    ///
1856    /// # Errors
1857    /// Returns [`EngineError`] if the PRAGMA query fails or if the connection
1858    /// mutex has been poisoned.
1859    #[doc(hidden)]
1860    pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
1861        let conn = self.lock_connection()?;
1862        let result = conn
1863            .query_row(&format!("PRAGMA {name}"), [], |row| {
1864                // PRAGMAs may return TEXT or INTEGER; normalise to String.
1865                row.get::<_, rusqlite::types::Value>(0)
1866            })
1867            .map_err(EngineError::Sqlite)?;
1868        let s = match result {
1869            rusqlite::types::Value::Text(t) => t,
1870            rusqlite::types::Value::Integer(i) => i.to_string(),
1871            rusqlite::types::Value::Real(f) => f.to_string(),
1872            rusqlite::types::Value::Blob(_) => {
1873                return Err(EngineError::InvalidWrite(format!(
1874                    "PRAGMA {name} returned an unexpected BLOB value"
1875                )));
1876            }
1877            rusqlite::types::Value::Null => String::new(),
1878        };
1879        Ok(s)
1880    }
1881
1882    /// Return all provenance events whose `subject` matches the given value.
1883    ///
1884    /// Subjects are logical node IDs (for retire/upsert events) or `source_ref`
1885    /// values (for excise events).
1886    ///
1887    /// # Errors
1888    /// Returns [`EngineError`] if the query fails or if the connection mutex
1889    /// has been poisoned.
1890    pub fn query_provenance_events(
1891        &self,
1892        subject: &str,
1893    ) -> Result<Vec<ProvenanceEvent>, EngineError> {
1894        let conn = self.lock_connection()?;
1895        let mut stmt = conn
1896            .prepare_cached(
1897                "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
1898                 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
1899            )
1900            .map_err(EngineError::Sqlite)?;
1901        let events = stmt
1902            .query_map(rusqlite::params![subject], |row| {
1903                Ok(ProvenanceEvent {
1904                    id: row.get(0)?,
1905                    event_type: row.get(1)?,
1906                    subject: row.get(2)?,
1907                    source_ref: row.get(3)?,
1908                    metadata_json: row.get(4)?,
1909                    created_at: row.get(5)?,
1910                })
1911            })
1912            .map_err(EngineError::Sqlite)?
1913            .collect::<Result<Vec<_>, _>>()
1914            .map_err(EngineError::Sqlite)?;
1915        Ok(events)
1916    }
1917}
1918
1919fn wrap_node_row_projection_sql(base_sql: &str) -> String {
1920    format!(
1921        "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
1922         FROM ({base_sql}) q \
1923         LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
1924    )
1925}
1926
1927/// Returns `true` when `err` indicates the vec virtual table is absent
1928/// (sqlite-vec feature enabled but `vec_nodes_active` not yet created).
1929pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
1930    match err {
1931        rusqlite::Error::SqliteFailure(_, Some(msg)) => {
1932            msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
1933        }
1934        _ => false,
1935    }
1936}
1937
1938fn scalar_to_bind(value: &ScalarValue) -> BindValue {
1939    match value {
1940        ScalarValue::Text(text) => BindValue::Text(text.clone()),
1941        ScalarValue::Integer(integer) => BindValue::Integer(*integer),
1942        ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
1943    }
1944}
1945
1946/// Merge strict and relaxed search branches into a single block-ordered,
1947/// deduplicated, limit-truncated hit list.
1948///
1949/// Phase 3 rules, in order:
1950///
1951/// 1. Each branch is sorted internally by score descending with `logical_id`
1952///    ascending as the deterministic tiebreak.
1953/// 2. Within a single branch, if the same `logical_id` appears twice (e.g.
1954///    once from the chunk surface and once from the property surface) the
1955///    higher-score row wins, then chunk > property > vector, then declaration
1956///    order (chunk first).
1957/// 3. Strict hits form one block and relaxed hits form the next. Strict
1958///    always precedes relaxed in the merged output regardless of per-hit
1959///    score.
1960/// 4. Cross-branch dedup is strict-wins: any relaxed hit whose `logical_id`
1961///    already appears in the strict block is dropped.
1962/// 5. The merged output is truncated to `limit`.
1963fn merge_search_branches(
1964    strict: Vec<SearchHit>,
1965    relaxed: Vec<SearchHit>,
1966    limit: usize,
1967) -> Vec<SearchHit> {
1968    merge_search_branches_three(strict, relaxed, Vec::new(), limit)
1969}
1970
1971/// Three-branch generalization of [`merge_search_branches`]: orders hits as
1972/// (strict block, relaxed block, vector block) per addendum 1 §Fusion
1973/// Semantics, with cross-branch dedup resolved by branch precedence
1974/// (strict > relaxed > vector). Within each block the existing
1975/// [`dedup_branch_hits`] rule applies (score desc, `logical_id` asc, source
1976/// priority chunk > property > vector).
1977///
1978/// Phase 12 (the unified `search()` entry point) calls this directly. The
1979/// two-branch [`merge_search_branches`] wrapper is preserved as a
1980/// convenience for the text-only `execute_compiled_search_plan` path; both
1981/// reduce to the same code.
1982fn merge_search_branches_three(
1983    strict: Vec<SearchHit>,
1984    relaxed: Vec<SearchHit>,
1985    vector: Vec<SearchHit>,
1986    limit: usize,
1987) -> Vec<SearchHit> {
1988    let strict_block = dedup_branch_hits(strict);
1989    let relaxed_block = dedup_branch_hits(relaxed);
1990    let vector_block = dedup_branch_hits(vector);
1991
1992    let mut seen: std::collections::HashSet<String> = strict_block
1993        .iter()
1994        .map(|h| h.node.logical_id.clone())
1995        .collect();
1996
1997    let mut merged = strict_block;
1998    for hit in relaxed_block {
1999        if seen.insert(hit.node.logical_id.clone()) {
2000            merged.push(hit);
2001        }
2002    }
2003    for hit in vector_block {
2004        if seen.insert(hit.node.logical_id.clone()) {
2005            merged.push(hit);
2006        }
2007    }
2008
2009    if merged.len() > limit {
2010        merged.truncate(limit);
2011    }
2012    merged
2013}
2014
2015/// Sort a branch's hits by score descending + `logical_id` ascending, then
2016/// dedup duplicate `logical_id`s within the branch using source priority
2017/// (chunk > property > vector) and declaration order.
2018fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2019    hits.sort_by(|a, b| {
2020        b.score
2021            .partial_cmp(&a.score)
2022            .unwrap_or(std::cmp::Ordering::Equal)
2023            .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2024            .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2025    });
2026
2027    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2028    hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2029    hits
2030}
2031
2032fn source_priority(source: SearchHitSource) -> u8 {
2033    // Lower is better. Chunk is declared before property in the CTE; vector
2034    // is reserved for future wiring but comes last among the known variants.
2035    match source {
2036        SearchHitSource::Chunk => 0,
2037        SearchHitSource::Property => 1,
2038        SearchHitSource::Vector => 2,
2039    }
2040}
2041
2042/// Sentinel markers used to wrap FTS5-matched terms in the `highlight()`
2043/// output so the coordinator can recover per-term byte offsets in the
2044/// original `text_content` column.
2045///
2046/// Each sentinel is a single `U+0001` ("start of heading") / `U+0002`
2047/// ("start of text") byte. These bytes are safe for all valid JSON text
2048/// *except* deliberately escape-injected `\u0001` / `\u0002` sequences: a
2049/// payload like `{"x":"\u0001"}` decodes to a literal 0x01 byte in the
2050/// extracted blob, making the sentinel ambiguous for that row. Such input
2051/// is treated as out of scope for attribution correctness — hits on those
2052/// rows may have misattributed `matched_paths`, but no panic or query
2053/// failure occurs. A future hardening step could strip bytes < 0x20 at
2054/// blob-emission time in `RecursiveWalker::emit_leaf` to close this gap.
2055///
2056/// Using one-byte markers keeps the original-to-highlighted offset
2057/// accounting trivial: every sentinel adds exactly one byte to the
2058/// highlighted string.
2059const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2060const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2061
2062/// Load the `fts_node_property_positions` sidecar rows for a given
2063/// `(logical_id, kind)` ordered by `start_offset`. Returns a vector of
2064/// `(start_offset, end_offset, leaf_path)` tuples ready for binary search.
2065fn load_position_map(
2066    conn: &Connection,
2067    logical_id: &str,
2068    kind: &str,
2069) -> Result<Vec<(usize, usize, String)>, EngineError> {
2070    let mut stmt = conn
2071        .prepare_cached(
2072            "SELECT start_offset, end_offset, leaf_path \
2073             FROM fts_node_property_positions \
2074             WHERE node_logical_id = ?1 AND kind = ?2 \
2075             ORDER BY start_offset ASC",
2076        )
2077        .map_err(EngineError::Sqlite)?;
2078    let rows = stmt
2079        .query_map(rusqlite::params![logical_id, kind], |row| {
2080            let start: i64 = row.get(0)?;
2081            let end: i64 = row.get(1)?;
2082            let path: String = row.get(2)?;
2083            // Offsets are non-negative and within blob byte limits; on the
2084            // off chance a corrupt row is encountered, fall back to 0 so
2085            // lookups silently skip it rather than panicking.
2086            let start = usize::try_from(start).unwrap_or(0);
2087            let end = usize::try_from(end).unwrap_or(0);
2088            Ok((start, end, path))
2089        })
2090        .map_err(EngineError::Sqlite)?;
2091    let mut out = Vec::new();
2092    for row in rows {
2093        out.push(row.map_err(EngineError::Sqlite)?);
2094    }
2095    Ok(out)
2096}
2097
2098/// Parse a `highlight()`-wrapped string, returning the list of original-text
2099/// byte offsets at which matched terms begin. `wrapped` is the
2100/// highlight-decorated form of a text column; `open` / `close` are the
2101/// sentinel markers passed to `highlight()`. The returned offsets refer to
2102/// positions in the *original* text (i.e. the column as it would be stored
2103/// without highlight decoration).
2104fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2105    let mut offsets = Vec::new();
2106    let bytes = wrapped.as_bytes();
2107    let open_bytes = open.as_bytes();
2108    let close_bytes = close.as_bytes();
2109    let mut i = 0usize;
2110    // Number of sentinel bytes consumed so far — every marker encountered
2111    // subtracts from the wrapped-string index to get the original offset.
2112    let mut marker_bytes_seen = 0usize;
2113    while i < bytes.len() {
2114        if bytes[i..].starts_with(open_bytes) {
2115            // Record the original-text offset of the term following the open
2116            // marker.
2117            let original_offset = i - marker_bytes_seen;
2118            offsets.push(original_offset);
2119            i += open_bytes.len();
2120            marker_bytes_seen += open_bytes.len();
2121        } else if bytes[i..].starts_with(close_bytes) {
2122            i += close_bytes.len();
2123            marker_bytes_seen += close_bytes.len();
2124        } else {
2125            i += 1;
2126        }
2127    }
2128    offsets
2129}
2130
2131/// Binary-search the position map for the leaf whose `[start, end)` range
2132/// contains `offset`. Returns `None` if no leaf covers the offset.
2133fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
2134    // Binary search for the greatest start_offset <= offset.
2135    let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
2136        Ok(i) => i,
2137        Err(0) => return None,
2138        Err(i) => i - 1,
2139    };
2140    let (start, end, path) = &positions[idx];
2141    if offset >= *start && offset < *end {
2142        Some(path.as_str())
2143    } else {
2144        None
2145    }
2146}
2147
2148/// Resolve per-hit match attribution by introspecting the FTS5 match state
2149/// for the hit's row via `highlight()` and mapping the resulting original-
2150/// text offsets back to recursive-leaf paths via the Phase 4 position map.
2151///
2152/// Chunk-backed hits carry no leaf structure and always return an empty
2153/// `matched_paths` vector. Property-backed hits without a `projection_row_id`
2154/// (which should not happen — the search CTE always populates it) also
2155/// return empty attribution rather than erroring.
2156fn resolve_hit_attribution(
2157    conn: &Connection,
2158    hit: &SearchHit,
2159    match_expr: &str,
2160) -> Result<HitAttribution, EngineError> {
2161    if !matches!(hit.source, SearchHitSource::Property) {
2162        return Ok(HitAttribution {
2163            matched_paths: Vec::new(),
2164        });
2165    }
2166    let Some(rowid_str) = hit.projection_row_id.as_deref() else {
2167        return Ok(HitAttribution {
2168            matched_paths: Vec::new(),
2169        });
2170    };
2171    let rowid: i64 = match rowid_str.parse() {
2172        Ok(v) => v,
2173        Err(_) => {
2174            return Ok(HitAttribution {
2175                matched_paths: Vec::new(),
2176            });
2177        }
2178    };
2179
2180    // Fetch the highlight-wrapped text_content for this hit's FTS row. The
2181    // FTS5 MATCH in the WHERE clause re-establishes the match state that
2182    // `highlight()` needs to decorate the returned text.
2183    let mut stmt = conn
2184        .prepare_cached(
2185            "SELECT highlight(fts_node_properties, 2, ?1, ?2) \
2186             FROM fts_node_properties \
2187             WHERE rowid = ?3 AND fts_node_properties MATCH ?4",
2188        )
2189        .map_err(EngineError::Sqlite)?;
2190    let wrapped: Option<String> = stmt
2191        .query_row(
2192            rusqlite::params![
2193                ATTRIBUTION_HIGHLIGHT_OPEN,
2194                ATTRIBUTION_HIGHLIGHT_CLOSE,
2195                rowid,
2196                match_expr,
2197            ],
2198            |row| row.get(0),
2199        )
2200        .optional()
2201        .map_err(EngineError::Sqlite)?;
2202    let Some(wrapped) = wrapped else {
2203        return Ok(HitAttribution {
2204            matched_paths: Vec::new(),
2205        });
2206    };
2207
2208    let offsets = parse_highlight_offsets(
2209        &wrapped,
2210        ATTRIBUTION_HIGHLIGHT_OPEN,
2211        ATTRIBUTION_HIGHLIGHT_CLOSE,
2212    );
2213    if offsets.is_empty() {
2214        return Ok(HitAttribution {
2215            matched_paths: Vec::new(),
2216        });
2217    }
2218
2219    let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
2220    if positions.is_empty() {
2221        // Scalar-only schemas have no position-map entries; attribution
2222        // degrades to an empty vector rather than erroring.
2223        return Ok(HitAttribution {
2224            matched_paths: Vec::new(),
2225        });
2226    }
2227
2228    let mut matched_paths: Vec<String> = Vec::new();
2229    for offset in offsets {
2230        if let Some(path) = find_leaf_for_offset(&positions, offset)
2231            && !matched_paths.iter().any(|p| p == path)
2232        {
2233            matched_paths.push(path.to_owned());
2234        }
2235    }
2236    Ok(HitAttribution { matched_paths })
2237}
2238
2239fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
2240    match value {
2241        fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
2242        fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
2243        fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
2244    }
2245}
2246
2247#[cfg(test)]
2248#[allow(clippy::expect_used)]
2249mod tests {
2250    use std::panic::{AssertUnwindSafe, catch_unwind};
2251    use std::sync::Arc;
2252
2253    use fathomdb_query::{BindValue, QueryBuilder};
2254    use fathomdb_schema::SchemaManager;
2255    use rusqlite::types::Value;
2256    use tempfile::NamedTempFile;
2257
2258    use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
2259
2260    use fathomdb_query::{
2261        NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
2262    };
2263
2264    use super::{
2265        bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
2266        wrap_node_row_projection_sql,
2267    };
2268
2269    fn mk_hit(
2270        logical_id: &str,
2271        score: f64,
2272        match_mode: SearchMatchMode,
2273        source: SearchHitSource,
2274    ) -> SearchHit {
2275        SearchHit {
2276            node: NodeRowLite {
2277                row_id: format!("{logical_id}-row"),
2278                logical_id: logical_id.to_owned(),
2279                kind: "Goal".to_owned(),
2280                properties: "{}".to_owned(),
2281                content_ref: None,
2282                last_accessed_at: None,
2283            },
2284            score,
2285            modality: RetrievalModality::Text,
2286            source,
2287            match_mode: Some(match_mode),
2288            snippet: None,
2289            written_at: 0,
2290            projection_row_id: None,
2291            vector_distance: None,
2292            attribution: None,
2293        }
2294    }
2295
2296    #[test]
2297    fn merge_places_strict_block_before_relaxed_regardless_of_score() {
2298        let strict = vec![mk_hit(
2299            "a",
2300            1.0,
2301            SearchMatchMode::Strict,
2302            SearchHitSource::Chunk,
2303        )];
2304        // Relaxed has a higher score but must still come second.
2305        let relaxed = vec![mk_hit(
2306            "b",
2307            9.9,
2308            SearchMatchMode::Relaxed,
2309            SearchHitSource::Chunk,
2310        )];
2311        let merged = merge_search_branches(strict, relaxed, 10);
2312        assert_eq!(merged.len(), 2);
2313        assert_eq!(merged[0].node.logical_id, "a");
2314        assert!(matches!(
2315            merged[0].match_mode,
2316            Some(SearchMatchMode::Strict)
2317        ));
2318        assert_eq!(merged[1].node.logical_id, "b");
2319        assert!(matches!(
2320            merged[1].match_mode,
2321            Some(SearchMatchMode::Relaxed)
2322        ));
2323    }
2324
2325    #[test]
2326    fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
2327        let strict = vec![mk_hit(
2328            "shared",
2329            1.0,
2330            SearchMatchMode::Strict,
2331            SearchHitSource::Chunk,
2332        )];
2333        let relaxed = vec![
2334            mk_hit(
2335                "shared",
2336                9.9,
2337                SearchMatchMode::Relaxed,
2338                SearchHitSource::Chunk,
2339            ),
2340            mk_hit(
2341                "other",
2342                2.0,
2343                SearchMatchMode::Relaxed,
2344                SearchHitSource::Chunk,
2345            ),
2346        ];
2347        let merged = merge_search_branches(strict, relaxed, 10);
2348        assert_eq!(merged.len(), 2);
2349        assert_eq!(merged[0].node.logical_id, "shared");
2350        assert!(matches!(
2351            merged[0].match_mode,
2352            Some(SearchMatchMode::Strict)
2353        ));
2354        assert_eq!(merged[1].node.logical_id, "other");
2355        assert!(matches!(
2356            merged[1].match_mode,
2357            Some(SearchMatchMode::Relaxed)
2358        ));
2359    }
2360
2361    #[test]
2362    fn merge_sorts_within_block_by_score_desc_then_logical_id() {
2363        let strict = vec![
2364            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2365            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2366            mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2367        ];
2368        let merged = merge_search_branches(strict, vec![], 10);
2369        assert_eq!(
2370            merged
2371                .iter()
2372                .map(|h| &h.node.logical_id)
2373                .collect::<Vec<_>>(),
2374            vec!["a", "c", "b"]
2375        );
2376    }
2377
2378    #[test]
2379    fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
2380        let strict = vec![
2381            mk_hit(
2382                "shared",
2383                1.0,
2384                SearchMatchMode::Strict,
2385                SearchHitSource::Property,
2386            ),
2387            mk_hit(
2388                "shared",
2389                1.0,
2390                SearchMatchMode::Strict,
2391                SearchHitSource::Chunk,
2392            ),
2393        ];
2394        let merged = merge_search_branches(strict, vec![], 10);
2395        assert_eq!(merged.len(), 1);
2396        assert!(matches!(merged[0].source, SearchHitSource::Chunk));
2397    }
2398
2399    #[test]
2400    fn merge_truncates_to_limit_after_block_merge() {
2401        let strict = vec![
2402            mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2403            mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2404        ];
2405        let relaxed = vec![mk_hit(
2406            "c",
2407            9.0,
2408            SearchMatchMode::Relaxed,
2409            SearchHitSource::Chunk,
2410        )];
2411        let merged = merge_search_branches(strict, relaxed, 2);
2412        assert_eq!(merged.len(), 2);
2413        assert_eq!(merged[0].node.logical_id, "a");
2414        assert_eq!(merged[1].node.logical_id, "b");
2415    }
2416
2417    /// P12 architectural pin: the generalized three-branch merger must
2418    /// produce strict -> relaxed -> vector block ordering with cross-branch
2419    /// dedup resolved by branch precedence (strict > relaxed > vector).
2420    /// v1 `search()` policy never fires the vector branch through the
2421    /// unified planner because read-time embedding is deferred, but the
2422    /// merge helper itself must be ready for the day the planner does so —
2423    /// otherwise wiring the future phase requires touching the core merge
2424    /// code as well as the planner.
2425    #[test]
2426    fn search_architecturally_supports_three_branch_fusion() {
2427        let strict = vec![mk_hit(
2428            "alpha",
2429            1.0,
2430            SearchMatchMode::Strict,
2431            SearchHitSource::Chunk,
2432        )];
2433        let relaxed = vec![mk_hit(
2434            "bravo",
2435            5.0,
2436            SearchMatchMode::Relaxed,
2437            SearchHitSource::Chunk,
2438        )];
2439        // Synthetic vector hit with the highest score. Three-block ordering
2440        // must still place it last.
2441        let mut vector_hit = mk_hit(
2442            "charlie",
2443            9.9,
2444            SearchMatchMode::Strict,
2445            SearchHitSource::Vector,
2446        );
2447        // Vector hits actually carry match_mode=None per the addendum, but
2448        // the merge helper's ordering is mode-agnostic; we override here to
2449        // pin the modality field for the test.
2450        vector_hit.match_mode = None;
2451        vector_hit.modality = RetrievalModality::Vector;
2452        let vector = vec![vector_hit];
2453
2454        let merged = merge_search_branches_three(strict, relaxed, vector, 10);
2455        assert_eq!(merged.len(), 3);
2456        assert_eq!(merged[0].node.logical_id, "alpha");
2457        assert_eq!(merged[1].node.logical_id, "bravo");
2458        assert_eq!(merged[2].node.logical_id, "charlie");
2459        // Vector block comes last regardless of its higher score.
2460        assert!(matches!(merged[2].source, SearchHitSource::Vector));
2461
2462        // Cross-branch dedup: a logical_id that appears in multiple branches
2463        // is attributed to its highest-priority originating branch only.
2464        let strict2 = vec![mk_hit(
2465            "shared",
2466            0.5,
2467            SearchMatchMode::Strict,
2468            SearchHitSource::Chunk,
2469        )];
2470        let relaxed2 = vec![mk_hit(
2471            "shared",
2472            5.0,
2473            SearchMatchMode::Relaxed,
2474            SearchHitSource::Chunk,
2475        )];
2476        let mut vshared = mk_hit(
2477            "shared",
2478            9.9,
2479            SearchMatchMode::Strict,
2480            SearchHitSource::Vector,
2481        );
2482        vshared.match_mode = None;
2483        vshared.modality = RetrievalModality::Vector;
2484        let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
2485        assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
2486        assert!(matches!(
2487            merged2[0].match_mode,
2488            Some(SearchMatchMode::Strict)
2489        ));
2490        assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
2491
2492        // Relaxed wins over vector when strict is absent.
2493        let mut vshared2 = mk_hit(
2494            "shared",
2495            9.9,
2496            SearchMatchMode::Strict,
2497            SearchHitSource::Vector,
2498        );
2499        vshared2.match_mode = None;
2500        vshared2.modality = RetrievalModality::Vector;
2501        let merged3 = merge_search_branches_three(
2502            vec![],
2503            vec![mk_hit(
2504                "shared",
2505                1.0,
2506                SearchMatchMode::Relaxed,
2507                SearchHitSource::Chunk,
2508            )],
2509            vec![vshared2],
2510            10,
2511        );
2512        assert_eq!(merged3.len(), 1);
2513        assert!(matches!(
2514            merged3[0].match_mode,
2515            Some(SearchMatchMode::Relaxed)
2516        ));
2517    }
2518
2519    /// P12-N-3: production-realistic vector-only fusion. The v1 planner
2520    /// never fires this shape today (read-time embedding is deferred), but
2521    /// when it does the merger will see empty strict + empty relaxed + a
2522    /// non-empty vector block. The three-branch merger must pass that
2523    /// block through unchanged, preserving `RetrievalModality::Vector`,
2524    /// `SearchHitSource::Vector`, and `match_mode == None` semantics.
2525    ///
2526    /// Note: the review spec asked for `vector_hit_count == 1` /
2527    /// `strict_hit_count == 0` assertions. Those are fields on
2528    /// `SearchRows`, which is assembled one layer up in
2529    /// `execute_compiled_retrieval_plan`. The merger returns a bare
2530    /// `Vec<SearchHit>`, so this test asserts the corresponding invariants
2531    /// directly on the returned vec (block shape + per-hit fields).
2532    #[test]
2533    fn merge_search_branches_three_vector_only_preserves_vector_block() {
2534        let mut vector_hit = mk_hit(
2535            "solo",
2536            0.75,
2537            SearchMatchMode::Strict,
2538            SearchHitSource::Vector,
2539        );
2540        vector_hit.match_mode = None;
2541        vector_hit.modality = RetrievalModality::Vector;
2542
2543        let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
2544
2545        assert_eq!(merged.len(), 1);
2546        assert_eq!(merged[0].node.logical_id, "solo");
2547        assert!(matches!(merged[0].source, SearchHitSource::Vector));
2548        assert!(matches!(merged[0].modality, RetrievalModality::Vector));
2549        assert!(
2550            merged[0].match_mode.is_none(),
2551            "vector hits carry match_mode=None per addendum 1"
2552        );
2553    }
2554
2555    /// P12-N-3: limit truncation must preserve block precedence — when the
2556    /// strict block alone already exceeds the limit, relaxed and vector
2557    /// hits must be dropped entirely even if they have higher raw scores.
2558    ///
2559    /// Note: the review spec asked for `strict_hit_count == 2` /
2560    /// `relaxed_hit_count == 0` / `vector_hit_count == 0` assertions, which
2561    /// are `SearchRows` fields assembled one layer up. Since
2562    /// `merge_search_branches_three` only returns a `Vec<SearchHit>`, this
2563    /// test asserts the corresponding invariants directly: the returned
2564    /// vec contains exactly the top two strict hits, with no relaxed or
2565    /// vector hits leaking past the limit.
2566    #[test]
2567    fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
2568        let strict = vec![
2569            mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2570            mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2571            mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2572        ];
2573        let relaxed = vec![mk_hit(
2574            "d",
2575            9.0,
2576            SearchMatchMode::Relaxed,
2577            SearchHitSource::Chunk,
2578        )];
2579        let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
2580        vector_hit.match_mode = None;
2581        vector_hit.modality = RetrievalModality::Vector;
2582        let vector = vec![vector_hit];
2583
2584        let merged = merge_search_branches_three(strict, relaxed, vector, 2);
2585
2586        assert_eq!(merged.len(), 2);
2587        assert_eq!(merged[0].node.logical_id, "a");
2588        assert_eq!(merged[1].node.logical_id, "b");
2589        // Neither relaxed nor vector hits made it past the limit.
2590        assert!(
2591            merged
2592                .iter()
2593                .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
2594            "strict block must win limit contention against higher-scored relaxed/vector hits"
2595        );
2596        assert!(
2597            merged
2598                .iter()
2599                .all(|h| matches!(h.source, SearchHitSource::Chunk)),
2600            "no vector source hits should leak past the limit"
2601        );
2602    }
2603
2604    #[test]
2605    fn is_vec_table_absent_matches_known_error_messages() {
2606        use rusqlite::ffi;
2607        fn make_err(msg: &str) -> rusqlite::Error {
2608            rusqlite::Error::SqliteFailure(
2609                ffi::Error {
2610                    code: ffi::ErrorCode::Unknown,
2611                    extended_code: 1,
2612                },
2613                Some(msg.to_owned()),
2614            )
2615        }
2616        assert!(is_vec_table_absent(&make_err(
2617            "no such table: vec_nodes_active"
2618        )));
2619        assert!(is_vec_table_absent(&make_err("no such module: vec0")));
2620        assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
2621        assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
2622        assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
2623    }
2624
2625    #[test]
2626    fn bind_value_text_maps_to_sql_text() {
2627        let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
2628        assert_eq!(val, Value::Text("hello".to_owned()));
2629    }
2630
2631    #[test]
2632    fn bind_value_integer_maps_to_sql_integer() {
2633        let val = bind_value_to_sql(&BindValue::Integer(42));
2634        assert_eq!(val, Value::Integer(42));
2635    }
2636
2637    #[test]
2638    fn bind_value_bool_true_maps_to_integer_one() {
2639        let val = bind_value_to_sql(&BindValue::Bool(true));
2640        assert_eq!(val, Value::Integer(1));
2641    }
2642
2643    #[test]
2644    fn bind_value_bool_false_maps_to_integer_zero() {
2645        let val = bind_value_to_sql(&BindValue::Bool(false));
2646        assert_eq!(val, Value::Integer(0));
2647    }
2648
2649    #[test]
2650    fn same_shape_queries_share_one_cache_entry() {
2651        let db = NamedTempFile::new().expect("temporary db");
2652        let coordinator = ExecutionCoordinator::open(
2653            db.path(),
2654            Arc::new(SchemaManager::new()),
2655            None,
2656            1,
2657            Arc::new(TelemetryCounters::default()),
2658            None,
2659        )
2660        .expect("coordinator");
2661
2662        let compiled_a = QueryBuilder::nodes("Meeting")
2663            .text_search("budget", 5)
2664            .limit(10)
2665            .compile()
2666            .expect("compiled a");
2667        let compiled_b = QueryBuilder::nodes("Meeting")
2668            .text_search("standup", 5)
2669            .limit(10)
2670            .compile()
2671            .expect("compiled b");
2672
2673        coordinator
2674            .execute_compiled_read(&compiled_a)
2675            .expect("read a");
2676        coordinator
2677            .execute_compiled_read(&compiled_b)
2678            .expect("read b");
2679
2680        assert_eq!(
2681            compiled_a.shape_hash, compiled_b.shape_hash,
2682            "different bind values, same structural shape → same hash"
2683        );
2684        assert_eq!(coordinator.shape_sql_count(), 1);
2685    }
2686
2687    #[test]
2688    fn vector_read_degrades_gracefully_when_vec_table_absent() {
2689        let db = NamedTempFile::new().expect("temporary db");
2690        let coordinator = ExecutionCoordinator::open(
2691            db.path(),
2692            Arc::new(SchemaManager::new()),
2693            None,
2694            1,
2695            Arc::new(TelemetryCounters::default()),
2696            None,
2697        )
2698        .expect("coordinator");
2699
2700        let compiled = QueryBuilder::nodes("Meeting")
2701            .vector_search("budget embeddings", 5)
2702            .compile()
2703            .expect("vector query compiles");
2704
2705        let result = coordinator.execute_compiled_read(&compiled);
2706        let rows = result.expect("degraded read must succeed, not error");
2707        assert!(
2708            rows.was_degraded,
2709            "result must be flagged as degraded when vec_nodes_active is absent"
2710        );
2711        assert!(
2712            rows.nodes.is_empty(),
2713            "degraded result must return empty nodes"
2714        );
2715    }
2716
2717    #[test]
2718    fn coordinator_caches_by_shape_hash() {
2719        let db = NamedTempFile::new().expect("temporary db");
2720        let coordinator = ExecutionCoordinator::open(
2721            db.path(),
2722            Arc::new(SchemaManager::new()),
2723            None,
2724            1,
2725            Arc::new(TelemetryCounters::default()),
2726            None,
2727        )
2728        .expect("coordinator");
2729
2730        let compiled = QueryBuilder::nodes("Meeting")
2731            .text_search("budget", 5)
2732            .compile()
2733            .expect("compiled query");
2734
2735        coordinator
2736            .execute_compiled_read(&compiled)
2737            .expect("execute compiled read");
2738        assert_eq!(coordinator.shape_sql_count(), 1);
2739    }
2740
2741    // --- Item 6: explain_compiled_read tests ---
2742
2743    #[test]
2744    fn explain_returns_correct_sql() {
2745        let db = NamedTempFile::new().expect("temporary db");
2746        let coordinator = ExecutionCoordinator::open(
2747            db.path(),
2748            Arc::new(SchemaManager::new()),
2749            None,
2750            1,
2751            Arc::new(TelemetryCounters::default()),
2752            None,
2753        )
2754        .expect("coordinator");
2755
2756        let compiled = QueryBuilder::nodes("Meeting")
2757            .text_search("budget", 5)
2758            .compile()
2759            .expect("compiled query");
2760
2761        let plan = coordinator.explain_compiled_read(&compiled);
2762
2763        assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
2764    }
2765
2766    #[test]
2767    fn explain_returns_correct_driving_table() {
2768        use fathomdb_query::DrivingTable;
2769
2770        let db = NamedTempFile::new().expect("temporary db");
2771        let coordinator = ExecutionCoordinator::open(
2772            db.path(),
2773            Arc::new(SchemaManager::new()),
2774            None,
2775            1,
2776            Arc::new(TelemetryCounters::default()),
2777            None,
2778        )
2779        .expect("coordinator");
2780
2781        let compiled = QueryBuilder::nodes("Meeting")
2782            .text_search("budget", 5)
2783            .compile()
2784            .expect("compiled query");
2785
2786        let plan = coordinator.explain_compiled_read(&compiled);
2787
2788        assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
2789    }
2790
2791    #[test]
2792    fn explain_reports_cache_miss_then_hit() {
2793        let db = NamedTempFile::new().expect("temporary db");
2794        let coordinator = ExecutionCoordinator::open(
2795            db.path(),
2796            Arc::new(SchemaManager::new()),
2797            None,
2798            1,
2799            Arc::new(TelemetryCounters::default()),
2800            None,
2801        )
2802        .expect("coordinator");
2803
2804        let compiled = QueryBuilder::nodes("Meeting")
2805            .text_search("budget", 5)
2806            .compile()
2807            .expect("compiled query");
2808
2809        // Before execution: cache miss
2810        let plan_before = coordinator.explain_compiled_read(&compiled);
2811        assert!(
2812            !plan_before.cache_hit,
2813            "cache miss expected before first execute"
2814        );
2815
2816        // Execute to populate cache
2817        coordinator
2818            .execute_compiled_read(&compiled)
2819            .expect("execute read");
2820
2821        // After execution: cache hit
2822        let plan_after = coordinator.explain_compiled_read(&compiled);
2823        assert!(
2824            plan_after.cache_hit,
2825            "cache hit expected after first execute"
2826        );
2827    }
2828
2829    #[test]
2830    fn explain_does_not_execute_query() {
2831        // Call explain_compiled_read on an empty database. If explain were
2832        // actually executing SQL, it would return Ok with 0 rows. But the
2833        // key assertion is that it returns a QueryPlan (not an error) even
2834        // without touching the database.
2835        let db = NamedTempFile::new().expect("temporary db");
2836        let coordinator = ExecutionCoordinator::open(
2837            db.path(),
2838            Arc::new(SchemaManager::new()),
2839            None,
2840            1,
2841            Arc::new(TelemetryCounters::default()),
2842            None,
2843        )
2844        .expect("coordinator");
2845
2846        let compiled = QueryBuilder::nodes("Meeting")
2847            .text_search("anything", 5)
2848            .compile()
2849            .expect("compiled query");
2850
2851        // This must not error, even though the database is empty
2852        let plan = coordinator.explain_compiled_read(&compiled);
2853
2854        assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
2855        assert_eq!(plan.bind_count, compiled.binds.len());
2856    }
2857
2858    #[test]
2859    fn coordinator_executes_compiled_read() {
2860        let db = NamedTempFile::new().expect("temporary db");
2861        let coordinator = ExecutionCoordinator::open(
2862            db.path(),
2863            Arc::new(SchemaManager::new()),
2864            None,
2865            1,
2866            Arc::new(TelemetryCounters::default()),
2867            None,
2868        )
2869        .expect("coordinator");
2870        let conn = rusqlite::Connection::open(db.path()).expect("open db");
2871
2872        conn.execute_batch(
2873            r#"
2874            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
2875            VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
2876            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
2877            VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
2878            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
2879            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
2880            "#,
2881        )
2882        .expect("seed data");
2883
2884        let compiled = QueryBuilder::nodes("Meeting")
2885            .text_search("budget", 5)
2886            .limit(5)
2887            .compile()
2888            .expect("compiled query");
2889
2890        let rows = coordinator
2891            .execute_compiled_read(&compiled)
2892            .expect("execute read");
2893
2894        assert_eq!(rows.nodes.len(), 1);
2895        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
2896    }
2897
2898    #[test]
2899    fn text_search_finds_structured_only_node_via_property_fts() {
2900        let db = NamedTempFile::new().expect("temporary db");
2901        let coordinator = ExecutionCoordinator::open(
2902            db.path(),
2903            Arc::new(SchemaManager::new()),
2904            None,
2905            1,
2906            Arc::new(TelemetryCounters::default()),
2907            None,
2908        )
2909        .expect("coordinator");
2910        let conn = rusqlite::Connection::open(db.path()).expect("open db");
2911
2912        // Insert a structured-only node (no chunks) with a property FTS row.
2913        conn.execute_batch(
2914            r#"
2915            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2916            VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
2917            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
2918            VALUES ('goal-1', 'Goal', 'Ship v2');
2919            "#,
2920        )
2921        .expect("seed data");
2922
2923        let compiled = QueryBuilder::nodes("Goal")
2924            .text_search("Ship", 5)
2925            .limit(5)
2926            .compile()
2927            .expect("compiled query");
2928
2929        let rows = coordinator
2930            .execute_compiled_read(&compiled)
2931            .expect("execute read");
2932
2933        assert_eq!(rows.nodes.len(), 1);
2934        assert_eq!(rows.nodes[0].logical_id, "goal-1");
2935    }
2936
2937    #[test]
2938    fn text_search_returns_both_chunk_and_property_backed_hits() {
2939        let db = NamedTempFile::new().expect("temporary db");
2940        let coordinator = ExecutionCoordinator::open(
2941            db.path(),
2942            Arc::new(SchemaManager::new()),
2943            None,
2944            1,
2945            Arc::new(TelemetryCounters::default()),
2946            None,
2947        )
2948        .expect("coordinator");
2949        let conn = rusqlite::Connection::open(db.path()).expect("open db");
2950
2951        // Chunk-backed hit: a Meeting with a chunk containing "quarterly".
2952        conn.execute_batch(
2953            r"
2954            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2955            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
2956            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
2957            VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
2958            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
2959            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
2960            ",
2961        )
2962        .expect("seed chunk-backed node");
2963
2964        // Property-backed hit: a Meeting with property FTS containing "quarterly".
2965        conn.execute_batch(
2966            r#"
2967            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2968            VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
2969            INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
2970            VALUES ('meeting-2', 'Meeting', 'quarterly sync');
2971            "#,
2972        )
2973        .expect("seed property-backed node");
2974
2975        let compiled = QueryBuilder::nodes("Meeting")
2976            .text_search("quarterly", 10)
2977            .limit(10)
2978            .compile()
2979            .expect("compiled query");
2980
2981        let rows = coordinator
2982            .execute_compiled_read(&compiled)
2983            .expect("execute read");
2984
2985        let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
2986        ids.sort_unstable();
2987        assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
2988    }
2989
2990    #[test]
2991    fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
2992        let db = NamedTempFile::new().expect("temporary db");
2993        let coordinator = ExecutionCoordinator::open(
2994            db.path(),
2995            Arc::new(SchemaManager::new()),
2996            None,
2997            1,
2998            Arc::new(TelemetryCounters::default()),
2999            None,
3000        )
3001        .expect("coordinator");
3002        let conn = rusqlite::Connection::open(db.path()).expect("open db");
3003
3004        conn.execute_batch(
3005            r"
3006            INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3007            VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3008            INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3009            VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3010            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3011            VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3012            ",
3013        )
3014        .expect("seed chunk-backed node");
3015
3016        let compiled = QueryBuilder::nodes("Meeting")
3017            .text_search("not a ship", 10)
3018            .limit(10)
3019            .compile()
3020            .expect("compiled query");
3021
3022        let rows = coordinator
3023            .execute_compiled_read(&compiled)
3024            .expect("execute read");
3025
3026        assert_eq!(rows.nodes.len(), 1);
3027        assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3028    }
3029
3030    // --- Item 1: capability gate tests ---
3031
3032    #[test]
3033    fn capability_gate_reports_false_without_feature() {
3034        let db = NamedTempFile::new().expect("temporary db");
3035        // Open without vector_dimension: regardless of feature flag, vector_enabled must be false
3036        // when no dimension is requested (the vector profile is never bootstrapped).
3037        let coordinator = ExecutionCoordinator::open(
3038            db.path(),
3039            Arc::new(SchemaManager::new()),
3040            None,
3041            1,
3042            Arc::new(TelemetryCounters::default()),
3043            None,
3044        )
3045        .expect("coordinator");
3046        assert!(
3047            !coordinator.vector_enabled(),
3048            "vector_enabled must be false when no dimension is requested"
3049        );
3050    }
3051
3052    #[cfg(feature = "sqlite-vec")]
3053    #[test]
3054    fn capability_gate_reports_true_when_feature_enabled() {
3055        let db = NamedTempFile::new().expect("temporary db");
3056        let coordinator = ExecutionCoordinator::open(
3057            db.path(),
3058            Arc::new(SchemaManager::new()),
3059            Some(128),
3060            1,
3061            Arc::new(TelemetryCounters::default()),
3062            None,
3063        )
3064        .expect("coordinator");
3065        assert!(
3066            coordinator.vector_enabled(),
3067            "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3068        );
3069    }
3070
3071    // --- Item 4: runtime table read tests ---
3072
3073    #[test]
3074    fn read_run_returns_inserted_run() {
3075        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3076
3077        let db = NamedTempFile::new().expect("temporary db");
3078        let writer = WriterActor::start(
3079            db.path(),
3080            Arc::new(SchemaManager::new()),
3081            ProvenanceMode::Warn,
3082            Arc::new(TelemetryCounters::default()),
3083        )
3084        .expect("writer");
3085        writer
3086            .submit(WriteRequest {
3087                label: "runtime".to_owned(),
3088                nodes: vec![],
3089                node_retires: vec![],
3090                edges: vec![],
3091                edge_retires: vec![],
3092                chunks: vec![],
3093                runs: vec![RunInsert {
3094                    id: "run-r1".to_owned(),
3095                    kind: "session".to_owned(),
3096                    status: "active".to_owned(),
3097                    properties: "{}".to_owned(),
3098                    source_ref: Some("src-1".to_owned()),
3099                    upsert: false,
3100                    supersedes_id: None,
3101                }],
3102                steps: vec![],
3103                actions: vec![],
3104                optional_backfills: vec![],
3105                vec_inserts: vec![],
3106                operational_writes: vec![],
3107            })
3108            .expect("write run");
3109
3110        let coordinator = ExecutionCoordinator::open(
3111            db.path(),
3112            Arc::new(SchemaManager::new()),
3113            None,
3114            1,
3115            Arc::new(TelemetryCounters::default()),
3116            None,
3117        )
3118        .expect("coordinator");
3119        let row = coordinator
3120            .read_run("run-r1")
3121            .expect("read_run")
3122            .expect("row exists");
3123        assert_eq!(row.id, "run-r1");
3124        assert_eq!(row.kind, "session");
3125        assert_eq!(row.status, "active");
3126    }
3127
3128    #[test]
3129    fn read_step_returns_inserted_step() {
3130        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
3131
3132        let db = NamedTempFile::new().expect("temporary db");
3133        let writer = WriterActor::start(
3134            db.path(),
3135            Arc::new(SchemaManager::new()),
3136            ProvenanceMode::Warn,
3137            Arc::new(TelemetryCounters::default()),
3138        )
3139        .expect("writer");
3140        writer
3141            .submit(WriteRequest {
3142                label: "runtime".to_owned(),
3143                nodes: vec![],
3144                node_retires: vec![],
3145                edges: vec![],
3146                edge_retires: vec![],
3147                chunks: vec![],
3148                runs: vec![RunInsert {
3149                    id: "run-s1".to_owned(),
3150                    kind: "session".to_owned(),
3151                    status: "active".to_owned(),
3152                    properties: "{}".to_owned(),
3153                    source_ref: Some("src-1".to_owned()),
3154                    upsert: false,
3155                    supersedes_id: None,
3156                }],
3157                steps: vec![StepInsert {
3158                    id: "step-s1".to_owned(),
3159                    run_id: "run-s1".to_owned(),
3160                    kind: "llm".to_owned(),
3161                    status: "completed".to_owned(),
3162                    properties: "{}".to_owned(),
3163                    source_ref: Some("src-1".to_owned()),
3164                    upsert: false,
3165                    supersedes_id: None,
3166                }],
3167                actions: vec![],
3168                optional_backfills: vec![],
3169                vec_inserts: vec![],
3170                operational_writes: vec![],
3171            })
3172            .expect("write step");
3173
3174        let coordinator = ExecutionCoordinator::open(
3175            db.path(),
3176            Arc::new(SchemaManager::new()),
3177            None,
3178            1,
3179            Arc::new(TelemetryCounters::default()),
3180            None,
3181        )
3182        .expect("coordinator");
3183        let row = coordinator
3184            .read_step("step-s1")
3185            .expect("read_step")
3186            .expect("row exists");
3187        assert_eq!(row.id, "step-s1");
3188        assert_eq!(row.run_id, "run-s1");
3189        assert_eq!(row.kind, "llm");
3190    }
3191
3192    #[test]
3193    fn read_action_returns_inserted_action() {
3194        use crate::{
3195            ProvenanceMode, RunInsert, WriteRequest, WriterActor,
3196            writer::{ActionInsert, StepInsert},
3197        };
3198
3199        let db = NamedTempFile::new().expect("temporary db");
3200        let writer = WriterActor::start(
3201            db.path(),
3202            Arc::new(SchemaManager::new()),
3203            ProvenanceMode::Warn,
3204            Arc::new(TelemetryCounters::default()),
3205        )
3206        .expect("writer");
3207        writer
3208            .submit(WriteRequest {
3209                label: "runtime".to_owned(),
3210                nodes: vec![],
3211                node_retires: vec![],
3212                edges: vec![],
3213                edge_retires: vec![],
3214                chunks: vec![],
3215                runs: vec![RunInsert {
3216                    id: "run-a1".to_owned(),
3217                    kind: "session".to_owned(),
3218                    status: "active".to_owned(),
3219                    properties: "{}".to_owned(),
3220                    source_ref: Some("src-1".to_owned()),
3221                    upsert: false,
3222                    supersedes_id: None,
3223                }],
3224                steps: vec![StepInsert {
3225                    id: "step-a1".to_owned(),
3226                    run_id: "run-a1".to_owned(),
3227                    kind: "llm".to_owned(),
3228                    status: "completed".to_owned(),
3229                    properties: "{}".to_owned(),
3230                    source_ref: Some("src-1".to_owned()),
3231                    upsert: false,
3232                    supersedes_id: None,
3233                }],
3234                actions: vec![ActionInsert {
3235                    id: "action-a1".to_owned(),
3236                    step_id: "step-a1".to_owned(),
3237                    kind: "emit".to_owned(),
3238                    status: "completed".to_owned(),
3239                    properties: "{}".to_owned(),
3240                    source_ref: Some("src-1".to_owned()),
3241                    upsert: false,
3242                    supersedes_id: None,
3243                }],
3244                optional_backfills: vec![],
3245                vec_inserts: vec![],
3246                operational_writes: vec![],
3247            })
3248            .expect("write action");
3249
3250        let coordinator = ExecutionCoordinator::open(
3251            db.path(),
3252            Arc::new(SchemaManager::new()),
3253            None,
3254            1,
3255            Arc::new(TelemetryCounters::default()),
3256            None,
3257        )
3258        .expect("coordinator");
3259        let row = coordinator
3260            .read_action("action-a1")
3261            .expect("read_action")
3262            .expect("row exists");
3263        assert_eq!(row.id, "action-a1");
3264        assert_eq!(row.step_id, "step-a1");
3265        assert_eq!(row.kind, "emit");
3266    }
3267
3268    #[test]
3269    fn read_active_runs_excludes_superseded() {
3270        use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3271
3272        let db = NamedTempFile::new().expect("temporary db");
3273        let writer = WriterActor::start(
3274            db.path(),
3275            Arc::new(SchemaManager::new()),
3276            ProvenanceMode::Warn,
3277            Arc::new(TelemetryCounters::default()),
3278        )
3279        .expect("writer");
3280
3281        // Insert original run
3282        writer
3283            .submit(WriteRequest {
3284                label: "v1".to_owned(),
3285                nodes: vec![],
3286                node_retires: vec![],
3287                edges: vec![],
3288                edge_retires: vec![],
3289                chunks: vec![],
3290                runs: vec![RunInsert {
3291                    id: "run-v1".to_owned(),
3292                    kind: "session".to_owned(),
3293                    status: "active".to_owned(),
3294                    properties: "{}".to_owned(),
3295                    source_ref: Some("src-1".to_owned()),
3296                    upsert: false,
3297                    supersedes_id: None,
3298                }],
3299                steps: vec![],
3300                actions: vec![],
3301                optional_backfills: vec![],
3302                vec_inserts: vec![],
3303                operational_writes: vec![],
3304            })
3305            .expect("v1 write");
3306
3307        // Supersede original run with v2
3308        writer
3309            .submit(WriteRequest {
3310                label: "v2".to_owned(),
3311                nodes: vec![],
3312                node_retires: vec![],
3313                edges: vec![],
3314                edge_retires: vec![],
3315                chunks: vec![],
3316                runs: vec![RunInsert {
3317                    id: "run-v2".to_owned(),
3318                    kind: "session".to_owned(),
3319                    status: "completed".to_owned(),
3320                    properties: "{}".to_owned(),
3321                    source_ref: Some("src-2".to_owned()),
3322                    upsert: true,
3323                    supersedes_id: Some("run-v1".to_owned()),
3324                }],
3325                steps: vec![],
3326                actions: vec![],
3327                optional_backfills: vec![],
3328                vec_inserts: vec![],
3329                operational_writes: vec![],
3330            })
3331            .expect("v2 write");
3332
3333        let coordinator = ExecutionCoordinator::open(
3334            db.path(),
3335            Arc::new(SchemaManager::new()),
3336            None,
3337            1,
3338            Arc::new(TelemetryCounters::default()),
3339            None,
3340        )
3341        .expect("coordinator");
3342        let active = coordinator.read_active_runs().expect("read_active_runs");
3343
3344        assert_eq!(active.len(), 1, "only the non-superseded run should appear");
3345        assert_eq!(active[0].id, "run-v2");
3346    }
3347
3348    #[allow(clippy::panic)]
3349    fn poison_connection(coordinator: &ExecutionCoordinator) {
3350        let result = catch_unwind(AssertUnwindSafe(|| {
3351            let _guard = coordinator.pool.connections[0]
3352                .lock()
3353                .expect("poison test lock");
3354            panic!("poison coordinator connection mutex");
3355        }));
3356        assert!(
3357            result.is_err(),
3358            "poison test must unwind while holding the connection mutex"
3359        );
3360    }
3361
3362    #[allow(clippy::panic)]
3363    fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
3364    where
3365        F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
3366    {
3367        match op(coordinator) {
3368            Err(EngineError::Bridge(message)) => {
3369                assert_eq!(message, "connection mutex poisoned");
3370            }
3371            Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
3372            Err(error) => panic!("expected poisoned connection error, got {error:?}"),
3373        }
3374    }
3375
3376    #[test]
3377    fn poisoned_connection_returns_bridge_error_for_read_helpers() {
3378        let db = NamedTempFile::new().expect("temporary db");
3379        let coordinator = ExecutionCoordinator::open(
3380            db.path(),
3381            Arc::new(SchemaManager::new()),
3382            None,
3383            1,
3384            Arc::new(TelemetryCounters::default()),
3385            None,
3386        )
3387        .expect("coordinator");
3388
3389        poison_connection(&coordinator);
3390
3391        assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
3392        assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
3393        assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
3394        assert_poisoned_connection_error(
3395            &coordinator,
3396            super::ExecutionCoordinator::read_active_runs,
3397        );
3398        assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
3399        assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
3400    }
3401
3402    // --- M-2: Bounded shape cache ---
3403
3404    #[test]
3405    fn shape_cache_stays_bounded() {
3406        use fathomdb_query::ShapeHash;
3407
3408        let db = NamedTempFile::new().expect("temporary db");
3409        let coordinator = ExecutionCoordinator::open(
3410            db.path(),
3411            Arc::new(SchemaManager::new()),
3412            None,
3413            1,
3414            Arc::new(TelemetryCounters::default()),
3415            None,
3416        )
3417        .expect("coordinator");
3418
3419        // Directly populate the cache with MAX_SHAPE_CACHE_SIZE + 1 entries.
3420        {
3421            let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
3422            for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
3423                cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
3424            }
3425        }
3426        // The cache is now over the limit but hasn't been pruned yet (pruning
3427        // happens on the insert path in execute_compiled_read).
3428
3429        // Execute a compiled read to trigger the bounded-cache check.
3430        let compiled = QueryBuilder::nodes("Meeting")
3431            .text_search("budget", 5)
3432            .limit(10)
3433            .compile()
3434            .expect("compiled query");
3435
3436        coordinator
3437            .execute_compiled_read(&compiled)
3438            .expect("execute read");
3439
3440        assert!(
3441            coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
3442            "shape cache must stay bounded: got {} entries, max {}",
3443            coordinator.shape_sql_count(),
3444            super::MAX_SHAPE_CACHE_SIZE
3445        );
3446    }
3447
3448    // --- M-1: Read pool size ---
3449
3450    #[test]
3451    fn read_pool_size_configurable() {
3452        let db = NamedTempFile::new().expect("temporary db");
3453        let coordinator = ExecutionCoordinator::open(
3454            db.path(),
3455            Arc::new(SchemaManager::new()),
3456            None,
3457            2,
3458            Arc::new(TelemetryCounters::default()),
3459            None,
3460        )
3461        .expect("coordinator with pool_size=2");
3462
3463        assert_eq!(coordinator.pool.size(), 2);
3464
3465        // Basic read should succeed through the pool.
3466        let compiled = QueryBuilder::nodes("Meeting")
3467            .text_search("budget", 5)
3468            .limit(10)
3469            .compile()
3470            .expect("compiled query");
3471
3472        let result = coordinator.execute_compiled_read(&compiled);
3473        assert!(result.is_ok(), "read through pool must succeed");
3474    }
3475
3476    // --- M-4: Grouped read batching ---
3477
3478    #[test]
3479    fn grouped_read_results_match_baseline() {
3480        use fathomdb_query::TraverseDirection;
3481
3482        let db = NamedTempFile::new().expect("temporary db");
3483
3484        // Bootstrap the database via coordinator (creates schema).
3485        let coordinator = ExecutionCoordinator::open(
3486            db.path(),
3487            Arc::new(SchemaManager::new()),
3488            None,
3489            1,
3490            Arc::new(TelemetryCounters::default()),
3491            None,
3492        )
3493        .expect("coordinator");
3494
3495        // Seed data: 10 root nodes (Meeting-0..9) with 2 outbound edges each
3496        // to expansion nodes (Task-0-a, Task-0-b, etc.).
3497        {
3498            let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
3499            for i in 0..10 {
3500                conn.execute_batch(&format!(
3501                    r#"
3502                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3503                    VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
3504                    INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3505                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
3506                    INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3507                    VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
3508
3509                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3510                    VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
3511                    INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3512                    VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
3513
3514                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3515                    VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
3516                    INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3517                    VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
3518                    "#,
3519                )).expect("seed data");
3520            }
3521        }
3522
3523        let compiled = QueryBuilder::nodes("Meeting")
3524            .text_search("meeting", 10)
3525            .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
3526            .limit(10)
3527            .compile_grouped()
3528            .expect("compiled grouped query");
3529
3530        let result = coordinator
3531            .execute_compiled_grouped_read(&compiled)
3532            .expect("grouped read");
3533
3534        assert!(!result.was_degraded, "grouped read should not be degraded");
3535        assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
3536        assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
3537        assert_eq!(result.expansions[0].slot, "tasks");
3538        assert_eq!(
3539            result.expansions[0].roots.len(),
3540            10,
3541            "each expansion slot should have entries for all 10 roots"
3542        );
3543
3544        // Each root should have exactly 2 expansion nodes (task-X-a, task-X-b).
3545        for root_expansion in &result.expansions[0].roots {
3546            assert_eq!(
3547                root_expansion.nodes.len(),
3548                2,
3549                "root {} should have 2 expansion nodes, got {}",
3550                root_expansion.root_logical_id,
3551                root_expansion.nodes.len()
3552            );
3553        }
3554    }
3555}