1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10 FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11 SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25const BATCH_CHUNK_SIZE: usize = 200;
30
31fn compile_expansion_filter(
44 filter: Option<&Predicate>,
45 first_param: usize,
46) -> (String, Vec<Value>) {
47 let Some(predicate) = filter else {
48 return (String::new(), vec![]);
49 };
50 let p = first_param;
51 match predicate {
52 Predicate::JsonPathEq { path, value } => {
53 let val = match value {
54 ScalarValue::Text(t) => Value::Text(t.clone()),
55 ScalarValue::Integer(i) => Value::Integer(*i),
56 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
57 };
58 (
59 format!(
60 "\n AND json_extract(n.properties, ?{p}) = ?{}",
61 p + 1
62 ),
63 vec![Value::Text(path.clone()), val],
64 )
65 }
66 Predicate::JsonPathCompare { path, op, value } => {
67 let val = match value {
68 ScalarValue::Text(t) => Value::Text(t.clone()),
69 ScalarValue::Integer(i) => Value::Integer(*i),
70 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
71 };
72 let operator = match op {
73 ComparisonOp::Gt => ">",
74 ComparisonOp::Gte => ">=",
75 ComparisonOp::Lt => "<",
76 ComparisonOp::Lte => "<=",
77 };
78 (
79 format!(
80 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
81 p + 1
82 ),
83 vec![Value::Text(path.clone()), val],
84 )
85 }
86 Predicate::JsonPathFusedEq { path, value } => (
87 format!(
88 "\n AND json_extract(n.properties, ?{p}) = ?{}",
89 p + 1
90 ),
91 vec![Value::Text(path.clone()), Value::Text(value.clone())],
92 ),
93 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
94 let operator = match op {
95 ComparisonOp::Gt => ">",
96 ComparisonOp::Gte => ">=",
97 ComparisonOp::Lt => "<",
98 ComparisonOp::Lte => "<=",
99 };
100 (
101 format!(
102 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
103 p + 1
104 ),
105 vec![Value::Text(path.clone()), Value::Integer(*value)],
106 )
107 }
108 Predicate::KindEq(kind) => (
109 format!("\n AND n.kind = ?{p}"),
110 vec![Value::Text(kind.clone())],
111 ),
112 Predicate::LogicalIdEq(logical_id) => (
113 format!("\n AND n.logical_id = ?{p}"),
114 vec![Value::Text(logical_id.clone())],
115 ),
116 Predicate::SourceRefEq(source_ref) => (
117 format!("\n AND n.source_ref = ?{p}"),
118 vec![Value::Text(source_ref.clone())],
119 ),
120 Predicate::ContentRefEq(uri) => (
121 format!("\n AND n.content_ref = ?{p}"),
122 vec![Value::Text(uri.clone())],
123 ),
124 Predicate::ContentRefNotNull => (
125 "\n AND n.content_ref IS NOT NULL".to_owned(),
126 vec![],
127 ),
128 }
129}
130
131struct ReadPool {
136 connections: Vec<Mutex<Connection>>,
137}
138
139impl fmt::Debug for ReadPool {
140 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141 f.debug_struct("ReadPool")
142 .field("size", &self.connections.len())
143 .finish()
144 }
145}
146
147impl ReadPool {
148 fn new(
159 db_path: &Path,
160 pool_size: usize,
161 schema_manager: &SchemaManager,
162 vector_enabled: bool,
163 ) -> Result<Self, EngineError> {
164 let mut connections = Vec::with_capacity(pool_size);
165 for _ in 0..pool_size {
166 let conn = if vector_enabled {
167 #[cfg(feature = "sqlite-vec")]
168 {
169 sqlite::open_readonly_connection_with_vec(db_path)?
170 }
171 #[cfg(not(feature = "sqlite-vec"))]
172 {
173 sqlite::open_readonly_connection(db_path)?
174 }
175 } else {
176 sqlite::open_readonly_connection(db_path)?
177 };
178 schema_manager
179 .initialize_reader_connection(&conn)
180 .map_err(EngineError::Schema)?;
181 connections.push(Mutex::new(conn));
182 }
183 Ok(Self { connections })
184 }
185
186 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
195 for conn in &self.connections {
197 if let Ok(guard) = conn.try_lock() {
198 return Ok(guard);
199 }
200 }
201 self.connections[0].lock().map_err(|_| {
203 trace_error!("read pool: connection mutex poisoned");
204 EngineError::Bridge("connection mutex poisoned".to_owned())
205 })
206 }
207
208 #[cfg(test)]
210 fn size(&self) -> usize {
211 self.connections.len()
212 }
213}
214
215#[derive(Clone, Debug, PartialEq, Eq)]
219pub struct QueryPlan {
220 pub sql: String,
221 pub bind_count: usize,
222 pub driving_table: DrivingTable,
223 pub shape_hash: ShapeHash,
224 pub cache_hit: bool,
225}
226
227#[derive(Clone, Debug, PartialEq, Eq)]
229pub struct NodeRow {
230 pub row_id: String,
232 pub logical_id: String,
234 pub kind: String,
236 pub properties: String,
238 pub content_ref: Option<String>,
240 pub last_accessed_at: Option<i64>,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq)]
246pub struct RunRow {
247 pub id: String,
249 pub kind: String,
251 pub status: String,
253 pub properties: String,
255}
256
257#[derive(Clone, Debug, PartialEq, Eq)]
259pub struct StepRow {
260 pub id: String,
262 pub run_id: String,
264 pub kind: String,
266 pub status: String,
268 pub properties: String,
270}
271
272#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct ActionRow {
275 pub id: String,
277 pub step_id: String,
279 pub kind: String,
281 pub status: String,
283 pub properties: String,
285}
286
287#[derive(Clone, Debug, PartialEq, Eq)]
289pub struct ProvenanceEvent {
290 pub id: String,
291 pub event_type: String,
292 pub subject: String,
293 pub source_ref: Option<String>,
294 pub metadata_json: String,
295 pub created_at: i64,
296}
297
298#[derive(Clone, Debug, Default, PartialEq, Eq)]
300pub struct QueryRows {
301 pub nodes: Vec<NodeRow>,
303 pub runs: Vec<RunRow>,
305 pub steps: Vec<StepRow>,
307 pub actions: Vec<ActionRow>,
309 pub was_degraded: bool,
312}
313
314#[derive(Clone, Debug, PartialEq, Eq)]
316pub struct ExpansionRootRows {
317 pub root_logical_id: String,
319 pub nodes: Vec<NodeRow>,
321}
322
323#[derive(Clone, Debug, PartialEq, Eq)]
325pub struct ExpansionSlotRows {
326 pub slot: String,
328 pub roots: Vec<ExpansionRootRows>,
330}
331
332#[derive(Clone, Debug, Default, PartialEq, Eq)]
334pub struct GroupedQueryRows {
335 pub roots: Vec<NodeRow>,
337 pub expansions: Vec<ExpansionSlotRows>,
339 pub was_degraded: bool,
341}
342
343pub struct ExecutionCoordinator {
345 database_path: PathBuf,
346 schema_manager: Arc<SchemaManager>,
347 pool: ReadPool,
348 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
349 vector_enabled: bool,
350 vec_degradation_warned: AtomicBool,
351 telemetry: Arc<TelemetryCounters>,
352 query_embedder: Option<Arc<dyn QueryEmbedder>>,
359}
360
361impl fmt::Debug for ExecutionCoordinator {
362 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363 f.debug_struct("ExecutionCoordinator")
364 .field("database_path", &self.database_path)
365 .finish_non_exhaustive()
366 }
367}
368
369impl ExecutionCoordinator {
370 pub fn open(
373 path: impl AsRef<Path>,
374 schema_manager: Arc<SchemaManager>,
375 vector_dimension: Option<usize>,
376 pool_size: usize,
377 telemetry: Arc<TelemetryCounters>,
378 query_embedder: Option<Arc<dyn QueryEmbedder>>,
379 ) -> Result<Self, EngineError> {
380 let path = path.as_ref().to_path_buf();
381 #[cfg(feature = "sqlite-vec")]
382 let conn = if vector_dimension.is_some() {
383 sqlite::open_connection_with_vec(&path)?
384 } else {
385 sqlite::open_connection(&path)?
386 };
387 #[cfg(not(feature = "sqlite-vec"))]
388 let conn = sqlite::open_connection(&path)?;
389
390 let report = schema_manager.bootstrap(&conn)?;
391
392 let needs_property_fts_rebuild = {
428 let schema_count: i64 =
429 conn.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
430 row.get(0)
431 })?;
432 if schema_count == 0 {
433 false
434 } else {
435 let fts_count: i64 =
436 conn.query_row("SELECT COUNT(*) FROM fts_node_properties", [], |row| {
437 row.get(0)
438 })?;
439 fts_count == 0
440 }
441 };
442 let needs_position_backfill = {
448 let recursive_schema_count: i64 = conn.query_row(
457 "SELECT COUNT(*) FROM fts_property_schemas \
458 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
459 [],
460 |row| row.get(0),
461 )?;
462 if recursive_schema_count == 0 {
463 false
464 } else {
465 let position_count: i64 = conn.query_row(
466 "SELECT COUNT(*) FROM fts_node_property_positions",
467 [],
468 |row| row.get(0),
469 )?;
470 position_count == 0
471 }
472 };
473 if needs_property_fts_rebuild || needs_position_backfill {
474 let tx = conn.unchecked_transaction()?;
475 tx.execute("DELETE FROM fts_node_properties", [])?;
476 tx.execute("DELETE FROM fts_node_property_positions", [])?;
477 crate::projection::insert_property_fts_rows(
478 &tx,
479 "SELECT logical_id, properties FROM nodes \
480 WHERE kind = ?1 AND superseded_at IS NULL",
481 )?;
482 tx.commit()?;
483 }
484
485 #[cfg(feature = "sqlite-vec")]
486 let mut vector_enabled = report.vector_profile_enabled;
487 #[cfg(not(feature = "sqlite-vec"))]
488 let vector_enabled = {
489 let _ = &report;
490 false
491 };
492
493 if let Some(dim) = vector_dimension {
494 schema_manager
495 .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
496 .map_err(EngineError::Schema)?;
497 #[cfg(feature = "sqlite-vec")]
499 {
500 vector_enabled = true;
501 }
502 }
503
504 drop(conn);
506
507 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
508
509 Ok(Self {
510 database_path: path,
511 schema_manager,
512 pool,
513 shape_sql_map: Mutex::new(HashMap::new()),
514 vector_enabled,
515 vec_degradation_warned: AtomicBool::new(false),
516 telemetry,
517 query_embedder,
518 })
519 }
520
521 pub fn database_path(&self) -> &Path {
523 &self.database_path
524 }
525
526 #[must_use]
528 pub fn vector_enabled(&self) -> bool {
529 self.vector_enabled
530 }
531
532 #[must_use]
539 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
540 self.query_embedder.as_ref()
541 }
542
543 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
544 self.pool.acquire()
545 }
546
547 #[must_use]
553 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
554 let mut total = SqliteCacheStatus::default();
555 for conn_mutex in &self.pool.connections {
556 if let Ok(conn) = conn_mutex.try_lock() {
557 total.add(&read_db_cache_status(&conn));
558 }
559 }
560 total
561 }
562
563 #[allow(clippy::expect_used)]
566 pub fn execute_compiled_read(
567 &self,
568 compiled: &CompiledQuery,
569 ) -> Result<QueryRows, EngineError> {
570 if compiled.driving_table == DrivingTable::FtsNodes
575 && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
576 && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
577 {
578 self.telemetry.increment_queries();
579 return Ok(QueryRows {
580 nodes,
581 runs: Vec::new(),
582 steps: Vec::new(),
583 actions: Vec::new(),
584 was_degraded: false,
585 });
586 }
587
588 let row_sql = wrap_node_row_projection_sql(&compiled.sql);
589 {
595 let mut cache = self
596 .shape_sql_map
597 .lock()
598 .unwrap_or_else(PoisonError::into_inner);
599 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
600 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
601 cache.clear();
602 }
603 cache.insert(compiled.shape_hash, row_sql.clone());
604 }
605
606 let bind_values = compiled
607 .binds
608 .iter()
609 .map(bind_value_to_sql)
610 .collect::<Vec<_>>();
611
612 let conn_guard = match self.lock_connection() {
617 Ok(g) => g,
618 Err(e) => {
619 self.telemetry.increment_errors();
620 return Err(e);
621 }
622 };
623 let mut statement = match conn_guard.prepare_cached(&row_sql) {
624 Ok(stmt) => stmt,
625 Err(e) if is_vec_table_absent(&e) => {
626 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
627 trace_warn!("vector table absent, degrading to non-vector query");
628 }
629 return Ok(QueryRows {
630 was_degraded: true,
631 ..Default::default()
632 });
633 }
634 Err(e) => {
635 self.telemetry.increment_errors();
636 return Err(EngineError::Sqlite(e));
637 }
638 };
639 let nodes = match statement
640 .query_map(params_from_iter(bind_values.iter()), |row| {
641 Ok(NodeRow {
642 row_id: row.get(0)?,
643 logical_id: row.get(1)?,
644 kind: row.get(2)?,
645 properties: row.get(3)?,
646 content_ref: row.get(4)?,
647 last_accessed_at: row.get(5)?,
648 })
649 })
650 .and_then(Iterator::collect)
651 {
652 Ok(rows) => rows,
653 Err(e) => {
654 self.telemetry.increment_errors();
655 return Err(EngineError::Sqlite(e));
656 }
657 };
658
659 self.telemetry.increment_queries();
660 Ok(QueryRows {
661 nodes,
662 runs: Vec::new(),
663 steps: Vec::new(),
664 actions: Vec::new(),
665 was_degraded: false,
666 })
667 }
668
669 pub fn execute_compiled_search(
684 &self,
685 compiled: &CompiledSearch,
686 ) -> Result<SearchRows, EngineError> {
687 let (relaxed_query, was_degraded_at_plan_time) =
694 fathomdb_query::derive_relaxed(&compiled.text_query);
695 let relaxed = relaxed_query.map(|q| CompiledSearch {
696 root_kind: compiled.root_kind.clone(),
697 text_query: q,
698 limit: compiled.limit,
699 fusable_filters: compiled.fusable_filters.clone(),
700 residual_filters: compiled.residual_filters.clone(),
701 attribution_requested: compiled.attribution_requested,
702 });
703 let plan = CompiledSearchPlan {
704 strict: compiled.clone(),
705 relaxed,
706 was_degraded_at_plan_time,
707 };
708 self.execute_compiled_search_plan(&plan)
709 }
710
711 pub fn execute_compiled_search_plan(
730 &self,
731 plan: &CompiledSearchPlan,
732 ) -> Result<SearchRows, EngineError> {
733 let strict = &plan.strict;
734 let limit = strict.limit;
735 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
736
737 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
738 let strict_underfilled = strict_hits.len() < fallback_threshold;
739
740 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
741 let mut fallback_used = false;
742 let mut was_degraded = false;
743 if let Some(relaxed) = plan.relaxed.as_ref()
744 && strict_underfilled
745 {
746 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
747 fallback_used = true;
748 was_degraded = plan.was_degraded_at_plan_time;
749 }
750
751 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
752 if strict.attribution_requested {
756 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
757 self.populate_attribution_for_hits(
758 &mut merged,
759 &strict.text_query,
760 relaxed_text_query,
761 )?;
762 }
763 let strict_hit_count = merged
764 .iter()
765 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
766 .count();
767 let relaxed_hit_count = merged
768 .iter()
769 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
770 .count();
771 let vector_hit_count = 0;
775
776 Ok(SearchRows {
777 hits: merged,
778 strict_hit_count,
779 relaxed_hit_count,
780 vector_hit_count,
781 fallback_used,
782 was_degraded,
783 })
784 }
785
786 #[allow(clippy::too_many_lines)]
815 pub fn execute_compiled_vector_search(
816 &self,
817 compiled: &CompiledVectorSearch,
818 ) -> Result<SearchRows, EngineError> {
819 use std::fmt::Write as _;
820
821 if compiled.limit == 0 {
825 return Ok(SearchRows::default());
826 }
827
828 let filter_by_kind = !compiled.root_kind.is_empty();
829 let mut binds: Vec<BindValue> = Vec::new();
830 binds.push(BindValue::Text(compiled.query_text.clone()));
831 if filter_by_kind {
832 binds.push(BindValue::Text(compiled.root_kind.clone()));
833 }
834
835 let mut fused_clauses = String::new();
838 for predicate in &compiled.fusable_filters {
839 match predicate {
840 Predicate::KindEq(kind) => {
841 binds.push(BindValue::Text(kind.clone()));
842 let idx = binds.len();
843 let _ = write!(
844 fused_clauses,
845 "\n AND src.kind = ?{idx}"
846 );
847 }
848 Predicate::LogicalIdEq(logical_id) => {
849 binds.push(BindValue::Text(logical_id.clone()));
850 let idx = binds.len();
851 let _ = write!(
852 fused_clauses,
853 "\n AND src.logical_id = ?{idx}"
854 );
855 }
856 Predicate::SourceRefEq(source_ref) => {
857 binds.push(BindValue::Text(source_ref.clone()));
858 let idx = binds.len();
859 let _ = write!(
860 fused_clauses,
861 "\n AND src.source_ref = ?{idx}"
862 );
863 }
864 Predicate::ContentRefEq(uri) => {
865 binds.push(BindValue::Text(uri.clone()));
866 let idx = binds.len();
867 let _ = write!(
868 fused_clauses,
869 "\n AND src.content_ref = ?{idx}"
870 );
871 }
872 Predicate::ContentRefNotNull => {
873 fused_clauses
874 .push_str("\n AND src.content_ref IS NOT NULL");
875 }
876 Predicate::JsonPathFusedEq { path, value } => {
877 binds.push(BindValue::Text(path.clone()));
878 let path_idx = binds.len();
879 binds.push(BindValue::Text(value.clone()));
880 let value_idx = binds.len();
881 let _ = write!(
882 fused_clauses,
883 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
884 );
885 }
886 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
887 binds.push(BindValue::Text(path.clone()));
888 let path_idx = binds.len();
889 binds.push(BindValue::Integer(*value));
890 let value_idx = binds.len();
891 let operator = match op {
892 ComparisonOp::Gt => ">",
893 ComparisonOp::Gte => ">=",
894 ComparisonOp::Lt => "<",
895 ComparisonOp::Lte => "<=",
896 };
897 let _ = write!(
898 fused_clauses,
899 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
900 );
901 }
902 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
903 }
906 }
907 }
908
909 let mut filter_clauses = String::new();
911 for predicate in &compiled.residual_filters {
912 match predicate {
913 Predicate::JsonPathEq { path, value } => {
914 binds.push(BindValue::Text(path.clone()));
915 let path_idx = binds.len();
916 binds.push(scalar_to_bind(value));
917 let value_idx = binds.len();
918 let _ = write!(
919 filter_clauses,
920 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
921 );
922 }
923 Predicate::JsonPathCompare { path, op, value } => {
924 binds.push(BindValue::Text(path.clone()));
925 let path_idx = binds.len();
926 binds.push(scalar_to_bind(value));
927 let value_idx = binds.len();
928 let operator = match op {
929 ComparisonOp::Gt => ">",
930 ComparisonOp::Gte => ">=",
931 ComparisonOp::Lt => "<",
932 ComparisonOp::Lte => "<=",
933 };
934 let _ = write!(
935 filter_clauses,
936 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
937 );
938 }
939 Predicate::KindEq(_)
940 | Predicate::LogicalIdEq(_)
941 | Predicate::SourceRefEq(_)
942 | Predicate::ContentRefEq(_)
943 | Predicate::ContentRefNotNull
944 | Predicate::JsonPathFusedEq { .. }
945 | Predicate::JsonPathFusedTimestampCmp { .. } => {
946 }
948 }
949 }
950
951 let limit = compiled.limit;
954 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
955 let limit_idx = binds.len();
956
957 let base_limit = limit;
963 let kind_clause = if filter_by_kind {
964 "\n AND src.kind = ?2"
965 } else {
966 ""
967 };
968
969 let sql = format!(
970 "WITH vector_hits AS (
971 SELECT
972 src.row_id AS row_id,
973 src.logical_id AS logical_id,
974 src.kind AS kind,
975 src.properties AS properties,
976 src.source_ref AS source_ref,
977 src.content_ref AS content_ref,
978 src.created_at AS created_at,
979 vc.distance AS distance,
980 vc.chunk_id AS chunk_id
981 FROM (
982 SELECT chunk_id, distance
983 FROM vec_nodes_active
984 WHERE embedding MATCH ?1
985 LIMIT {base_limit}
986 ) vc
987 JOIN chunks c ON c.id = vc.chunk_id
988 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
989 WHERE 1 = 1{kind_clause}{fused_clauses}
990 )
991 SELECT
992 h.row_id,
993 h.logical_id,
994 h.kind,
995 h.properties,
996 h.content_ref,
997 am.last_accessed_at,
998 h.created_at,
999 h.distance,
1000 h.chunk_id
1001 FROM vector_hits h
1002 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1003 WHERE 1 = 1{filter_clauses}
1004 ORDER BY h.distance ASC
1005 LIMIT ?{limit_idx}"
1006 );
1007
1008 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1009
1010 let conn_guard = match self.lock_connection() {
1011 Ok(g) => g,
1012 Err(e) => {
1013 self.telemetry.increment_errors();
1014 return Err(e);
1015 }
1016 };
1017 let mut statement = match conn_guard.prepare_cached(&sql) {
1018 Ok(stmt) => stmt,
1019 Err(e) if is_vec_table_absent(&e) => {
1020 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1022 trace_warn!("vector table absent, degrading vector_search to empty result");
1023 }
1024 return Ok(SearchRows {
1025 hits: Vec::new(),
1026 strict_hit_count: 0,
1027 relaxed_hit_count: 0,
1028 vector_hit_count: 0,
1029 fallback_used: false,
1030 was_degraded: true,
1031 });
1032 }
1033 Err(e) => {
1034 self.telemetry.increment_errors();
1035 return Err(EngineError::Sqlite(e));
1036 }
1037 };
1038
1039 let attribution_requested = compiled.attribution_requested;
1040 let hits = match statement
1041 .query_map(params_from_iter(bind_values.iter()), |row| {
1042 let distance: f64 = row.get(7)?;
1043 let score = -distance;
1050 Ok(SearchHit {
1051 node: fathomdb_query::NodeRowLite {
1052 row_id: row.get(0)?,
1053 logical_id: row.get(1)?,
1054 kind: row.get(2)?,
1055 properties: row.get(3)?,
1056 content_ref: row.get(4)?,
1057 last_accessed_at: row.get(5)?,
1058 },
1059 written_at: row.get(6)?,
1060 score,
1061 modality: RetrievalModality::Vector,
1062 source: SearchHitSource::Vector,
1063 match_mode: None,
1065 snippet: None,
1067 projection_row_id: row.get::<_, Option<String>>(8)?,
1068 vector_distance: Some(distance),
1069 attribution: if attribution_requested {
1070 Some(HitAttribution {
1071 matched_paths: Vec::new(),
1072 })
1073 } else {
1074 None
1075 },
1076 })
1077 })
1078 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1079 {
1080 Ok(rows) => rows,
1081 Err(e) => {
1082 if is_vec_table_absent(&e) {
1086 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1087 trace_warn!(
1088 "vector table absent at query time, degrading vector_search to empty result"
1089 );
1090 }
1091 drop(statement);
1092 drop(conn_guard);
1093 return Ok(SearchRows {
1094 hits: Vec::new(),
1095 strict_hit_count: 0,
1096 relaxed_hit_count: 0,
1097 vector_hit_count: 0,
1098 fallback_used: false,
1099 was_degraded: true,
1100 });
1101 }
1102 self.telemetry.increment_errors();
1103 return Err(EngineError::Sqlite(e));
1104 }
1105 };
1106
1107 drop(statement);
1108 drop(conn_guard);
1109
1110 self.telemetry.increment_queries();
1111 let vector_hit_count = hits.len();
1112 Ok(SearchRows {
1113 hits,
1114 strict_hit_count: 0,
1115 relaxed_hit_count: 0,
1116 vector_hit_count,
1117 fallback_used: false,
1118 was_degraded: false,
1119 })
1120 }
1121
1122 pub fn execute_retrieval_plan(
1154 &self,
1155 plan: &CompiledRetrievalPlan,
1156 raw_query: &str,
1157 ) -> Result<SearchRows, EngineError> {
1158 let mut plan = plan.clone();
1164 let limit = plan.text.strict.limit;
1165
1166 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1168
1169 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1172 let strict_underfilled = strict_hits.len() < fallback_threshold;
1173 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1174 let mut fallback_used = false;
1175 let mut was_degraded = false;
1176 if let Some(relaxed) = plan.text.relaxed.as_ref()
1177 && strict_underfilled
1178 {
1179 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1180 fallback_used = true;
1181 was_degraded = plan.was_degraded_at_plan_time;
1182 }
1183
1184 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1191 if text_branches_empty && self.query_embedder.is_some() {
1192 self.fill_vector_branch(&mut plan, raw_query);
1193 }
1194
1195 let mut vector_hits: Vec<SearchHit> = Vec::new();
1200 if let Some(vector) = plan.vector.as_ref()
1201 && strict_hits.is_empty()
1202 && relaxed_hits.is_empty()
1203 {
1204 let vector_rows = self.execute_compiled_vector_search(vector)?;
1205 vector_hits = vector_rows.hits;
1210 if vector_rows.was_degraded {
1211 was_degraded = true;
1212 }
1213 }
1214 if text_branches_empty
1221 && plan.was_degraded_at_plan_time
1222 && plan.vector.is_none()
1223 && self.query_embedder.is_some()
1224 {
1225 was_degraded = true;
1226 }
1227
1228 let strict = &plan.text.strict;
1230 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1231 if strict.attribution_requested {
1232 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1233 self.populate_attribution_for_hits(
1234 &mut merged,
1235 &strict.text_query,
1236 relaxed_text_query,
1237 )?;
1238 }
1239
1240 let strict_hit_count = merged
1241 .iter()
1242 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1243 .count();
1244 let relaxed_hit_count = merged
1245 .iter()
1246 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1247 .count();
1248 let vector_hit_count = merged
1249 .iter()
1250 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1251 .count();
1252
1253 Ok(SearchRows {
1254 hits: merged,
1255 strict_hit_count,
1256 relaxed_hit_count,
1257 vector_hit_count,
1258 fallback_used,
1259 was_degraded,
1260 })
1261 }
1262
1263 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1279 let Some(embedder) = self.query_embedder.as_ref() else {
1280 return;
1281 };
1282 match embedder.embed_query(raw_query) {
1283 Ok(vec) => {
1284 let literal = match serde_json::to_string(&vec) {
1290 Ok(s) => s,
1291 Err(err) => {
1292 trace_warn!(
1293 error = %err,
1294 "query embedder vector serialization failed; skipping vector branch"
1295 );
1296 let _ = err; plan.was_degraded_at_plan_time = true;
1298 return;
1299 }
1300 };
1301 let strict = &plan.text.strict;
1302 plan.vector = Some(CompiledVectorSearch {
1303 root_kind: strict.root_kind.clone(),
1304 query_text: literal,
1305 limit: strict.limit,
1306 fusable_filters: strict.fusable_filters.clone(),
1307 residual_filters: strict.residual_filters.clone(),
1308 attribution_requested: strict.attribution_requested,
1309 });
1310 }
1311 Err(err) => {
1312 trace_warn!(
1313 error = %err,
1314 "query embedder unavailable, skipping vector branch"
1315 );
1316 let _ = err; plan.was_degraded_at_plan_time = true;
1318 }
1319 }
1320 }
1321
1322 #[allow(clippy::too_many_lines)]
1331 fn run_search_branch(
1332 &self,
1333 compiled: &CompiledSearch,
1334 branch: SearchBranch,
1335 ) -> Result<Vec<SearchHit>, EngineError> {
1336 use std::fmt::Write as _;
1337 if matches!(
1349 compiled.text_query,
1350 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1351 ) {
1352 return Ok(Vec::new());
1353 }
1354 let rendered = render_text_query_fts5(&compiled.text_query);
1355 let filter_by_kind = !compiled.root_kind.is_empty();
1361 let mut binds: Vec<BindValue> = if filter_by_kind {
1362 vec![
1363 BindValue::Text(rendered.clone()),
1364 BindValue::Text(compiled.root_kind.clone()),
1365 BindValue::Text(rendered),
1366 BindValue::Text(compiled.root_kind.clone()),
1367 ]
1368 } else {
1369 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1370 };
1371
1372 let mut fused_clauses = String::new();
1381 for predicate in &compiled.fusable_filters {
1382 match predicate {
1383 Predicate::KindEq(kind) => {
1384 binds.push(BindValue::Text(kind.clone()));
1385 let idx = binds.len();
1386 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1387 }
1388 Predicate::LogicalIdEq(logical_id) => {
1389 binds.push(BindValue::Text(logical_id.clone()));
1390 let idx = binds.len();
1391 let _ = write!(
1392 fused_clauses,
1393 "\n AND u.logical_id = ?{idx}"
1394 );
1395 }
1396 Predicate::SourceRefEq(source_ref) => {
1397 binds.push(BindValue::Text(source_ref.clone()));
1398 let idx = binds.len();
1399 let _ = write!(
1400 fused_clauses,
1401 "\n AND u.source_ref = ?{idx}"
1402 );
1403 }
1404 Predicate::ContentRefEq(uri) => {
1405 binds.push(BindValue::Text(uri.clone()));
1406 let idx = binds.len();
1407 let _ = write!(
1408 fused_clauses,
1409 "\n AND u.content_ref = ?{idx}"
1410 );
1411 }
1412 Predicate::ContentRefNotNull => {
1413 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1414 }
1415 Predicate::JsonPathFusedEq { path, value } => {
1416 binds.push(BindValue::Text(path.clone()));
1417 let path_idx = binds.len();
1418 binds.push(BindValue::Text(value.clone()));
1419 let value_idx = binds.len();
1420 let _ = write!(
1421 fused_clauses,
1422 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1423 );
1424 }
1425 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1426 binds.push(BindValue::Text(path.clone()));
1427 let path_idx = binds.len();
1428 binds.push(BindValue::Integer(*value));
1429 let value_idx = binds.len();
1430 let operator = match op {
1431 ComparisonOp::Gt => ">",
1432 ComparisonOp::Gte => ">=",
1433 ComparisonOp::Lt => "<",
1434 ComparisonOp::Lte => "<=",
1435 };
1436 let _ = write!(
1437 fused_clauses,
1438 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1439 );
1440 }
1441 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1442 }
1445 }
1446 }
1447
1448 let mut filter_clauses = String::new();
1449 for predicate in &compiled.residual_filters {
1450 match predicate {
1451 Predicate::JsonPathEq { path, value } => {
1452 binds.push(BindValue::Text(path.clone()));
1453 let path_idx = binds.len();
1454 binds.push(scalar_to_bind(value));
1455 let value_idx = binds.len();
1456 let _ = write!(
1457 filter_clauses,
1458 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1459 );
1460 }
1461 Predicate::JsonPathCompare { path, op, value } => {
1462 binds.push(BindValue::Text(path.clone()));
1463 let path_idx = binds.len();
1464 binds.push(scalar_to_bind(value));
1465 let value_idx = binds.len();
1466 let operator = match op {
1467 ComparisonOp::Gt => ">",
1468 ComparisonOp::Gte => ">=",
1469 ComparisonOp::Lt => "<",
1470 ComparisonOp::Lte => "<=",
1471 };
1472 let _ = write!(
1473 filter_clauses,
1474 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1475 );
1476 }
1477 Predicate::KindEq(_)
1478 | Predicate::LogicalIdEq(_)
1479 | Predicate::SourceRefEq(_)
1480 | Predicate::ContentRefEq(_)
1481 | Predicate::ContentRefNotNull
1482 | Predicate::JsonPathFusedEq { .. }
1483 | Predicate::JsonPathFusedTimestampCmp { .. } => {
1484 }
1487 }
1488 }
1489
1490 let limit = compiled.limit;
1497 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1498 let limit_idx = binds.len();
1499 let (chunk_fts_bind, chunk_kind_clause, prop_fts_bind, prop_kind_clause) = if filter_by_kind
1509 {
1510 (
1511 "?1",
1512 "\n AND src.kind = ?2",
1513 "?3",
1514 "\n AND fp.kind = ?4",
1515 )
1516 } else {
1517 ("?1", "", "?2", "")
1518 };
1519 let sql = format!(
1520 "WITH search_hits AS (
1521 SELECT
1522 u.row_id AS row_id,
1523 u.logical_id AS logical_id,
1524 u.kind AS kind,
1525 u.properties AS properties,
1526 u.source_ref AS source_ref,
1527 u.content_ref AS content_ref,
1528 u.created_at AS created_at,
1529 u.score AS score,
1530 u.source AS source,
1531 u.snippet AS snippet,
1532 u.projection_row_id AS projection_row_id
1533 FROM (
1534 SELECT
1535 src.row_id AS row_id,
1536 c.node_logical_id AS logical_id,
1537 src.kind AS kind,
1538 src.properties AS properties,
1539 src.source_ref AS source_ref,
1540 src.content_ref AS content_ref,
1541 src.created_at AS created_at,
1542 -bm25(fts_nodes) AS score,
1543 'chunk' AS source,
1544 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1545 f.chunk_id AS projection_row_id
1546 FROM fts_nodes f
1547 JOIN chunks c ON c.id = f.chunk_id
1548 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1549 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}
1550 UNION ALL
1551 SELECT
1552 src.row_id AS row_id,
1553 fp.node_logical_id AS logical_id,
1554 src.kind AS kind,
1555 src.properties AS properties,
1556 src.source_ref AS source_ref,
1557 src.content_ref AS content_ref,
1558 src.created_at AS created_at,
1559 -bm25(fts_node_properties) AS score,
1560 'property' AS source,
1561 substr(fp.text_content, 1, 200) AS snippet,
1562 CAST(fp.rowid AS TEXT) AS projection_row_id
1563 FROM fts_node_properties fp
1564 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1565 WHERE fts_node_properties MATCH {prop_fts_bind}{prop_kind_clause}
1566 ) u
1567 WHERE 1 = 1{fused_clauses}
1568 ORDER BY u.score DESC
1569 LIMIT ?{limit_idx}
1570 )
1571 SELECT
1572 h.row_id,
1573 h.logical_id,
1574 h.kind,
1575 h.properties,
1576 h.content_ref,
1577 am.last_accessed_at,
1578 h.created_at,
1579 h.score,
1580 h.source,
1581 h.snippet,
1582 h.projection_row_id
1583 FROM search_hits h
1584 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1585 WHERE 1 = 1{filter_clauses}
1586 ORDER BY h.score DESC"
1587 );
1588
1589 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1590
1591 let conn_guard = match self.lock_connection() {
1592 Ok(g) => g,
1593 Err(e) => {
1594 self.telemetry.increment_errors();
1595 return Err(e);
1596 }
1597 };
1598 let mut statement = match conn_guard.prepare_cached(&sql) {
1599 Ok(stmt) => stmt,
1600 Err(e) => {
1601 self.telemetry.increment_errors();
1602 return Err(EngineError::Sqlite(e));
1603 }
1604 };
1605
1606 let hits = match statement
1607 .query_map(params_from_iter(bind_values.iter()), |row| {
1608 let source_str: String = row.get(8)?;
1609 let source = if source_str == "property" {
1614 SearchHitSource::Property
1615 } else {
1616 SearchHitSource::Chunk
1617 };
1618 let match_mode = match branch {
1619 SearchBranch::Strict => SearchMatchMode::Strict,
1620 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1621 };
1622 Ok(SearchHit {
1623 node: fathomdb_query::NodeRowLite {
1624 row_id: row.get(0)?,
1625 logical_id: row.get(1)?,
1626 kind: row.get(2)?,
1627 properties: row.get(3)?,
1628 content_ref: row.get(4)?,
1629 last_accessed_at: row.get(5)?,
1630 },
1631 written_at: row.get(6)?,
1632 score: row.get(7)?,
1633 modality: RetrievalModality::Text,
1635 source,
1636 match_mode: Some(match_mode),
1637 snippet: row.get(9)?,
1638 projection_row_id: row.get(10)?,
1639 vector_distance: None,
1640 attribution: None,
1641 })
1642 })
1643 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1644 {
1645 Ok(rows) => rows,
1646 Err(e) => {
1647 self.telemetry.increment_errors();
1648 return Err(EngineError::Sqlite(e));
1649 }
1650 };
1651
1652 drop(statement);
1656 drop(conn_guard);
1657
1658 self.telemetry.increment_queries();
1659 Ok(hits)
1660 }
1661
1662 fn populate_attribution_for_hits(
1666 &self,
1667 hits: &mut [SearchHit],
1668 strict_text_query: &fathomdb_query::TextQuery,
1669 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1670 ) -> Result<(), EngineError> {
1671 let conn_guard = match self.lock_connection() {
1672 Ok(g) => g,
1673 Err(e) => {
1674 self.telemetry.increment_errors();
1675 return Err(e);
1676 }
1677 };
1678 let strict_expr = render_text_query_fts5(strict_text_query);
1679 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1680 for hit in hits.iter_mut() {
1681 let match_expr = match hit.match_mode {
1686 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1687 Some(SearchMatchMode::Relaxed) => {
1688 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1689 }
1690 None => continue,
1691 };
1692 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1693 Ok(att) => hit.attribution = Some(att),
1694 Err(e) => {
1695 self.telemetry.increment_errors();
1696 return Err(e);
1697 }
1698 }
1699 }
1700 Ok(())
1701 }
1702
1703 pub fn execute_compiled_grouped_read(
1707 &self,
1708 compiled: &CompiledGroupedQuery,
1709 ) -> Result<GroupedQueryRows, EngineError> {
1710 let root_rows = self.execute_compiled_read(&compiled.root)?;
1711 if root_rows.was_degraded {
1712 return Ok(GroupedQueryRows {
1713 roots: Vec::new(),
1714 expansions: Vec::new(),
1715 was_degraded: true,
1716 });
1717 }
1718
1719 let roots = root_rows.nodes;
1720 let mut expansions = Vec::with_capacity(compiled.expansions.len());
1721 for expansion in &compiled.expansions {
1722 let slot_rows = if roots.is_empty() {
1723 Vec::new()
1724 } else {
1725 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1726 };
1727 expansions.push(ExpansionSlotRows {
1728 slot: expansion.slot.clone(),
1729 roots: slot_rows,
1730 });
1731 }
1732
1733 Ok(GroupedQueryRows {
1734 roots,
1735 expansions,
1736 was_degraded: false,
1737 })
1738 }
1739
1740 fn read_expansion_nodes_chunked(
1746 &self,
1747 roots: &[NodeRow],
1748 expansion: &ExpansionSlot,
1749 hard_limit: usize,
1750 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1751 if roots.len() <= BATCH_CHUNK_SIZE {
1752 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1753 }
1754
1755 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1758 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1759 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1760 per_root
1761 .entry(group.root_logical_id)
1762 .or_default()
1763 .extend(group.nodes);
1764 }
1765 }
1766
1767 Ok(roots
1768 .iter()
1769 .map(|root| ExpansionRootRows {
1770 root_logical_id: root.logical_id.clone(),
1771 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1772 })
1773 .collect())
1774 }
1775
1776 fn read_expansion_nodes_batched(
1781 &self,
1782 roots: &[NodeRow],
1783 expansion: &ExpansionSlot,
1784 hard_limit: usize,
1785 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1786 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1787 let (join_condition, next_logical_id) = match expansion.direction {
1788 fathomdb_query::TraverseDirection::Out => {
1789 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1790 }
1791 fathomdb_query::TraverseDirection::In => {
1792 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1793 }
1794 };
1795
1796 if expansion.filter.as_ref().is_some_and(|f| {
1801 matches!(
1802 f,
1803 Predicate::JsonPathFusedEq { .. } | Predicate::JsonPathFusedTimestampCmp { .. }
1804 )
1805 }) {
1806 self.validate_fused_filter_for_edge_label(&expansion.label)?;
1807 }
1808
1809 let root_seed_union: String = (1..=root_ids.len())
1813 .map(|i| format!("SELECT ?{i}"))
1814 .collect::<Vec<_>>()
1815 .join(" UNION ALL ");
1816
1817 let edge_kind_param = root_ids.len() + 1;
1820 let filter_param_start = root_ids.len() + 2;
1821
1822 let (filter_sql, filter_binds) =
1826 compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
1827
1828 let sql = format!(
1832 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
1833 traversed(root_id, logical_id, depth, visited, emitted) AS (
1834 SELECT rid, rid, 0, printf(',%s,', rid), 0
1835 FROM root_ids
1836 UNION ALL
1837 SELECT
1838 t.root_id,
1839 {next_logical_id},
1840 t.depth + 1,
1841 t.visited || {next_logical_id} || ',',
1842 t.emitted + 1
1843 FROM traversed t
1844 JOIN edges e ON {join_condition}
1845 AND e.kind = ?{edge_kind_param}
1846 AND e.superseded_at IS NULL
1847 WHERE t.depth < {max_depth}
1848 AND t.emitted < {hard_limit}
1849 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
1850 ),
1851 numbered AS (
1852 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
1853 , n.content_ref, am.last_accessed_at
1854 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
1855 FROM traversed t
1856 JOIN nodes n ON n.logical_id = t.logical_id
1857 AND n.superseded_at IS NULL
1858 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
1859 WHERE t.depth > 0{filter_sql}
1860 )
1861 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
1862 FROM numbered
1863 WHERE rn <= {hard_limit}
1864 ORDER BY root_id, logical_id",
1865 max_depth = expansion.max_depth,
1866 );
1867
1868 let conn_guard = self.lock_connection()?;
1869 let mut statement = conn_guard
1870 .prepare_cached(&sql)
1871 .map_err(EngineError::Sqlite)?;
1872
1873 let mut bind_values: Vec<Value> = root_ids
1875 .iter()
1876 .map(|id| Value::Text((*id).to_owned()))
1877 .collect();
1878 bind_values.push(Value::Text(expansion.label.clone()));
1879 bind_values.extend(filter_binds);
1880
1881 let rows = statement
1882 .query_map(params_from_iter(bind_values.iter()), |row| {
1883 Ok((
1884 row.get::<_, String>(0)?, NodeRow {
1886 row_id: row.get(1)?,
1887 logical_id: row.get(2)?,
1888 kind: row.get(3)?,
1889 properties: row.get(4)?,
1890 content_ref: row.get(5)?,
1891 last_accessed_at: row.get(6)?,
1892 },
1893 ))
1894 })
1895 .map_err(EngineError::Sqlite)?
1896 .collect::<Result<Vec<_>, _>>()
1897 .map_err(EngineError::Sqlite)?;
1898
1899 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1901 for (root_id, node) in rows {
1902 per_root.entry(root_id).or_default().push(node);
1903 }
1904
1905 let root_groups = roots
1906 .iter()
1907 .map(|root| ExpansionRootRows {
1908 root_logical_id: root.logical_id.clone(),
1909 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1910 })
1911 .collect();
1912
1913 Ok(root_groups)
1914 }
1915
1916 fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
1930 let conn = self.lock_connection()?;
1931 let mut stmt = conn
1933 .prepare_cached(
1934 "SELECT DISTINCT n.kind \
1935 FROM edges e \
1936 JOIN nodes n ON n.logical_id = e.target_logical_id \
1937 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
1938 )
1939 .map_err(EngineError::Sqlite)?;
1940 let target_kinds: Vec<String> = stmt
1941 .query_map(rusqlite::params![edge_label], |row| row.get(0))
1942 .map_err(EngineError::Sqlite)?
1943 .collect::<Result<Vec<_>, _>>()
1944 .map_err(EngineError::Sqlite)?;
1945
1946 for kind in &target_kinds {
1947 let has_schema: bool = conn
1948 .query_row(
1949 "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
1950 rusqlite::params![kind],
1951 |row| row.get(0),
1952 )
1953 .map_err(EngineError::Sqlite)?;
1954 if !has_schema {
1955 return Err(EngineError::InvalidConfig(format!(
1956 "kind {kind:?} has no registered property-FTS schema; register one with \
1957 admin.register_fts_property_schema(..) before using fused filters on \
1958 expansion slots, or use JsonPathEq for non-fused semantics \
1959 (expand slot uses edge label {edge_label:?})"
1960 )));
1961 }
1962 }
1963 Ok(())
1964 }
1965
1966 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
1972 let conn = self.lock_connection()?;
1973 conn.query_row(
1974 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
1975 rusqlite::params![id],
1976 |row| {
1977 Ok(RunRow {
1978 id: row.get(0)?,
1979 kind: row.get(1)?,
1980 status: row.get(2)?,
1981 properties: row.get(3)?,
1982 })
1983 },
1984 )
1985 .optional()
1986 .map_err(EngineError::Sqlite)
1987 }
1988
1989 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
1995 let conn = self.lock_connection()?;
1996 conn.query_row(
1997 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
1998 rusqlite::params![id],
1999 |row| {
2000 Ok(StepRow {
2001 id: row.get(0)?,
2002 run_id: row.get(1)?,
2003 kind: row.get(2)?,
2004 status: row.get(3)?,
2005 properties: row.get(4)?,
2006 })
2007 },
2008 )
2009 .optional()
2010 .map_err(EngineError::Sqlite)
2011 }
2012
2013 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2019 let conn = self.lock_connection()?;
2020 conn.query_row(
2021 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2022 rusqlite::params![id],
2023 |row| {
2024 Ok(ActionRow {
2025 id: row.get(0)?,
2026 step_id: row.get(1)?,
2027 kind: row.get(2)?,
2028 status: row.get(3)?,
2029 properties: row.get(4)?,
2030 })
2031 },
2032 )
2033 .optional()
2034 .map_err(EngineError::Sqlite)
2035 }
2036
2037 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2043 let conn = self.lock_connection()?;
2044 let mut stmt = conn
2045 .prepare_cached(
2046 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2047 )
2048 .map_err(EngineError::Sqlite)?;
2049 let rows = stmt
2050 .query_map([], |row| {
2051 Ok(RunRow {
2052 id: row.get(0)?,
2053 kind: row.get(1)?,
2054 status: row.get(2)?,
2055 properties: row.get(3)?,
2056 })
2057 })
2058 .map_err(EngineError::Sqlite)?
2059 .collect::<Result<Vec<_>, _>>()
2060 .map_err(EngineError::Sqlite)?;
2061 Ok(rows)
2062 }
2063
2064 #[must_use]
2074 #[allow(clippy::expect_used)]
2075 pub fn shape_sql_count(&self) -> usize {
2076 self.shape_sql_map
2077 .lock()
2078 .unwrap_or_else(PoisonError::into_inner)
2079 .len()
2080 }
2081
2082 #[must_use]
2084 pub fn schema_manager(&self) -> Arc<SchemaManager> {
2085 Arc::clone(&self.schema_manager)
2086 }
2087
2088 #[must_use]
2097 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2098 let cache_hit = self
2099 .shape_sql_map
2100 .lock()
2101 .unwrap_or_else(PoisonError::into_inner)
2102 .contains_key(&compiled.shape_hash);
2103 QueryPlan {
2104 sql: wrap_node_row_projection_sql(&compiled.sql),
2105 bind_count: compiled.binds.len(),
2106 driving_table: compiled.driving_table,
2107 shape_hash: compiled.shape_hash,
2108 cache_hit,
2109 }
2110 }
2111
2112 #[doc(hidden)]
2119 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2120 let conn = self.lock_connection()?;
2121 let result = conn
2122 .query_row(&format!("PRAGMA {name}"), [], |row| {
2123 row.get::<_, rusqlite::types::Value>(0)
2125 })
2126 .map_err(EngineError::Sqlite)?;
2127 let s = match result {
2128 rusqlite::types::Value::Text(t) => t,
2129 rusqlite::types::Value::Integer(i) => i.to_string(),
2130 rusqlite::types::Value::Real(f) => f.to_string(),
2131 rusqlite::types::Value::Blob(_) => {
2132 return Err(EngineError::InvalidWrite(format!(
2133 "PRAGMA {name} returned an unexpected BLOB value"
2134 )));
2135 }
2136 rusqlite::types::Value::Null => String::new(),
2137 };
2138 Ok(s)
2139 }
2140
2141 pub fn query_provenance_events(
2150 &self,
2151 subject: &str,
2152 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2153 let conn = self.lock_connection()?;
2154 let mut stmt = conn
2155 .prepare_cached(
2156 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2157 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2158 )
2159 .map_err(EngineError::Sqlite)?;
2160 let events = stmt
2161 .query_map(rusqlite::params![subject], |row| {
2162 Ok(ProvenanceEvent {
2163 id: row.get(0)?,
2164 event_type: row.get(1)?,
2165 subject: row.get(2)?,
2166 source_ref: row.get(3)?,
2167 metadata_json: row.get(4)?,
2168 created_at: row.get(5)?,
2169 })
2170 })
2171 .map_err(EngineError::Sqlite)?
2172 .collect::<Result<Vec<_>, _>>()
2173 .map_err(EngineError::Sqlite)?;
2174 Ok(events)
2175 }
2176
2177 fn scan_fallback_if_first_registration(
2182 &self,
2183 kind: &str,
2184 ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2185 let conn = self.lock_connection()?;
2186
2187 let needs_scan: bool = conn
2190 .query_row(
2191 "SELECT 1 FROM fts_property_rebuild_state \
2192 WHERE kind = ?1 AND is_first_registration = 1 \
2193 AND state IN ('PENDING','BUILDING','SWAPPING') \
2194 AND NOT EXISTS (SELECT 1 FROM fts_node_properties WHERE kind = ?1) \
2195 LIMIT 1",
2196 rusqlite::params![kind],
2197 |_| Ok(true),
2198 )
2199 .optional()?
2200 .unwrap_or(false);
2201
2202 if !needs_scan {
2203 return Ok(None);
2204 }
2205
2206 let mut stmt = conn
2209 .prepare_cached(
2210 "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2211 am.last_accessed_at \
2212 FROM nodes n \
2213 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2214 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2215 )
2216 .map_err(EngineError::Sqlite)?;
2217
2218 let nodes = stmt
2219 .query_map(rusqlite::params![kind], |row| {
2220 Ok(NodeRow {
2221 row_id: row.get(0)?,
2222 logical_id: row.get(1)?,
2223 kind: row.get(2)?,
2224 properties: row.get(3)?,
2225 content_ref: row.get(4)?,
2226 last_accessed_at: row.get(5)?,
2227 })
2228 })
2229 .map_err(EngineError::Sqlite)?
2230 .collect::<Result<Vec<_>, _>>()
2231 .map_err(EngineError::Sqlite)?;
2232
2233 Ok(Some(nodes))
2234 }
2235
2236 pub fn get_property_fts_rebuild_progress(
2242 &self,
2243 kind: &str,
2244 ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2245 let conn = self.lock_connection()?;
2246 let row = conn
2247 .query_row(
2248 "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2249 FROM fts_property_rebuild_state WHERE kind = ?1",
2250 rusqlite::params![kind],
2251 |r| {
2252 Ok(crate::rebuild_actor::RebuildProgress {
2253 state: r.get(0)?,
2254 rows_total: r.get(1)?,
2255 rows_done: r.get(2)?,
2256 started_at: r.get(3)?,
2257 last_progress_at: r.get(4)?,
2258 error_message: r.get(5)?,
2259 })
2260 },
2261 )
2262 .optional()?;
2263 Ok(row)
2264 }
2265}
2266
2267fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2268 format!(
2269 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2270 FROM ({base_sql}) q \
2271 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2272 )
2273}
2274
2275pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2278 match err {
2279 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2280 msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
2281 }
2282 _ => false,
2283 }
2284}
2285
2286fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2287 match value {
2288 ScalarValue::Text(text) => BindValue::Text(text.clone()),
2289 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2290 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2291 }
2292}
2293
2294fn merge_search_branches(
2312 strict: Vec<SearchHit>,
2313 relaxed: Vec<SearchHit>,
2314 limit: usize,
2315) -> Vec<SearchHit> {
2316 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2317}
2318
2319fn merge_search_branches_three(
2331 strict: Vec<SearchHit>,
2332 relaxed: Vec<SearchHit>,
2333 vector: Vec<SearchHit>,
2334 limit: usize,
2335) -> Vec<SearchHit> {
2336 let strict_block = dedup_branch_hits(strict);
2337 let relaxed_block = dedup_branch_hits(relaxed);
2338 let vector_block = dedup_branch_hits(vector);
2339
2340 let mut seen: std::collections::HashSet<String> = strict_block
2341 .iter()
2342 .map(|h| h.node.logical_id.clone())
2343 .collect();
2344
2345 let mut merged = strict_block;
2346 for hit in relaxed_block {
2347 if seen.insert(hit.node.logical_id.clone()) {
2348 merged.push(hit);
2349 }
2350 }
2351 for hit in vector_block {
2352 if seen.insert(hit.node.logical_id.clone()) {
2353 merged.push(hit);
2354 }
2355 }
2356
2357 if merged.len() > limit {
2358 merged.truncate(limit);
2359 }
2360 merged
2361}
2362
2363fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2367 hits.sort_by(|a, b| {
2368 b.score
2369 .partial_cmp(&a.score)
2370 .unwrap_or(std::cmp::Ordering::Equal)
2371 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2372 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2373 });
2374
2375 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2376 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2377 hits
2378}
2379
2380fn source_priority(source: SearchHitSource) -> u8 {
2381 match source {
2384 SearchHitSource::Chunk => 0,
2385 SearchHitSource::Property => 1,
2386 SearchHitSource::Vector => 2,
2387 }
2388}
2389
2390const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2408const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2409
2410fn load_position_map(
2414 conn: &Connection,
2415 logical_id: &str,
2416 kind: &str,
2417) -> Result<Vec<(usize, usize, String)>, EngineError> {
2418 let mut stmt = conn
2419 .prepare_cached(
2420 "SELECT start_offset, end_offset, leaf_path \
2421 FROM fts_node_property_positions \
2422 WHERE node_logical_id = ?1 AND kind = ?2 \
2423 ORDER BY start_offset ASC",
2424 )
2425 .map_err(EngineError::Sqlite)?;
2426 let rows = stmt
2427 .query_map(rusqlite::params![logical_id, kind], |row| {
2428 let start: i64 = row.get(0)?;
2429 let end: i64 = row.get(1)?;
2430 let path: String = row.get(2)?;
2431 let start = usize::try_from(start).unwrap_or(0);
2435 let end = usize::try_from(end).unwrap_or(0);
2436 Ok((start, end, path))
2437 })
2438 .map_err(EngineError::Sqlite)?;
2439 let mut out = Vec::new();
2440 for row in rows {
2441 out.push(row.map_err(EngineError::Sqlite)?);
2442 }
2443 Ok(out)
2444}
2445
2446fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2453 let mut offsets = Vec::new();
2454 let bytes = wrapped.as_bytes();
2455 let open_bytes = open.as_bytes();
2456 let close_bytes = close.as_bytes();
2457 let mut i = 0usize;
2458 let mut marker_bytes_seen = 0usize;
2461 while i < bytes.len() {
2462 if bytes[i..].starts_with(open_bytes) {
2463 let original_offset = i - marker_bytes_seen;
2466 offsets.push(original_offset);
2467 i += open_bytes.len();
2468 marker_bytes_seen += open_bytes.len();
2469 } else if bytes[i..].starts_with(close_bytes) {
2470 i += close_bytes.len();
2471 marker_bytes_seen += close_bytes.len();
2472 } else {
2473 i += 1;
2474 }
2475 }
2476 offsets
2477}
2478
2479fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
2482 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
2484 Ok(i) => i,
2485 Err(0) => return None,
2486 Err(i) => i - 1,
2487 };
2488 let (start, end, path) = &positions[idx];
2489 if offset >= *start && offset < *end {
2490 Some(path.as_str())
2491 } else {
2492 None
2493 }
2494}
2495
2496fn resolve_hit_attribution(
2505 conn: &Connection,
2506 hit: &SearchHit,
2507 match_expr: &str,
2508) -> Result<HitAttribution, EngineError> {
2509 if !matches!(hit.source, SearchHitSource::Property) {
2510 return Ok(HitAttribution {
2511 matched_paths: Vec::new(),
2512 });
2513 }
2514 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
2515 return Ok(HitAttribution {
2516 matched_paths: Vec::new(),
2517 });
2518 };
2519 let rowid: i64 = match rowid_str.parse() {
2520 Ok(v) => v,
2521 Err(_) => {
2522 return Ok(HitAttribution {
2523 matched_paths: Vec::new(),
2524 });
2525 }
2526 };
2527
2528 let mut stmt = conn
2532 .prepare_cached(
2533 "SELECT highlight(fts_node_properties, 2, ?1, ?2) \
2534 FROM fts_node_properties \
2535 WHERE rowid = ?3 AND fts_node_properties MATCH ?4",
2536 )
2537 .map_err(EngineError::Sqlite)?;
2538 let wrapped: Option<String> = stmt
2539 .query_row(
2540 rusqlite::params![
2541 ATTRIBUTION_HIGHLIGHT_OPEN,
2542 ATTRIBUTION_HIGHLIGHT_CLOSE,
2543 rowid,
2544 match_expr,
2545 ],
2546 |row| row.get(0),
2547 )
2548 .optional()
2549 .map_err(EngineError::Sqlite)?;
2550 let Some(wrapped) = wrapped else {
2551 return Ok(HitAttribution {
2552 matched_paths: Vec::new(),
2553 });
2554 };
2555
2556 let offsets = parse_highlight_offsets(
2557 &wrapped,
2558 ATTRIBUTION_HIGHLIGHT_OPEN,
2559 ATTRIBUTION_HIGHLIGHT_CLOSE,
2560 );
2561 if offsets.is_empty() {
2562 return Ok(HitAttribution {
2563 matched_paths: Vec::new(),
2564 });
2565 }
2566
2567 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
2568 if positions.is_empty() {
2569 return Ok(HitAttribution {
2572 matched_paths: Vec::new(),
2573 });
2574 }
2575
2576 let mut matched_paths: Vec<String> = Vec::new();
2577 for offset in offsets {
2578 if let Some(path) = find_leaf_for_offset(&positions, offset)
2579 && !matched_paths.iter().any(|p| p == path)
2580 {
2581 matched_paths.push(path.to_owned());
2582 }
2583 }
2584 Ok(HitAttribution { matched_paths })
2585}
2586
2587fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
2588 match value {
2589 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
2590 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
2591 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
2592 }
2593}
2594
2595#[cfg(test)]
2596#[allow(clippy::expect_used)]
2597mod tests {
2598 use std::panic::{AssertUnwindSafe, catch_unwind};
2599 use std::sync::Arc;
2600
2601 use fathomdb_query::{BindValue, QueryBuilder};
2602 use fathomdb_schema::SchemaManager;
2603 use rusqlite::types::Value;
2604 use tempfile::NamedTempFile;
2605
2606 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
2607
2608 use fathomdb_query::{
2609 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
2610 };
2611
2612 use super::{
2613 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
2614 wrap_node_row_projection_sql,
2615 };
2616
2617 fn mk_hit(
2618 logical_id: &str,
2619 score: f64,
2620 match_mode: SearchMatchMode,
2621 source: SearchHitSource,
2622 ) -> SearchHit {
2623 SearchHit {
2624 node: NodeRowLite {
2625 row_id: format!("{logical_id}-row"),
2626 logical_id: logical_id.to_owned(),
2627 kind: "Goal".to_owned(),
2628 properties: "{}".to_owned(),
2629 content_ref: None,
2630 last_accessed_at: None,
2631 },
2632 score,
2633 modality: RetrievalModality::Text,
2634 source,
2635 match_mode: Some(match_mode),
2636 snippet: None,
2637 written_at: 0,
2638 projection_row_id: None,
2639 vector_distance: None,
2640 attribution: None,
2641 }
2642 }
2643
2644 #[test]
2645 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
2646 let strict = vec![mk_hit(
2647 "a",
2648 1.0,
2649 SearchMatchMode::Strict,
2650 SearchHitSource::Chunk,
2651 )];
2652 let relaxed = vec![mk_hit(
2654 "b",
2655 9.9,
2656 SearchMatchMode::Relaxed,
2657 SearchHitSource::Chunk,
2658 )];
2659 let merged = merge_search_branches(strict, relaxed, 10);
2660 assert_eq!(merged.len(), 2);
2661 assert_eq!(merged[0].node.logical_id, "a");
2662 assert!(matches!(
2663 merged[0].match_mode,
2664 Some(SearchMatchMode::Strict)
2665 ));
2666 assert_eq!(merged[1].node.logical_id, "b");
2667 assert!(matches!(
2668 merged[1].match_mode,
2669 Some(SearchMatchMode::Relaxed)
2670 ));
2671 }
2672
2673 #[test]
2674 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
2675 let strict = vec![mk_hit(
2676 "shared",
2677 1.0,
2678 SearchMatchMode::Strict,
2679 SearchHitSource::Chunk,
2680 )];
2681 let relaxed = vec![
2682 mk_hit(
2683 "shared",
2684 9.9,
2685 SearchMatchMode::Relaxed,
2686 SearchHitSource::Chunk,
2687 ),
2688 mk_hit(
2689 "other",
2690 2.0,
2691 SearchMatchMode::Relaxed,
2692 SearchHitSource::Chunk,
2693 ),
2694 ];
2695 let merged = merge_search_branches(strict, relaxed, 10);
2696 assert_eq!(merged.len(), 2);
2697 assert_eq!(merged[0].node.logical_id, "shared");
2698 assert!(matches!(
2699 merged[0].match_mode,
2700 Some(SearchMatchMode::Strict)
2701 ));
2702 assert_eq!(merged[1].node.logical_id, "other");
2703 assert!(matches!(
2704 merged[1].match_mode,
2705 Some(SearchMatchMode::Relaxed)
2706 ));
2707 }
2708
2709 #[test]
2710 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
2711 let strict = vec![
2712 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2713 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2714 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2715 ];
2716 let merged = merge_search_branches(strict, vec![], 10);
2717 assert_eq!(
2718 merged
2719 .iter()
2720 .map(|h| &h.node.logical_id)
2721 .collect::<Vec<_>>(),
2722 vec!["a", "c", "b"]
2723 );
2724 }
2725
2726 #[test]
2727 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
2728 let strict = vec![
2729 mk_hit(
2730 "shared",
2731 1.0,
2732 SearchMatchMode::Strict,
2733 SearchHitSource::Property,
2734 ),
2735 mk_hit(
2736 "shared",
2737 1.0,
2738 SearchMatchMode::Strict,
2739 SearchHitSource::Chunk,
2740 ),
2741 ];
2742 let merged = merge_search_branches(strict, vec![], 10);
2743 assert_eq!(merged.len(), 1);
2744 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
2745 }
2746
2747 #[test]
2748 fn merge_truncates_to_limit_after_block_merge() {
2749 let strict = vec![
2750 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2751 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2752 ];
2753 let relaxed = vec![mk_hit(
2754 "c",
2755 9.0,
2756 SearchMatchMode::Relaxed,
2757 SearchHitSource::Chunk,
2758 )];
2759 let merged = merge_search_branches(strict, relaxed, 2);
2760 assert_eq!(merged.len(), 2);
2761 assert_eq!(merged[0].node.logical_id, "a");
2762 assert_eq!(merged[1].node.logical_id, "b");
2763 }
2764
2765 #[test]
2774 fn search_architecturally_supports_three_branch_fusion() {
2775 let strict = vec![mk_hit(
2776 "alpha",
2777 1.0,
2778 SearchMatchMode::Strict,
2779 SearchHitSource::Chunk,
2780 )];
2781 let relaxed = vec![mk_hit(
2782 "bravo",
2783 5.0,
2784 SearchMatchMode::Relaxed,
2785 SearchHitSource::Chunk,
2786 )];
2787 let mut vector_hit = mk_hit(
2790 "charlie",
2791 9.9,
2792 SearchMatchMode::Strict,
2793 SearchHitSource::Vector,
2794 );
2795 vector_hit.match_mode = None;
2799 vector_hit.modality = RetrievalModality::Vector;
2800 let vector = vec![vector_hit];
2801
2802 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
2803 assert_eq!(merged.len(), 3);
2804 assert_eq!(merged[0].node.logical_id, "alpha");
2805 assert_eq!(merged[1].node.logical_id, "bravo");
2806 assert_eq!(merged[2].node.logical_id, "charlie");
2807 assert!(matches!(merged[2].source, SearchHitSource::Vector));
2809
2810 let strict2 = vec![mk_hit(
2813 "shared",
2814 0.5,
2815 SearchMatchMode::Strict,
2816 SearchHitSource::Chunk,
2817 )];
2818 let relaxed2 = vec![mk_hit(
2819 "shared",
2820 5.0,
2821 SearchMatchMode::Relaxed,
2822 SearchHitSource::Chunk,
2823 )];
2824 let mut vshared = mk_hit(
2825 "shared",
2826 9.9,
2827 SearchMatchMode::Strict,
2828 SearchHitSource::Vector,
2829 );
2830 vshared.match_mode = None;
2831 vshared.modality = RetrievalModality::Vector;
2832 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
2833 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
2834 assert!(matches!(
2835 merged2[0].match_mode,
2836 Some(SearchMatchMode::Strict)
2837 ));
2838 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
2839
2840 let mut vshared2 = mk_hit(
2842 "shared",
2843 9.9,
2844 SearchMatchMode::Strict,
2845 SearchHitSource::Vector,
2846 );
2847 vshared2.match_mode = None;
2848 vshared2.modality = RetrievalModality::Vector;
2849 let merged3 = merge_search_branches_three(
2850 vec![],
2851 vec![mk_hit(
2852 "shared",
2853 1.0,
2854 SearchMatchMode::Relaxed,
2855 SearchHitSource::Chunk,
2856 )],
2857 vec![vshared2],
2858 10,
2859 );
2860 assert_eq!(merged3.len(), 1);
2861 assert!(matches!(
2862 merged3[0].match_mode,
2863 Some(SearchMatchMode::Relaxed)
2864 ));
2865 }
2866
2867 #[test]
2881 fn merge_search_branches_three_vector_only_preserves_vector_block() {
2882 let mut vector_hit = mk_hit(
2883 "solo",
2884 0.75,
2885 SearchMatchMode::Strict,
2886 SearchHitSource::Vector,
2887 );
2888 vector_hit.match_mode = None;
2889 vector_hit.modality = RetrievalModality::Vector;
2890
2891 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
2892
2893 assert_eq!(merged.len(), 1);
2894 assert_eq!(merged[0].node.logical_id, "solo");
2895 assert!(matches!(merged[0].source, SearchHitSource::Vector));
2896 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
2897 assert!(
2898 merged[0].match_mode.is_none(),
2899 "vector hits carry match_mode=None per addendum 1"
2900 );
2901 }
2902
2903 #[test]
2915 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
2916 let strict = vec![
2917 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2918 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2919 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2920 ];
2921 let relaxed = vec![mk_hit(
2922 "d",
2923 9.0,
2924 SearchMatchMode::Relaxed,
2925 SearchHitSource::Chunk,
2926 )];
2927 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
2928 vector_hit.match_mode = None;
2929 vector_hit.modality = RetrievalModality::Vector;
2930 let vector = vec![vector_hit];
2931
2932 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
2933
2934 assert_eq!(merged.len(), 2);
2935 assert_eq!(merged[0].node.logical_id, "a");
2936 assert_eq!(merged[1].node.logical_id, "b");
2937 assert!(
2939 merged
2940 .iter()
2941 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
2942 "strict block must win limit contention against higher-scored relaxed/vector hits"
2943 );
2944 assert!(
2945 merged
2946 .iter()
2947 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
2948 "no vector source hits should leak past the limit"
2949 );
2950 }
2951
2952 #[test]
2953 fn is_vec_table_absent_matches_known_error_messages() {
2954 use rusqlite::ffi;
2955 fn make_err(msg: &str) -> rusqlite::Error {
2956 rusqlite::Error::SqliteFailure(
2957 ffi::Error {
2958 code: ffi::ErrorCode::Unknown,
2959 extended_code: 1,
2960 },
2961 Some(msg.to_owned()),
2962 )
2963 }
2964 assert!(is_vec_table_absent(&make_err(
2965 "no such table: vec_nodes_active"
2966 )));
2967 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
2968 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
2969 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
2970 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
2971 }
2972
2973 #[test]
2974 fn bind_value_text_maps_to_sql_text() {
2975 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
2976 assert_eq!(val, Value::Text("hello".to_owned()));
2977 }
2978
2979 #[test]
2980 fn bind_value_integer_maps_to_sql_integer() {
2981 let val = bind_value_to_sql(&BindValue::Integer(42));
2982 assert_eq!(val, Value::Integer(42));
2983 }
2984
2985 #[test]
2986 fn bind_value_bool_true_maps_to_integer_one() {
2987 let val = bind_value_to_sql(&BindValue::Bool(true));
2988 assert_eq!(val, Value::Integer(1));
2989 }
2990
2991 #[test]
2992 fn bind_value_bool_false_maps_to_integer_zero() {
2993 let val = bind_value_to_sql(&BindValue::Bool(false));
2994 assert_eq!(val, Value::Integer(0));
2995 }
2996
2997 #[test]
2998 fn same_shape_queries_share_one_cache_entry() {
2999 let db = NamedTempFile::new().expect("temporary db");
3000 let coordinator = ExecutionCoordinator::open(
3001 db.path(),
3002 Arc::new(SchemaManager::new()),
3003 None,
3004 1,
3005 Arc::new(TelemetryCounters::default()),
3006 None,
3007 )
3008 .expect("coordinator");
3009
3010 let compiled_a = QueryBuilder::nodes("Meeting")
3011 .text_search("budget", 5)
3012 .limit(10)
3013 .compile()
3014 .expect("compiled a");
3015 let compiled_b = QueryBuilder::nodes("Meeting")
3016 .text_search("standup", 5)
3017 .limit(10)
3018 .compile()
3019 .expect("compiled b");
3020
3021 coordinator
3022 .execute_compiled_read(&compiled_a)
3023 .expect("read a");
3024 coordinator
3025 .execute_compiled_read(&compiled_b)
3026 .expect("read b");
3027
3028 assert_eq!(
3029 compiled_a.shape_hash, compiled_b.shape_hash,
3030 "different bind values, same structural shape → same hash"
3031 );
3032 assert_eq!(coordinator.shape_sql_count(), 1);
3033 }
3034
3035 #[test]
3036 fn vector_read_degrades_gracefully_when_vec_table_absent() {
3037 let db = NamedTempFile::new().expect("temporary db");
3038 let coordinator = ExecutionCoordinator::open(
3039 db.path(),
3040 Arc::new(SchemaManager::new()),
3041 None,
3042 1,
3043 Arc::new(TelemetryCounters::default()),
3044 None,
3045 )
3046 .expect("coordinator");
3047
3048 let compiled = QueryBuilder::nodes("Meeting")
3049 .vector_search("budget embeddings", 5)
3050 .compile()
3051 .expect("vector query compiles");
3052
3053 let result = coordinator.execute_compiled_read(&compiled);
3054 let rows = result.expect("degraded read must succeed, not error");
3055 assert!(
3056 rows.was_degraded,
3057 "result must be flagged as degraded when vec_nodes_active is absent"
3058 );
3059 assert!(
3060 rows.nodes.is_empty(),
3061 "degraded result must return empty nodes"
3062 );
3063 }
3064
3065 #[test]
3066 fn coordinator_caches_by_shape_hash() {
3067 let db = NamedTempFile::new().expect("temporary db");
3068 let coordinator = ExecutionCoordinator::open(
3069 db.path(),
3070 Arc::new(SchemaManager::new()),
3071 None,
3072 1,
3073 Arc::new(TelemetryCounters::default()),
3074 None,
3075 )
3076 .expect("coordinator");
3077
3078 let compiled = QueryBuilder::nodes("Meeting")
3079 .text_search("budget", 5)
3080 .compile()
3081 .expect("compiled query");
3082
3083 coordinator
3084 .execute_compiled_read(&compiled)
3085 .expect("execute compiled read");
3086 assert_eq!(coordinator.shape_sql_count(), 1);
3087 }
3088
3089 #[test]
3092 fn explain_returns_correct_sql() {
3093 let db = NamedTempFile::new().expect("temporary db");
3094 let coordinator = ExecutionCoordinator::open(
3095 db.path(),
3096 Arc::new(SchemaManager::new()),
3097 None,
3098 1,
3099 Arc::new(TelemetryCounters::default()),
3100 None,
3101 )
3102 .expect("coordinator");
3103
3104 let compiled = QueryBuilder::nodes("Meeting")
3105 .text_search("budget", 5)
3106 .compile()
3107 .expect("compiled query");
3108
3109 let plan = coordinator.explain_compiled_read(&compiled);
3110
3111 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3112 }
3113
3114 #[test]
3115 fn explain_returns_correct_driving_table() {
3116 use fathomdb_query::DrivingTable;
3117
3118 let db = NamedTempFile::new().expect("temporary db");
3119 let coordinator = ExecutionCoordinator::open(
3120 db.path(),
3121 Arc::new(SchemaManager::new()),
3122 None,
3123 1,
3124 Arc::new(TelemetryCounters::default()),
3125 None,
3126 )
3127 .expect("coordinator");
3128
3129 let compiled = QueryBuilder::nodes("Meeting")
3130 .text_search("budget", 5)
3131 .compile()
3132 .expect("compiled query");
3133
3134 let plan = coordinator.explain_compiled_read(&compiled);
3135
3136 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3137 }
3138
3139 #[test]
3140 fn explain_reports_cache_miss_then_hit() {
3141 let db = NamedTempFile::new().expect("temporary db");
3142 let coordinator = ExecutionCoordinator::open(
3143 db.path(),
3144 Arc::new(SchemaManager::new()),
3145 None,
3146 1,
3147 Arc::new(TelemetryCounters::default()),
3148 None,
3149 )
3150 .expect("coordinator");
3151
3152 let compiled = QueryBuilder::nodes("Meeting")
3153 .text_search("budget", 5)
3154 .compile()
3155 .expect("compiled query");
3156
3157 let plan_before = coordinator.explain_compiled_read(&compiled);
3159 assert!(
3160 !plan_before.cache_hit,
3161 "cache miss expected before first execute"
3162 );
3163
3164 coordinator
3166 .execute_compiled_read(&compiled)
3167 .expect("execute read");
3168
3169 let plan_after = coordinator.explain_compiled_read(&compiled);
3171 assert!(
3172 plan_after.cache_hit,
3173 "cache hit expected after first execute"
3174 );
3175 }
3176
3177 #[test]
3178 fn explain_does_not_execute_query() {
3179 let db = NamedTempFile::new().expect("temporary db");
3184 let coordinator = ExecutionCoordinator::open(
3185 db.path(),
3186 Arc::new(SchemaManager::new()),
3187 None,
3188 1,
3189 Arc::new(TelemetryCounters::default()),
3190 None,
3191 )
3192 .expect("coordinator");
3193
3194 let compiled = QueryBuilder::nodes("Meeting")
3195 .text_search("anything", 5)
3196 .compile()
3197 .expect("compiled query");
3198
3199 let plan = coordinator.explain_compiled_read(&compiled);
3201
3202 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3203 assert_eq!(plan.bind_count, compiled.binds.len());
3204 }
3205
3206 #[test]
3207 fn coordinator_executes_compiled_read() {
3208 let db = NamedTempFile::new().expect("temporary db");
3209 let coordinator = ExecutionCoordinator::open(
3210 db.path(),
3211 Arc::new(SchemaManager::new()),
3212 None,
3213 1,
3214 Arc::new(TelemetryCounters::default()),
3215 None,
3216 )
3217 .expect("coordinator");
3218 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3219
3220 conn.execute_batch(
3221 r#"
3222 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3223 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3224 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3225 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
3226 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3227 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
3228 "#,
3229 )
3230 .expect("seed data");
3231
3232 let compiled = QueryBuilder::nodes("Meeting")
3233 .text_search("budget", 5)
3234 .limit(5)
3235 .compile()
3236 .expect("compiled query");
3237
3238 let rows = coordinator
3239 .execute_compiled_read(&compiled)
3240 .expect("execute read");
3241
3242 assert_eq!(rows.nodes.len(), 1);
3243 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3244 }
3245
3246 #[test]
3247 fn text_search_finds_structured_only_node_via_property_fts() {
3248 let db = NamedTempFile::new().expect("temporary db");
3249 let coordinator = ExecutionCoordinator::open(
3250 db.path(),
3251 Arc::new(SchemaManager::new()),
3252 None,
3253 1,
3254 Arc::new(TelemetryCounters::default()),
3255 None,
3256 )
3257 .expect("coordinator");
3258 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3259
3260 conn.execute_batch(
3262 r#"
3263 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3264 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
3265 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
3266 VALUES ('goal-1', 'Goal', 'Ship v2');
3267 "#,
3268 )
3269 .expect("seed data");
3270
3271 let compiled = QueryBuilder::nodes("Goal")
3272 .text_search("Ship", 5)
3273 .limit(5)
3274 .compile()
3275 .expect("compiled query");
3276
3277 let rows = coordinator
3278 .execute_compiled_read(&compiled)
3279 .expect("execute read");
3280
3281 assert_eq!(rows.nodes.len(), 1);
3282 assert_eq!(rows.nodes[0].logical_id, "goal-1");
3283 }
3284
3285 #[test]
3286 fn text_search_returns_both_chunk_and_property_backed_hits() {
3287 let db = NamedTempFile::new().expect("temporary db");
3288 let coordinator = ExecutionCoordinator::open(
3289 db.path(),
3290 Arc::new(SchemaManager::new()),
3291 None,
3292 1,
3293 Arc::new(TelemetryCounters::default()),
3294 None,
3295 )
3296 .expect("coordinator");
3297 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3298
3299 conn.execute_batch(
3301 r"
3302 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3303 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3304 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3305 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3306 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3307 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3308 ",
3309 )
3310 .expect("seed chunk-backed node");
3311
3312 conn.execute_batch(
3314 r#"
3315 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3316 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3317 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
3318 VALUES ('meeting-2', 'Meeting', 'quarterly sync');
3319 "#,
3320 )
3321 .expect("seed property-backed node");
3322
3323 let compiled = QueryBuilder::nodes("Meeting")
3324 .text_search("quarterly", 10)
3325 .limit(10)
3326 .compile()
3327 .expect("compiled query");
3328
3329 let rows = coordinator
3330 .execute_compiled_read(&compiled)
3331 .expect("execute read");
3332
3333 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3334 ids.sort_unstable();
3335 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3336 }
3337
3338 #[test]
3339 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3340 let db = NamedTempFile::new().expect("temporary db");
3341 let coordinator = ExecutionCoordinator::open(
3342 db.path(),
3343 Arc::new(SchemaManager::new()),
3344 None,
3345 1,
3346 Arc::new(TelemetryCounters::default()),
3347 None,
3348 )
3349 .expect("coordinator");
3350 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3351
3352 conn.execute_batch(
3353 r"
3354 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3355 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3356 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3357 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3358 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3359 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3360 ",
3361 )
3362 .expect("seed chunk-backed node");
3363
3364 let compiled = QueryBuilder::nodes("Meeting")
3365 .text_search("not a ship", 10)
3366 .limit(10)
3367 .compile()
3368 .expect("compiled query");
3369
3370 let rows = coordinator
3371 .execute_compiled_read(&compiled)
3372 .expect("execute read");
3373
3374 assert_eq!(rows.nodes.len(), 1);
3375 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3376 }
3377
3378 #[test]
3381 fn capability_gate_reports_false_without_feature() {
3382 let db = NamedTempFile::new().expect("temporary db");
3383 let coordinator = ExecutionCoordinator::open(
3386 db.path(),
3387 Arc::new(SchemaManager::new()),
3388 None,
3389 1,
3390 Arc::new(TelemetryCounters::default()),
3391 None,
3392 )
3393 .expect("coordinator");
3394 assert!(
3395 !coordinator.vector_enabled(),
3396 "vector_enabled must be false when no dimension is requested"
3397 );
3398 }
3399
3400 #[cfg(feature = "sqlite-vec")]
3401 #[test]
3402 fn capability_gate_reports_true_when_feature_enabled() {
3403 let db = NamedTempFile::new().expect("temporary db");
3404 let coordinator = ExecutionCoordinator::open(
3405 db.path(),
3406 Arc::new(SchemaManager::new()),
3407 Some(128),
3408 1,
3409 Arc::new(TelemetryCounters::default()),
3410 None,
3411 )
3412 .expect("coordinator");
3413 assert!(
3414 coordinator.vector_enabled(),
3415 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3416 );
3417 }
3418
3419 #[test]
3422 fn read_run_returns_inserted_run() {
3423 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3424
3425 let db = NamedTempFile::new().expect("temporary db");
3426 let writer = WriterActor::start(
3427 db.path(),
3428 Arc::new(SchemaManager::new()),
3429 ProvenanceMode::Warn,
3430 Arc::new(TelemetryCounters::default()),
3431 )
3432 .expect("writer");
3433 writer
3434 .submit(WriteRequest {
3435 label: "runtime".to_owned(),
3436 nodes: vec![],
3437 node_retires: vec![],
3438 edges: vec![],
3439 edge_retires: vec![],
3440 chunks: vec![],
3441 runs: vec![RunInsert {
3442 id: "run-r1".to_owned(),
3443 kind: "session".to_owned(),
3444 status: "active".to_owned(),
3445 properties: "{}".to_owned(),
3446 source_ref: Some("src-1".to_owned()),
3447 upsert: false,
3448 supersedes_id: None,
3449 }],
3450 steps: vec![],
3451 actions: vec![],
3452 optional_backfills: vec![],
3453 vec_inserts: vec![],
3454 operational_writes: vec![],
3455 })
3456 .expect("write run");
3457
3458 let coordinator = ExecutionCoordinator::open(
3459 db.path(),
3460 Arc::new(SchemaManager::new()),
3461 None,
3462 1,
3463 Arc::new(TelemetryCounters::default()),
3464 None,
3465 )
3466 .expect("coordinator");
3467 let row = coordinator
3468 .read_run("run-r1")
3469 .expect("read_run")
3470 .expect("row exists");
3471 assert_eq!(row.id, "run-r1");
3472 assert_eq!(row.kind, "session");
3473 assert_eq!(row.status, "active");
3474 }
3475
3476 #[test]
3477 fn read_step_returns_inserted_step() {
3478 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
3479
3480 let db = NamedTempFile::new().expect("temporary db");
3481 let writer = WriterActor::start(
3482 db.path(),
3483 Arc::new(SchemaManager::new()),
3484 ProvenanceMode::Warn,
3485 Arc::new(TelemetryCounters::default()),
3486 )
3487 .expect("writer");
3488 writer
3489 .submit(WriteRequest {
3490 label: "runtime".to_owned(),
3491 nodes: vec![],
3492 node_retires: vec![],
3493 edges: vec![],
3494 edge_retires: vec![],
3495 chunks: vec![],
3496 runs: vec![RunInsert {
3497 id: "run-s1".to_owned(),
3498 kind: "session".to_owned(),
3499 status: "active".to_owned(),
3500 properties: "{}".to_owned(),
3501 source_ref: Some("src-1".to_owned()),
3502 upsert: false,
3503 supersedes_id: None,
3504 }],
3505 steps: vec![StepInsert {
3506 id: "step-s1".to_owned(),
3507 run_id: "run-s1".to_owned(),
3508 kind: "llm".to_owned(),
3509 status: "completed".to_owned(),
3510 properties: "{}".to_owned(),
3511 source_ref: Some("src-1".to_owned()),
3512 upsert: false,
3513 supersedes_id: None,
3514 }],
3515 actions: vec![],
3516 optional_backfills: vec![],
3517 vec_inserts: vec![],
3518 operational_writes: vec![],
3519 })
3520 .expect("write step");
3521
3522 let coordinator = ExecutionCoordinator::open(
3523 db.path(),
3524 Arc::new(SchemaManager::new()),
3525 None,
3526 1,
3527 Arc::new(TelemetryCounters::default()),
3528 None,
3529 )
3530 .expect("coordinator");
3531 let row = coordinator
3532 .read_step("step-s1")
3533 .expect("read_step")
3534 .expect("row exists");
3535 assert_eq!(row.id, "step-s1");
3536 assert_eq!(row.run_id, "run-s1");
3537 assert_eq!(row.kind, "llm");
3538 }
3539
3540 #[test]
3541 fn read_action_returns_inserted_action() {
3542 use crate::{
3543 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
3544 writer::{ActionInsert, StepInsert},
3545 };
3546
3547 let db = NamedTempFile::new().expect("temporary db");
3548 let writer = WriterActor::start(
3549 db.path(),
3550 Arc::new(SchemaManager::new()),
3551 ProvenanceMode::Warn,
3552 Arc::new(TelemetryCounters::default()),
3553 )
3554 .expect("writer");
3555 writer
3556 .submit(WriteRequest {
3557 label: "runtime".to_owned(),
3558 nodes: vec![],
3559 node_retires: vec![],
3560 edges: vec![],
3561 edge_retires: vec![],
3562 chunks: vec![],
3563 runs: vec![RunInsert {
3564 id: "run-a1".to_owned(),
3565 kind: "session".to_owned(),
3566 status: "active".to_owned(),
3567 properties: "{}".to_owned(),
3568 source_ref: Some("src-1".to_owned()),
3569 upsert: false,
3570 supersedes_id: None,
3571 }],
3572 steps: vec![StepInsert {
3573 id: "step-a1".to_owned(),
3574 run_id: "run-a1".to_owned(),
3575 kind: "llm".to_owned(),
3576 status: "completed".to_owned(),
3577 properties: "{}".to_owned(),
3578 source_ref: Some("src-1".to_owned()),
3579 upsert: false,
3580 supersedes_id: None,
3581 }],
3582 actions: vec![ActionInsert {
3583 id: "action-a1".to_owned(),
3584 step_id: "step-a1".to_owned(),
3585 kind: "emit".to_owned(),
3586 status: "completed".to_owned(),
3587 properties: "{}".to_owned(),
3588 source_ref: Some("src-1".to_owned()),
3589 upsert: false,
3590 supersedes_id: None,
3591 }],
3592 optional_backfills: vec![],
3593 vec_inserts: vec![],
3594 operational_writes: vec![],
3595 })
3596 .expect("write action");
3597
3598 let coordinator = ExecutionCoordinator::open(
3599 db.path(),
3600 Arc::new(SchemaManager::new()),
3601 None,
3602 1,
3603 Arc::new(TelemetryCounters::default()),
3604 None,
3605 )
3606 .expect("coordinator");
3607 let row = coordinator
3608 .read_action("action-a1")
3609 .expect("read_action")
3610 .expect("row exists");
3611 assert_eq!(row.id, "action-a1");
3612 assert_eq!(row.step_id, "step-a1");
3613 assert_eq!(row.kind, "emit");
3614 }
3615
3616 #[test]
3617 fn read_active_runs_excludes_superseded() {
3618 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3619
3620 let db = NamedTempFile::new().expect("temporary db");
3621 let writer = WriterActor::start(
3622 db.path(),
3623 Arc::new(SchemaManager::new()),
3624 ProvenanceMode::Warn,
3625 Arc::new(TelemetryCounters::default()),
3626 )
3627 .expect("writer");
3628
3629 writer
3631 .submit(WriteRequest {
3632 label: "v1".to_owned(),
3633 nodes: vec![],
3634 node_retires: vec![],
3635 edges: vec![],
3636 edge_retires: vec![],
3637 chunks: vec![],
3638 runs: vec![RunInsert {
3639 id: "run-v1".to_owned(),
3640 kind: "session".to_owned(),
3641 status: "active".to_owned(),
3642 properties: "{}".to_owned(),
3643 source_ref: Some("src-1".to_owned()),
3644 upsert: false,
3645 supersedes_id: None,
3646 }],
3647 steps: vec![],
3648 actions: vec![],
3649 optional_backfills: vec![],
3650 vec_inserts: vec![],
3651 operational_writes: vec![],
3652 })
3653 .expect("v1 write");
3654
3655 writer
3657 .submit(WriteRequest {
3658 label: "v2".to_owned(),
3659 nodes: vec![],
3660 node_retires: vec![],
3661 edges: vec![],
3662 edge_retires: vec![],
3663 chunks: vec![],
3664 runs: vec![RunInsert {
3665 id: "run-v2".to_owned(),
3666 kind: "session".to_owned(),
3667 status: "completed".to_owned(),
3668 properties: "{}".to_owned(),
3669 source_ref: Some("src-2".to_owned()),
3670 upsert: true,
3671 supersedes_id: Some("run-v1".to_owned()),
3672 }],
3673 steps: vec![],
3674 actions: vec![],
3675 optional_backfills: vec![],
3676 vec_inserts: vec![],
3677 operational_writes: vec![],
3678 })
3679 .expect("v2 write");
3680
3681 let coordinator = ExecutionCoordinator::open(
3682 db.path(),
3683 Arc::new(SchemaManager::new()),
3684 None,
3685 1,
3686 Arc::new(TelemetryCounters::default()),
3687 None,
3688 )
3689 .expect("coordinator");
3690 let active = coordinator.read_active_runs().expect("read_active_runs");
3691
3692 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
3693 assert_eq!(active[0].id, "run-v2");
3694 }
3695
3696 #[allow(clippy::panic)]
3697 fn poison_connection(coordinator: &ExecutionCoordinator) {
3698 let result = catch_unwind(AssertUnwindSafe(|| {
3699 let _guard = coordinator.pool.connections[0]
3700 .lock()
3701 .expect("poison test lock");
3702 panic!("poison coordinator connection mutex");
3703 }));
3704 assert!(
3705 result.is_err(),
3706 "poison test must unwind while holding the connection mutex"
3707 );
3708 }
3709
3710 #[allow(clippy::panic)]
3711 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
3712 where
3713 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
3714 {
3715 match op(coordinator) {
3716 Err(EngineError::Bridge(message)) => {
3717 assert_eq!(message, "connection mutex poisoned");
3718 }
3719 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
3720 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
3721 }
3722 }
3723
3724 #[test]
3725 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
3726 let db = NamedTempFile::new().expect("temporary db");
3727 let coordinator = ExecutionCoordinator::open(
3728 db.path(),
3729 Arc::new(SchemaManager::new()),
3730 None,
3731 1,
3732 Arc::new(TelemetryCounters::default()),
3733 None,
3734 )
3735 .expect("coordinator");
3736
3737 poison_connection(&coordinator);
3738
3739 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
3740 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
3741 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
3742 assert_poisoned_connection_error(
3743 &coordinator,
3744 super::ExecutionCoordinator::read_active_runs,
3745 );
3746 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
3747 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
3748 }
3749
3750 #[test]
3753 fn shape_cache_stays_bounded() {
3754 use fathomdb_query::ShapeHash;
3755
3756 let db = NamedTempFile::new().expect("temporary db");
3757 let coordinator = ExecutionCoordinator::open(
3758 db.path(),
3759 Arc::new(SchemaManager::new()),
3760 None,
3761 1,
3762 Arc::new(TelemetryCounters::default()),
3763 None,
3764 )
3765 .expect("coordinator");
3766
3767 {
3769 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
3770 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
3771 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
3772 }
3773 }
3774 let compiled = QueryBuilder::nodes("Meeting")
3779 .text_search("budget", 5)
3780 .limit(10)
3781 .compile()
3782 .expect("compiled query");
3783
3784 coordinator
3785 .execute_compiled_read(&compiled)
3786 .expect("execute read");
3787
3788 assert!(
3789 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
3790 "shape cache must stay bounded: got {} entries, max {}",
3791 coordinator.shape_sql_count(),
3792 super::MAX_SHAPE_CACHE_SIZE
3793 );
3794 }
3795
3796 #[test]
3799 fn read_pool_size_configurable() {
3800 let db = NamedTempFile::new().expect("temporary db");
3801 let coordinator = ExecutionCoordinator::open(
3802 db.path(),
3803 Arc::new(SchemaManager::new()),
3804 None,
3805 2,
3806 Arc::new(TelemetryCounters::default()),
3807 None,
3808 )
3809 .expect("coordinator with pool_size=2");
3810
3811 assert_eq!(coordinator.pool.size(), 2);
3812
3813 let compiled = QueryBuilder::nodes("Meeting")
3815 .text_search("budget", 5)
3816 .limit(10)
3817 .compile()
3818 .expect("compiled query");
3819
3820 let result = coordinator.execute_compiled_read(&compiled);
3821 assert!(result.is_ok(), "read through pool must succeed");
3822 }
3823
3824 #[test]
3827 fn grouped_read_results_match_baseline() {
3828 use fathomdb_query::TraverseDirection;
3829
3830 let db = NamedTempFile::new().expect("temporary db");
3831
3832 let coordinator = ExecutionCoordinator::open(
3834 db.path(),
3835 Arc::new(SchemaManager::new()),
3836 None,
3837 1,
3838 Arc::new(TelemetryCounters::default()),
3839 None,
3840 )
3841 .expect("coordinator");
3842
3843 {
3846 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
3847 for i in 0..10 {
3848 conn.execute_batch(&format!(
3849 r#"
3850 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3851 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
3852 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3853 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
3854 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3855 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
3856
3857 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3858 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
3859 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3860 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
3861
3862 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3863 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
3864 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3865 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
3866 "#,
3867 )).expect("seed data");
3868 }
3869 }
3870
3871 let compiled = QueryBuilder::nodes("Meeting")
3872 .text_search("meeting", 10)
3873 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None)
3874 .limit(10)
3875 .compile_grouped()
3876 .expect("compiled grouped query");
3877
3878 let result = coordinator
3879 .execute_compiled_grouped_read(&compiled)
3880 .expect("grouped read");
3881
3882 assert!(!result.was_degraded, "grouped read should not be degraded");
3883 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
3884 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
3885 assert_eq!(result.expansions[0].slot, "tasks");
3886 assert_eq!(
3887 result.expansions[0].roots.len(),
3888 10,
3889 "each expansion slot should have entries for all 10 roots"
3890 );
3891
3892 for root_expansion in &result.expansions[0].roots {
3894 assert_eq!(
3895 root_expansion.nodes.len(),
3896 2,
3897 "root {} should have 2 expansion nodes, got {}",
3898 root_expansion.root_logical_id,
3899 root_expansion.nodes.len()
3900 );
3901 }
3902 }
3903}