1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRawVectorSearch,
9 CompiledRetrievalPlan, CompiledSearch, CompiledSearchPlan, CompiledSemanticSearch,
10 CompiledVectorSearch, DrivingTable, EdgeExpansionSlot, ExpansionSlot, FALLBACK_TRIGGER_K,
11 HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch, SearchHit,
12 SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
13};
14use fathomdb_schema::SchemaManager;
15use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
16
17use crate::embedder::QueryEmbedder;
18use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
19use crate::{EngineError, sqlite};
20
21const MAX_SHAPE_CACHE_SIZE: usize = 4096;
25
26const BATCH_CHUNK_SIZE: usize = 200;
31
32fn compile_expansion_in_filter(
37 p: usize,
38 path: &str,
39 value_binds: Vec<Value>,
40) -> (String, Vec<Value>) {
41 let first_val = p + 1;
42 let placeholders = (0..value_binds.len())
43 .map(|i| format!("?{}", first_val + i))
44 .collect::<Vec<_>>()
45 .join(", ");
46 let mut binds = vec![Value::Text(path.to_owned())];
47 binds.extend(value_binds);
48 (
49 format!("\n AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
50 binds,
51 )
52}
53
54#[allow(clippy::too_many_lines)]
69fn compile_expansion_filter(
70 filter: Option<&Predicate>,
71 first_param: usize,
72) -> (String, Vec<Value>) {
73 let Some(predicate) = filter else {
74 return (String::new(), vec![]);
75 };
76 let p = first_param;
77 match predicate {
78 Predicate::JsonPathEq { path, value } => {
79 let val = match value {
80 ScalarValue::Text(t) => Value::Text(t.clone()),
81 ScalarValue::Integer(i) => Value::Integer(*i),
82 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
83 };
84 (
85 format!(
86 "\n AND json_extract(n.properties, ?{p}) = ?{}",
87 p + 1
88 ),
89 vec![Value::Text(path.clone()), val],
90 )
91 }
92 Predicate::JsonPathCompare { path, op, value } => {
93 let val = match value {
94 ScalarValue::Text(t) => Value::Text(t.clone()),
95 ScalarValue::Integer(i) => Value::Integer(*i),
96 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
97 };
98 let operator = match op {
99 ComparisonOp::Gt => ">",
100 ComparisonOp::Gte => ">=",
101 ComparisonOp::Lt => "<",
102 ComparisonOp::Lte => "<=",
103 };
104 (
105 format!(
106 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
107 p + 1
108 ),
109 vec![Value::Text(path.clone()), val],
110 )
111 }
112 Predicate::JsonPathFusedEq { path, value } => (
113 format!(
114 "\n AND json_extract(n.properties, ?{p}) = ?{}",
115 p + 1
116 ),
117 vec![Value::Text(path.clone()), Value::Text(value.clone())],
118 ),
119 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
120 let operator = match op {
121 ComparisonOp::Gt => ">",
122 ComparisonOp::Gte => ">=",
123 ComparisonOp::Lt => "<",
124 ComparisonOp::Lte => "<=",
125 };
126 (
127 format!(
128 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
129 p + 1
130 ),
131 vec![Value::Text(path.clone()), Value::Integer(*value)],
132 )
133 }
134 Predicate::JsonPathFusedBoolEq { path, value } => (
135 format!(
136 "\n AND json_extract(n.properties, ?{p}) = ?{}",
137 p + 1
138 ),
139 vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
140 ),
141 Predicate::KindEq(kind) => (
142 format!("\n AND n.kind = ?{p}"),
143 vec![Value::Text(kind.clone())],
144 ),
145 Predicate::LogicalIdEq(logical_id) => (
146 format!("\n AND n.logical_id = ?{p}"),
147 vec![Value::Text(logical_id.clone())],
148 ),
149 Predicate::SourceRefEq(source_ref) => (
150 format!("\n AND n.source_ref = ?{p}"),
151 vec![Value::Text(source_ref.clone())],
152 ),
153 Predicate::ContentRefEq(uri) => (
154 format!("\n AND n.content_ref = ?{p}"),
155 vec![Value::Text(uri.clone())],
156 ),
157 Predicate::ContentRefNotNull => (
158 "\n AND n.content_ref IS NOT NULL".to_owned(),
159 vec![],
160 ),
161 Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
162 unreachable!(
163 "compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
164 );
165 }
166 Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
167 p,
168 path,
169 values.iter().map(|v| Value::Text(v.clone())).collect(),
170 ),
171 Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
172 p,
173 path,
174 values
175 .iter()
176 .map(|v| match v {
177 ScalarValue::Text(t) => Value::Text(t.clone()),
178 ScalarValue::Integer(i) => Value::Integer(*i),
179 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
180 })
181 .collect(),
182 ),
183 }
184}
185
186fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
196 let Some(predicate) = filter else {
197 return (String::new(), vec![]);
198 };
199 let p = first_param;
200 match predicate {
201 Predicate::EdgePropertyEq { path, value } => {
202 let val = match value {
203 ScalarValue::Text(t) => Value::Text(t.clone()),
204 ScalarValue::Integer(i) => Value::Integer(*i),
205 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
206 };
207 (
208 format!(
209 "\n AND json_extract(e.properties, ?{p}) = ?{}",
210 p + 1
211 ),
212 vec![Value::Text(path.clone()), val],
213 )
214 }
215 Predicate::EdgePropertyCompare { path, op, value } => {
216 let val = match value {
217 ScalarValue::Text(t) => Value::Text(t.clone()),
218 ScalarValue::Integer(i) => Value::Integer(*i),
219 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
220 };
221 let operator = match op {
222 ComparisonOp::Gt => ">",
223 ComparisonOp::Gte => ">=",
224 ComparisonOp::Lt => "<",
225 ComparisonOp::Lte => "<=",
226 };
227 (
228 format!(
229 "\n AND json_extract(e.properties, ?{p}) {operator} ?{}",
230 p + 1
231 ),
232 vec![Value::Text(path.clone()), val],
233 )
234 }
235 _ => {
236 unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
237 }
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
246pub enum TokenizerStrategy {
247 RecallOptimizedEnglish,
249 PrecisionOptimized,
251 SubstringTrigram,
253 GlobalCjk,
255 SourceCode,
257 Custom(String),
259}
260
261impl TokenizerStrategy {
262 pub fn from_str(s: &str) -> Self {
265 match s {
266 "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
267 "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
268 "trigram" => Self::SubstringTrigram,
269 "icu" => Self::GlobalCjk,
270 s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
271 other => Self::Custom(other.to_string()),
272 }
273 }
274}
275
276struct ReadPool {
281 connections: Vec<Mutex<Connection>>,
282}
283
284impl fmt::Debug for ReadPool {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 f.debug_struct("ReadPool")
287 .field("size", &self.connections.len())
288 .finish()
289 }
290}
291
292impl ReadPool {
293 fn new(
304 db_path: &Path,
305 pool_size: usize,
306 schema_manager: &SchemaManager,
307 vector_enabled: bool,
308 ) -> Result<Self, EngineError> {
309 let mut connections = Vec::with_capacity(pool_size);
310 for _ in 0..pool_size {
311 let conn = if vector_enabled {
312 #[cfg(feature = "sqlite-vec")]
313 {
314 sqlite::open_readonly_connection_with_vec(db_path)?
315 }
316 #[cfg(not(feature = "sqlite-vec"))]
317 {
318 sqlite::open_readonly_connection(db_path)?
319 }
320 } else {
321 sqlite::open_readonly_connection(db_path)?
322 };
323 schema_manager
324 .initialize_reader_connection(&conn)
325 .map_err(EngineError::Schema)?;
326 connections.push(Mutex::new(conn));
327 }
328 Ok(Self { connections })
329 }
330
331 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
340 for conn in &self.connections {
342 if let Ok(guard) = conn.try_lock() {
343 return Ok(guard);
344 }
345 }
346 self.connections[0].lock().map_err(|_| {
348 trace_error!("read pool: connection mutex poisoned");
349 EngineError::Bridge("connection mutex poisoned".to_owned())
350 })
351 }
352
353 #[cfg(test)]
355 fn size(&self) -> usize {
356 self.connections.len()
357 }
358}
359
360#[derive(Clone, Debug, PartialEq, Eq)]
364pub struct QueryPlan {
365 pub sql: String,
366 pub bind_count: usize,
367 pub driving_table: DrivingTable,
368 pub shape_hash: ShapeHash,
369 pub cache_hit: bool,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq)]
374pub struct NodeRow {
375 pub row_id: String,
377 pub logical_id: String,
379 pub kind: String,
381 pub properties: String,
383 pub content_ref: Option<String>,
385 pub last_accessed_at: Option<i64>,
387}
388
389#[derive(Clone, Debug, PartialEq)]
399pub struct EdgeRow {
400 pub row_id: String,
402 pub logical_id: String,
404 pub source_logical_id: String,
406 pub target_logical_id: String,
408 pub kind: String,
410 pub properties: String,
412 pub source_ref: Option<String>,
414 pub confidence: Option<f64>,
416}
417
418#[derive(Clone, Debug, PartialEq, Eq)]
420pub struct RunRow {
421 pub id: String,
423 pub kind: String,
425 pub status: String,
427 pub properties: String,
429}
430
431#[derive(Clone, Debug, PartialEq, Eq)]
433pub struct StepRow {
434 pub id: String,
436 pub run_id: String,
438 pub kind: String,
440 pub status: String,
442 pub properties: String,
444}
445
446#[derive(Clone, Debug, PartialEq, Eq)]
448pub struct ActionRow {
449 pub id: String,
451 pub step_id: String,
453 pub kind: String,
455 pub status: String,
457 pub properties: String,
459}
460
461#[derive(Clone, Debug, PartialEq, Eq)]
463pub struct ProvenanceEvent {
464 pub id: String,
465 pub event_type: String,
466 pub subject: String,
467 pub source_ref: Option<String>,
468 pub metadata_json: String,
469 pub created_at: i64,
470}
471
472#[derive(Clone, Debug, Default, PartialEq, Eq)]
474pub struct QueryRows {
475 pub nodes: Vec<NodeRow>,
477 pub runs: Vec<RunRow>,
479 pub steps: Vec<StepRow>,
481 pub actions: Vec<ActionRow>,
483 pub was_degraded: bool,
486}
487
488#[derive(Clone, Debug, PartialEq, Eq)]
490pub struct ExpansionRootRows {
491 pub root_logical_id: String,
493 pub nodes: Vec<NodeRow>,
495}
496
497#[derive(Clone, Debug, PartialEq, Eq)]
499pub struct ExpansionSlotRows {
500 pub slot: String,
502 pub roots: Vec<ExpansionRootRows>,
504}
505
506#[derive(Clone, Debug, PartialEq)]
511pub struct EdgeExpansionRootRows {
512 pub root_logical_id: String,
514 pub pairs: Vec<(EdgeRow, NodeRow)>,
518}
519
520#[derive(Clone, Debug, PartialEq)]
524pub struct EdgeExpansionSlotRows {
525 pub slot: String,
527 pub roots: Vec<EdgeExpansionRootRows>,
529}
530
531#[derive(Clone, Debug, Default, PartialEq)]
538pub struct GroupedQueryRows {
539 pub roots: Vec<NodeRow>,
541 pub expansions: Vec<ExpansionSlotRows>,
543 pub edge_expansions: Vec<EdgeExpansionSlotRows>,
545 pub was_degraded: bool,
547}
548
549pub struct ExecutionCoordinator {
551 database_path: PathBuf,
552 schema_manager: Arc<SchemaManager>,
553 pool: ReadPool,
554 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
555 vector_enabled: bool,
556 vec_degradation_warned: AtomicBool,
557 telemetry: Arc<TelemetryCounters>,
558 query_embedder: Option<Arc<dyn QueryEmbedder>>,
565 fts_strategies: HashMap<String, TokenizerStrategy>,
576}
577
578impl fmt::Debug for ExecutionCoordinator {
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580 f.debug_struct("ExecutionCoordinator")
581 .field("database_path", &self.database_path)
582 .finish_non_exhaustive()
583 }
584}
585
586impl ExecutionCoordinator {
587 pub fn open(
590 path: impl AsRef<Path>,
591 schema_manager: Arc<SchemaManager>,
592 vector_dimension: Option<usize>,
593 pool_size: usize,
594 telemetry: Arc<TelemetryCounters>,
595 query_embedder: Option<Arc<dyn QueryEmbedder>>,
596 ) -> Result<Self, EngineError> {
597 let path = path.as_ref().to_path_buf();
598 #[cfg(feature = "sqlite-vec")]
599 let mut conn = if vector_dimension.is_some() {
600 sqlite::open_connection_with_vec(&path)?
601 } else {
602 sqlite::open_connection(&path)?
603 };
604 #[cfg(not(feature = "sqlite-vec"))]
605 let mut conn = sqlite::open_connection(&path)?;
606
607 let report = schema_manager.bootstrap(&conn)?;
608
609 run_open_time_fts_guards(&mut conn)?;
628
629 #[cfg(feature = "sqlite-vec")]
630 let mut vector_enabled = report.vector_profile_enabled;
631 #[cfg(not(feature = "sqlite-vec"))]
632 let vector_enabled = {
633 let _ = &report;
634 false
635 };
636
637 if vector_dimension.is_some() {
642 #[cfg(feature = "sqlite-vec")]
643 {
644 vector_enabled = true;
645 }
646 }
647
648 if let Some(ref emb) = query_embedder {
651 check_vec_identity_at_open(&conn, emb.as_ref())?;
652 }
653
654 let fts_strategies: HashMap<String, TokenizerStrategy> = {
656 let mut map = HashMap::new();
657 let mut stmt = conn
658 .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
659 let rows = stmt.query_map([], |row| {
660 let kind: String = row.get(0)?;
661 let config_json: String = row.get(1)?;
662 Ok((kind, config_json))
663 })?;
664 for row in rows.flatten() {
665 let (kind, config_json) = row;
666 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
667 && let Some(tok) = v["tokenizer"].as_str()
668 {
669 map.insert(kind, TokenizerStrategy::from_str(tok));
670 }
671 }
672 map
673 };
674
675 drop(conn);
677
678 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
679
680 Ok(Self {
681 database_path: path,
682 schema_manager,
683 pool,
684 shape_sql_map: Mutex::new(HashMap::new()),
685 vector_enabled,
686 vec_degradation_warned: AtomicBool::new(false),
687 telemetry,
688 query_embedder,
689 fts_strategies,
690 })
691 }
692
693 pub fn database_path(&self) -> &Path {
695 &self.database_path
696 }
697
698 #[must_use]
700 pub fn vector_enabled(&self) -> bool {
701 self.vector_enabled
702 }
703
704 #[must_use]
711 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
712 self.query_embedder.as_ref()
713 }
714
715 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
716 self.pool.acquire()
717 }
718
719 #[must_use]
725 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
726 let mut total = SqliteCacheStatus::default();
727 for conn_mutex in &self.pool.connections {
728 if let Ok(conn) = conn_mutex.try_lock() {
729 total.add(&read_db_cache_status(&conn));
730 }
731 }
732 total
733 }
734
735 #[allow(clippy::expect_used, clippy::too_many_lines)]
738 pub fn execute_compiled_read(
739 &self,
740 compiled: &CompiledQuery,
741 ) -> Result<QueryRows, EngineError> {
742 if let Some(semantic) = compiled.semantic_search.as_ref() {
747 let search_rows = self.execute_compiled_semantic_search(semantic)?;
748 return Ok(search_rows_to_query_rows(search_rows));
749 }
750 if let Some(raw) = compiled.raw_vector_search.as_ref() {
751 let search_rows = self.execute_compiled_raw_vector_search(raw)?;
752 return Ok(search_rows_to_query_rows(search_rows));
753 }
754 if compiled.driving_table == DrivingTable::FtsNodes
759 && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
760 && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
761 {
762 self.telemetry.increment_queries();
763 return Ok(QueryRows {
764 nodes,
765 runs: Vec::new(),
766 steps: Vec::new(),
767 actions: Vec::new(),
768 was_degraded: false,
769 });
770 }
771
772 let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
782 let conn_check = match self.lock_connection() {
783 Ok(g) => g,
784 Err(e) => {
785 self.telemetry.increment_errors();
786 return Err(e);
787 }
788 };
789 let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
790 drop(conn_check);
791 result?
792 } else if compiled.driving_table == DrivingTable::VecNodes {
793 let root_kind = compiled
794 .binds
795 .get(1)
796 .and_then(|b| {
797 if let BindValue::Text(k) = b {
798 Some(k.as_str())
799 } else {
800 None
801 }
802 })
803 .unwrap_or("");
804 let vec_table = if root_kind.is_empty() {
805 "vec__unknown".to_owned()
806 } else {
807 fathomdb_schema::vec_kind_table_name(root_kind)
808 };
809 let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
810 (new_sql, compiled.binds.clone())
811 } else {
812 (compiled.sql.clone(), compiled.binds.clone())
813 };
814
815 let row_sql = wrap_node_row_projection_sql(&adapted_sql);
816 {
822 let mut cache = self
823 .shape_sql_map
824 .lock()
825 .unwrap_or_else(PoisonError::into_inner);
826 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
827 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
828 cache.clear();
829 }
830 cache.insert(compiled.shape_hash, row_sql.clone());
831 }
832
833 let bind_values = adapted_binds
834 .iter()
835 .map(bind_value_to_sql)
836 .collect::<Vec<_>>();
837
838 let conn_guard = match self.lock_connection() {
843 Ok(g) => g,
844 Err(e) => {
845 self.telemetry.increment_errors();
846 return Err(e);
847 }
848 };
849 let mut statement = match conn_guard.prepare_cached(&row_sql) {
850 Ok(stmt) => stmt,
851 Err(e) if is_vec_table_absent(&e) => {
852 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
853 trace_warn!("vector table absent, degrading to non-vector query");
854 }
855 return Ok(QueryRows {
856 was_degraded: true,
857 ..Default::default()
858 });
859 }
860 Err(e) => {
861 self.telemetry.increment_errors();
862 return Err(EngineError::Sqlite(e));
863 }
864 };
865 let nodes = match statement
866 .query_map(params_from_iter(bind_values.iter()), |row| {
867 Ok(NodeRow {
868 row_id: row.get(0)?,
869 logical_id: row.get(1)?,
870 kind: row.get(2)?,
871 properties: row.get(3)?,
872 content_ref: row.get(4)?,
873 last_accessed_at: row.get(5)?,
874 })
875 })
876 .and_then(Iterator::collect)
877 {
878 Ok(rows) => rows,
879 Err(e) => {
880 self.telemetry.increment_errors();
881 return Err(EngineError::Sqlite(e));
882 }
883 };
884
885 self.telemetry.increment_queries();
886 Ok(QueryRows {
887 nodes,
888 runs: Vec::new(),
889 steps: Vec::new(),
890 actions: Vec::new(),
891 was_degraded: false,
892 })
893 }
894
895 pub fn execute_compiled_search(
910 &self,
911 compiled: &CompiledSearch,
912 ) -> Result<SearchRows, EngineError> {
913 let (relaxed_query, was_degraded_at_plan_time) =
920 fathomdb_query::derive_relaxed(&compiled.text_query);
921 let relaxed = relaxed_query.map(|q| CompiledSearch {
922 root_kind: compiled.root_kind.clone(),
923 text_query: q,
924 limit: compiled.limit,
925 fusable_filters: compiled.fusable_filters.clone(),
926 residual_filters: compiled.residual_filters.clone(),
927 attribution_requested: compiled.attribution_requested,
928 });
929 let plan = CompiledSearchPlan {
930 strict: compiled.clone(),
931 relaxed,
932 was_degraded_at_plan_time,
933 };
934 self.execute_compiled_search_plan(&plan)
935 }
936
937 pub fn execute_compiled_search_plan(
956 &self,
957 plan: &CompiledSearchPlan,
958 ) -> Result<SearchRows, EngineError> {
959 let strict = &plan.strict;
960 let limit = strict.limit;
961 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
962
963 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
964 let strict_underfilled = strict_hits.len() < fallback_threshold;
965
966 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
967 let mut fallback_used = false;
968 let mut was_degraded = false;
969 if let Some(relaxed) = plan.relaxed.as_ref()
970 && strict_underfilled
971 {
972 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
973 fallback_used = true;
974 was_degraded = plan.was_degraded_at_plan_time;
975 }
976
977 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
978 if strict.attribution_requested {
982 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
983 self.populate_attribution_for_hits(
984 &mut merged,
985 &strict.text_query,
986 relaxed_text_query,
987 )?;
988 }
989 let strict_hit_count = merged
990 .iter()
991 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
992 .count();
993 let relaxed_hit_count = merged
994 .iter()
995 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
996 .count();
997 let vector_hit_count = 0;
1001
1002 Ok(SearchRows {
1003 hits: merged,
1004 strict_hit_count,
1005 relaxed_hit_count,
1006 vector_hit_count,
1007 fallback_used,
1008 was_degraded,
1009 })
1010 }
1011
1012 #[allow(clippy::too_many_lines)]
1041 pub fn execute_compiled_vector_search(
1042 &self,
1043 compiled: &CompiledVectorSearch,
1044 ) -> Result<SearchRows, EngineError> {
1045 use std::fmt::Write as _;
1046
1047 if compiled.limit == 0 {
1051 return Ok(SearchRows::default());
1052 }
1053
1054 let filter_by_kind = !compiled.root_kind.is_empty();
1055 let mut binds: Vec<BindValue> = Vec::new();
1056 binds.push(BindValue::Text(compiled.query_text.clone()));
1057 if filter_by_kind {
1058 binds.push(BindValue::Text(compiled.root_kind.clone()));
1059 }
1060
1061 let mut fused_clauses = String::new();
1064 for predicate in &compiled.fusable_filters {
1065 match predicate {
1066 Predicate::KindEq(kind) => {
1067 binds.push(BindValue::Text(kind.clone()));
1068 let idx = binds.len();
1069 let _ = write!(
1070 fused_clauses,
1071 "\n AND src.kind = ?{idx}"
1072 );
1073 }
1074 Predicate::LogicalIdEq(logical_id) => {
1075 binds.push(BindValue::Text(logical_id.clone()));
1076 let idx = binds.len();
1077 let _ = write!(
1078 fused_clauses,
1079 "\n AND src.logical_id = ?{idx}"
1080 );
1081 }
1082 Predicate::SourceRefEq(source_ref) => {
1083 binds.push(BindValue::Text(source_ref.clone()));
1084 let idx = binds.len();
1085 let _ = write!(
1086 fused_clauses,
1087 "\n AND src.source_ref = ?{idx}"
1088 );
1089 }
1090 Predicate::ContentRefEq(uri) => {
1091 binds.push(BindValue::Text(uri.clone()));
1092 let idx = binds.len();
1093 let _ = write!(
1094 fused_clauses,
1095 "\n AND src.content_ref = ?{idx}"
1096 );
1097 }
1098 Predicate::ContentRefNotNull => {
1099 fused_clauses
1100 .push_str("\n AND src.content_ref IS NOT NULL");
1101 }
1102 Predicate::JsonPathFusedEq { path, value } => {
1103 binds.push(BindValue::Text(path.clone()));
1104 let path_idx = binds.len();
1105 binds.push(BindValue::Text(value.clone()));
1106 let value_idx = binds.len();
1107 let _ = write!(
1108 fused_clauses,
1109 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1110 );
1111 }
1112 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1113 binds.push(BindValue::Text(path.clone()));
1114 let path_idx = binds.len();
1115 binds.push(BindValue::Integer(*value));
1116 let value_idx = binds.len();
1117 let operator = match op {
1118 ComparisonOp::Gt => ">",
1119 ComparisonOp::Gte => ">=",
1120 ComparisonOp::Lt => "<",
1121 ComparisonOp::Lte => "<=",
1122 };
1123 let _ = write!(
1124 fused_clauses,
1125 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
1126 );
1127 }
1128 Predicate::JsonPathFusedBoolEq { path, value } => {
1129 binds.push(BindValue::Text(path.clone()));
1130 let path_idx = binds.len();
1131 binds.push(BindValue::Integer(i64::from(*value)));
1132 let value_idx = binds.len();
1133 let _ = write!(
1134 fused_clauses,
1135 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1136 );
1137 }
1138 Predicate::JsonPathFusedIn { path, values } => {
1139 binds.push(BindValue::Text(path.clone()));
1140 let first_param = binds.len();
1141 for v in values {
1142 binds.push(BindValue::Text(v.clone()));
1143 }
1144 let placeholders = (1..=values.len())
1145 .map(|i| format!("?{}", first_param + i))
1146 .collect::<Vec<_>>()
1147 .join(", ");
1148 let _ = write!(
1149 fused_clauses,
1150 "\n AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
1151 );
1152 }
1153 Predicate::JsonPathEq { .. }
1154 | Predicate::JsonPathCompare { .. }
1155 | Predicate::JsonPathIn { .. }
1156 | Predicate::EdgePropertyEq { .. }
1157 | Predicate::EdgePropertyCompare { .. } => {
1158 }
1162 }
1163 }
1164
1165 let mut filter_clauses = String::new();
1167 for predicate in &compiled.residual_filters {
1168 match predicate {
1169 Predicate::JsonPathEq { path, value } => {
1170 binds.push(BindValue::Text(path.clone()));
1171 let path_idx = binds.len();
1172 binds.push(scalar_to_bind(value));
1173 let value_idx = binds.len();
1174 let _ = write!(
1175 filter_clauses,
1176 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1177 );
1178 }
1179 Predicate::JsonPathCompare { path, op, value } => {
1180 binds.push(BindValue::Text(path.clone()));
1181 let path_idx = binds.len();
1182 binds.push(scalar_to_bind(value));
1183 let value_idx = binds.len();
1184 let operator = match op {
1185 ComparisonOp::Gt => ">",
1186 ComparisonOp::Gte => ">=",
1187 ComparisonOp::Lt => "<",
1188 ComparisonOp::Lte => "<=",
1189 };
1190 let _ = write!(
1191 filter_clauses,
1192 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1193 );
1194 }
1195 Predicate::JsonPathIn { path, values } => {
1196 binds.push(BindValue::Text(path.clone()));
1197 let first_param = binds.len();
1198 for v in values {
1199 binds.push(scalar_to_bind(v));
1200 }
1201 let placeholders = (1..=values.len())
1202 .map(|i| format!("?{}", first_param + i))
1203 .collect::<Vec<_>>()
1204 .join(", ");
1205 let _ = write!(
1206 filter_clauses,
1207 "\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1208 );
1209 }
1210 Predicate::KindEq(_)
1211 | Predicate::LogicalIdEq(_)
1212 | Predicate::SourceRefEq(_)
1213 | Predicate::ContentRefEq(_)
1214 | Predicate::ContentRefNotNull
1215 | Predicate::JsonPathFusedEq { .. }
1216 | Predicate::JsonPathFusedTimestampCmp { .. }
1217 | Predicate::JsonPathFusedBoolEq { .. }
1218 | Predicate::JsonPathFusedIn { .. }
1219 | Predicate::EdgePropertyEq { .. }
1220 | Predicate::EdgePropertyCompare { .. } => {
1221 }
1224 }
1225 }
1226
1227 let limit = compiled.limit;
1230 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1231 let limit_idx = binds.len();
1232
1233 let base_limit = limit;
1239 let kind_clause = if filter_by_kind {
1240 "\n AND src.kind = ?2"
1241 } else {
1242 ""
1243 };
1244
1245 let vec_table = if compiled.root_kind.is_empty() {
1249 "vec__unknown".to_owned()
1250 } else {
1251 fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1252 };
1253
1254 let sql = format!(
1255 "WITH vector_hits AS (
1256 SELECT
1257 src.row_id AS row_id,
1258 src.logical_id AS logical_id,
1259 src.kind AS kind,
1260 src.properties AS properties,
1261 src.source_ref AS source_ref,
1262 src.content_ref AS content_ref,
1263 src.created_at AS created_at,
1264 vc.distance AS distance,
1265 vc.chunk_id AS chunk_id
1266 FROM (
1267 SELECT chunk_id, distance
1268 FROM {vec_table}
1269 WHERE embedding MATCH ?1
1270 LIMIT {base_limit}
1271 ) vc
1272 JOIN chunks c ON c.id = vc.chunk_id
1273 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1274 WHERE 1 = 1{kind_clause}{fused_clauses}
1275 )
1276 SELECT
1277 h.row_id,
1278 h.logical_id,
1279 h.kind,
1280 h.properties,
1281 h.content_ref,
1282 am.last_accessed_at,
1283 h.created_at,
1284 h.distance,
1285 h.chunk_id
1286 FROM vector_hits h
1287 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1288 WHERE 1 = 1{filter_clauses}
1289 ORDER BY h.distance ASC
1290 LIMIT ?{limit_idx}"
1291 );
1292
1293 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1294
1295 let conn_guard = match self.lock_connection() {
1296 Ok(g) => g,
1297 Err(e) => {
1298 self.telemetry.increment_errors();
1299 return Err(e);
1300 }
1301 };
1302 let mut statement = match conn_guard.prepare_cached(&sql) {
1303 Ok(stmt) => stmt,
1304 Err(e) if is_vec_table_absent(&e) => {
1305 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1307 trace_warn!("vector table absent, degrading vector_search to empty result");
1308 }
1309 return Ok(SearchRows {
1310 hits: Vec::new(),
1311 strict_hit_count: 0,
1312 relaxed_hit_count: 0,
1313 vector_hit_count: 0,
1314 fallback_used: false,
1315 was_degraded: true,
1316 });
1317 }
1318 Err(e) => {
1319 self.telemetry.increment_errors();
1320 return Err(EngineError::Sqlite(e));
1321 }
1322 };
1323
1324 let attribution_requested = compiled.attribution_requested;
1325 let hits = match statement
1326 .query_map(params_from_iter(bind_values.iter()), |row| {
1327 let distance: f64 = row.get(7)?;
1328 let score = -distance;
1335 Ok(SearchHit {
1336 node: fathomdb_query::NodeRowLite {
1337 row_id: row.get(0)?,
1338 logical_id: row.get(1)?,
1339 kind: row.get(2)?,
1340 properties: row.get(3)?,
1341 content_ref: row.get(4)?,
1342 last_accessed_at: row.get(5)?,
1343 },
1344 written_at: row.get(6)?,
1345 score,
1346 modality: RetrievalModality::Vector,
1347 source: SearchHitSource::Vector,
1348 match_mode: None,
1350 snippet: None,
1352 projection_row_id: row.get::<_, Option<String>>(8)?,
1353 vector_distance: Some(distance),
1354 attribution: if attribution_requested {
1355 Some(HitAttribution {
1356 matched_paths: Vec::new(),
1357 })
1358 } else {
1359 None
1360 },
1361 })
1362 })
1363 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1364 {
1365 Ok(rows) => rows,
1366 Err(e) => {
1367 if is_vec_table_absent(&e) {
1371 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1372 trace_warn!(
1373 "vector table absent at query time, degrading vector_search to empty result"
1374 );
1375 }
1376 drop(statement);
1377 drop(conn_guard);
1378 return Ok(SearchRows {
1379 hits: Vec::new(),
1380 strict_hit_count: 0,
1381 relaxed_hit_count: 0,
1382 vector_hit_count: 0,
1383 fallback_used: false,
1384 was_degraded: true,
1385 });
1386 }
1387 self.telemetry.increment_errors();
1388 return Err(EngineError::Sqlite(e));
1389 }
1390 };
1391
1392 drop(statement);
1393 drop(conn_guard);
1394
1395 self.telemetry.increment_queries();
1396 let vector_hit_count = hits.len();
1397 Ok(SearchRows {
1398 hits,
1399 strict_hit_count: 0,
1400 relaxed_hit_count: 0,
1401 vector_hit_count,
1402 fallback_used: false,
1403 was_degraded: false,
1404 })
1405 }
1406
1407 pub fn execute_compiled_semantic_search(
1429 &self,
1430 compiled: &CompiledSemanticSearch,
1431 ) -> Result<SearchRows, EngineError> {
1432 let profile_dim = self
1434 .active_profile_dimensions()?
1435 .ok_or(EngineError::EmbedderNotConfigured)?;
1436
1437 let schema_state = self.enabled_vec_kind_state(&compiled.root_kind)?;
1439 let Some(state) = schema_state else {
1440 return Err(EngineError::KindNotVectorIndexed {
1441 kind: compiled.root_kind.clone(),
1442 });
1443 };
1444
1445 if state == "stale" {
1447 return Ok(SearchRows {
1448 was_degraded: true,
1449 ..SearchRows::default()
1450 });
1451 }
1452
1453 let Some(embedder) = self.query_embedder.as_ref() else {
1455 return Ok(SearchRows {
1456 was_degraded: true,
1457 ..SearchRows::default()
1458 });
1459 };
1460 let Ok(embedding) = embedder.embed_query(&compiled.text) else {
1461 return Ok(SearchRows {
1462 was_degraded: true,
1463 ..SearchRows::default()
1464 });
1465 };
1466
1467 if embedding.len() != profile_dim {
1469 return Err(EngineError::DimensionMismatch {
1470 expected: profile_dim,
1471 actual: embedding.len(),
1472 });
1473 }
1474
1475 let literal = serde_json::to_string(&embedding)
1477 .map_err(|e| EngineError::Bridge(format!("serialize query embedding: {e}")))?;
1478 let inner = CompiledVectorSearch {
1479 root_kind: compiled.root_kind.clone(),
1480 query_text: literal,
1481 limit: compiled.limit,
1482 fusable_filters: Vec::new(),
1483 residual_filters: Vec::new(),
1484 attribution_requested: false,
1485 };
1486 self.execute_compiled_vector_search(&inner)
1487 }
1488
1489 pub fn execute_compiled_raw_vector_search(
1507 &self,
1508 compiled: &CompiledRawVectorSearch,
1509 ) -> Result<SearchRows, EngineError> {
1510 let profile_dim = self
1511 .active_profile_dimensions()?
1512 .ok_or(EngineError::EmbedderNotConfigured)?;
1513
1514 let schema_state = self.enabled_vec_kind_state(&compiled.root_kind)?;
1515 let Some(state) = schema_state else {
1516 return Err(EngineError::KindNotVectorIndexed {
1517 kind: compiled.root_kind.clone(),
1518 });
1519 };
1520
1521 if compiled.vec.len() != profile_dim {
1522 return Err(EngineError::DimensionMismatch {
1523 expected: profile_dim,
1524 actual: compiled.vec.len(),
1525 });
1526 }
1527
1528 if state == "stale" {
1529 return Ok(SearchRows {
1530 was_degraded: true,
1531 ..SearchRows::default()
1532 });
1533 }
1534
1535 let literal = serde_json::to_string(&compiled.vec)
1536 .map_err(|e| EngineError::Bridge(format!("serialize raw vector: {e}")))?;
1537 let inner = CompiledVectorSearch {
1538 root_kind: compiled.root_kind.clone(),
1539 query_text: literal,
1540 limit: compiled.limit,
1541 fusable_filters: Vec::new(),
1542 residual_filters: Vec::new(),
1543 attribution_requested: false,
1544 };
1545 self.execute_compiled_vector_search(&inner)
1546 }
1547
1548 fn active_profile_dimensions(&self) -> Result<Option<usize>, EngineError> {
1551 let conn = self.lock_connection()?;
1552 let dim: Option<i64> = conn
1553 .query_row(
1554 "SELECT dimensions FROM vector_embedding_profiles WHERE active = 1",
1555 [],
1556 |row| row.get::<_, i64>(0),
1557 )
1558 .optional()?;
1559 Ok(dim.and_then(|d| usize::try_from(d).ok()))
1560 }
1561
1562 fn enabled_vec_kind_state(&self, kind: &str) -> Result<Option<String>, EngineError> {
1565 let conn = self.lock_connection()?;
1566 let row: Option<(i64, String)> = conn
1567 .query_row(
1568 "SELECT enabled, state FROM vector_index_schemas WHERE kind = ?1",
1569 rusqlite::params![kind],
1570 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1571 )
1572 .optional()?;
1573 Ok(row.and_then(|(enabled, state)| if enabled == 1 { Some(state) } else { None }))
1574 }
1575
1576 pub fn execute_retrieval_plan(
1608 &self,
1609 plan: &CompiledRetrievalPlan,
1610 raw_query: &str,
1611 ) -> Result<SearchRows, EngineError> {
1612 let mut plan = plan.clone();
1618 let limit = plan.text.strict.limit;
1619
1620 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1622
1623 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1626 let strict_underfilled = strict_hits.len() < fallback_threshold;
1627 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1628 let mut fallback_used = false;
1629 let mut was_degraded = false;
1630 if let Some(relaxed) = plan.text.relaxed.as_ref()
1631 && strict_underfilled
1632 {
1633 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1634 fallback_used = true;
1635 was_degraded = plan.was_degraded_at_plan_time;
1636 }
1637
1638 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1645 if text_branches_empty && self.query_embedder.is_some() {
1646 self.fill_vector_branch(&mut plan, raw_query);
1647 }
1648
1649 let mut vector_hits: Vec<SearchHit> = Vec::new();
1654 if let Some(vector) = plan.vector.as_ref()
1655 && strict_hits.is_empty()
1656 && relaxed_hits.is_empty()
1657 {
1658 let vector_rows = self.execute_compiled_vector_search(vector)?;
1659 vector_hits = vector_rows.hits;
1664 if vector_rows.was_degraded {
1665 was_degraded = true;
1666 }
1667 }
1668 if text_branches_empty
1675 && plan.was_degraded_at_plan_time
1676 && plan.vector.is_none()
1677 && self.query_embedder.is_some()
1678 {
1679 was_degraded = true;
1680 }
1681
1682 let strict = &plan.text.strict;
1684 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1685 if strict.attribution_requested {
1686 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1687 self.populate_attribution_for_hits(
1688 &mut merged,
1689 &strict.text_query,
1690 relaxed_text_query,
1691 )?;
1692 }
1693
1694 let strict_hit_count = merged
1695 .iter()
1696 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1697 .count();
1698 let relaxed_hit_count = merged
1699 .iter()
1700 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1701 .count();
1702 let vector_hit_count = merged
1703 .iter()
1704 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1705 .count();
1706
1707 Ok(SearchRows {
1708 hits: merged,
1709 strict_hit_count,
1710 relaxed_hit_count,
1711 vector_hit_count,
1712 fallback_used,
1713 was_degraded,
1714 })
1715 }
1716
1717 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1733 let Some(embedder) = self.query_embedder.as_ref() else {
1734 return;
1735 };
1736 match embedder.embed_query(raw_query) {
1737 Ok(vec) => {
1738 let literal = match serde_json::to_string(&vec) {
1744 Ok(s) => s,
1745 Err(err) => {
1746 trace_warn!(
1747 error = %err,
1748 "query embedder vector serialization failed; skipping vector branch"
1749 );
1750 let _ = err; plan.was_degraded_at_plan_time = true;
1752 return;
1753 }
1754 };
1755 let strict = &plan.text.strict;
1756 plan.vector = Some(CompiledVectorSearch {
1757 root_kind: strict.root_kind.clone(),
1758 query_text: literal,
1759 limit: strict.limit,
1760 fusable_filters: strict.fusable_filters.clone(),
1761 residual_filters: strict.residual_filters.clone(),
1762 attribution_requested: strict.attribution_requested,
1763 });
1764 }
1765 Err(err) => {
1766 trace_warn!(
1767 error = %err,
1768 "query embedder unavailable, skipping vector branch"
1769 );
1770 let _ = err; plan.was_degraded_at_plan_time = true;
1772 }
1773 }
1774 }
1775
1776 #[allow(clippy::too_many_lines)]
1785 fn run_search_branch(
1786 &self,
1787 compiled: &CompiledSearch,
1788 branch: SearchBranch,
1789 ) -> Result<Vec<SearchHit>, EngineError> {
1790 use std::fmt::Write as _;
1791 if matches!(
1803 compiled.text_query,
1804 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1805 ) {
1806 return Ok(Vec::new());
1807 }
1808 let rendered_base = render_text_query_fts5(&compiled.text_query);
1809 let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1822 if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1823 && rendered_base
1824 .chars()
1825 .filter(|c| c.is_alphanumeric())
1826 .count()
1827 < 3
1828 {
1829 return Ok(Vec::new());
1830 }
1831 let rendered = rendered_base;
1832 let filter_by_kind = !compiled.root_kind.is_empty();
1838
1839 let conn_guard = match self.lock_connection() {
1843 Ok(g) => g,
1844 Err(e) => {
1845 self.telemetry.increment_errors();
1846 return Err(e);
1847 }
1848 };
1849
1850 let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1868 let kind = compiled.root_kind.clone();
1869 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1870 let exists: bool = conn_guard
1871 .query_row(
1872 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1873 rusqlite::params![prop_table],
1874 |_| Ok(true),
1875 )
1876 .optional()
1877 .map_err(EngineError::Sqlite)?
1878 .unwrap_or(false);
1879 if exists {
1880 vec![(kind, prop_table)]
1881 } else {
1882 vec![]
1883 }
1884 } else {
1885 let kind_eq_values: Vec<String> = compiled
1890 .fusable_filters
1891 .iter()
1892 .filter_map(|p| match p {
1893 Predicate::KindEq(k) => Some(k.clone()),
1894 _ => None,
1895 })
1896 .collect();
1897 if kind_eq_values.len() == 1 {
1898 let kind = kind_eq_values[0].clone();
1899 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1900 let exists: bool = conn_guard
1901 .query_row(
1902 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1903 rusqlite::params![prop_table],
1904 |_| Ok(true),
1905 )
1906 .optional()
1907 .map_err(EngineError::Sqlite)?
1908 .unwrap_or(false);
1909 if exists {
1910 vec![(kind, prop_table)]
1911 } else {
1912 vec![]
1913 }
1914 } else {
1915 let mut stmt = conn_guard
1919 .prepare("SELECT kind FROM fts_property_schemas")
1920 .map_err(EngineError::Sqlite)?;
1921 let all_kinds: Vec<String> = stmt
1922 .query_map([], |r| r.get::<_, String>(0))
1923 .map_err(EngineError::Sqlite)?
1924 .collect::<Result<Vec<_>, _>>()
1925 .map_err(EngineError::Sqlite)?;
1926 drop(stmt);
1927 let mut result = Vec::new();
1928 for kind in all_kinds {
1929 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1930 let exists: bool = conn_guard
1931 .query_row(
1932 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1933 rusqlite::params![prop_table],
1934 |_| Ok(true),
1935 )
1936 .optional()
1937 .map_err(EngineError::Sqlite)?
1938 .unwrap_or(false);
1939 if exists {
1940 result.push((kind, prop_table));
1941 }
1942 }
1943 result
1944 }
1945 };
1946 let use_prop_fts = !prop_fts_tables.is_empty();
1947
1948 let mut binds: Vec<BindValue> = if filter_by_kind {
1954 if use_prop_fts {
1955 vec![
1956 BindValue::Text(rendered.clone()),
1957 BindValue::Text(compiled.root_kind.clone()),
1958 BindValue::Text(rendered),
1959 ]
1960 } else {
1961 vec![
1962 BindValue::Text(rendered.clone()),
1963 BindValue::Text(compiled.root_kind.clone()),
1964 ]
1965 }
1966 } else if use_prop_fts {
1967 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1969 } else {
1970 vec![BindValue::Text(rendered)]
1971 };
1972
1973 let mut fused_clauses = String::new();
1982 for predicate in &compiled.fusable_filters {
1983 match predicate {
1984 Predicate::KindEq(kind) => {
1985 binds.push(BindValue::Text(kind.clone()));
1986 let idx = binds.len();
1987 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1988 }
1989 Predicate::LogicalIdEq(logical_id) => {
1990 binds.push(BindValue::Text(logical_id.clone()));
1991 let idx = binds.len();
1992 let _ = write!(
1993 fused_clauses,
1994 "\n AND u.logical_id = ?{idx}"
1995 );
1996 }
1997 Predicate::SourceRefEq(source_ref) => {
1998 binds.push(BindValue::Text(source_ref.clone()));
1999 let idx = binds.len();
2000 let _ = write!(
2001 fused_clauses,
2002 "\n AND u.source_ref = ?{idx}"
2003 );
2004 }
2005 Predicate::ContentRefEq(uri) => {
2006 binds.push(BindValue::Text(uri.clone()));
2007 let idx = binds.len();
2008 let _ = write!(
2009 fused_clauses,
2010 "\n AND u.content_ref = ?{idx}"
2011 );
2012 }
2013 Predicate::ContentRefNotNull => {
2014 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
2015 }
2016 Predicate::JsonPathFusedEq { path, value } => {
2017 binds.push(BindValue::Text(path.clone()));
2018 let path_idx = binds.len();
2019 binds.push(BindValue::Text(value.clone()));
2020 let value_idx = binds.len();
2021 let _ = write!(
2022 fused_clauses,
2023 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
2024 );
2025 }
2026 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
2027 binds.push(BindValue::Text(path.clone()));
2028 let path_idx = binds.len();
2029 binds.push(BindValue::Integer(*value));
2030 let value_idx = binds.len();
2031 let operator = match op {
2032 ComparisonOp::Gt => ">",
2033 ComparisonOp::Gte => ">=",
2034 ComparisonOp::Lt => "<",
2035 ComparisonOp::Lte => "<=",
2036 };
2037 let _ = write!(
2038 fused_clauses,
2039 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
2040 );
2041 }
2042 Predicate::JsonPathFusedBoolEq { path, value } => {
2043 binds.push(BindValue::Text(path.clone()));
2044 let path_idx = binds.len();
2045 binds.push(BindValue::Integer(i64::from(*value)));
2046 let value_idx = binds.len();
2047 let _ = write!(
2048 fused_clauses,
2049 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
2050 );
2051 }
2052 Predicate::JsonPathFusedIn { path, values } => {
2053 binds.push(BindValue::Text(path.clone()));
2054 let first_param = binds.len();
2055 for v in values {
2056 binds.push(BindValue::Text(v.clone()));
2057 }
2058 let placeholders = (1..=values.len())
2059 .map(|i| format!("?{}", first_param + i))
2060 .collect::<Vec<_>>()
2061 .join(", ");
2062 let _ = write!(
2063 fused_clauses,
2064 "\n AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
2065 );
2066 }
2067 Predicate::JsonPathEq { .. }
2068 | Predicate::JsonPathCompare { .. }
2069 | Predicate::JsonPathIn { .. }
2070 | Predicate::EdgePropertyEq { .. }
2071 | Predicate::EdgePropertyCompare { .. } => {
2072 }
2076 }
2077 }
2078
2079 let mut filter_clauses = String::new();
2080 for predicate in &compiled.residual_filters {
2081 match predicate {
2082 Predicate::JsonPathEq { path, value } => {
2083 binds.push(BindValue::Text(path.clone()));
2084 let path_idx = binds.len();
2085 binds.push(scalar_to_bind(value));
2086 let value_idx = binds.len();
2087 let _ = write!(
2088 filter_clauses,
2089 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
2090 );
2091 }
2092 Predicate::JsonPathCompare { path, op, value } => {
2093 binds.push(BindValue::Text(path.clone()));
2094 let path_idx = binds.len();
2095 binds.push(scalar_to_bind(value));
2096 let value_idx = binds.len();
2097 let operator = match op {
2098 ComparisonOp::Gt => ">",
2099 ComparisonOp::Gte => ">=",
2100 ComparisonOp::Lt => "<",
2101 ComparisonOp::Lte => "<=",
2102 };
2103 let _ = write!(
2104 filter_clauses,
2105 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
2106 );
2107 }
2108 Predicate::JsonPathIn { path, values } => {
2109 binds.push(BindValue::Text(path.clone()));
2110 let first_param = binds.len();
2111 for v in values {
2112 binds.push(scalar_to_bind(v));
2113 }
2114 let placeholders = (1..=values.len())
2115 .map(|i| format!("?{}", first_param + i))
2116 .collect::<Vec<_>>()
2117 .join(", ");
2118 let _ = write!(
2119 filter_clauses,
2120 "\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
2121 );
2122 }
2123 Predicate::KindEq(_)
2124 | Predicate::LogicalIdEq(_)
2125 | Predicate::SourceRefEq(_)
2126 | Predicate::ContentRefEq(_)
2127 | Predicate::ContentRefNotNull
2128 | Predicate::JsonPathFusedEq { .. }
2129 | Predicate::JsonPathFusedTimestampCmp { .. }
2130 | Predicate::JsonPathFusedBoolEq { .. }
2131 | Predicate::JsonPathFusedIn { .. }
2132 | Predicate::EdgePropertyEq { .. }
2133 | Predicate::EdgePropertyCompare { .. } => {
2134 }
2138 }
2139 }
2140
2141 let limit = compiled.limit;
2148 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
2149 let limit_idx = binds.len();
2150 let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
2166 let prop_arm_sql: String = if use_prop_fts {
2167 prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
2168 let bm25_expr = conn_guard
2170 .query_row(
2171 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
2172 rusqlite::params![kind],
2173 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
2174 )
2175 .ok()
2176 .map_or_else(
2177 || format!("bm25({prop_table})"),
2178 |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
2179 );
2180 let is_weighted = bm25_expr != format!("bm25({prop_table})");
2183 let snippet_expr = if is_weighted {
2184 "'' AS snippet".to_owned()
2185 } else {
2186 "substr(fp.text_content, 1, 200) AS snippet".to_owned()
2187 };
2188 let _ = write!(
2189 acc,
2190 "
2191 UNION ALL
2192 SELECT
2193 src.row_id AS row_id,
2194 fp.node_logical_id AS logical_id,
2195 src.kind AS kind,
2196 src.properties AS properties,
2197 src.source_ref AS source_ref,
2198 src.content_ref AS content_ref,
2199 src.created_at AS created_at,
2200 -{bm25_expr} AS score,
2201 'property' AS source,
2202 {snippet_expr},
2203 CAST(fp.rowid AS TEXT) AS projection_row_id
2204 FROM {prop_table} fp
2205 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
2206 WHERE {prop_table} MATCH ?{prop_bind_idx}"
2207 );
2208 acc
2209 })
2210 } else {
2211 String::new()
2212 };
2213 let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
2214 ("?1", "\n AND src.kind = ?2")
2215 } else {
2216 ("?1", "")
2217 };
2218 let sql = format!(
2219 "WITH search_hits AS (
2220 SELECT
2221 u.row_id AS row_id,
2222 u.logical_id AS logical_id,
2223 u.kind AS kind,
2224 u.properties AS properties,
2225 u.source_ref AS source_ref,
2226 u.content_ref AS content_ref,
2227 u.created_at AS created_at,
2228 u.score AS score,
2229 u.source AS source,
2230 u.snippet AS snippet,
2231 u.projection_row_id AS projection_row_id
2232 FROM (
2233 SELECT
2234 src.row_id AS row_id,
2235 c.node_logical_id AS logical_id,
2236 src.kind AS kind,
2237 src.properties AS properties,
2238 src.source_ref AS source_ref,
2239 src.content_ref AS content_ref,
2240 src.created_at AS created_at,
2241 -bm25(fts_nodes) AS score,
2242 'chunk' AS source,
2243 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
2244 f.chunk_id AS projection_row_id
2245 FROM fts_nodes f
2246 JOIN chunks c ON c.id = f.chunk_id
2247 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
2248 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
2249 ) u
2250 WHERE 1 = 1{fused_clauses}
2251 ORDER BY u.score DESC
2252 LIMIT ?{limit_idx}
2253 )
2254 SELECT
2255 h.row_id,
2256 h.logical_id,
2257 h.kind,
2258 h.properties,
2259 h.content_ref,
2260 am.last_accessed_at,
2261 h.created_at,
2262 h.score,
2263 h.source,
2264 h.snippet,
2265 h.projection_row_id
2266 FROM search_hits h
2267 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
2268 WHERE 1 = 1{filter_clauses}
2269 ORDER BY h.score DESC"
2270 );
2271
2272 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
2273
2274 let mut statement = match conn_guard.prepare_cached(&sql) {
2275 Ok(stmt) => stmt,
2276 Err(e) => {
2277 self.telemetry.increment_errors();
2278 return Err(EngineError::Sqlite(e));
2279 }
2280 };
2281
2282 let hits = match statement
2283 .query_map(params_from_iter(bind_values.iter()), |row| {
2284 let source_str: String = row.get(8)?;
2285 let source = if source_str == "property" {
2290 SearchHitSource::Property
2291 } else {
2292 SearchHitSource::Chunk
2293 };
2294 let match_mode = match branch {
2295 SearchBranch::Strict => SearchMatchMode::Strict,
2296 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
2297 };
2298 Ok(SearchHit {
2299 node: fathomdb_query::NodeRowLite {
2300 row_id: row.get(0)?,
2301 logical_id: row.get(1)?,
2302 kind: row.get(2)?,
2303 properties: row.get(3)?,
2304 content_ref: row.get(4)?,
2305 last_accessed_at: row.get(5)?,
2306 },
2307 written_at: row.get(6)?,
2308 score: row.get(7)?,
2309 modality: RetrievalModality::Text,
2311 source,
2312 match_mode: Some(match_mode),
2313 snippet: row.get(9)?,
2314 projection_row_id: row.get(10)?,
2315 vector_distance: None,
2316 attribution: None,
2317 })
2318 })
2319 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
2320 {
2321 Ok(rows) => rows,
2322 Err(e) => {
2323 self.telemetry.increment_errors();
2324 return Err(EngineError::Sqlite(e));
2325 }
2326 };
2327
2328 drop(statement);
2332 drop(conn_guard);
2333
2334 self.telemetry.increment_queries();
2335 Ok(hits)
2336 }
2337
2338 fn populate_attribution_for_hits(
2342 &self,
2343 hits: &mut [SearchHit],
2344 strict_text_query: &fathomdb_query::TextQuery,
2345 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
2346 ) -> Result<(), EngineError> {
2347 let conn_guard = match self.lock_connection() {
2348 Ok(g) => g,
2349 Err(e) => {
2350 self.telemetry.increment_errors();
2351 return Err(e);
2352 }
2353 };
2354 let strict_expr = render_text_query_fts5(strict_text_query);
2355 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
2356 for hit in hits.iter_mut() {
2357 let match_expr = match hit.match_mode {
2362 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
2363 Some(SearchMatchMode::Relaxed) => {
2364 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
2365 }
2366 None => continue,
2367 };
2368 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
2369 Ok(att) => hit.attribution = Some(att),
2370 Err(e) => {
2371 self.telemetry.increment_errors();
2372 return Err(e);
2373 }
2374 }
2375 }
2376 Ok(())
2377 }
2378
2379 pub fn execute_compiled_grouped_read(
2383 &self,
2384 compiled: &CompiledGroupedQuery,
2385 ) -> Result<GroupedQueryRows, EngineError> {
2386 let root_rows = self.execute_compiled_read(&compiled.root)?;
2387 if root_rows.was_degraded {
2388 return Ok(GroupedQueryRows {
2389 roots: Vec::new(),
2390 expansions: Vec::new(),
2391 edge_expansions: Vec::new(),
2392 was_degraded: true,
2393 });
2394 }
2395
2396 let roots = root_rows.nodes;
2397 let mut expansions = Vec::with_capacity(compiled.expansions.len());
2398 for expansion in &compiled.expansions {
2399 let slot_rows = if roots.is_empty() {
2400 Vec::new()
2401 } else {
2402 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
2403 };
2404 expansions.push(ExpansionSlotRows {
2405 slot: expansion.slot.clone(),
2406 roots: slot_rows,
2407 });
2408 }
2409
2410 let mut edge_expansions = Vec::with_capacity(compiled.edge_expansions.len());
2411 for edge_expansion in &compiled.edge_expansions {
2412 let slot_rows = if roots.is_empty() {
2413 Vec::new()
2414 } else {
2415 self.read_edge_expansion_chunked(&roots, edge_expansion, compiled.hints.hard_limit)?
2416 };
2417 edge_expansions.push(EdgeExpansionSlotRows {
2418 slot: edge_expansion.slot.clone(),
2419 roots: slot_rows,
2420 });
2421 }
2422
2423 Ok(GroupedQueryRows {
2424 roots,
2425 expansions,
2426 edge_expansions,
2427 was_degraded: false,
2428 })
2429 }
2430
2431 fn read_expansion_nodes_chunked(
2437 &self,
2438 roots: &[NodeRow],
2439 expansion: &ExpansionSlot,
2440 hard_limit: usize,
2441 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2442 if roots.len() <= BATCH_CHUNK_SIZE {
2443 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
2444 }
2445
2446 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2449 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2450 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
2451 per_root
2452 .entry(group.root_logical_id)
2453 .or_default()
2454 .extend(group.nodes);
2455 }
2456 }
2457
2458 Ok(roots
2459 .iter()
2460 .map(|root| ExpansionRootRows {
2461 root_logical_id: root.logical_id.clone(),
2462 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2463 })
2464 .collect())
2465 }
2466
2467 #[allow(clippy::too_many_lines)]
2472 fn read_expansion_nodes_batched(
2473 &self,
2474 roots: &[NodeRow],
2475 expansion: &ExpansionSlot,
2476 hard_limit: usize,
2477 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2478 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2479 let (join_condition, next_logical_id) = match expansion.direction {
2480 fathomdb_query::TraverseDirection::Out => {
2481 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2482 }
2483 fathomdb_query::TraverseDirection::In => {
2484 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2485 }
2486 };
2487
2488 if expansion.filter.as_ref().is_some_and(|f| {
2493 matches!(
2494 f,
2495 Predicate::JsonPathFusedEq { .. }
2496 | Predicate::JsonPathFusedTimestampCmp { .. }
2497 | Predicate::JsonPathFusedIn { .. }
2498 )
2499 }) {
2500 self.validate_fused_filter_for_edge_label(&expansion.label)?;
2501 }
2502
2503 let root_seed_union: String = (1..=root_ids.len())
2507 .map(|i| format!("SELECT ?{i}"))
2508 .collect::<Vec<_>>()
2509 .join(" UNION ALL ");
2510
2511 let edge_kind_param = root_ids.len() + 1;
2515 let edge_filter_param_start = root_ids.len() + 2;
2516
2517 let (edge_filter_sql, edge_filter_binds) =
2520 compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2521
2522 let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2523
2524 let (filter_sql, filter_binds) =
2528 compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2529
2530 let sql = format!(
2534 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2535 traversed(root_id, logical_id, depth, visited, emitted) AS (
2536 SELECT rid, rid, 0, printf(',%s,', rid), 0
2537 FROM root_ids
2538 UNION ALL
2539 SELECT
2540 t.root_id,
2541 {next_logical_id},
2542 t.depth + 1,
2543 t.visited || {next_logical_id} || ',',
2544 t.emitted + 1
2545 FROM traversed t
2546 JOIN edges e ON {join_condition}
2547 AND e.kind = ?{edge_kind_param}
2548 AND e.superseded_at IS NULL{edge_filter_sql}
2549 WHERE t.depth < {max_depth}
2550 AND t.emitted < {hard_limit}
2551 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2552 ),
2553 numbered AS (
2554 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2555 , n.content_ref, am.last_accessed_at
2556 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2557 FROM traversed t
2558 JOIN nodes n ON n.logical_id = t.logical_id
2559 AND n.superseded_at IS NULL
2560 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2561 WHERE t.depth > 0{filter_sql}
2562 )
2563 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2564 FROM numbered
2565 WHERE rn <= {hard_limit}
2566 ORDER BY root_id, logical_id",
2567 max_depth = expansion.max_depth,
2568 );
2569
2570 let conn_guard = self.lock_connection()?;
2571 let mut statement = conn_guard
2572 .prepare_cached(&sql)
2573 .map_err(EngineError::Sqlite)?;
2574
2575 let mut bind_values: Vec<Value> = root_ids
2578 .iter()
2579 .map(|id| Value::Text((*id).to_owned()))
2580 .collect();
2581 bind_values.push(Value::Text(expansion.label.clone()));
2582 bind_values.extend(edge_filter_binds);
2583 bind_values.extend(filter_binds);
2584
2585 let rows = statement
2586 .query_map(params_from_iter(bind_values.iter()), |row| {
2587 Ok((
2588 row.get::<_, String>(0)?, NodeRow {
2590 row_id: row.get(1)?,
2591 logical_id: row.get(2)?,
2592 kind: row.get(3)?,
2593 properties: row.get(4)?,
2594 content_ref: row.get(5)?,
2595 last_accessed_at: row.get(6)?,
2596 },
2597 ))
2598 })
2599 .map_err(EngineError::Sqlite)?
2600 .collect::<Result<Vec<_>, _>>()
2601 .map_err(EngineError::Sqlite)?;
2602
2603 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2605 for (root_id, node) in rows {
2606 per_root.entry(root_id).or_default().push(node);
2607 }
2608
2609 let root_groups = roots
2610 .iter()
2611 .map(|root| ExpansionRootRows {
2612 root_logical_id: root.logical_id.clone(),
2613 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2614 })
2615 .collect();
2616
2617 Ok(root_groups)
2618 }
2619
2620 fn read_edge_expansion_chunked(
2626 &self,
2627 roots: &[NodeRow],
2628 expansion: &EdgeExpansionSlot,
2629 hard_limit: usize,
2630 ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2631 if roots.len() <= BATCH_CHUNK_SIZE {
2632 return self.read_edge_expansion_batched(roots, expansion, hard_limit);
2633 }
2634
2635 let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2636 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2637 for group in self.read_edge_expansion_batched(chunk, expansion, hard_limit)? {
2638 per_root
2639 .entry(group.root_logical_id)
2640 .or_default()
2641 .extend(group.pairs);
2642 }
2643 }
2644
2645 Ok(roots
2646 .iter()
2647 .map(|root| EdgeExpansionRootRows {
2648 root_logical_id: root.logical_id.clone(),
2649 pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2650 })
2651 .collect())
2652 }
2653
2654 #[allow(clippy::too_many_lines)]
2663 fn read_edge_expansion_batched(
2664 &self,
2665 roots: &[NodeRow],
2666 expansion: &EdgeExpansionSlot,
2667 hard_limit: usize,
2668 ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2669 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2670 let (join_condition, next_logical_id) = match expansion.direction {
2671 fathomdb_query::TraverseDirection::Out => {
2672 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2673 }
2674 fathomdb_query::TraverseDirection::In => {
2675 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2676 }
2677 };
2678
2679 if expansion.endpoint_filter.as_ref().is_some_and(|f| {
2682 matches!(
2683 f,
2684 Predicate::JsonPathFusedEq { .. }
2685 | Predicate::JsonPathFusedTimestampCmp { .. }
2686 | Predicate::JsonPathFusedIn { .. }
2687 )
2688 }) {
2689 self.validate_fused_filter_for_edge_label(&expansion.label)?;
2690 }
2691
2692 let root_seed_union: String = (1..=root_ids.len())
2693 .map(|i| format!("SELECT ?{i}"))
2694 .collect::<Vec<_>>()
2695 .join(" UNION ALL ");
2696
2697 let edge_kind_param = root_ids.len() + 1;
2698 let edge_filter_param_start = root_ids.len() + 2;
2699
2700 let (edge_filter_sql, edge_filter_binds) =
2701 compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2702
2703 let endpoint_filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2704 let (endpoint_filter_sql, endpoint_filter_binds) = compile_expansion_filter(
2705 expansion.endpoint_filter.as_ref(),
2706 endpoint_filter_param_start,
2707 );
2708
2709 let sql = format!(
2714 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2715 traversed(root_id, logical_id, depth, visited, emitted, edge_row_id) AS (
2716 SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_row_id
2717 FROM root_ids
2718 UNION ALL
2719 SELECT
2720 t.root_id,
2721 {next_logical_id},
2722 t.depth + 1,
2723 t.visited || {next_logical_id} || ',',
2724 t.emitted + 1,
2725 e.row_id AS edge_row_id
2726 FROM traversed t
2727 JOIN edges e ON {join_condition}
2728 AND e.kind = ?{edge_kind_param}
2729 AND e.superseded_at IS NULL{edge_filter_sql}
2730 WHERE t.depth < {max_depth}
2731 AND t.emitted < {hard_limit}
2732 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2733 ),
2734 numbered AS (
2735 SELECT t.root_id,
2736 e.row_id AS e_row_id,
2737 e.logical_id AS e_logical_id,
2738 e.source_logical_id AS e_source,
2739 e.target_logical_id AS e_target,
2740 e.kind AS e_kind,
2741 e.properties AS e_properties,
2742 e.source_ref AS e_source_ref,
2743 e.confidence AS e_confidence,
2744 n.row_id AS n_row_id,
2745 n.logical_id AS n_logical_id,
2746 n.kind AS n_kind,
2747 n.properties AS n_properties,
2748 n.content_ref AS n_content_ref,
2749 am.last_accessed_at AS n_last_accessed_at,
2750 ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id, e.row_id) AS rn
2751 FROM traversed t
2752 JOIN edges e ON e.row_id = t.edge_row_id
2753 JOIN nodes n ON n.logical_id = t.logical_id
2754 AND n.superseded_at IS NULL
2755 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2756 WHERE t.depth > 0{endpoint_filter_sql}
2757 )
2758 SELECT root_id,
2759 e_row_id, e_logical_id, e_source, e_target, e_kind, e_properties,
2760 e_source_ref, e_confidence,
2761 n_row_id, n_logical_id, n_kind, n_properties, n_content_ref,
2762 n_last_accessed_at
2763 FROM numbered
2764 WHERE rn <= {hard_limit}
2765 ORDER BY root_id, n_logical_id, e_row_id",
2766 max_depth = expansion.max_depth,
2767 );
2768
2769 let conn_guard = self.lock_connection()?;
2770 let mut statement = conn_guard
2771 .prepare_cached(&sql)
2772 .map_err(EngineError::Sqlite)?;
2773
2774 let mut bind_values: Vec<Value> = root_ids
2775 .iter()
2776 .map(|id| Value::Text((*id).to_owned()))
2777 .collect();
2778 bind_values.push(Value::Text(expansion.label.clone()));
2779 bind_values.extend(edge_filter_binds);
2780 bind_values.extend(endpoint_filter_binds);
2781
2782 let rows = statement
2783 .query_map(params_from_iter(bind_values.iter()), |row| {
2784 let root_id: String = row.get(0)?;
2785 let edge_row = EdgeRow {
2786 row_id: row.get(1)?,
2787 logical_id: row.get(2)?,
2788 source_logical_id: row.get(3)?,
2789 target_logical_id: row.get(4)?,
2790 kind: row.get(5)?,
2791 properties: row.get(6)?,
2792 source_ref: row.get(7)?,
2793 confidence: row.get(8)?,
2794 };
2795 let node_row = NodeRow {
2796 row_id: row.get(9)?,
2797 logical_id: row.get(10)?,
2798 kind: row.get(11)?,
2799 properties: row.get(12)?,
2800 content_ref: row.get(13)?,
2801 last_accessed_at: row.get(14)?,
2802 };
2803 Ok((root_id, edge_row, node_row))
2804 })
2805 .map_err(EngineError::Sqlite)?
2806 .collect::<Result<Vec<_>, _>>()
2807 .map_err(EngineError::Sqlite)?;
2808
2809 let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2810 for (root_id, edge, node) in rows {
2811 per_root.entry(root_id).or_default().push((edge, node));
2812 }
2813
2814 let root_groups = roots
2815 .iter()
2816 .map(|root| EdgeExpansionRootRows {
2817 root_logical_id: root.logical_id.clone(),
2818 pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2819 })
2820 .collect();
2821
2822 Ok(root_groups)
2823 }
2824
2825 fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2839 let conn = self.lock_connection()?;
2840 let mut stmt = conn
2842 .prepare_cached(
2843 "SELECT DISTINCT n.kind \
2844 FROM edges e \
2845 JOIN nodes n ON n.logical_id = e.target_logical_id \
2846 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2847 )
2848 .map_err(EngineError::Sqlite)?;
2849 let target_kinds: Vec<String> = stmt
2850 .query_map(rusqlite::params![edge_label], |row| row.get(0))
2851 .map_err(EngineError::Sqlite)?
2852 .collect::<Result<Vec<_>, _>>()
2853 .map_err(EngineError::Sqlite)?;
2854
2855 for kind in &target_kinds {
2856 let has_schema: bool = conn
2857 .query_row(
2858 "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2859 rusqlite::params![kind],
2860 |row| row.get(0),
2861 )
2862 .map_err(EngineError::Sqlite)?;
2863 if !has_schema {
2864 return Err(EngineError::InvalidConfig(format!(
2865 "kind {kind:?} has no registered property-FTS schema; register one with \
2866 admin.register_fts_property_schema(..) before using fused filters on \
2867 expansion slots, or use JsonPathEq for non-fused semantics \
2868 (expand slot uses edge label {edge_label:?})"
2869 )));
2870 }
2871 }
2872 Ok(())
2873 }
2874
2875 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2881 let conn = self.lock_connection()?;
2882 conn.query_row(
2883 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2884 rusqlite::params![id],
2885 |row| {
2886 Ok(RunRow {
2887 id: row.get(0)?,
2888 kind: row.get(1)?,
2889 status: row.get(2)?,
2890 properties: row.get(3)?,
2891 })
2892 },
2893 )
2894 .optional()
2895 .map_err(EngineError::Sqlite)
2896 }
2897
2898 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2904 let conn = self.lock_connection()?;
2905 conn.query_row(
2906 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2907 rusqlite::params![id],
2908 |row| {
2909 Ok(StepRow {
2910 id: row.get(0)?,
2911 run_id: row.get(1)?,
2912 kind: row.get(2)?,
2913 status: row.get(3)?,
2914 properties: row.get(4)?,
2915 })
2916 },
2917 )
2918 .optional()
2919 .map_err(EngineError::Sqlite)
2920 }
2921
2922 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2928 let conn = self.lock_connection()?;
2929 conn.query_row(
2930 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2931 rusqlite::params![id],
2932 |row| {
2933 Ok(ActionRow {
2934 id: row.get(0)?,
2935 step_id: row.get(1)?,
2936 kind: row.get(2)?,
2937 status: row.get(3)?,
2938 properties: row.get(4)?,
2939 })
2940 },
2941 )
2942 .optional()
2943 .map_err(EngineError::Sqlite)
2944 }
2945
2946 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2952 let conn = self.lock_connection()?;
2953 let mut stmt = conn
2954 .prepare_cached(
2955 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2956 )
2957 .map_err(EngineError::Sqlite)?;
2958 let rows = stmt
2959 .query_map([], |row| {
2960 Ok(RunRow {
2961 id: row.get(0)?,
2962 kind: row.get(1)?,
2963 status: row.get(2)?,
2964 properties: row.get(3)?,
2965 })
2966 })
2967 .map_err(EngineError::Sqlite)?
2968 .collect::<Result<Vec<_>, _>>()
2969 .map_err(EngineError::Sqlite)?;
2970 Ok(rows)
2971 }
2972
2973 #[must_use]
2983 #[allow(clippy::expect_used)]
2984 pub fn shape_sql_count(&self) -> usize {
2985 self.shape_sql_map
2986 .lock()
2987 .unwrap_or_else(PoisonError::into_inner)
2988 .len()
2989 }
2990
2991 #[must_use]
2993 pub fn schema_manager(&self) -> Arc<SchemaManager> {
2994 Arc::clone(&self.schema_manager)
2995 }
2996
2997 #[must_use]
3006 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
3007 let cache_hit = self
3008 .shape_sql_map
3009 .lock()
3010 .unwrap_or_else(PoisonError::into_inner)
3011 .contains_key(&compiled.shape_hash);
3012 QueryPlan {
3013 sql: wrap_node_row_projection_sql(&compiled.sql),
3014 bind_count: compiled.binds.len(),
3015 driving_table: compiled.driving_table,
3016 shape_hash: compiled.shape_hash,
3017 cache_hit,
3018 }
3019 }
3020
3021 #[doc(hidden)]
3028 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
3029 let conn = self.lock_connection()?;
3030 let result = conn
3031 .query_row(&format!("PRAGMA {name}"), [], |row| {
3032 row.get::<_, rusqlite::types::Value>(0)
3034 })
3035 .map_err(EngineError::Sqlite)?;
3036 let s = match result {
3037 rusqlite::types::Value::Text(t) => t,
3038 rusqlite::types::Value::Integer(i) => i.to_string(),
3039 rusqlite::types::Value::Real(f) => f.to_string(),
3040 rusqlite::types::Value::Blob(_) => {
3041 return Err(EngineError::InvalidWrite(format!(
3042 "PRAGMA {name} returned an unexpected BLOB value"
3043 )));
3044 }
3045 rusqlite::types::Value::Null => String::new(),
3046 };
3047 Ok(s)
3048 }
3049
3050 pub fn query_provenance_events(
3059 &self,
3060 subject: &str,
3061 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
3062 let conn = self.lock_connection()?;
3063 let mut stmt = conn
3064 .prepare_cached(
3065 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
3066 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
3067 )
3068 .map_err(EngineError::Sqlite)?;
3069 let events = stmt
3070 .query_map(rusqlite::params![subject], |row| {
3071 Ok(ProvenanceEvent {
3072 id: row.get(0)?,
3073 event_type: row.get(1)?,
3074 subject: row.get(2)?,
3075 source_ref: row.get(3)?,
3076 metadata_json: row.get(4)?,
3077 created_at: row.get(5)?,
3078 })
3079 })
3080 .map_err(EngineError::Sqlite)?
3081 .collect::<Result<Vec<_>, _>>()
3082 .map_err(EngineError::Sqlite)?;
3083 Ok(events)
3084 }
3085
3086 fn scan_fallback_if_first_registration(
3092 &self,
3093 kind: &str,
3094 ) -> Result<Option<Vec<NodeRow>>, EngineError> {
3095 let conn = self.lock_connection()?;
3096
3097 let prop_table = fathomdb_schema::fts_kind_table_name(kind);
3100 let table_exists: bool = conn
3102 .query_row(
3103 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3104 rusqlite::params![prop_table],
3105 |_| Ok(true),
3106 )
3107 .optional()?
3108 .unwrap_or(false);
3109 let prop_empty = if table_exists {
3110 let cnt: i64 =
3111 conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
3112 r.get(0)
3113 })?;
3114 cnt == 0
3115 } else {
3116 true
3117 };
3118 let needs_scan: bool = if prop_empty {
3119 conn.query_row(
3120 "SELECT 1 FROM fts_property_rebuild_state \
3121 WHERE kind = ?1 AND is_first_registration = 1 \
3122 AND state IN ('PENDING','BUILDING','SWAPPING') \
3123 LIMIT 1",
3124 rusqlite::params![kind],
3125 |_| Ok(true),
3126 )
3127 .optional()?
3128 .unwrap_or(false)
3129 } else {
3130 false
3131 };
3132
3133 if !needs_scan {
3134 return Ok(None);
3135 }
3136
3137 let mut stmt = conn
3140 .prepare_cached(
3141 "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
3142 am.last_accessed_at \
3143 FROM nodes n \
3144 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
3145 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
3146 )
3147 .map_err(EngineError::Sqlite)?;
3148
3149 let nodes = stmt
3150 .query_map(rusqlite::params![kind], |row| {
3151 Ok(NodeRow {
3152 row_id: row.get(0)?,
3153 logical_id: row.get(1)?,
3154 kind: row.get(2)?,
3155 properties: row.get(3)?,
3156 content_ref: row.get(4)?,
3157 last_accessed_at: row.get(5)?,
3158 })
3159 })
3160 .map_err(EngineError::Sqlite)?
3161 .collect::<Result<Vec<_>, _>>()
3162 .map_err(EngineError::Sqlite)?;
3163
3164 Ok(Some(nodes))
3165 }
3166
3167 pub fn get_property_fts_rebuild_progress(
3173 &self,
3174 kind: &str,
3175 ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
3176 let conn = self.lock_connection()?;
3177 let row = conn
3178 .query_row(
3179 "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
3180 FROM fts_property_rebuild_state WHERE kind = ?1",
3181 rusqlite::params![kind],
3182 |r| {
3183 Ok(crate::rebuild_actor::RebuildProgress {
3184 state: r.get(0)?,
3185 rows_total: r.get(1)?,
3186 rows_done: r.get(2)?,
3187 started_at: r.get(3)?,
3188 last_progress_at: r.get(4)?,
3189 error_message: r.get(5)?,
3190 })
3191 },
3192 )
3193 .optional()?;
3194 Ok(row)
3195 }
3196}
3197
3198fn adapt_fts_nodes_sql_for_per_kind_tables(
3208 compiled: &CompiledQuery,
3209 conn: &rusqlite::Connection,
3210) -> Result<(String, Vec<BindValue>), EngineError> {
3211 let root_kind = compiled
3212 .binds
3213 .get(1)
3214 .and_then(|b| {
3215 if let BindValue::Text(k) = b {
3216 Some(k.as_str())
3217 } else {
3218 None
3219 }
3220 })
3221 .unwrap_or("");
3222 let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
3223 let prop_table_exists: bool = conn
3224 .query_row(
3225 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3226 rusqlite::params![prop_table],
3227 |_| Ok(true),
3228 )
3229 .optional()
3230 .map_err(EngineError::Sqlite)?
3231 .unwrap_or(false);
3232
3233 Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
3234}
3235
3236#[allow(clippy::unnecessary_wraps)]
3242fn check_vec_identity_at_open(
3243 conn: &rusqlite::Connection,
3244 embedder: &dyn QueryEmbedder,
3245) -> Result<(), EngineError> {
3246 let row: Option<String> = conn
3247 .query_row(
3248 "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
3249 [],
3250 |row| row.get(0),
3251 )
3252 .optional()
3253 .unwrap_or(None);
3254
3255 let Some(config_json) = row else {
3256 return Ok(());
3257 };
3258
3259 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
3261 return Ok(());
3262 };
3263
3264 let identity = embedder.identity();
3265
3266 if let Some(stored_model) = parsed
3267 .get("model_identity")
3268 .and_then(serde_json::Value::as_str)
3269 && stored_model != identity.model_identity
3270 {
3271 trace_warn!(
3272 stored_model_identity = stored_model,
3273 embedder_model_identity = %identity.model_identity,
3274 "vec identity mismatch at open: model_identity differs"
3275 );
3276 }
3277
3278 if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
3279 let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
3280 if stored_dim != identity.dimension {
3281 trace_warn!(
3282 stored_dimensions = stored_dim,
3283 embedder_dimensions = identity.dimension,
3284 "vec identity mismatch at open: dimensions differ"
3285 );
3286 }
3287 }
3288
3289 Ok(())
3290}
3291
3292fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
3304 let schema_count: i64 = conn
3305 .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
3306 row.get(0)
3307 })
3308 .map_err(EngineError::Sqlite)?;
3309 if schema_count == 0 {
3310 return Ok(());
3311 }
3312
3313 let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
3314 let needs_position_backfill = if needs_fts_rebuild {
3315 false
3316 } else {
3317 open_guard_check_positions_empty(conn)?
3318 };
3319
3320 if needs_fts_rebuild || needs_position_backfill {
3321 let per_kind_tables: Vec<String> = {
3322 let mut stmt = conn
3323 .prepare(
3324 "SELECT name FROM sqlite_master \
3325 WHERE type='table' AND name LIKE 'fts_props_%' \
3326 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
3327 )
3328 .map_err(EngineError::Sqlite)?;
3329 stmt.query_map([], |r| r.get::<_, String>(0))
3330 .map_err(EngineError::Sqlite)?
3331 .collect::<Result<Vec<_>, _>>()
3332 .map_err(EngineError::Sqlite)?
3333 };
3334 let tx = conn
3335 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
3336 .map_err(EngineError::Sqlite)?;
3337 for table in &per_kind_tables {
3338 tx.execute_batch(&format!("DELETE FROM {table}"))
3339 .map_err(EngineError::Sqlite)?;
3340 }
3341 tx.execute("DELETE FROM fts_node_property_positions", [])
3342 .map_err(EngineError::Sqlite)?;
3343 crate::projection::insert_property_fts_rows(
3344 &tx,
3345 "SELECT logical_id, properties FROM nodes \
3346 WHERE kind = ?1 AND superseded_at IS NULL",
3347 )
3348 .map_err(EngineError::Sqlite)?;
3349 tx.commit().map_err(EngineError::Sqlite)?;
3350 }
3351 Ok(())
3352}
3353
3354fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3355 let kinds: Vec<String> = {
3356 let mut stmt = conn
3357 .prepare("SELECT kind FROM fts_property_schemas")
3358 .map_err(EngineError::Sqlite)?;
3359 stmt.query_map([], |row| row.get::<_, String>(0))
3360 .map_err(EngineError::Sqlite)?
3361 .collect::<Result<Vec<_>, _>>()
3362 .map_err(EngineError::Sqlite)?
3363 };
3364 for kind in &kinds {
3365 let table = fathomdb_schema::fts_kind_table_name(kind);
3366 let table_exists: bool = conn
3367 .query_row(
3368 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3369 rusqlite::params![table],
3370 |_| Ok(true),
3371 )
3372 .optional()
3373 .map_err(EngineError::Sqlite)?
3374 .unwrap_or(false);
3375 let fts_count: i64 = if table_exists {
3376 conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
3377 row.get(0)
3378 })
3379 .map_err(EngineError::Sqlite)?
3380 } else {
3381 0
3382 };
3383 if fts_count == 0 {
3384 let node_count: i64 = conn
3385 .query_row(
3386 "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
3387 rusqlite::params![kind],
3388 |row| row.get(0),
3389 )
3390 .map_err(EngineError::Sqlite)?;
3391 if node_count > 0 {
3392 return Ok(true);
3393 }
3394 }
3395 }
3396 Ok(false)
3397}
3398
3399fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3400 let recursive_count: i64 = conn
3401 .query_row(
3402 "SELECT COUNT(*) FROM fts_property_schemas \
3403 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
3404 [],
3405 |row| row.get(0),
3406 )
3407 .map_err(EngineError::Sqlite)?;
3408 if recursive_count == 0 {
3409 return Ok(false);
3410 }
3411 let pos_count: i64 = conn
3412 .query_row(
3413 "SELECT COUNT(*) FROM fts_node_property_positions",
3414 [],
3415 |row| row.get(0),
3416 )
3417 .map_err(EngineError::Sqlite)?;
3418 Ok(pos_count == 0)
3419}
3420
3421fn wrap_node_row_projection_sql(base_sql: &str) -> String {
3422 format!(
3423 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
3424 FROM ({base_sql}) q \
3425 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
3426 )
3427}
3428
3429pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
3437 match err {
3438 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
3439 (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
3441 || msg.contains("no such module: vec0")
3442 }
3443 _ => false,
3444 }
3445}
3446
3447fn search_rows_to_query_rows(rows: SearchRows) -> QueryRows {
3453 let nodes = rows
3454 .hits
3455 .into_iter()
3456 .map(|hit| NodeRow {
3457 row_id: hit.node.row_id,
3458 logical_id: hit.node.logical_id,
3459 kind: hit.node.kind,
3460 properties: hit.node.properties,
3461 content_ref: hit.node.content_ref,
3462 last_accessed_at: hit.node.last_accessed_at,
3463 })
3464 .collect();
3465 QueryRows {
3466 nodes,
3467 runs: Vec::new(),
3468 steps: Vec::new(),
3469 actions: Vec::new(),
3470 was_degraded: rows.was_degraded,
3471 }
3472}
3473
3474fn scalar_to_bind(value: &ScalarValue) -> BindValue {
3475 match value {
3476 ScalarValue::Text(text) => BindValue::Text(text.clone()),
3477 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
3478 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
3479 }
3480}
3481
3482fn merge_search_branches(
3500 strict: Vec<SearchHit>,
3501 relaxed: Vec<SearchHit>,
3502 limit: usize,
3503) -> Vec<SearchHit> {
3504 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
3505}
3506
3507fn merge_search_branches_three(
3519 strict: Vec<SearchHit>,
3520 relaxed: Vec<SearchHit>,
3521 vector: Vec<SearchHit>,
3522 limit: usize,
3523) -> Vec<SearchHit> {
3524 let strict_block = dedup_branch_hits(strict);
3525 let relaxed_block = dedup_branch_hits(relaxed);
3526 let vector_block = dedup_branch_hits(vector);
3527
3528 let mut seen: std::collections::HashSet<String> = strict_block
3529 .iter()
3530 .map(|h| h.node.logical_id.clone())
3531 .collect();
3532
3533 let mut merged = strict_block;
3534 for hit in relaxed_block {
3535 if seen.insert(hit.node.logical_id.clone()) {
3536 merged.push(hit);
3537 }
3538 }
3539 for hit in vector_block {
3540 if seen.insert(hit.node.logical_id.clone()) {
3541 merged.push(hit);
3542 }
3543 }
3544
3545 if merged.len() > limit {
3546 merged.truncate(limit);
3547 }
3548 merged
3549}
3550
3551fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
3555 hits.sort_by(|a, b| {
3556 b.score
3557 .partial_cmp(&a.score)
3558 .unwrap_or(std::cmp::Ordering::Equal)
3559 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
3560 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
3561 });
3562
3563 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
3564 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
3565 hits
3566}
3567
3568fn source_priority(source: SearchHitSource) -> u8 {
3569 match source {
3572 SearchHitSource::Chunk => 0,
3573 SearchHitSource::Property => 1,
3574 SearchHitSource::Vector => 2,
3575 }
3576}
3577
3578const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
3596const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
3597
3598fn load_position_map(
3602 conn: &Connection,
3603 logical_id: &str,
3604 kind: &str,
3605) -> Result<Vec<(usize, usize, String)>, EngineError> {
3606 let mut stmt = conn
3607 .prepare_cached(
3608 "SELECT start_offset, end_offset, leaf_path \
3609 FROM fts_node_property_positions \
3610 WHERE node_logical_id = ?1 AND kind = ?2 \
3611 ORDER BY start_offset ASC",
3612 )
3613 .map_err(EngineError::Sqlite)?;
3614 let rows = stmt
3615 .query_map(rusqlite::params![logical_id, kind], |row| {
3616 let start: i64 = row.get(0)?;
3617 let end: i64 = row.get(1)?;
3618 let path: String = row.get(2)?;
3619 let start = usize::try_from(start).unwrap_or(0);
3623 let end = usize::try_from(end).unwrap_or(0);
3624 Ok((start, end, path))
3625 })
3626 .map_err(EngineError::Sqlite)?;
3627 let mut out = Vec::new();
3628 for row in rows {
3629 out.push(row.map_err(EngineError::Sqlite)?);
3630 }
3631 Ok(out)
3632}
3633
3634fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3641 let mut offsets = Vec::new();
3642 let bytes = wrapped.as_bytes();
3643 let open_bytes = open.as_bytes();
3644 let close_bytes = close.as_bytes();
3645 let mut i = 0usize;
3646 let mut marker_bytes_seen = 0usize;
3649 while i < bytes.len() {
3650 if bytes[i..].starts_with(open_bytes) {
3651 let original_offset = i - marker_bytes_seen;
3654 offsets.push(original_offset);
3655 i += open_bytes.len();
3656 marker_bytes_seen += open_bytes.len();
3657 } else if bytes[i..].starts_with(close_bytes) {
3658 i += close_bytes.len();
3659 marker_bytes_seen += close_bytes.len();
3660 } else {
3661 i += 1;
3662 }
3663 }
3664 offsets
3665}
3666
3667fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3670 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3672 Ok(i) => i,
3673 Err(0) => return None,
3674 Err(i) => i - 1,
3675 };
3676 let (start, end, path) = &positions[idx];
3677 if offset >= *start && offset < *end {
3678 Some(path.as_str())
3679 } else {
3680 None
3681 }
3682}
3683
3684fn resolve_hit_attribution(
3693 conn: &Connection,
3694 hit: &SearchHit,
3695 match_expr: &str,
3696) -> Result<HitAttribution, EngineError> {
3697 if matches!(hit.source, SearchHitSource::Chunk) {
3698 return Ok(HitAttribution {
3699 matched_paths: vec!["text_content".to_owned()],
3700 });
3701 }
3702 if !matches!(hit.source, SearchHitSource::Property) {
3703 return Ok(HitAttribution {
3704 matched_paths: Vec::new(),
3705 });
3706 }
3707 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3708 return Ok(HitAttribution {
3709 matched_paths: Vec::new(),
3710 });
3711 };
3712 let rowid: i64 = match rowid_str.parse() {
3713 Ok(v) => v,
3714 Err(_) => {
3715 return Ok(HitAttribution {
3716 matched_paths: Vec::new(),
3717 });
3718 }
3719 };
3720
3721 let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3727 let highlight_sql = format!(
3728 "SELECT highlight({prop_table}, 1, ?1, ?2) \
3729 FROM {prop_table} \
3730 WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3731 );
3732 let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3733 let wrapped: Option<String> = stmt
3734 .query_row(
3735 rusqlite::params![
3736 ATTRIBUTION_HIGHLIGHT_OPEN,
3737 ATTRIBUTION_HIGHLIGHT_CLOSE,
3738 rowid,
3739 match_expr,
3740 ],
3741 |row| row.get(0),
3742 )
3743 .optional()
3744 .map_err(EngineError::Sqlite)?;
3745 let Some(wrapped) = wrapped else {
3746 return Ok(HitAttribution {
3747 matched_paths: Vec::new(),
3748 });
3749 };
3750
3751 let offsets = parse_highlight_offsets(
3752 &wrapped,
3753 ATTRIBUTION_HIGHLIGHT_OPEN,
3754 ATTRIBUTION_HIGHLIGHT_CLOSE,
3755 );
3756 if offsets.is_empty() {
3757 return Ok(HitAttribution {
3758 matched_paths: Vec::new(),
3759 });
3760 }
3761
3762 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3763 if positions.is_empty() {
3764 return Ok(HitAttribution {
3767 matched_paths: Vec::new(),
3768 });
3769 }
3770
3771 let mut matched_paths: Vec<String> = Vec::new();
3772 for offset in offsets {
3773 if let Some(path) = find_leaf_for_offset(&positions, offset)
3774 && !matched_paths.iter().any(|p| p == path)
3775 {
3776 matched_paths.push(path.to_owned());
3777 }
3778 }
3779 Ok(HitAttribution { matched_paths })
3780}
3781
3782fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3789 let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3790 let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3791 if !any_weighted {
3792 return format!("bm25({table})");
3793 }
3794 let weights: Vec<String> = std::iter::once("0.0".to_owned())
3796 .chain(
3797 schema
3798 .paths
3799 .iter()
3800 .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3801 )
3802 .collect();
3803 format!("bm25({table}, {})", weights.join(", "))
3804}
3805
3806fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3807 match value {
3808 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3809 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3810 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3811 }
3812}
3813
3814#[cfg(test)]
3815#[allow(clippy::expect_used, deprecated)]
3816mod tests {
3817 use std::panic::{AssertUnwindSafe, catch_unwind};
3818 use std::sync::Arc;
3819
3820 use fathomdb_query::{BindValue, QueryBuilder};
3821 use fathomdb_schema::SchemaManager;
3822 use rusqlite::types::Value;
3823 use tempfile::NamedTempFile;
3824
3825 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3826
3827 use fathomdb_query::{
3828 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3829 };
3830
3831 use super::{
3832 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3833 wrap_node_row_projection_sql,
3834 };
3835
3836 fn mk_hit(
3837 logical_id: &str,
3838 score: f64,
3839 match_mode: SearchMatchMode,
3840 source: SearchHitSource,
3841 ) -> SearchHit {
3842 SearchHit {
3843 node: NodeRowLite {
3844 row_id: format!("{logical_id}-row"),
3845 logical_id: logical_id.to_owned(),
3846 kind: "Goal".to_owned(),
3847 properties: "{}".to_owned(),
3848 content_ref: None,
3849 last_accessed_at: None,
3850 },
3851 score,
3852 modality: RetrievalModality::Text,
3853 source,
3854 match_mode: Some(match_mode),
3855 snippet: None,
3856 written_at: 0,
3857 projection_row_id: None,
3858 vector_distance: None,
3859 attribution: None,
3860 }
3861 }
3862
3863 #[test]
3864 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3865 let strict = vec![mk_hit(
3866 "a",
3867 1.0,
3868 SearchMatchMode::Strict,
3869 SearchHitSource::Chunk,
3870 )];
3871 let relaxed = vec![mk_hit(
3873 "b",
3874 9.9,
3875 SearchMatchMode::Relaxed,
3876 SearchHitSource::Chunk,
3877 )];
3878 let merged = merge_search_branches(strict, relaxed, 10);
3879 assert_eq!(merged.len(), 2);
3880 assert_eq!(merged[0].node.logical_id, "a");
3881 assert!(matches!(
3882 merged[0].match_mode,
3883 Some(SearchMatchMode::Strict)
3884 ));
3885 assert_eq!(merged[1].node.logical_id, "b");
3886 assert!(matches!(
3887 merged[1].match_mode,
3888 Some(SearchMatchMode::Relaxed)
3889 ));
3890 }
3891
3892 #[test]
3893 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3894 let strict = vec![mk_hit(
3895 "shared",
3896 1.0,
3897 SearchMatchMode::Strict,
3898 SearchHitSource::Chunk,
3899 )];
3900 let relaxed = vec![
3901 mk_hit(
3902 "shared",
3903 9.9,
3904 SearchMatchMode::Relaxed,
3905 SearchHitSource::Chunk,
3906 ),
3907 mk_hit(
3908 "other",
3909 2.0,
3910 SearchMatchMode::Relaxed,
3911 SearchHitSource::Chunk,
3912 ),
3913 ];
3914 let merged = merge_search_branches(strict, relaxed, 10);
3915 assert_eq!(merged.len(), 2);
3916 assert_eq!(merged[0].node.logical_id, "shared");
3917 assert!(matches!(
3918 merged[0].match_mode,
3919 Some(SearchMatchMode::Strict)
3920 ));
3921 assert_eq!(merged[1].node.logical_id, "other");
3922 assert!(matches!(
3923 merged[1].match_mode,
3924 Some(SearchMatchMode::Relaxed)
3925 ));
3926 }
3927
3928 #[test]
3929 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3930 let strict = vec![
3931 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3932 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3933 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3934 ];
3935 let merged = merge_search_branches(strict, vec![], 10);
3936 assert_eq!(
3937 merged
3938 .iter()
3939 .map(|h| &h.node.logical_id)
3940 .collect::<Vec<_>>(),
3941 vec!["a", "c", "b"]
3942 );
3943 }
3944
3945 #[test]
3946 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3947 let strict = vec![
3948 mk_hit(
3949 "shared",
3950 1.0,
3951 SearchMatchMode::Strict,
3952 SearchHitSource::Property,
3953 ),
3954 mk_hit(
3955 "shared",
3956 1.0,
3957 SearchMatchMode::Strict,
3958 SearchHitSource::Chunk,
3959 ),
3960 ];
3961 let merged = merge_search_branches(strict, vec![], 10);
3962 assert_eq!(merged.len(), 1);
3963 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3964 }
3965
3966 #[test]
3967 fn merge_truncates_to_limit_after_block_merge() {
3968 let strict = vec![
3969 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3970 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3971 ];
3972 let relaxed = vec![mk_hit(
3973 "c",
3974 9.0,
3975 SearchMatchMode::Relaxed,
3976 SearchHitSource::Chunk,
3977 )];
3978 let merged = merge_search_branches(strict, relaxed, 2);
3979 assert_eq!(merged.len(), 2);
3980 assert_eq!(merged[0].node.logical_id, "a");
3981 assert_eq!(merged[1].node.logical_id, "b");
3982 }
3983
3984 #[test]
3993 fn search_architecturally_supports_three_branch_fusion() {
3994 let strict = vec![mk_hit(
3995 "alpha",
3996 1.0,
3997 SearchMatchMode::Strict,
3998 SearchHitSource::Chunk,
3999 )];
4000 let relaxed = vec![mk_hit(
4001 "bravo",
4002 5.0,
4003 SearchMatchMode::Relaxed,
4004 SearchHitSource::Chunk,
4005 )];
4006 let mut vector_hit = mk_hit(
4009 "charlie",
4010 9.9,
4011 SearchMatchMode::Strict,
4012 SearchHitSource::Vector,
4013 );
4014 vector_hit.match_mode = None;
4018 vector_hit.modality = RetrievalModality::Vector;
4019 let vector = vec![vector_hit];
4020
4021 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
4022 assert_eq!(merged.len(), 3);
4023 assert_eq!(merged[0].node.logical_id, "alpha");
4024 assert_eq!(merged[1].node.logical_id, "bravo");
4025 assert_eq!(merged[2].node.logical_id, "charlie");
4026 assert!(matches!(merged[2].source, SearchHitSource::Vector));
4028
4029 let strict2 = vec![mk_hit(
4032 "shared",
4033 0.5,
4034 SearchMatchMode::Strict,
4035 SearchHitSource::Chunk,
4036 )];
4037 let relaxed2 = vec![mk_hit(
4038 "shared",
4039 5.0,
4040 SearchMatchMode::Relaxed,
4041 SearchHitSource::Chunk,
4042 )];
4043 let mut vshared = mk_hit(
4044 "shared",
4045 9.9,
4046 SearchMatchMode::Strict,
4047 SearchHitSource::Vector,
4048 );
4049 vshared.match_mode = None;
4050 vshared.modality = RetrievalModality::Vector;
4051 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
4052 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
4053 assert!(matches!(
4054 merged2[0].match_mode,
4055 Some(SearchMatchMode::Strict)
4056 ));
4057 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
4058
4059 let mut vshared2 = mk_hit(
4061 "shared",
4062 9.9,
4063 SearchMatchMode::Strict,
4064 SearchHitSource::Vector,
4065 );
4066 vshared2.match_mode = None;
4067 vshared2.modality = RetrievalModality::Vector;
4068 let merged3 = merge_search_branches_three(
4069 vec![],
4070 vec![mk_hit(
4071 "shared",
4072 1.0,
4073 SearchMatchMode::Relaxed,
4074 SearchHitSource::Chunk,
4075 )],
4076 vec![vshared2],
4077 10,
4078 );
4079 assert_eq!(merged3.len(), 1);
4080 assert!(matches!(
4081 merged3[0].match_mode,
4082 Some(SearchMatchMode::Relaxed)
4083 ));
4084 }
4085
4086 #[test]
4100 fn merge_search_branches_three_vector_only_preserves_vector_block() {
4101 let mut vector_hit = mk_hit(
4102 "solo",
4103 0.75,
4104 SearchMatchMode::Strict,
4105 SearchHitSource::Vector,
4106 );
4107 vector_hit.match_mode = None;
4108 vector_hit.modality = RetrievalModality::Vector;
4109
4110 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
4111
4112 assert_eq!(merged.len(), 1);
4113 assert_eq!(merged[0].node.logical_id, "solo");
4114 assert!(matches!(merged[0].source, SearchHitSource::Vector));
4115 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
4116 assert!(
4117 merged[0].match_mode.is_none(),
4118 "vector hits carry match_mode=None per addendum 1"
4119 );
4120 }
4121
4122 #[test]
4134 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
4135 let strict = vec![
4136 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
4137 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
4138 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
4139 ];
4140 let relaxed = vec![mk_hit(
4141 "d",
4142 9.0,
4143 SearchMatchMode::Relaxed,
4144 SearchHitSource::Chunk,
4145 )];
4146 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
4147 vector_hit.match_mode = None;
4148 vector_hit.modality = RetrievalModality::Vector;
4149 let vector = vec![vector_hit];
4150
4151 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
4152
4153 assert_eq!(merged.len(), 2);
4154 assert_eq!(merged[0].node.logical_id, "a");
4155 assert_eq!(merged[1].node.logical_id, "b");
4156 assert!(
4158 merged
4159 .iter()
4160 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
4161 "strict block must win limit contention against higher-scored relaxed/vector hits"
4162 );
4163 assert!(
4164 merged
4165 .iter()
4166 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
4167 "no vector source hits should leak past the limit"
4168 );
4169 }
4170
4171 #[test]
4172 fn is_vec_table_absent_matches_known_error_messages() {
4173 use rusqlite::ffi;
4174 fn make_err(msg: &str) -> rusqlite::Error {
4175 rusqlite::Error::SqliteFailure(
4176 ffi::Error {
4177 code: ffi::ErrorCode::Unknown,
4178 extended_code: 1,
4179 },
4180 Some(msg.to_owned()),
4181 )
4182 }
4183 assert!(is_vec_table_absent(&make_err(
4184 "no such table: vec_nodes_active"
4185 )));
4186 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
4187 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
4188 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
4189 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
4190 }
4191
4192 #[test]
4195 fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
4196 let db = NamedTempFile::new().expect("temporary db");
4199 let coordinator = ExecutionCoordinator::open(
4200 db.path(),
4201 Arc::new(SchemaManager::new()),
4202 None,
4203 1,
4204 Arc::new(TelemetryCounters::default()),
4205 None,
4206 )
4207 .expect("coordinator");
4208
4209 let compiled = QueryBuilder::nodes("MyKind")
4210 .vector_search("some query", 5)
4211 .compile()
4212 .expect("vector query compiles");
4213
4214 let rows = coordinator
4215 .execute_compiled_read(&compiled)
4216 .expect("degraded read must succeed");
4217 assert!(
4218 rows.was_degraded,
4219 "must degrade when vec_mykind table does not exist"
4220 );
4221 assert!(
4222 rows.nodes.is_empty(),
4223 "degraded result must return empty nodes"
4224 );
4225 }
4226
4227 #[test]
4228 fn bind_value_text_maps_to_sql_text() {
4229 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
4230 assert_eq!(val, Value::Text("hello".to_owned()));
4231 }
4232
4233 #[test]
4234 fn bind_value_integer_maps_to_sql_integer() {
4235 let val = bind_value_to_sql(&BindValue::Integer(42));
4236 assert_eq!(val, Value::Integer(42));
4237 }
4238
4239 #[test]
4240 fn bind_value_bool_true_maps_to_integer_one() {
4241 let val = bind_value_to_sql(&BindValue::Bool(true));
4242 assert_eq!(val, Value::Integer(1));
4243 }
4244
4245 #[test]
4246 fn bind_value_bool_false_maps_to_integer_zero() {
4247 let val = bind_value_to_sql(&BindValue::Bool(false));
4248 assert_eq!(val, Value::Integer(0));
4249 }
4250
4251 #[test]
4252 fn same_shape_queries_share_one_cache_entry() {
4253 let db = NamedTempFile::new().expect("temporary db");
4254 let coordinator = ExecutionCoordinator::open(
4255 db.path(),
4256 Arc::new(SchemaManager::new()),
4257 None,
4258 1,
4259 Arc::new(TelemetryCounters::default()),
4260 None,
4261 )
4262 .expect("coordinator");
4263
4264 let compiled_a = QueryBuilder::nodes("Meeting")
4265 .text_search("budget", 5)
4266 .limit(10)
4267 .compile()
4268 .expect("compiled a");
4269 let compiled_b = QueryBuilder::nodes("Meeting")
4270 .text_search("standup", 5)
4271 .limit(10)
4272 .compile()
4273 .expect("compiled b");
4274
4275 coordinator
4276 .execute_compiled_read(&compiled_a)
4277 .expect("read a");
4278 coordinator
4279 .execute_compiled_read(&compiled_b)
4280 .expect("read b");
4281
4282 assert_eq!(
4283 compiled_a.shape_hash, compiled_b.shape_hash,
4284 "different bind values, same structural shape → same hash"
4285 );
4286 assert_eq!(coordinator.shape_sql_count(), 1);
4287 }
4288
4289 #[test]
4290 fn vector_read_degrades_gracefully_when_vec_table_absent() {
4291 let db = NamedTempFile::new().expect("temporary db");
4292 let coordinator = ExecutionCoordinator::open(
4293 db.path(),
4294 Arc::new(SchemaManager::new()),
4295 None,
4296 1,
4297 Arc::new(TelemetryCounters::default()),
4298 None,
4299 )
4300 .expect("coordinator");
4301
4302 let compiled = QueryBuilder::nodes("Meeting")
4303 .vector_search("budget embeddings", 5)
4304 .compile()
4305 .expect("vector query compiles");
4306
4307 let result = coordinator.execute_compiled_read(&compiled);
4308 let rows = result.expect("degraded read must succeed, not error");
4309 assert!(
4310 rows.was_degraded,
4311 "result must be flagged as degraded when vec_nodes_active is absent"
4312 );
4313 assert!(
4314 rows.nodes.is_empty(),
4315 "degraded result must return empty nodes"
4316 );
4317 }
4318
4319 #[test]
4320 fn coordinator_caches_by_shape_hash() {
4321 let db = NamedTempFile::new().expect("temporary db");
4322 let coordinator = ExecutionCoordinator::open(
4323 db.path(),
4324 Arc::new(SchemaManager::new()),
4325 None,
4326 1,
4327 Arc::new(TelemetryCounters::default()),
4328 None,
4329 )
4330 .expect("coordinator");
4331
4332 let compiled = QueryBuilder::nodes("Meeting")
4333 .text_search("budget", 5)
4334 .compile()
4335 .expect("compiled query");
4336
4337 coordinator
4338 .execute_compiled_read(&compiled)
4339 .expect("execute compiled read");
4340 assert_eq!(coordinator.shape_sql_count(), 1);
4341 }
4342
4343 #[test]
4346 fn explain_returns_correct_sql() {
4347 let db = NamedTempFile::new().expect("temporary db");
4348 let coordinator = ExecutionCoordinator::open(
4349 db.path(),
4350 Arc::new(SchemaManager::new()),
4351 None,
4352 1,
4353 Arc::new(TelemetryCounters::default()),
4354 None,
4355 )
4356 .expect("coordinator");
4357
4358 let compiled = QueryBuilder::nodes("Meeting")
4359 .text_search("budget", 5)
4360 .compile()
4361 .expect("compiled query");
4362
4363 let plan = coordinator.explain_compiled_read(&compiled);
4364
4365 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
4366 }
4367
4368 #[test]
4369 fn explain_returns_correct_driving_table() {
4370 use fathomdb_query::DrivingTable;
4371
4372 let db = NamedTempFile::new().expect("temporary db");
4373 let coordinator = ExecutionCoordinator::open(
4374 db.path(),
4375 Arc::new(SchemaManager::new()),
4376 None,
4377 1,
4378 Arc::new(TelemetryCounters::default()),
4379 None,
4380 )
4381 .expect("coordinator");
4382
4383 let compiled = QueryBuilder::nodes("Meeting")
4384 .text_search("budget", 5)
4385 .compile()
4386 .expect("compiled query");
4387
4388 let plan = coordinator.explain_compiled_read(&compiled);
4389
4390 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
4391 }
4392
4393 #[test]
4394 fn explain_reports_cache_miss_then_hit() {
4395 let db = NamedTempFile::new().expect("temporary db");
4396 let coordinator = ExecutionCoordinator::open(
4397 db.path(),
4398 Arc::new(SchemaManager::new()),
4399 None,
4400 1,
4401 Arc::new(TelemetryCounters::default()),
4402 None,
4403 )
4404 .expect("coordinator");
4405
4406 let compiled = QueryBuilder::nodes("Meeting")
4407 .text_search("budget", 5)
4408 .compile()
4409 .expect("compiled query");
4410
4411 let plan_before = coordinator.explain_compiled_read(&compiled);
4413 assert!(
4414 !plan_before.cache_hit,
4415 "cache miss expected before first execute"
4416 );
4417
4418 coordinator
4420 .execute_compiled_read(&compiled)
4421 .expect("execute read");
4422
4423 let plan_after = coordinator.explain_compiled_read(&compiled);
4425 assert!(
4426 plan_after.cache_hit,
4427 "cache hit expected after first execute"
4428 );
4429 }
4430
4431 #[test]
4432 fn explain_does_not_execute_query() {
4433 let db = NamedTempFile::new().expect("temporary db");
4438 let coordinator = ExecutionCoordinator::open(
4439 db.path(),
4440 Arc::new(SchemaManager::new()),
4441 None,
4442 1,
4443 Arc::new(TelemetryCounters::default()),
4444 None,
4445 )
4446 .expect("coordinator");
4447
4448 let compiled = QueryBuilder::nodes("Meeting")
4449 .text_search("anything", 5)
4450 .compile()
4451 .expect("compiled query");
4452
4453 let plan = coordinator.explain_compiled_read(&compiled);
4455
4456 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
4457 assert_eq!(plan.bind_count, compiled.binds.len());
4458 }
4459
4460 #[test]
4461 fn coordinator_executes_compiled_read() {
4462 let db = NamedTempFile::new().expect("temporary db");
4463 let coordinator = ExecutionCoordinator::open(
4464 db.path(),
4465 Arc::new(SchemaManager::new()),
4466 None,
4467 1,
4468 Arc::new(TelemetryCounters::default()),
4469 None,
4470 )
4471 .expect("coordinator");
4472 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4473
4474 conn.execute_batch(
4475 r#"
4476 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4477 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
4478 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4479 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
4480 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4481 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
4482 "#,
4483 )
4484 .expect("seed data");
4485
4486 let compiled = QueryBuilder::nodes("Meeting")
4487 .text_search("budget", 5)
4488 .limit(5)
4489 .compile()
4490 .expect("compiled query");
4491
4492 let rows = coordinator
4493 .execute_compiled_read(&compiled)
4494 .expect("execute read");
4495
4496 assert_eq!(rows.nodes.len(), 1);
4497 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4498 }
4499
4500 #[test]
4501 fn text_search_finds_structured_only_node_via_property_fts() {
4502 let db = NamedTempFile::new().expect("temporary db");
4503 let coordinator = ExecutionCoordinator::open(
4504 db.path(),
4505 Arc::new(SchemaManager::new()),
4506 None,
4507 1,
4508 Arc::new(TelemetryCounters::default()),
4509 None,
4510 )
4511 .expect("coordinator");
4512 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4513
4514 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
4517 conn.execute_batch(&format!(
4518 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
4519 node_logical_id UNINDEXED, text_content, \
4520 tokenize = 'porter unicode61 remove_diacritics 2'\
4521 )"
4522 ))
4523 .expect("create per-kind fts table");
4524 conn.execute_batch(&format!(
4525 r#"
4526 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4527 VALUES ('row-1', 'goal-1', 'Goal', '{{"name":"Ship v2"}}', 100, 'seed');
4528 INSERT INTO {goal_table} (node_logical_id, text_content)
4529 VALUES ('goal-1', 'Ship v2');
4530 "#
4531 ))
4532 .expect("seed data");
4533
4534 let compiled = QueryBuilder::nodes("Goal")
4535 .text_search("Ship", 5)
4536 .limit(5)
4537 .compile()
4538 .expect("compiled query");
4539
4540 let rows = coordinator
4541 .execute_compiled_read(&compiled)
4542 .expect("execute read");
4543
4544 assert_eq!(rows.nodes.len(), 1);
4545 assert_eq!(rows.nodes[0].logical_id, "goal-1");
4546 }
4547
4548 #[test]
4549 fn text_search_returns_both_chunk_and_property_backed_hits() {
4550 let db = NamedTempFile::new().expect("temporary db");
4551 let coordinator = ExecutionCoordinator::open(
4552 db.path(),
4553 Arc::new(SchemaManager::new()),
4554 None,
4555 1,
4556 Arc::new(TelemetryCounters::default()),
4557 None,
4558 )
4559 .expect("coordinator");
4560 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4561
4562 conn.execute_batch(
4564 r"
4565 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4566 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4567 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4568 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
4569 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4570 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
4571 ",
4572 )
4573 .expect("seed chunk-backed node");
4574
4575 let meeting_table = fathomdb_schema::fts_kind_table_name("Meeting");
4578 conn.execute_batch(&format!(
4579 "CREATE VIRTUAL TABLE IF NOT EXISTS {meeting_table} USING fts5(\
4580 node_logical_id UNINDEXED, text_content, \
4581 tokenize = 'porter unicode61 remove_diacritics 2'\
4582 )"
4583 ))
4584 .expect("create per-kind fts table");
4585 conn.execute_batch(&format!(
4586 r#"
4587 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4588 VALUES ('row-2', 'meeting-2', 'Meeting', '{{"title":"quarterly sync"}}', 100, 'seed');
4589 INSERT INTO {meeting_table} (node_logical_id, text_content)
4590 VALUES ('meeting-2', 'quarterly sync');
4591 "#
4592 ))
4593 .expect("seed property-backed node");
4594
4595 let compiled = QueryBuilder::nodes("Meeting")
4596 .text_search("quarterly", 10)
4597 .limit(10)
4598 .compile()
4599 .expect("compiled query");
4600
4601 let rows = coordinator
4602 .execute_compiled_read(&compiled)
4603 .expect("execute read");
4604
4605 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
4606 ids.sort_unstable();
4607 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
4608 }
4609
4610 #[test]
4611 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
4612 let db = NamedTempFile::new().expect("temporary db");
4613 let coordinator = ExecutionCoordinator::open(
4614 db.path(),
4615 Arc::new(SchemaManager::new()),
4616 None,
4617 1,
4618 Arc::new(TelemetryCounters::default()),
4619 None,
4620 )
4621 .expect("coordinator");
4622 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4623
4624 conn.execute_batch(
4625 r"
4626 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4627 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4628 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4629 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4630 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4631 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4632 ",
4633 )
4634 .expect("seed chunk-backed node");
4635
4636 let compiled = QueryBuilder::nodes("Meeting")
4637 .text_search("not a ship", 10)
4638 .limit(10)
4639 .compile()
4640 .expect("compiled query");
4641
4642 let rows = coordinator
4643 .execute_compiled_read(&compiled)
4644 .expect("execute read");
4645
4646 assert_eq!(rows.nodes.len(), 1);
4647 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4648 }
4649
4650 #[test]
4653 fn capability_gate_reports_false_without_feature() {
4654 let db = NamedTempFile::new().expect("temporary db");
4655 let coordinator = ExecutionCoordinator::open(
4658 db.path(),
4659 Arc::new(SchemaManager::new()),
4660 None,
4661 1,
4662 Arc::new(TelemetryCounters::default()),
4663 None,
4664 )
4665 .expect("coordinator");
4666 assert!(
4667 !coordinator.vector_enabled(),
4668 "vector_enabled must be false when no dimension is requested"
4669 );
4670 }
4671
4672 #[cfg(feature = "sqlite-vec")]
4673 #[test]
4674 fn capability_gate_reports_true_when_feature_enabled() {
4675 let db = NamedTempFile::new().expect("temporary db");
4676 let coordinator = ExecutionCoordinator::open(
4677 db.path(),
4678 Arc::new(SchemaManager::new()),
4679 Some(128),
4680 1,
4681 Arc::new(TelemetryCounters::default()),
4682 None,
4683 )
4684 .expect("coordinator");
4685 assert!(
4686 coordinator.vector_enabled(),
4687 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4688 );
4689 }
4690
4691 #[test]
4694 fn read_run_returns_inserted_run() {
4695 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4696
4697 let db = NamedTempFile::new().expect("temporary db");
4698 let writer = WriterActor::start(
4699 db.path(),
4700 Arc::new(SchemaManager::new()),
4701 ProvenanceMode::Warn,
4702 Arc::new(TelemetryCounters::default()),
4703 )
4704 .expect("writer");
4705 writer
4706 .submit(WriteRequest {
4707 label: "runtime".to_owned(),
4708 nodes: vec![],
4709 node_retires: vec![],
4710 edges: vec![],
4711 edge_retires: vec![],
4712 chunks: vec![],
4713 runs: vec![RunInsert {
4714 id: "run-r1".to_owned(),
4715 kind: "session".to_owned(),
4716 status: "active".to_owned(),
4717 properties: "{}".to_owned(),
4718 source_ref: Some("src-1".to_owned()),
4719 upsert: false,
4720 supersedes_id: None,
4721 }],
4722 steps: vec![],
4723 actions: vec![],
4724 optional_backfills: vec![],
4725 vec_inserts: vec![],
4726 operational_writes: vec![],
4727 })
4728 .expect("write run");
4729
4730 let coordinator = ExecutionCoordinator::open(
4731 db.path(),
4732 Arc::new(SchemaManager::new()),
4733 None,
4734 1,
4735 Arc::new(TelemetryCounters::default()),
4736 None,
4737 )
4738 .expect("coordinator");
4739 let row = coordinator
4740 .read_run("run-r1")
4741 .expect("read_run")
4742 .expect("row exists");
4743 assert_eq!(row.id, "run-r1");
4744 assert_eq!(row.kind, "session");
4745 assert_eq!(row.status, "active");
4746 }
4747
4748 #[test]
4749 fn read_step_returns_inserted_step() {
4750 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4751
4752 let db = NamedTempFile::new().expect("temporary db");
4753 let writer = WriterActor::start(
4754 db.path(),
4755 Arc::new(SchemaManager::new()),
4756 ProvenanceMode::Warn,
4757 Arc::new(TelemetryCounters::default()),
4758 )
4759 .expect("writer");
4760 writer
4761 .submit(WriteRequest {
4762 label: "runtime".to_owned(),
4763 nodes: vec![],
4764 node_retires: vec![],
4765 edges: vec![],
4766 edge_retires: vec![],
4767 chunks: vec![],
4768 runs: vec![RunInsert {
4769 id: "run-s1".to_owned(),
4770 kind: "session".to_owned(),
4771 status: "active".to_owned(),
4772 properties: "{}".to_owned(),
4773 source_ref: Some("src-1".to_owned()),
4774 upsert: false,
4775 supersedes_id: None,
4776 }],
4777 steps: vec![StepInsert {
4778 id: "step-s1".to_owned(),
4779 run_id: "run-s1".to_owned(),
4780 kind: "llm".to_owned(),
4781 status: "completed".to_owned(),
4782 properties: "{}".to_owned(),
4783 source_ref: Some("src-1".to_owned()),
4784 upsert: false,
4785 supersedes_id: None,
4786 }],
4787 actions: vec![],
4788 optional_backfills: vec![],
4789 vec_inserts: vec![],
4790 operational_writes: vec![],
4791 })
4792 .expect("write step");
4793
4794 let coordinator = ExecutionCoordinator::open(
4795 db.path(),
4796 Arc::new(SchemaManager::new()),
4797 None,
4798 1,
4799 Arc::new(TelemetryCounters::default()),
4800 None,
4801 )
4802 .expect("coordinator");
4803 let row = coordinator
4804 .read_step("step-s1")
4805 .expect("read_step")
4806 .expect("row exists");
4807 assert_eq!(row.id, "step-s1");
4808 assert_eq!(row.run_id, "run-s1");
4809 assert_eq!(row.kind, "llm");
4810 }
4811
4812 #[test]
4813 fn read_action_returns_inserted_action() {
4814 use crate::{
4815 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4816 writer::{ActionInsert, StepInsert},
4817 };
4818
4819 let db = NamedTempFile::new().expect("temporary db");
4820 let writer = WriterActor::start(
4821 db.path(),
4822 Arc::new(SchemaManager::new()),
4823 ProvenanceMode::Warn,
4824 Arc::new(TelemetryCounters::default()),
4825 )
4826 .expect("writer");
4827 writer
4828 .submit(WriteRequest {
4829 label: "runtime".to_owned(),
4830 nodes: vec![],
4831 node_retires: vec![],
4832 edges: vec![],
4833 edge_retires: vec![],
4834 chunks: vec![],
4835 runs: vec![RunInsert {
4836 id: "run-a1".to_owned(),
4837 kind: "session".to_owned(),
4838 status: "active".to_owned(),
4839 properties: "{}".to_owned(),
4840 source_ref: Some("src-1".to_owned()),
4841 upsert: false,
4842 supersedes_id: None,
4843 }],
4844 steps: vec![StepInsert {
4845 id: "step-a1".to_owned(),
4846 run_id: "run-a1".to_owned(),
4847 kind: "llm".to_owned(),
4848 status: "completed".to_owned(),
4849 properties: "{}".to_owned(),
4850 source_ref: Some("src-1".to_owned()),
4851 upsert: false,
4852 supersedes_id: None,
4853 }],
4854 actions: vec![ActionInsert {
4855 id: "action-a1".to_owned(),
4856 step_id: "step-a1".to_owned(),
4857 kind: "emit".to_owned(),
4858 status: "completed".to_owned(),
4859 properties: "{}".to_owned(),
4860 source_ref: Some("src-1".to_owned()),
4861 upsert: false,
4862 supersedes_id: None,
4863 }],
4864 optional_backfills: vec![],
4865 vec_inserts: vec![],
4866 operational_writes: vec![],
4867 })
4868 .expect("write action");
4869
4870 let coordinator = ExecutionCoordinator::open(
4871 db.path(),
4872 Arc::new(SchemaManager::new()),
4873 None,
4874 1,
4875 Arc::new(TelemetryCounters::default()),
4876 None,
4877 )
4878 .expect("coordinator");
4879 let row = coordinator
4880 .read_action("action-a1")
4881 .expect("read_action")
4882 .expect("row exists");
4883 assert_eq!(row.id, "action-a1");
4884 assert_eq!(row.step_id, "step-a1");
4885 assert_eq!(row.kind, "emit");
4886 }
4887
4888 #[test]
4889 fn read_active_runs_excludes_superseded() {
4890 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4891
4892 let db = NamedTempFile::new().expect("temporary db");
4893 let writer = WriterActor::start(
4894 db.path(),
4895 Arc::new(SchemaManager::new()),
4896 ProvenanceMode::Warn,
4897 Arc::new(TelemetryCounters::default()),
4898 )
4899 .expect("writer");
4900
4901 writer
4903 .submit(WriteRequest {
4904 label: "v1".to_owned(),
4905 nodes: vec![],
4906 node_retires: vec![],
4907 edges: vec![],
4908 edge_retires: vec![],
4909 chunks: vec![],
4910 runs: vec![RunInsert {
4911 id: "run-v1".to_owned(),
4912 kind: "session".to_owned(),
4913 status: "active".to_owned(),
4914 properties: "{}".to_owned(),
4915 source_ref: Some("src-1".to_owned()),
4916 upsert: false,
4917 supersedes_id: None,
4918 }],
4919 steps: vec![],
4920 actions: vec![],
4921 optional_backfills: vec![],
4922 vec_inserts: vec![],
4923 operational_writes: vec![],
4924 })
4925 .expect("v1 write");
4926
4927 writer
4929 .submit(WriteRequest {
4930 label: "v2".to_owned(),
4931 nodes: vec![],
4932 node_retires: vec![],
4933 edges: vec![],
4934 edge_retires: vec![],
4935 chunks: vec![],
4936 runs: vec![RunInsert {
4937 id: "run-v2".to_owned(),
4938 kind: "session".to_owned(),
4939 status: "completed".to_owned(),
4940 properties: "{}".to_owned(),
4941 source_ref: Some("src-2".to_owned()),
4942 upsert: true,
4943 supersedes_id: Some("run-v1".to_owned()),
4944 }],
4945 steps: vec![],
4946 actions: vec![],
4947 optional_backfills: vec![],
4948 vec_inserts: vec![],
4949 operational_writes: vec![],
4950 })
4951 .expect("v2 write");
4952
4953 let coordinator = ExecutionCoordinator::open(
4954 db.path(),
4955 Arc::new(SchemaManager::new()),
4956 None,
4957 1,
4958 Arc::new(TelemetryCounters::default()),
4959 None,
4960 )
4961 .expect("coordinator");
4962 let active = coordinator.read_active_runs().expect("read_active_runs");
4963
4964 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4965 assert_eq!(active[0].id, "run-v2");
4966 }
4967
4968 #[allow(clippy::panic)]
4969 fn poison_connection(coordinator: &ExecutionCoordinator) {
4970 let result = catch_unwind(AssertUnwindSafe(|| {
4971 let _guard = coordinator.pool.connections[0]
4972 .lock()
4973 .expect("poison test lock");
4974 panic!("poison coordinator connection mutex");
4975 }));
4976 assert!(
4977 result.is_err(),
4978 "poison test must unwind while holding the connection mutex"
4979 );
4980 }
4981
4982 #[allow(clippy::panic)]
4983 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4984 where
4985 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4986 {
4987 match op(coordinator) {
4988 Err(EngineError::Bridge(message)) => {
4989 assert_eq!(message, "connection mutex poisoned");
4990 }
4991 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4992 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4993 }
4994 }
4995
4996 #[test]
4997 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4998 let db = NamedTempFile::new().expect("temporary db");
4999 let coordinator = ExecutionCoordinator::open(
5000 db.path(),
5001 Arc::new(SchemaManager::new()),
5002 None,
5003 1,
5004 Arc::new(TelemetryCounters::default()),
5005 None,
5006 )
5007 .expect("coordinator");
5008
5009 poison_connection(&coordinator);
5010
5011 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
5012 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
5013 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
5014 assert_poisoned_connection_error(
5015 &coordinator,
5016 super::ExecutionCoordinator::read_active_runs,
5017 );
5018 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
5019 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
5020 }
5021
5022 #[test]
5025 fn shape_cache_stays_bounded() {
5026 use fathomdb_query::ShapeHash;
5027
5028 let db = NamedTempFile::new().expect("temporary db");
5029 let coordinator = ExecutionCoordinator::open(
5030 db.path(),
5031 Arc::new(SchemaManager::new()),
5032 None,
5033 1,
5034 Arc::new(TelemetryCounters::default()),
5035 None,
5036 )
5037 .expect("coordinator");
5038
5039 {
5041 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
5042 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
5043 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
5044 }
5045 }
5046 let compiled = QueryBuilder::nodes("Meeting")
5051 .text_search("budget", 5)
5052 .limit(10)
5053 .compile()
5054 .expect("compiled query");
5055
5056 coordinator
5057 .execute_compiled_read(&compiled)
5058 .expect("execute read");
5059
5060 assert!(
5061 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
5062 "shape cache must stay bounded: got {} entries, max {}",
5063 coordinator.shape_sql_count(),
5064 super::MAX_SHAPE_CACHE_SIZE
5065 );
5066 }
5067
5068 #[test]
5071 fn read_pool_size_configurable() {
5072 let db = NamedTempFile::new().expect("temporary db");
5073 let coordinator = ExecutionCoordinator::open(
5074 db.path(),
5075 Arc::new(SchemaManager::new()),
5076 None,
5077 2,
5078 Arc::new(TelemetryCounters::default()),
5079 None,
5080 )
5081 .expect("coordinator with pool_size=2");
5082
5083 assert_eq!(coordinator.pool.size(), 2);
5084
5085 let compiled = QueryBuilder::nodes("Meeting")
5087 .text_search("budget", 5)
5088 .limit(10)
5089 .compile()
5090 .expect("compiled query");
5091
5092 let result = coordinator.execute_compiled_read(&compiled);
5093 assert!(result.is_ok(), "read through pool must succeed");
5094 }
5095
5096 #[test]
5099 fn grouped_read_results_match_baseline() {
5100 use fathomdb_query::TraverseDirection;
5101
5102 let db = NamedTempFile::new().expect("temporary db");
5103
5104 let coordinator = ExecutionCoordinator::open(
5106 db.path(),
5107 Arc::new(SchemaManager::new()),
5108 None,
5109 1,
5110 Arc::new(TelemetryCounters::default()),
5111 None,
5112 )
5113 .expect("coordinator");
5114
5115 {
5118 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
5119 for i in 0..10 {
5120 conn.execute_batch(&format!(
5121 r#"
5122 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5123 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
5124 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5125 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
5126 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5127 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
5128
5129 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5130 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
5131 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5132 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
5133
5134 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
5135 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
5136 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
5137 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
5138 "#,
5139 )).expect("seed data");
5140 }
5141 }
5142
5143 let compiled = QueryBuilder::nodes("Meeting")
5144 .text_search("meeting", 10)
5145 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
5146 .limit(10)
5147 .compile_grouped()
5148 .expect("compiled grouped query");
5149
5150 let result = coordinator
5151 .execute_compiled_grouped_read(&compiled)
5152 .expect("grouped read");
5153
5154 assert!(!result.was_degraded, "grouped read should not be degraded");
5155 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
5156 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
5157 assert_eq!(result.expansions[0].slot, "tasks");
5158 assert_eq!(
5159 result.expansions[0].roots.len(),
5160 10,
5161 "each expansion slot should have entries for all 10 roots"
5162 );
5163
5164 for root_expansion in &result.expansions[0].roots {
5166 assert_eq!(
5167 root_expansion.nodes.len(),
5168 2,
5169 "root {} should have 2 expansion nodes, got {}",
5170 root_expansion.root_logical_id,
5171 root_expansion.nodes.len()
5172 );
5173 }
5174 }
5175
5176 #[test]
5179 fn build_bm25_expr_no_weights() {
5180 let schema_json = r#"["$.title","$.body"]"#;
5181 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
5182 assert_eq!(result, "bm25(fts_props_testkind)");
5183 }
5184
5185 #[test]
5186 fn build_bm25_expr_with_weights() {
5187 let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
5188 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
5189 assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
5190 }
5191
5192 #[test]
5195 #[allow(clippy::too_many_lines)]
5196 fn weighted_schema_bm25_orders_title_match_above_body_match() {
5197 use crate::{
5198 AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
5199 WriterActor, writer::ChunkPolicy,
5200 };
5201 use fathomdb_schema::fts_column_name;
5202
5203 let db = NamedTempFile::new().expect("temporary db");
5204 let schema_manager = Arc::new(SchemaManager::new());
5205
5206 {
5208 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5209 admin
5210 .register_fts_property_schema_with_entries(
5211 "Article",
5212 &[
5213 FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
5214 FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
5215 ],
5216 None,
5217 &[],
5218 crate::rebuild_actor::RebuildMode::Eager,
5219 )
5220 .expect("register schema with weights");
5221 }
5222
5223 let writer = WriterActor::start(
5225 db.path(),
5226 Arc::clone(&schema_manager),
5227 ProvenanceMode::Warn,
5228 Arc::new(TelemetryCounters::default()),
5229 )
5230 .expect("writer");
5231
5232 writer
5234 .submit(WriteRequest {
5235 label: "insert-a".to_owned(),
5236 nodes: vec![NodeInsert {
5237 row_id: "row-a".to_owned(),
5238 logical_id: "article-a".to_owned(),
5239 kind: "Article".to_owned(),
5240 properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
5241 source_ref: Some("src-a".to_owned()),
5242 upsert: false,
5243 chunk_policy: ChunkPolicy::Preserve,
5244 content_ref: None,
5245 }],
5246 node_retires: vec![],
5247 edges: vec![],
5248 edge_retires: vec![],
5249 chunks: vec![],
5250 runs: vec![],
5251 steps: vec![],
5252 actions: vec![],
5253 optional_backfills: vec![],
5254 vec_inserts: vec![],
5255 operational_writes: vec![],
5256 })
5257 .expect("write node A");
5258
5259 writer
5261 .submit(WriteRequest {
5262 label: "insert-b".to_owned(),
5263 nodes: vec![NodeInsert {
5264 row_id: "row-b".to_owned(),
5265 logical_id: "article-b".to_owned(),
5266 kind: "Article".to_owned(),
5267 properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
5268 source_ref: Some("src-b".to_owned()),
5269 upsert: false,
5270 chunk_policy: ChunkPolicy::Preserve,
5271 content_ref: None,
5272 }],
5273 node_retires: vec![],
5274 edges: vec![],
5275 edge_retires: vec![],
5276 chunks: vec![],
5277 runs: vec![],
5278 steps: vec![],
5279 actions: vec![],
5280 optional_backfills: vec![],
5281 vec_inserts: vec![],
5282 operational_writes: vec![],
5283 })
5284 .expect("write node B");
5285
5286 drop(writer);
5287
5288 {
5290 let title_col = fts_column_name("$.title", false);
5291 let body_col = fts_column_name("$.body", false);
5292 let article_table = fathomdb_schema::fts_kind_table_name("Article");
5293 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5294 let count: i64 = conn
5295 .query_row(&format!("SELECT count(*) FROM {article_table}"), [], |r| {
5296 r.get(0)
5297 })
5298 .expect("count fts rows");
5299 assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
5300 let (title_a, body_a): (String, String) = conn
5301 .query_row(
5302 &format!(
5303 "SELECT {title_col}, {body_col} FROM {article_table} \
5304 WHERE node_logical_id = 'article-a'"
5305 ),
5306 [],
5307 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
5308 )
5309 .expect("select article-a");
5310 assert_eq!(
5311 title_a, "rust",
5312 "article-a must have 'rust' in title column"
5313 );
5314 assert_eq!(
5315 body_a, "other",
5316 "article-a must have 'other' in body column"
5317 );
5318 }
5319
5320 let coordinator = ExecutionCoordinator::open(
5322 db.path(),
5323 Arc::clone(&schema_manager),
5324 None,
5325 1,
5326 Arc::new(TelemetryCounters::default()),
5327 None,
5328 )
5329 .expect("coordinator");
5330
5331 let compiled = fathomdb_query::QueryBuilder::nodes("Article")
5332 .text_search("rust", 5)
5333 .limit(10)
5334 .compile()
5335 .expect("compiled query");
5336
5337 let rows = coordinator
5338 .execute_compiled_read(&compiled)
5339 .expect("execute read");
5340
5341 assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
5342 assert_eq!(
5343 rows.nodes[0].logical_id, "article-a",
5344 "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
5345 );
5346 }
5347
5348 #[test]
5359 fn property_fts_hit_matched_paths_from_positions() {
5360 use crate::{AdminService, rebuild_actor::RebuildMode};
5361 use fathomdb_query::compile_search;
5362
5363 let db = NamedTempFile::new().expect("temporary db");
5364 let schema_manager = Arc::new(SchemaManager::new());
5365
5366 {
5369 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5370 admin
5371 .register_fts_property_schema_with_entries(
5372 "Item",
5373 &[
5374 crate::FtsPropertyPathSpec::scalar("$.body"),
5375 crate::FtsPropertyPathSpec::scalar("$.title"),
5376 ],
5377 None,
5378 &[],
5379 RebuildMode::Eager,
5380 )
5381 .expect("register Item FTS schema");
5382 }
5383
5384 let coordinator = ExecutionCoordinator::open(
5385 db.path(),
5386 Arc::clone(&schema_manager),
5387 None,
5388 1,
5389 Arc::new(TelemetryCounters::default()),
5390 None,
5391 )
5392 .expect("coordinator");
5393
5394 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5395
5396 let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
5401 assert_eq!(
5403 crate::writer::LEAF_SEPARATOR.len(),
5404 29,
5405 "LEAF_SEPARATOR length changed; update position offsets"
5406 );
5407
5408 conn.execute(
5409 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5410 VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
5411 [],
5412 )
5413 .expect("insert node");
5414 let item_table = fathomdb_schema::fts_kind_table_name("Item");
5416 conn.execute(
5417 &format!(
5418 "INSERT INTO {item_table} (node_logical_id, text_content) \
5419 VALUES ('item-1', ?1)"
5420 ),
5421 rusqlite::params![blob],
5422 )
5423 .expect("insert fts row");
5424 conn.execute(
5425 "INSERT INTO fts_node_property_positions \
5426 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5427 VALUES ('item-1', 'Item', 0, 5, '$.body')",
5428 [],
5429 )
5430 .expect("insert body position");
5431 conn.execute(
5432 "INSERT INTO fts_node_property_positions \
5433 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5434 VALUES ('item-1', 'Item', 34, 44, '$.title')",
5435 [],
5436 )
5437 .expect("insert title position");
5438
5439 let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
5440 let mut compiled = compile_search(ast.ast()).expect("compile search");
5441 compiled.attribution_requested = true;
5442
5443 let rows = coordinator
5444 .execute_compiled_search(&compiled)
5445 .expect("search");
5446
5447 assert!(!rows.hits.is_empty(), "expected at least one hit");
5448 let hit = rows
5449 .hits
5450 .iter()
5451 .find(|h| h.node.logical_id == "item-1")
5452 .expect("item-1 must be in hits");
5453
5454 let att = hit
5455 .attribution
5456 .as_ref()
5457 .expect("attribution must be Some when attribution_requested");
5458 assert!(
5459 att.matched_paths.contains(&"$.title".to_owned()),
5460 "matched_paths must contain '$.title', got {:?}",
5461 att.matched_paths,
5462 );
5463 assert!(
5464 !att.matched_paths.contains(&"$.body".to_owned()),
5465 "matched_paths must NOT contain '$.body', got {:?}",
5466 att.matched_paths,
5467 );
5468 }
5469
5470 #[test]
5478 fn vector_hit_has_no_attribution() {
5479 use fathomdb_query::compile_vector_search;
5480
5481 let db = NamedTempFile::new().expect("temporary db");
5482 let coordinator = ExecutionCoordinator::open(
5483 db.path(),
5484 Arc::new(SchemaManager::new()),
5485 None,
5486 1,
5487 Arc::new(TelemetryCounters::default()),
5488 None,
5489 )
5490 .expect("coordinator");
5491
5492 let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
5494 let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
5495 compiled.attribution_requested = true;
5496
5497 let rows = coordinator
5500 .execute_compiled_vector_search(&compiled)
5501 .expect("vector search must not error");
5502
5503 assert!(
5504 rows.was_degraded,
5505 "vector search without vec table must degrade"
5506 );
5507 for hit in &rows.hits {
5508 assert!(
5509 hit.attribution.is_none(),
5510 "vector hits must carry attribution = None, got {:?}",
5511 hit.attribution
5512 );
5513 }
5514 }
5515
5516 #[test]
5530 fn chunk_hit_has_text_content_attribution() {
5531 use fathomdb_query::compile_search;
5532
5533 let db = NamedTempFile::new().expect("temporary db");
5534 let coordinator = ExecutionCoordinator::open(
5535 db.path(),
5536 Arc::new(SchemaManager::new()),
5537 None,
5538 1,
5539 Arc::new(TelemetryCounters::default()),
5540 None,
5541 )
5542 .expect("coordinator");
5543
5544 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5545
5546 conn.execute_batch(
5547 r"
5548 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5549 VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
5550 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5551 VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
5552 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5553 VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
5554 ",
5555 )
5556 .expect("seed chunk node");
5557
5558 let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
5559 let mut compiled = compile_search(ast.ast()).expect("compile search");
5560 compiled.attribution_requested = true;
5561
5562 let rows = coordinator
5563 .execute_compiled_search(&compiled)
5564 .expect("search");
5565
5566 assert!(!rows.hits.is_empty(), "expected chunk hit");
5567 let hit = rows
5568 .hits
5569 .iter()
5570 .find(|h| matches!(h.source, SearchHitSource::Chunk))
5571 .expect("must have a Chunk hit");
5572
5573 let att = hit
5574 .attribution
5575 .as_ref()
5576 .expect("attribution must be Some when attribution_requested");
5577 assert_eq!(
5578 att.matched_paths,
5579 vec!["text_content".to_owned()],
5580 "chunk matched_paths must be [\"text_content\"], got {:?}",
5581 att.matched_paths,
5582 );
5583 }
5584
5585 #[test]
5592 #[allow(clippy::too_many_lines)]
5593 fn mixed_kind_results_get_per_kind_matched_paths() {
5594 use crate::{AdminService, rebuild_actor::RebuildMode};
5595 use fathomdb_query::compile_search;
5596
5597 let db = NamedTempFile::new().expect("temporary db");
5598 let schema_manager = Arc::new(SchemaManager::new());
5599
5600 {
5603 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5604 admin
5605 .register_fts_property_schema_with_entries(
5606 "KindA",
5607 &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
5608 None,
5609 &[],
5610 RebuildMode::Eager,
5611 )
5612 .expect("register KindA FTS schema");
5613 admin
5614 .register_fts_property_schema_with_entries(
5615 "KindB",
5616 &[crate::FtsPropertyPathSpec::scalar("$.beta")],
5617 None,
5618 &[],
5619 RebuildMode::Eager,
5620 )
5621 .expect("register KindB FTS schema");
5622 }
5623
5624 let coordinator = ExecutionCoordinator::open(
5625 db.path(),
5626 Arc::clone(&schema_manager),
5627 None,
5628 1,
5629 Arc::new(TelemetryCounters::default()),
5630 None,
5631 )
5632 .expect("coordinator");
5633
5634 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5635
5636 conn.execute(
5638 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5639 VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5640 [],
5641 )
5642 .expect("insert KindA node");
5643 let table_for_a = fathomdb_schema::fts_kind_table_name("KindA");
5645 conn.execute(
5646 &format!(
5647 "INSERT INTO {table_for_a} (node_logical_id, text_content) \
5648 VALUES ('node-a', 'xenoterm')"
5649 ),
5650 [],
5651 )
5652 .expect("insert KindA fts row");
5653 conn.execute(
5654 "INSERT INTO fts_node_property_positions \
5655 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5656 VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5657 [],
5658 )
5659 .expect("insert KindA position");
5660
5661 conn.execute(
5663 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5664 VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5665 [],
5666 )
5667 .expect("insert KindB node");
5668 let table_for_b = fathomdb_schema::fts_kind_table_name("KindB");
5670 conn.execute(
5671 &format!(
5672 "INSERT INTO {table_for_b} (node_logical_id, text_content) \
5673 VALUES ('node-b', 'xenoterm')"
5674 ),
5675 [],
5676 )
5677 .expect("insert KindB fts row");
5678 conn.execute(
5679 "INSERT INTO fts_node_property_positions \
5680 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5681 VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5682 [],
5683 )
5684 .expect("insert KindB position");
5685
5686 let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5688 let mut compiled = compile_search(ast.ast()).expect("compile search");
5689 compiled.attribution_requested = true;
5690
5691 let rows = coordinator
5692 .execute_compiled_search(&compiled)
5693 .expect("search");
5694
5695 assert!(
5697 rows.hits.len() >= 2,
5698 "expected hits for both kinds, got {}",
5699 rows.hits.len()
5700 );
5701
5702 for hit in &rows.hits {
5703 let att = hit
5704 .attribution
5705 .as_ref()
5706 .expect("attribution must be Some when attribution_requested");
5707 match hit.node.kind.as_str() {
5708 "KindA" => {
5709 assert_eq!(
5710 att.matched_paths,
5711 vec!["$.alpha".to_owned()],
5712 "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5713 att.matched_paths,
5714 );
5715 }
5716 "KindB" => {
5717 assert_eq!(
5718 att.matched_paths,
5719 vec!["$.beta".to_owned()],
5720 "KindB hit must have matched_paths=['$.beta'], got {:?}",
5721 att.matched_paths,
5722 );
5723 }
5724 other => {
5725 assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5727 }
5728 }
5729 }
5730 }
5731
5732 #[test]
5735 fn tokenizer_strategy_from_str() {
5736 use super::TokenizerStrategy;
5737 assert_eq!(
5738 TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5739 TokenizerStrategy::RecallOptimizedEnglish,
5740 );
5741 assert_eq!(
5742 TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5743 TokenizerStrategy::PrecisionOptimized,
5744 );
5745 assert_eq!(
5746 TokenizerStrategy::from_str("trigram"),
5747 TokenizerStrategy::SubstringTrigram,
5748 );
5749 assert_eq!(
5750 TokenizerStrategy::from_str("icu"),
5751 TokenizerStrategy::GlobalCjk,
5752 );
5753 assert_eq!(
5754 TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5755 TokenizerStrategy::SourceCode,
5756 );
5757 assert_eq!(
5759 TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5760 TokenizerStrategy::SourceCode,
5761 );
5762 assert_eq!(
5763 TokenizerStrategy::from_str("my_custom_tokenizer"),
5764 TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5765 );
5766 }
5767
5768 #[test]
5769 fn trigram_short_query_returns_empty() {
5770 use fathomdb_query::compile_search;
5771
5772 let db = NamedTempFile::new().expect("temporary db");
5773 let schema_manager = Arc::new(SchemaManager::new());
5774
5775 {
5777 let bootstrap = ExecutionCoordinator::open(
5778 db.path(),
5779 Arc::clone(&schema_manager),
5780 None,
5781 1,
5782 Arc::new(TelemetryCounters::default()),
5783 None,
5784 )
5785 .expect("bootstrap coordinator");
5786 drop(bootstrap);
5787 }
5788
5789 {
5791 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5792 conn.execute_batch(
5793 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5794 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5795 )
5796 .expect("insert profile");
5797 }
5798
5799 let coordinator = ExecutionCoordinator::open(
5801 db.path(),
5802 Arc::clone(&schema_manager),
5803 None,
5804 1,
5805 Arc::new(TelemetryCounters::default()),
5806 None,
5807 )
5808 .expect("coordinator reopen");
5809
5810 let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5812 let compiled = compile_search(ast.ast()).expect("compile search");
5813 let rows = coordinator
5814 .execute_compiled_search(&compiled)
5815 .expect("short trigram query must not error");
5816 assert!(
5817 rows.hits.is_empty(),
5818 "2-char trigram query must return empty"
5819 );
5820 }
5821
5822 #[test]
5823 fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5824 use fathomdb_query::compile_search;
5834
5835 let db = NamedTempFile::new().expect("temporary db");
5836 let schema_manager = Arc::new(SchemaManager::new());
5837
5838 {
5840 let bootstrap = ExecutionCoordinator::open(
5841 db.path(),
5842 Arc::clone(&schema_manager),
5843 None,
5844 1,
5845 Arc::new(TelemetryCounters::default()),
5846 None,
5847 )
5848 .expect("bootstrap coordinator");
5849 drop(bootstrap);
5850 }
5851
5852 {
5854 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5855 conn.execute(
5856 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5857 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5858 [],
5859 )
5860 .expect("insert profile");
5861 conn.execute_batch(
5862 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5863 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5864 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5865 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5866 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5867 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5868 )
5869 .expect("insert node and fts row");
5870 }
5871
5872 let coordinator = ExecutionCoordinator::open(
5874 db.path(),
5875 Arc::clone(&schema_manager),
5876 None,
5877 1,
5878 Arc::new(TelemetryCounters::default()),
5879 None,
5880 )
5881 .expect("coordinator reopen");
5882
5883 let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5885 let compiled = compile_search(ast.ast()).expect("compile search");
5886 let rows = coordinator
5887 .execute_compiled_search(&compiled)
5888 .expect("source code search must not error");
5889 assert!(
5890 !rows.hits.is_empty(),
5891 "SourceCode strategy search for 'std.io' must return the document; \
5892 got empty — FTS5 expression was likely corrupted by post-render escaping"
5893 );
5894 }
5895
5896 #[derive(Debug)]
5899 struct StubEmbedder {
5900 model_identity: String,
5901 dimension: usize,
5902 }
5903
5904 impl StubEmbedder {
5905 fn new(model_identity: &str, dimension: usize) -> Self {
5906 Self {
5907 model_identity: model_identity.to_owned(),
5908 dimension,
5909 }
5910 }
5911 }
5912
5913 impl crate::embedder::QueryEmbedder for StubEmbedder {
5914 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5915 Ok(vec![0.0; self.dimension])
5916 }
5917 fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5918 crate::embedder::QueryEmbedderIdentity {
5919 model_identity: self.model_identity.clone(),
5920 model_version: "1.0".to_owned(),
5921 dimension: self.dimension,
5922 normalization_policy: "l2".to_owned(),
5923 }
5924 }
5925 fn max_tokens(&self) -> usize {
5926 512
5927 }
5928 }
5929
5930 fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5931 let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5932 conn.execute_batch(
5933 "CREATE TABLE IF NOT EXISTS projection_profiles (
5934 kind TEXT NOT NULL,
5935 facet TEXT NOT NULL,
5936 config_json TEXT NOT NULL,
5937 active_at INTEGER,
5938 created_at INTEGER,
5939 PRIMARY KEY (kind, facet)
5940 );",
5941 )
5942 .expect("create projection_profiles");
5943 conn
5944 }
5945
5946 #[test]
5947 fn check_vec_identity_no_profile_no_panic() {
5948 let conn = make_in_memory_db_with_projection_profiles();
5949 let embedder = StubEmbedder::new("bge-small", 384);
5950 let result = super::check_vec_identity_at_open(&conn, &embedder);
5951 assert!(result.is_ok(), "no profile row must return Ok(())");
5952 }
5953
5954 #[test]
5955 fn check_vec_identity_matching_identity_ok() {
5956 let conn = make_in_memory_db_with_projection_profiles();
5957 conn.execute(
5958 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5959 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5960 [],
5961 )
5962 .expect("insert profile");
5963 let embedder = StubEmbedder::new("bge-small", 384);
5964 let result = super::check_vec_identity_at_open(&conn, &embedder);
5965 assert!(result.is_ok(), "matching profile must return Ok(())");
5966 }
5967
5968 #[test]
5969 fn check_vec_identity_mismatched_dimensions_ok() {
5970 let conn = make_in_memory_db_with_projection_profiles();
5971 conn.execute(
5972 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5973 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5974 [],
5975 )
5976 .expect("insert profile");
5977 let embedder = StubEmbedder::new("bge-small", 768);
5979 let result = super::check_vec_identity_at_open(&conn, &embedder);
5980 assert!(
5981 result.is_ok(),
5982 "dimension mismatch must warn and return Ok(())"
5983 );
5984 }
5985
5986 #[test]
5987 fn custom_tokenizer_passthrough() {
5988 use super::TokenizerStrategy;
5989 let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5990 assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5992 assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5994 assert_ne!(strategy, TokenizerStrategy::SourceCode);
5995 }
5996
5997 #[test]
5998 fn check_vec_identity_mismatched_model_ok() {
5999 let conn = make_in_memory_db_with_projection_profiles();
6000 conn.execute(
6001 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
6002 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
6003 [],
6004 )
6005 .expect("insert profile");
6006 let embedder = StubEmbedder::new("bge-large", 384);
6008 let result = super::check_vec_identity_at_open(&conn, &embedder);
6009 assert!(
6010 result.is_ok(),
6011 "model_identity mismatch must warn and return Ok(())"
6012 );
6013 }
6014}