1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10 FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11 SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25const BATCH_CHUNK_SIZE: usize = 200;
30
31fn compile_expansion_in_filter(
36 p: usize,
37 path: &str,
38 value_binds: Vec<Value>,
39) -> (String, Vec<Value>) {
40 let first_val = p + 1;
41 let placeholders = (0..value_binds.len())
42 .map(|i| format!("?{}", first_val + i))
43 .collect::<Vec<_>>()
44 .join(", ");
45 let mut binds = vec![Value::Text(path.to_owned())];
46 binds.extend(value_binds);
47 (
48 format!("\n AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
49 binds,
50 )
51}
52
53#[allow(clippy::too_many_lines)]
68fn compile_expansion_filter(
69 filter: Option<&Predicate>,
70 first_param: usize,
71) -> (String, Vec<Value>) {
72 let Some(predicate) = filter else {
73 return (String::new(), vec![]);
74 };
75 let p = first_param;
76 match predicate {
77 Predicate::JsonPathEq { path, value } => {
78 let val = match value {
79 ScalarValue::Text(t) => Value::Text(t.clone()),
80 ScalarValue::Integer(i) => Value::Integer(*i),
81 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
82 };
83 (
84 format!(
85 "\n AND json_extract(n.properties, ?{p}) = ?{}",
86 p + 1
87 ),
88 vec![Value::Text(path.clone()), val],
89 )
90 }
91 Predicate::JsonPathCompare { path, op, value } => {
92 let val = match value {
93 ScalarValue::Text(t) => Value::Text(t.clone()),
94 ScalarValue::Integer(i) => Value::Integer(*i),
95 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
96 };
97 let operator = match op {
98 ComparisonOp::Gt => ">",
99 ComparisonOp::Gte => ">=",
100 ComparisonOp::Lt => "<",
101 ComparisonOp::Lte => "<=",
102 };
103 (
104 format!(
105 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
106 p + 1
107 ),
108 vec![Value::Text(path.clone()), val],
109 )
110 }
111 Predicate::JsonPathFusedEq { path, value } => (
112 format!(
113 "\n AND json_extract(n.properties, ?{p}) = ?{}",
114 p + 1
115 ),
116 vec![Value::Text(path.clone()), Value::Text(value.clone())],
117 ),
118 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
119 let operator = match op {
120 ComparisonOp::Gt => ">",
121 ComparisonOp::Gte => ">=",
122 ComparisonOp::Lt => "<",
123 ComparisonOp::Lte => "<=",
124 };
125 (
126 format!(
127 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
128 p + 1
129 ),
130 vec![Value::Text(path.clone()), Value::Integer(*value)],
131 )
132 }
133 Predicate::JsonPathFusedBoolEq { path, value } => (
134 format!(
135 "\n AND json_extract(n.properties, ?{p}) = ?{}",
136 p + 1
137 ),
138 vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
139 ),
140 Predicate::KindEq(kind) => (
141 format!("\n AND n.kind = ?{p}"),
142 vec![Value::Text(kind.clone())],
143 ),
144 Predicate::LogicalIdEq(logical_id) => (
145 format!("\n AND n.logical_id = ?{p}"),
146 vec![Value::Text(logical_id.clone())],
147 ),
148 Predicate::SourceRefEq(source_ref) => (
149 format!("\n AND n.source_ref = ?{p}"),
150 vec![Value::Text(source_ref.clone())],
151 ),
152 Predicate::ContentRefEq(uri) => (
153 format!("\n AND n.content_ref = ?{p}"),
154 vec![Value::Text(uri.clone())],
155 ),
156 Predicate::ContentRefNotNull => (
157 "\n AND n.content_ref IS NOT NULL".to_owned(),
158 vec![],
159 ),
160 Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
161 unreachable!(
162 "compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
163 );
164 }
165 Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
166 p,
167 path,
168 values.iter().map(|v| Value::Text(v.clone())).collect(),
169 ),
170 Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
171 p,
172 path,
173 values
174 .iter()
175 .map(|v| match v {
176 ScalarValue::Text(t) => Value::Text(t.clone()),
177 ScalarValue::Integer(i) => Value::Integer(*i),
178 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
179 })
180 .collect(),
181 ),
182 }
183}
184
185fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
195 let Some(predicate) = filter else {
196 return (String::new(), vec![]);
197 };
198 let p = first_param;
199 match predicate {
200 Predicate::EdgePropertyEq { path, value } => {
201 let val = match value {
202 ScalarValue::Text(t) => Value::Text(t.clone()),
203 ScalarValue::Integer(i) => Value::Integer(*i),
204 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
205 };
206 (
207 format!(
208 "\n AND json_extract(e.properties, ?{p}) = ?{}",
209 p + 1
210 ),
211 vec![Value::Text(path.clone()), val],
212 )
213 }
214 Predicate::EdgePropertyCompare { path, op, value } => {
215 let val = match value {
216 ScalarValue::Text(t) => Value::Text(t.clone()),
217 ScalarValue::Integer(i) => Value::Integer(*i),
218 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
219 };
220 let operator = match op {
221 ComparisonOp::Gt => ">",
222 ComparisonOp::Gte => ">=",
223 ComparisonOp::Lt => "<",
224 ComparisonOp::Lte => "<=",
225 };
226 (
227 format!(
228 "\n AND json_extract(e.properties, ?{p}) {operator} ?{}",
229 p + 1
230 ),
231 vec![Value::Text(path.clone()), val],
232 )
233 }
234 _ => {
235 unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
236 }
237 }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq)]
245pub enum TokenizerStrategy {
246 RecallOptimizedEnglish,
248 PrecisionOptimized,
250 SubstringTrigram,
252 GlobalCjk,
254 SourceCode,
256 Custom(String),
258}
259
260impl TokenizerStrategy {
261 pub fn from_str(s: &str) -> Self {
264 match s {
265 "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
266 "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
267 "trigram" => Self::SubstringTrigram,
268 "icu" => Self::GlobalCjk,
269 s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
270 other => Self::Custom(other.to_string()),
271 }
272 }
273}
274
275struct ReadPool {
280 connections: Vec<Mutex<Connection>>,
281}
282
283impl fmt::Debug for ReadPool {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 f.debug_struct("ReadPool")
286 .field("size", &self.connections.len())
287 .finish()
288 }
289}
290
291impl ReadPool {
292 fn new(
303 db_path: &Path,
304 pool_size: usize,
305 schema_manager: &SchemaManager,
306 vector_enabled: bool,
307 ) -> Result<Self, EngineError> {
308 let mut connections = Vec::with_capacity(pool_size);
309 for _ in 0..pool_size {
310 let conn = if vector_enabled {
311 #[cfg(feature = "sqlite-vec")]
312 {
313 sqlite::open_readonly_connection_with_vec(db_path)?
314 }
315 #[cfg(not(feature = "sqlite-vec"))]
316 {
317 sqlite::open_readonly_connection(db_path)?
318 }
319 } else {
320 sqlite::open_readonly_connection(db_path)?
321 };
322 schema_manager
323 .initialize_reader_connection(&conn)
324 .map_err(EngineError::Schema)?;
325 connections.push(Mutex::new(conn));
326 }
327 Ok(Self { connections })
328 }
329
330 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
339 for conn in &self.connections {
341 if let Ok(guard) = conn.try_lock() {
342 return Ok(guard);
343 }
344 }
345 self.connections[0].lock().map_err(|_| {
347 trace_error!("read pool: connection mutex poisoned");
348 EngineError::Bridge("connection mutex poisoned".to_owned())
349 })
350 }
351
352 #[cfg(test)]
354 fn size(&self) -> usize {
355 self.connections.len()
356 }
357}
358
359#[derive(Clone, Debug, PartialEq, Eq)]
363pub struct QueryPlan {
364 pub sql: String,
365 pub bind_count: usize,
366 pub driving_table: DrivingTable,
367 pub shape_hash: ShapeHash,
368 pub cache_hit: bool,
369}
370
371#[derive(Clone, Debug, PartialEq, Eq)]
373pub struct NodeRow {
374 pub row_id: String,
376 pub logical_id: String,
378 pub kind: String,
380 pub properties: String,
382 pub content_ref: Option<String>,
384 pub last_accessed_at: Option<i64>,
386 pub edge_properties: Option<String>,
389}
390
391#[derive(Clone, Debug, PartialEq, Eq)]
393pub struct RunRow {
394 pub id: String,
396 pub kind: String,
398 pub status: String,
400 pub properties: String,
402}
403
404#[derive(Clone, Debug, PartialEq, Eq)]
406pub struct StepRow {
407 pub id: String,
409 pub run_id: String,
411 pub kind: String,
413 pub status: String,
415 pub properties: String,
417}
418
419#[derive(Clone, Debug, PartialEq, Eq)]
421pub struct ActionRow {
422 pub id: String,
424 pub step_id: String,
426 pub kind: String,
428 pub status: String,
430 pub properties: String,
432}
433
434#[derive(Clone, Debug, PartialEq, Eq)]
436pub struct ProvenanceEvent {
437 pub id: String,
438 pub event_type: String,
439 pub subject: String,
440 pub source_ref: Option<String>,
441 pub metadata_json: String,
442 pub created_at: i64,
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Eq)]
447pub struct QueryRows {
448 pub nodes: Vec<NodeRow>,
450 pub runs: Vec<RunRow>,
452 pub steps: Vec<StepRow>,
454 pub actions: Vec<ActionRow>,
456 pub was_degraded: bool,
459}
460
461#[derive(Clone, Debug, PartialEq, Eq)]
463pub struct ExpansionRootRows {
464 pub root_logical_id: String,
466 pub nodes: Vec<NodeRow>,
468}
469
470#[derive(Clone, Debug, PartialEq, Eq)]
472pub struct ExpansionSlotRows {
473 pub slot: String,
475 pub roots: Vec<ExpansionRootRows>,
477}
478
479#[derive(Clone, Debug, Default, PartialEq, Eq)]
481pub struct GroupedQueryRows {
482 pub roots: Vec<NodeRow>,
484 pub expansions: Vec<ExpansionSlotRows>,
486 pub was_degraded: bool,
488}
489
490pub struct ExecutionCoordinator {
492 database_path: PathBuf,
493 schema_manager: Arc<SchemaManager>,
494 pool: ReadPool,
495 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
496 vector_enabled: bool,
497 vec_degradation_warned: AtomicBool,
498 telemetry: Arc<TelemetryCounters>,
499 query_embedder: Option<Arc<dyn QueryEmbedder>>,
506 fts_strategies: HashMap<String, TokenizerStrategy>,
517}
518
519impl fmt::Debug for ExecutionCoordinator {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 f.debug_struct("ExecutionCoordinator")
522 .field("database_path", &self.database_path)
523 .finish_non_exhaustive()
524 }
525}
526
527impl ExecutionCoordinator {
528 pub fn open(
531 path: impl AsRef<Path>,
532 schema_manager: Arc<SchemaManager>,
533 vector_dimension: Option<usize>,
534 pool_size: usize,
535 telemetry: Arc<TelemetryCounters>,
536 query_embedder: Option<Arc<dyn QueryEmbedder>>,
537 ) -> Result<Self, EngineError> {
538 let path = path.as_ref().to_path_buf();
539 #[cfg(feature = "sqlite-vec")]
540 let mut conn = if vector_dimension.is_some() {
541 sqlite::open_connection_with_vec(&path)?
542 } else {
543 sqlite::open_connection(&path)?
544 };
545 #[cfg(not(feature = "sqlite-vec"))]
546 let mut conn = sqlite::open_connection(&path)?;
547
548 let report = schema_manager.bootstrap(&conn)?;
549
550 run_open_time_fts_guards(&mut conn)?;
569
570 #[cfg(feature = "sqlite-vec")]
571 let mut vector_enabled = report.vector_profile_enabled;
572 #[cfg(not(feature = "sqlite-vec"))]
573 let vector_enabled = {
574 let _ = &report;
575 false
576 };
577
578 if vector_dimension.is_some() {
583 #[cfg(feature = "sqlite-vec")]
584 {
585 vector_enabled = true;
586 }
587 }
588
589 if let Some(ref emb) = query_embedder {
592 check_vec_identity_at_open(&conn, emb.as_ref())?;
593 }
594
595 let fts_strategies: HashMap<String, TokenizerStrategy> = {
597 let mut map = HashMap::new();
598 let mut stmt = conn
599 .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
600 let rows = stmt.query_map([], |row| {
601 let kind: String = row.get(0)?;
602 let config_json: String = row.get(1)?;
603 Ok((kind, config_json))
604 })?;
605 for row in rows.flatten() {
606 let (kind, config_json) = row;
607 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
608 && let Some(tok) = v["tokenizer"].as_str()
609 {
610 map.insert(kind, TokenizerStrategy::from_str(tok));
611 }
612 }
613 map
614 };
615
616 drop(conn);
618
619 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
620
621 Ok(Self {
622 database_path: path,
623 schema_manager,
624 pool,
625 shape_sql_map: Mutex::new(HashMap::new()),
626 vector_enabled,
627 vec_degradation_warned: AtomicBool::new(false),
628 telemetry,
629 query_embedder,
630 fts_strategies,
631 })
632 }
633
634 pub fn database_path(&self) -> &Path {
636 &self.database_path
637 }
638
639 #[must_use]
641 pub fn vector_enabled(&self) -> bool {
642 self.vector_enabled
643 }
644
645 #[must_use]
652 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
653 self.query_embedder.as_ref()
654 }
655
656 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
657 self.pool.acquire()
658 }
659
660 #[must_use]
666 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
667 let mut total = SqliteCacheStatus::default();
668 for conn_mutex in &self.pool.connections {
669 if let Ok(conn) = conn_mutex.try_lock() {
670 total.add(&read_db_cache_status(&conn));
671 }
672 }
673 total
674 }
675
676 #[allow(clippy::expect_used, clippy::too_many_lines)]
679 pub fn execute_compiled_read(
680 &self,
681 compiled: &CompiledQuery,
682 ) -> Result<QueryRows, EngineError> {
683 if compiled.driving_table == DrivingTable::FtsNodes
688 && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
689 && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
690 {
691 self.telemetry.increment_queries();
692 return Ok(QueryRows {
693 nodes,
694 runs: Vec::new(),
695 steps: Vec::new(),
696 actions: Vec::new(),
697 was_degraded: false,
698 });
699 }
700
701 let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
711 let conn_check = match self.lock_connection() {
712 Ok(g) => g,
713 Err(e) => {
714 self.telemetry.increment_errors();
715 return Err(e);
716 }
717 };
718 let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
719 drop(conn_check);
720 result?
721 } else if compiled.driving_table == DrivingTable::VecNodes {
722 let root_kind = compiled
723 .binds
724 .get(1)
725 .and_then(|b| {
726 if let BindValue::Text(k) = b {
727 Some(k.as_str())
728 } else {
729 None
730 }
731 })
732 .unwrap_or("");
733 let vec_table = if root_kind.is_empty() {
734 "vec__unknown".to_owned()
735 } else {
736 fathomdb_schema::vec_kind_table_name(root_kind)
737 };
738 let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
739 (new_sql, compiled.binds.clone())
740 } else {
741 (compiled.sql.clone(), compiled.binds.clone())
742 };
743
744 let row_sql = wrap_node_row_projection_sql(&adapted_sql);
745 {
751 let mut cache = self
752 .shape_sql_map
753 .lock()
754 .unwrap_or_else(PoisonError::into_inner);
755 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
756 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
757 cache.clear();
758 }
759 cache.insert(compiled.shape_hash, row_sql.clone());
760 }
761
762 let bind_values = adapted_binds
763 .iter()
764 .map(bind_value_to_sql)
765 .collect::<Vec<_>>();
766
767 let conn_guard = match self.lock_connection() {
772 Ok(g) => g,
773 Err(e) => {
774 self.telemetry.increment_errors();
775 return Err(e);
776 }
777 };
778 let mut statement = match conn_guard.prepare_cached(&row_sql) {
779 Ok(stmt) => stmt,
780 Err(e) if is_vec_table_absent(&e) => {
781 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
782 trace_warn!("vector table absent, degrading to non-vector query");
783 }
784 return Ok(QueryRows {
785 was_degraded: true,
786 ..Default::default()
787 });
788 }
789 Err(e) => {
790 self.telemetry.increment_errors();
791 return Err(EngineError::Sqlite(e));
792 }
793 };
794 let nodes = match statement
795 .query_map(params_from_iter(bind_values.iter()), |row| {
796 Ok(NodeRow {
797 row_id: row.get(0)?,
798 logical_id: row.get(1)?,
799 kind: row.get(2)?,
800 properties: row.get(3)?,
801 content_ref: row.get(4)?,
802 last_accessed_at: row.get(5)?,
803 edge_properties: None,
804 })
805 })
806 .and_then(Iterator::collect)
807 {
808 Ok(rows) => rows,
809 Err(e) => {
810 self.telemetry.increment_errors();
811 return Err(EngineError::Sqlite(e));
812 }
813 };
814
815 self.telemetry.increment_queries();
816 Ok(QueryRows {
817 nodes,
818 runs: Vec::new(),
819 steps: Vec::new(),
820 actions: Vec::new(),
821 was_degraded: false,
822 })
823 }
824
825 pub fn execute_compiled_search(
840 &self,
841 compiled: &CompiledSearch,
842 ) -> Result<SearchRows, EngineError> {
843 let (relaxed_query, was_degraded_at_plan_time) =
850 fathomdb_query::derive_relaxed(&compiled.text_query);
851 let relaxed = relaxed_query.map(|q| CompiledSearch {
852 root_kind: compiled.root_kind.clone(),
853 text_query: q,
854 limit: compiled.limit,
855 fusable_filters: compiled.fusable_filters.clone(),
856 residual_filters: compiled.residual_filters.clone(),
857 attribution_requested: compiled.attribution_requested,
858 });
859 let plan = CompiledSearchPlan {
860 strict: compiled.clone(),
861 relaxed,
862 was_degraded_at_plan_time,
863 };
864 self.execute_compiled_search_plan(&plan)
865 }
866
867 pub fn execute_compiled_search_plan(
886 &self,
887 plan: &CompiledSearchPlan,
888 ) -> Result<SearchRows, EngineError> {
889 let strict = &plan.strict;
890 let limit = strict.limit;
891 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
892
893 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
894 let strict_underfilled = strict_hits.len() < fallback_threshold;
895
896 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
897 let mut fallback_used = false;
898 let mut was_degraded = false;
899 if let Some(relaxed) = plan.relaxed.as_ref()
900 && strict_underfilled
901 {
902 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
903 fallback_used = true;
904 was_degraded = plan.was_degraded_at_plan_time;
905 }
906
907 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
908 if strict.attribution_requested {
912 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
913 self.populate_attribution_for_hits(
914 &mut merged,
915 &strict.text_query,
916 relaxed_text_query,
917 )?;
918 }
919 let strict_hit_count = merged
920 .iter()
921 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
922 .count();
923 let relaxed_hit_count = merged
924 .iter()
925 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
926 .count();
927 let vector_hit_count = 0;
931
932 Ok(SearchRows {
933 hits: merged,
934 strict_hit_count,
935 relaxed_hit_count,
936 vector_hit_count,
937 fallback_used,
938 was_degraded,
939 })
940 }
941
942 #[allow(clippy::too_many_lines)]
971 pub fn execute_compiled_vector_search(
972 &self,
973 compiled: &CompiledVectorSearch,
974 ) -> Result<SearchRows, EngineError> {
975 use std::fmt::Write as _;
976
977 if compiled.limit == 0 {
981 return Ok(SearchRows::default());
982 }
983
984 let filter_by_kind = !compiled.root_kind.is_empty();
985 let mut binds: Vec<BindValue> = Vec::new();
986 binds.push(BindValue::Text(compiled.query_text.clone()));
987 if filter_by_kind {
988 binds.push(BindValue::Text(compiled.root_kind.clone()));
989 }
990
991 let mut fused_clauses = String::new();
994 for predicate in &compiled.fusable_filters {
995 match predicate {
996 Predicate::KindEq(kind) => {
997 binds.push(BindValue::Text(kind.clone()));
998 let idx = binds.len();
999 let _ = write!(
1000 fused_clauses,
1001 "\n AND src.kind = ?{idx}"
1002 );
1003 }
1004 Predicate::LogicalIdEq(logical_id) => {
1005 binds.push(BindValue::Text(logical_id.clone()));
1006 let idx = binds.len();
1007 let _ = write!(
1008 fused_clauses,
1009 "\n AND src.logical_id = ?{idx}"
1010 );
1011 }
1012 Predicate::SourceRefEq(source_ref) => {
1013 binds.push(BindValue::Text(source_ref.clone()));
1014 let idx = binds.len();
1015 let _ = write!(
1016 fused_clauses,
1017 "\n AND src.source_ref = ?{idx}"
1018 );
1019 }
1020 Predicate::ContentRefEq(uri) => {
1021 binds.push(BindValue::Text(uri.clone()));
1022 let idx = binds.len();
1023 let _ = write!(
1024 fused_clauses,
1025 "\n AND src.content_ref = ?{idx}"
1026 );
1027 }
1028 Predicate::ContentRefNotNull => {
1029 fused_clauses
1030 .push_str("\n AND src.content_ref IS NOT NULL");
1031 }
1032 Predicate::JsonPathFusedEq { path, value } => {
1033 binds.push(BindValue::Text(path.clone()));
1034 let path_idx = binds.len();
1035 binds.push(BindValue::Text(value.clone()));
1036 let value_idx = binds.len();
1037 let _ = write!(
1038 fused_clauses,
1039 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1040 );
1041 }
1042 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1043 binds.push(BindValue::Text(path.clone()));
1044 let path_idx = binds.len();
1045 binds.push(BindValue::Integer(*value));
1046 let value_idx = binds.len();
1047 let operator = match op {
1048 ComparisonOp::Gt => ">",
1049 ComparisonOp::Gte => ">=",
1050 ComparisonOp::Lt => "<",
1051 ComparisonOp::Lte => "<=",
1052 };
1053 let _ = write!(
1054 fused_clauses,
1055 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
1056 );
1057 }
1058 Predicate::JsonPathFusedBoolEq { path, value } => {
1059 binds.push(BindValue::Text(path.clone()));
1060 let path_idx = binds.len();
1061 binds.push(BindValue::Integer(i64::from(*value)));
1062 let value_idx = binds.len();
1063 let _ = write!(
1064 fused_clauses,
1065 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1066 );
1067 }
1068 Predicate::JsonPathFusedIn { path, values } => {
1069 binds.push(BindValue::Text(path.clone()));
1070 let first_param = binds.len();
1071 for v in values {
1072 binds.push(BindValue::Text(v.clone()));
1073 }
1074 let placeholders = (1..=values.len())
1075 .map(|i| format!("?{}", first_param + i))
1076 .collect::<Vec<_>>()
1077 .join(", ");
1078 let _ = write!(
1079 fused_clauses,
1080 "\n AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
1081 );
1082 }
1083 Predicate::JsonPathEq { .. }
1084 | Predicate::JsonPathCompare { .. }
1085 | Predicate::JsonPathIn { .. }
1086 | Predicate::EdgePropertyEq { .. }
1087 | Predicate::EdgePropertyCompare { .. } => {
1088 }
1092 }
1093 }
1094
1095 let mut filter_clauses = String::new();
1097 for predicate in &compiled.residual_filters {
1098 match predicate {
1099 Predicate::JsonPathEq { path, value } => {
1100 binds.push(BindValue::Text(path.clone()));
1101 let path_idx = binds.len();
1102 binds.push(scalar_to_bind(value));
1103 let value_idx = binds.len();
1104 let _ = write!(
1105 filter_clauses,
1106 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1107 );
1108 }
1109 Predicate::JsonPathCompare { path, op, value } => {
1110 binds.push(BindValue::Text(path.clone()));
1111 let path_idx = binds.len();
1112 binds.push(scalar_to_bind(value));
1113 let value_idx = binds.len();
1114 let operator = match op {
1115 ComparisonOp::Gt => ">",
1116 ComparisonOp::Gte => ">=",
1117 ComparisonOp::Lt => "<",
1118 ComparisonOp::Lte => "<=",
1119 };
1120 let _ = write!(
1121 filter_clauses,
1122 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1123 );
1124 }
1125 Predicate::JsonPathIn { path, values } => {
1126 binds.push(BindValue::Text(path.clone()));
1127 let first_param = binds.len();
1128 for v in values {
1129 binds.push(scalar_to_bind(v));
1130 }
1131 let placeholders = (1..=values.len())
1132 .map(|i| format!("?{}", first_param + i))
1133 .collect::<Vec<_>>()
1134 .join(", ");
1135 let _ = write!(
1136 filter_clauses,
1137 "\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1138 );
1139 }
1140 Predicate::KindEq(_)
1141 | Predicate::LogicalIdEq(_)
1142 | Predicate::SourceRefEq(_)
1143 | Predicate::ContentRefEq(_)
1144 | Predicate::ContentRefNotNull
1145 | Predicate::JsonPathFusedEq { .. }
1146 | Predicate::JsonPathFusedTimestampCmp { .. }
1147 | Predicate::JsonPathFusedBoolEq { .. }
1148 | Predicate::JsonPathFusedIn { .. }
1149 | Predicate::EdgePropertyEq { .. }
1150 | Predicate::EdgePropertyCompare { .. } => {
1151 }
1154 }
1155 }
1156
1157 let limit = compiled.limit;
1160 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1161 let limit_idx = binds.len();
1162
1163 let base_limit = limit;
1169 let kind_clause = if filter_by_kind {
1170 "\n AND src.kind = ?2"
1171 } else {
1172 ""
1173 };
1174
1175 let vec_table = if compiled.root_kind.is_empty() {
1179 "vec__unknown".to_owned()
1180 } else {
1181 fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1182 };
1183
1184 let sql = format!(
1185 "WITH vector_hits AS (
1186 SELECT
1187 src.row_id AS row_id,
1188 src.logical_id AS logical_id,
1189 src.kind AS kind,
1190 src.properties AS properties,
1191 src.source_ref AS source_ref,
1192 src.content_ref AS content_ref,
1193 src.created_at AS created_at,
1194 vc.distance AS distance,
1195 vc.chunk_id AS chunk_id
1196 FROM (
1197 SELECT chunk_id, distance
1198 FROM {vec_table}
1199 WHERE embedding MATCH ?1
1200 LIMIT {base_limit}
1201 ) vc
1202 JOIN chunks c ON c.id = vc.chunk_id
1203 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1204 WHERE 1 = 1{kind_clause}{fused_clauses}
1205 )
1206 SELECT
1207 h.row_id,
1208 h.logical_id,
1209 h.kind,
1210 h.properties,
1211 h.content_ref,
1212 am.last_accessed_at,
1213 h.created_at,
1214 h.distance,
1215 h.chunk_id
1216 FROM vector_hits h
1217 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1218 WHERE 1 = 1{filter_clauses}
1219 ORDER BY h.distance ASC
1220 LIMIT ?{limit_idx}"
1221 );
1222
1223 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1224
1225 let conn_guard = match self.lock_connection() {
1226 Ok(g) => g,
1227 Err(e) => {
1228 self.telemetry.increment_errors();
1229 return Err(e);
1230 }
1231 };
1232 let mut statement = match conn_guard.prepare_cached(&sql) {
1233 Ok(stmt) => stmt,
1234 Err(e) if is_vec_table_absent(&e) => {
1235 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1237 trace_warn!("vector table absent, degrading vector_search to empty result");
1238 }
1239 return Ok(SearchRows {
1240 hits: Vec::new(),
1241 strict_hit_count: 0,
1242 relaxed_hit_count: 0,
1243 vector_hit_count: 0,
1244 fallback_used: false,
1245 was_degraded: true,
1246 });
1247 }
1248 Err(e) => {
1249 self.telemetry.increment_errors();
1250 return Err(EngineError::Sqlite(e));
1251 }
1252 };
1253
1254 let attribution_requested = compiled.attribution_requested;
1255 let hits = match statement
1256 .query_map(params_from_iter(bind_values.iter()), |row| {
1257 let distance: f64 = row.get(7)?;
1258 let score = -distance;
1265 Ok(SearchHit {
1266 node: fathomdb_query::NodeRowLite {
1267 row_id: row.get(0)?,
1268 logical_id: row.get(1)?,
1269 kind: row.get(2)?,
1270 properties: row.get(3)?,
1271 content_ref: row.get(4)?,
1272 last_accessed_at: row.get(5)?,
1273 },
1274 written_at: row.get(6)?,
1275 score,
1276 modality: RetrievalModality::Vector,
1277 source: SearchHitSource::Vector,
1278 match_mode: None,
1280 snippet: None,
1282 projection_row_id: row.get::<_, Option<String>>(8)?,
1283 vector_distance: Some(distance),
1284 attribution: if attribution_requested {
1285 Some(HitAttribution {
1286 matched_paths: Vec::new(),
1287 })
1288 } else {
1289 None
1290 },
1291 })
1292 })
1293 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1294 {
1295 Ok(rows) => rows,
1296 Err(e) => {
1297 if is_vec_table_absent(&e) {
1301 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1302 trace_warn!(
1303 "vector table absent at query time, degrading vector_search to empty result"
1304 );
1305 }
1306 drop(statement);
1307 drop(conn_guard);
1308 return Ok(SearchRows {
1309 hits: Vec::new(),
1310 strict_hit_count: 0,
1311 relaxed_hit_count: 0,
1312 vector_hit_count: 0,
1313 fallback_used: false,
1314 was_degraded: true,
1315 });
1316 }
1317 self.telemetry.increment_errors();
1318 return Err(EngineError::Sqlite(e));
1319 }
1320 };
1321
1322 drop(statement);
1323 drop(conn_guard);
1324
1325 self.telemetry.increment_queries();
1326 let vector_hit_count = hits.len();
1327 Ok(SearchRows {
1328 hits,
1329 strict_hit_count: 0,
1330 relaxed_hit_count: 0,
1331 vector_hit_count,
1332 fallback_used: false,
1333 was_degraded: false,
1334 })
1335 }
1336
1337 pub fn execute_retrieval_plan(
1369 &self,
1370 plan: &CompiledRetrievalPlan,
1371 raw_query: &str,
1372 ) -> Result<SearchRows, EngineError> {
1373 let mut plan = plan.clone();
1379 let limit = plan.text.strict.limit;
1380
1381 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1383
1384 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1387 let strict_underfilled = strict_hits.len() < fallback_threshold;
1388 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1389 let mut fallback_used = false;
1390 let mut was_degraded = false;
1391 if let Some(relaxed) = plan.text.relaxed.as_ref()
1392 && strict_underfilled
1393 {
1394 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1395 fallback_used = true;
1396 was_degraded = plan.was_degraded_at_plan_time;
1397 }
1398
1399 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1406 if text_branches_empty && self.query_embedder.is_some() {
1407 self.fill_vector_branch(&mut plan, raw_query);
1408 }
1409
1410 let mut vector_hits: Vec<SearchHit> = Vec::new();
1415 if let Some(vector) = plan.vector.as_ref()
1416 && strict_hits.is_empty()
1417 && relaxed_hits.is_empty()
1418 {
1419 let vector_rows = self.execute_compiled_vector_search(vector)?;
1420 vector_hits = vector_rows.hits;
1425 if vector_rows.was_degraded {
1426 was_degraded = true;
1427 }
1428 }
1429 if text_branches_empty
1436 && plan.was_degraded_at_plan_time
1437 && plan.vector.is_none()
1438 && self.query_embedder.is_some()
1439 {
1440 was_degraded = true;
1441 }
1442
1443 let strict = &plan.text.strict;
1445 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1446 if strict.attribution_requested {
1447 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1448 self.populate_attribution_for_hits(
1449 &mut merged,
1450 &strict.text_query,
1451 relaxed_text_query,
1452 )?;
1453 }
1454
1455 let strict_hit_count = merged
1456 .iter()
1457 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1458 .count();
1459 let relaxed_hit_count = merged
1460 .iter()
1461 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1462 .count();
1463 let vector_hit_count = merged
1464 .iter()
1465 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1466 .count();
1467
1468 Ok(SearchRows {
1469 hits: merged,
1470 strict_hit_count,
1471 relaxed_hit_count,
1472 vector_hit_count,
1473 fallback_used,
1474 was_degraded,
1475 })
1476 }
1477
1478 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1494 let Some(embedder) = self.query_embedder.as_ref() else {
1495 return;
1496 };
1497 match embedder.embed_query(raw_query) {
1498 Ok(vec) => {
1499 let literal = match serde_json::to_string(&vec) {
1505 Ok(s) => s,
1506 Err(err) => {
1507 trace_warn!(
1508 error = %err,
1509 "query embedder vector serialization failed; skipping vector branch"
1510 );
1511 let _ = err; plan.was_degraded_at_plan_time = true;
1513 return;
1514 }
1515 };
1516 let strict = &plan.text.strict;
1517 plan.vector = Some(CompiledVectorSearch {
1518 root_kind: strict.root_kind.clone(),
1519 query_text: literal,
1520 limit: strict.limit,
1521 fusable_filters: strict.fusable_filters.clone(),
1522 residual_filters: strict.residual_filters.clone(),
1523 attribution_requested: strict.attribution_requested,
1524 });
1525 }
1526 Err(err) => {
1527 trace_warn!(
1528 error = %err,
1529 "query embedder unavailable, skipping vector branch"
1530 );
1531 let _ = err; plan.was_degraded_at_plan_time = true;
1533 }
1534 }
1535 }
1536
1537 #[allow(clippy::too_many_lines)]
1546 fn run_search_branch(
1547 &self,
1548 compiled: &CompiledSearch,
1549 branch: SearchBranch,
1550 ) -> Result<Vec<SearchHit>, EngineError> {
1551 use std::fmt::Write as _;
1552 if matches!(
1564 compiled.text_query,
1565 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1566 ) {
1567 return Ok(Vec::new());
1568 }
1569 let rendered_base = render_text_query_fts5(&compiled.text_query);
1570 let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1583 if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1584 && rendered_base
1585 .chars()
1586 .filter(|c| c.is_alphanumeric())
1587 .count()
1588 < 3
1589 {
1590 return Ok(Vec::new());
1591 }
1592 let rendered = rendered_base;
1593 let filter_by_kind = !compiled.root_kind.is_empty();
1599
1600 let conn_guard = match self.lock_connection() {
1604 Ok(g) => g,
1605 Err(e) => {
1606 self.telemetry.increment_errors();
1607 return Err(e);
1608 }
1609 };
1610
1611 let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1629 let kind = compiled.root_kind.clone();
1630 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1631 let exists: bool = conn_guard
1632 .query_row(
1633 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1634 rusqlite::params![prop_table],
1635 |_| Ok(true),
1636 )
1637 .optional()
1638 .map_err(EngineError::Sqlite)?
1639 .unwrap_or(false);
1640 if exists {
1641 vec![(kind, prop_table)]
1642 } else {
1643 vec![]
1644 }
1645 } else {
1646 let kind_eq_values: Vec<String> = compiled
1651 .fusable_filters
1652 .iter()
1653 .filter_map(|p| match p {
1654 Predicate::KindEq(k) => Some(k.clone()),
1655 _ => None,
1656 })
1657 .collect();
1658 if kind_eq_values.len() == 1 {
1659 let kind = kind_eq_values[0].clone();
1660 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1661 let exists: bool = conn_guard
1662 .query_row(
1663 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1664 rusqlite::params![prop_table],
1665 |_| Ok(true),
1666 )
1667 .optional()
1668 .map_err(EngineError::Sqlite)?
1669 .unwrap_or(false);
1670 if exists {
1671 vec![(kind, prop_table)]
1672 } else {
1673 vec![]
1674 }
1675 } else {
1676 let mut stmt = conn_guard
1680 .prepare("SELECT kind FROM fts_property_schemas")
1681 .map_err(EngineError::Sqlite)?;
1682 let all_kinds: Vec<String> = stmt
1683 .query_map([], |r| r.get::<_, String>(0))
1684 .map_err(EngineError::Sqlite)?
1685 .collect::<Result<Vec<_>, _>>()
1686 .map_err(EngineError::Sqlite)?;
1687 drop(stmt);
1688 let mut result = Vec::new();
1689 for kind in all_kinds {
1690 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1691 let exists: bool = conn_guard
1692 .query_row(
1693 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1694 rusqlite::params![prop_table],
1695 |_| Ok(true),
1696 )
1697 .optional()
1698 .map_err(EngineError::Sqlite)?
1699 .unwrap_or(false);
1700 if exists {
1701 result.push((kind, prop_table));
1702 }
1703 }
1704 result
1705 }
1706 };
1707 let use_prop_fts = !prop_fts_tables.is_empty();
1708
1709 let mut binds: Vec<BindValue> = if filter_by_kind {
1715 if use_prop_fts {
1716 vec![
1717 BindValue::Text(rendered.clone()),
1718 BindValue::Text(compiled.root_kind.clone()),
1719 BindValue::Text(rendered),
1720 ]
1721 } else {
1722 vec![
1723 BindValue::Text(rendered.clone()),
1724 BindValue::Text(compiled.root_kind.clone()),
1725 ]
1726 }
1727 } else if use_prop_fts {
1728 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1730 } else {
1731 vec![BindValue::Text(rendered)]
1732 };
1733
1734 let mut fused_clauses = String::new();
1743 for predicate in &compiled.fusable_filters {
1744 match predicate {
1745 Predicate::KindEq(kind) => {
1746 binds.push(BindValue::Text(kind.clone()));
1747 let idx = binds.len();
1748 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1749 }
1750 Predicate::LogicalIdEq(logical_id) => {
1751 binds.push(BindValue::Text(logical_id.clone()));
1752 let idx = binds.len();
1753 let _ = write!(
1754 fused_clauses,
1755 "\n AND u.logical_id = ?{idx}"
1756 );
1757 }
1758 Predicate::SourceRefEq(source_ref) => {
1759 binds.push(BindValue::Text(source_ref.clone()));
1760 let idx = binds.len();
1761 let _ = write!(
1762 fused_clauses,
1763 "\n AND u.source_ref = ?{idx}"
1764 );
1765 }
1766 Predicate::ContentRefEq(uri) => {
1767 binds.push(BindValue::Text(uri.clone()));
1768 let idx = binds.len();
1769 let _ = write!(
1770 fused_clauses,
1771 "\n AND u.content_ref = ?{idx}"
1772 );
1773 }
1774 Predicate::ContentRefNotNull => {
1775 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1776 }
1777 Predicate::JsonPathFusedEq { path, value } => {
1778 binds.push(BindValue::Text(path.clone()));
1779 let path_idx = binds.len();
1780 binds.push(BindValue::Text(value.clone()));
1781 let value_idx = binds.len();
1782 let _ = write!(
1783 fused_clauses,
1784 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1785 );
1786 }
1787 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1788 binds.push(BindValue::Text(path.clone()));
1789 let path_idx = binds.len();
1790 binds.push(BindValue::Integer(*value));
1791 let value_idx = binds.len();
1792 let operator = match op {
1793 ComparisonOp::Gt => ">",
1794 ComparisonOp::Gte => ">=",
1795 ComparisonOp::Lt => "<",
1796 ComparisonOp::Lte => "<=",
1797 };
1798 let _ = write!(
1799 fused_clauses,
1800 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1801 );
1802 }
1803 Predicate::JsonPathFusedBoolEq { path, value } => {
1804 binds.push(BindValue::Text(path.clone()));
1805 let path_idx = binds.len();
1806 binds.push(BindValue::Integer(i64::from(*value)));
1807 let value_idx = binds.len();
1808 let _ = write!(
1809 fused_clauses,
1810 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1811 );
1812 }
1813 Predicate::JsonPathFusedIn { path, values } => {
1814 binds.push(BindValue::Text(path.clone()));
1815 let first_param = binds.len();
1816 for v in values {
1817 binds.push(BindValue::Text(v.clone()));
1818 }
1819 let placeholders = (1..=values.len())
1820 .map(|i| format!("?{}", first_param + i))
1821 .collect::<Vec<_>>()
1822 .join(", ");
1823 let _ = write!(
1824 fused_clauses,
1825 "\n AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
1826 );
1827 }
1828 Predicate::JsonPathEq { .. }
1829 | Predicate::JsonPathCompare { .. }
1830 | Predicate::JsonPathIn { .. }
1831 | Predicate::EdgePropertyEq { .. }
1832 | Predicate::EdgePropertyCompare { .. } => {
1833 }
1837 }
1838 }
1839
1840 let mut filter_clauses = String::new();
1841 for predicate in &compiled.residual_filters {
1842 match predicate {
1843 Predicate::JsonPathEq { path, value } => {
1844 binds.push(BindValue::Text(path.clone()));
1845 let path_idx = binds.len();
1846 binds.push(scalar_to_bind(value));
1847 let value_idx = binds.len();
1848 let _ = write!(
1849 filter_clauses,
1850 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1851 );
1852 }
1853 Predicate::JsonPathCompare { path, op, value } => {
1854 binds.push(BindValue::Text(path.clone()));
1855 let path_idx = binds.len();
1856 binds.push(scalar_to_bind(value));
1857 let value_idx = binds.len();
1858 let operator = match op {
1859 ComparisonOp::Gt => ">",
1860 ComparisonOp::Gte => ">=",
1861 ComparisonOp::Lt => "<",
1862 ComparisonOp::Lte => "<=",
1863 };
1864 let _ = write!(
1865 filter_clauses,
1866 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1867 );
1868 }
1869 Predicate::JsonPathIn { path, values } => {
1870 binds.push(BindValue::Text(path.clone()));
1871 let first_param = binds.len();
1872 for v in values {
1873 binds.push(scalar_to_bind(v));
1874 }
1875 let placeholders = (1..=values.len())
1876 .map(|i| format!("?{}", first_param + i))
1877 .collect::<Vec<_>>()
1878 .join(", ");
1879 let _ = write!(
1880 filter_clauses,
1881 "\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1882 );
1883 }
1884 Predicate::KindEq(_)
1885 | Predicate::LogicalIdEq(_)
1886 | Predicate::SourceRefEq(_)
1887 | Predicate::ContentRefEq(_)
1888 | Predicate::ContentRefNotNull
1889 | Predicate::JsonPathFusedEq { .. }
1890 | Predicate::JsonPathFusedTimestampCmp { .. }
1891 | Predicate::JsonPathFusedBoolEq { .. }
1892 | Predicate::JsonPathFusedIn { .. }
1893 | Predicate::EdgePropertyEq { .. }
1894 | Predicate::EdgePropertyCompare { .. } => {
1895 }
1899 }
1900 }
1901
1902 let limit = compiled.limit;
1909 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1910 let limit_idx = binds.len();
1911 let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1927 let prop_arm_sql: String = if use_prop_fts {
1928 prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1929 let bm25_expr = conn_guard
1931 .query_row(
1932 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1933 rusqlite::params![kind],
1934 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1935 )
1936 .ok()
1937 .map_or_else(
1938 || format!("bm25({prop_table})"),
1939 |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1940 );
1941 let is_weighted = bm25_expr != format!("bm25({prop_table})");
1944 let snippet_expr = if is_weighted {
1945 "'' AS snippet".to_owned()
1946 } else {
1947 "substr(fp.text_content, 1, 200) AS snippet".to_owned()
1948 };
1949 let _ = write!(
1950 acc,
1951 "
1952 UNION ALL
1953 SELECT
1954 src.row_id AS row_id,
1955 fp.node_logical_id AS logical_id,
1956 src.kind AS kind,
1957 src.properties AS properties,
1958 src.source_ref AS source_ref,
1959 src.content_ref AS content_ref,
1960 src.created_at AS created_at,
1961 -{bm25_expr} AS score,
1962 'property' AS source,
1963 {snippet_expr},
1964 CAST(fp.rowid AS TEXT) AS projection_row_id
1965 FROM {prop_table} fp
1966 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1967 WHERE {prop_table} MATCH ?{prop_bind_idx}"
1968 );
1969 acc
1970 })
1971 } else {
1972 String::new()
1973 };
1974 let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
1975 ("?1", "\n AND src.kind = ?2")
1976 } else {
1977 ("?1", "")
1978 };
1979 let sql = format!(
1980 "WITH search_hits AS (
1981 SELECT
1982 u.row_id AS row_id,
1983 u.logical_id AS logical_id,
1984 u.kind AS kind,
1985 u.properties AS properties,
1986 u.source_ref AS source_ref,
1987 u.content_ref AS content_ref,
1988 u.created_at AS created_at,
1989 u.score AS score,
1990 u.source AS source,
1991 u.snippet AS snippet,
1992 u.projection_row_id AS projection_row_id
1993 FROM (
1994 SELECT
1995 src.row_id AS row_id,
1996 c.node_logical_id AS logical_id,
1997 src.kind AS kind,
1998 src.properties AS properties,
1999 src.source_ref AS source_ref,
2000 src.content_ref AS content_ref,
2001 src.created_at AS created_at,
2002 -bm25(fts_nodes) AS score,
2003 'chunk' AS source,
2004 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
2005 f.chunk_id AS projection_row_id
2006 FROM fts_nodes f
2007 JOIN chunks c ON c.id = f.chunk_id
2008 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
2009 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
2010 ) u
2011 WHERE 1 = 1{fused_clauses}
2012 ORDER BY u.score DESC
2013 LIMIT ?{limit_idx}
2014 )
2015 SELECT
2016 h.row_id,
2017 h.logical_id,
2018 h.kind,
2019 h.properties,
2020 h.content_ref,
2021 am.last_accessed_at,
2022 h.created_at,
2023 h.score,
2024 h.source,
2025 h.snippet,
2026 h.projection_row_id
2027 FROM search_hits h
2028 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
2029 WHERE 1 = 1{filter_clauses}
2030 ORDER BY h.score DESC"
2031 );
2032
2033 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
2034
2035 let mut statement = match conn_guard.prepare_cached(&sql) {
2036 Ok(stmt) => stmt,
2037 Err(e) => {
2038 self.telemetry.increment_errors();
2039 return Err(EngineError::Sqlite(e));
2040 }
2041 };
2042
2043 let hits = match statement
2044 .query_map(params_from_iter(bind_values.iter()), |row| {
2045 let source_str: String = row.get(8)?;
2046 let source = if source_str == "property" {
2051 SearchHitSource::Property
2052 } else {
2053 SearchHitSource::Chunk
2054 };
2055 let match_mode = match branch {
2056 SearchBranch::Strict => SearchMatchMode::Strict,
2057 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
2058 };
2059 Ok(SearchHit {
2060 node: fathomdb_query::NodeRowLite {
2061 row_id: row.get(0)?,
2062 logical_id: row.get(1)?,
2063 kind: row.get(2)?,
2064 properties: row.get(3)?,
2065 content_ref: row.get(4)?,
2066 last_accessed_at: row.get(5)?,
2067 },
2068 written_at: row.get(6)?,
2069 score: row.get(7)?,
2070 modality: RetrievalModality::Text,
2072 source,
2073 match_mode: Some(match_mode),
2074 snippet: row.get(9)?,
2075 projection_row_id: row.get(10)?,
2076 vector_distance: None,
2077 attribution: None,
2078 })
2079 })
2080 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
2081 {
2082 Ok(rows) => rows,
2083 Err(e) => {
2084 self.telemetry.increment_errors();
2085 return Err(EngineError::Sqlite(e));
2086 }
2087 };
2088
2089 drop(statement);
2093 drop(conn_guard);
2094
2095 self.telemetry.increment_queries();
2096 Ok(hits)
2097 }
2098
2099 fn populate_attribution_for_hits(
2103 &self,
2104 hits: &mut [SearchHit],
2105 strict_text_query: &fathomdb_query::TextQuery,
2106 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
2107 ) -> Result<(), EngineError> {
2108 let conn_guard = match self.lock_connection() {
2109 Ok(g) => g,
2110 Err(e) => {
2111 self.telemetry.increment_errors();
2112 return Err(e);
2113 }
2114 };
2115 let strict_expr = render_text_query_fts5(strict_text_query);
2116 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
2117 for hit in hits.iter_mut() {
2118 let match_expr = match hit.match_mode {
2123 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
2124 Some(SearchMatchMode::Relaxed) => {
2125 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
2126 }
2127 None => continue,
2128 };
2129 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
2130 Ok(att) => hit.attribution = Some(att),
2131 Err(e) => {
2132 self.telemetry.increment_errors();
2133 return Err(e);
2134 }
2135 }
2136 }
2137 Ok(())
2138 }
2139
2140 pub fn execute_compiled_grouped_read(
2144 &self,
2145 compiled: &CompiledGroupedQuery,
2146 ) -> Result<GroupedQueryRows, EngineError> {
2147 let root_rows = self.execute_compiled_read(&compiled.root)?;
2148 if root_rows.was_degraded {
2149 return Ok(GroupedQueryRows {
2150 roots: Vec::new(),
2151 expansions: Vec::new(),
2152 was_degraded: true,
2153 });
2154 }
2155
2156 let roots = root_rows.nodes;
2157 let mut expansions = Vec::with_capacity(compiled.expansions.len());
2158 for expansion in &compiled.expansions {
2159 let slot_rows = if roots.is_empty() {
2160 Vec::new()
2161 } else {
2162 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
2163 };
2164 expansions.push(ExpansionSlotRows {
2165 slot: expansion.slot.clone(),
2166 roots: slot_rows,
2167 });
2168 }
2169
2170 Ok(GroupedQueryRows {
2171 roots,
2172 expansions,
2173 was_degraded: false,
2174 })
2175 }
2176
2177 fn read_expansion_nodes_chunked(
2183 &self,
2184 roots: &[NodeRow],
2185 expansion: &ExpansionSlot,
2186 hard_limit: usize,
2187 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2188 if roots.len() <= BATCH_CHUNK_SIZE {
2189 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
2190 }
2191
2192 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2195 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2196 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
2197 per_root
2198 .entry(group.root_logical_id)
2199 .or_default()
2200 .extend(group.nodes);
2201 }
2202 }
2203
2204 Ok(roots
2205 .iter()
2206 .map(|root| ExpansionRootRows {
2207 root_logical_id: root.logical_id.clone(),
2208 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2209 })
2210 .collect())
2211 }
2212
2213 #[allow(clippy::too_many_lines)]
2218 fn read_expansion_nodes_batched(
2219 &self,
2220 roots: &[NodeRow],
2221 expansion: &ExpansionSlot,
2222 hard_limit: usize,
2223 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2224 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2225 let (join_condition, next_logical_id) = match expansion.direction {
2226 fathomdb_query::TraverseDirection::Out => {
2227 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2228 }
2229 fathomdb_query::TraverseDirection::In => {
2230 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2231 }
2232 };
2233
2234 if expansion.filter.as_ref().is_some_and(|f| {
2239 matches!(
2240 f,
2241 Predicate::JsonPathFusedEq { .. }
2242 | Predicate::JsonPathFusedTimestampCmp { .. }
2243 | Predicate::JsonPathFusedIn { .. }
2244 )
2245 }) {
2246 self.validate_fused_filter_for_edge_label(&expansion.label)?;
2247 }
2248
2249 let root_seed_union: String = (1..=root_ids.len())
2253 .map(|i| format!("SELECT ?{i}"))
2254 .collect::<Vec<_>>()
2255 .join(" UNION ALL ");
2256
2257 let edge_kind_param = root_ids.len() + 1;
2261 let edge_filter_param_start = root_ids.len() + 2;
2262
2263 let (edge_filter_sql, edge_filter_binds) =
2266 compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2267
2268 let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2269
2270 let (filter_sql, filter_binds) =
2274 compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2275
2276 let sql = format!(
2282 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2283 traversed(root_id, logical_id, depth, visited, emitted, edge_properties) AS (
2284 SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_properties
2285 FROM root_ids
2286 UNION ALL
2287 SELECT
2288 t.root_id,
2289 {next_logical_id},
2290 t.depth + 1,
2291 t.visited || {next_logical_id} || ',',
2292 t.emitted + 1,
2293 e.properties AS edge_properties
2294 FROM traversed t
2295 JOIN edges e ON {join_condition}
2296 AND e.kind = ?{edge_kind_param}
2297 AND e.superseded_at IS NULL{edge_filter_sql}
2298 WHERE t.depth < {max_depth}
2299 AND t.emitted < {hard_limit}
2300 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2301 ),
2302 numbered AS (
2303 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2304 , n.content_ref, am.last_accessed_at
2305 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2306 , t.edge_properties
2307 FROM traversed t
2308 JOIN nodes n ON n.logical_id = t.logical_id
2309 AND n.superseded_at IS NULL
2310 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2311 WHERE t.depth > 0{filter_sql}
2312 )
2313 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2314 , edge_properties
2315 FROM numbered
2316 WHERE rn <= {hard_limit}
2317 ORDER BY root_id, logical_id",
2318 max_depth = expansion.max_depth,
2319 );
2320
2321 let conn_guard = self.lock_connection()?;
2322 let mut statement = conn_guard
2323 .prepare_cached(&sql)
2324 .map_err(EngineError::Sqlite)?;
2325
2326 let mut bind_values: Vec<Value> = root_ids
2329 .iter()
2330 .map(|id| Value::Text((*id).to_owned()))
2331 .collect();
2332 bind_values.push(Value::Text(expansion.label.clone()));
2333 bind_values.extend(edge_filter_binds);
2334 bind_values.extend(filter_binds);
2335
2336 let rows = statement
2337 .query_map(params_from_iter(bind_values.iter()), |row| {
2338 Ok((
2339 row.get::<_, String>(0)?, NodeRow {
2341 row_id: row.get(1)?,
2342 logical_id: row.get(2)?,
2343 kind: row.get(3)?,
2344 properties: row.get(4)?,
2345 content_ref: row.get(5)?,
2346 last_accessed_at: row.get(6)?,
2347 edge_properties: row.get(7)?,
2348 },
2349 ))
2350 })
2351 .map_err(EngineError::Sqlite)?
2352 .collect::<Result<Vec<_>, _>>()
2353 .map_err(EngineError::Sqlite)?;
2354
2355 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2357 for (root_id, node) in rows {
2358 per_root.entry(root_id).or_default().push(node);
2359 }
2360
2361 let root_groups = roots
2362 .iter()
2363 .map(|root| ExpansionRootRows {
2364 root_logical_id: root.logical_id.clone(),
2365 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2366 })
2367 .collect();
2368
2369 Ok(root_groups)
2370 }
2371
2372 fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2386 let conn = self.lock_connection()?;
2387 let mut stmt = conn
2389 .prepare_cached(
2390 "SELECT DISTINCT n.kind \
2391 FROM edges e \
2392 JOIN nodes n ON n.logical_id = e.target_logical_id \
2393 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2394 )
2395 .map_err(EngineError::Sqlite)?;
2396 let target_kinds: Vec<String> = stmt
2397 .query_map(rusqlite::params![edge_label], |row| row.get(0))
2398 .map_err(EngineError::Sqlite)?
2399 .collect::<Result<Vec<_>, _>>()
2400 .map_err(EngineError::Sqlite)?;
2401
2402 for kind in &target_kinds {
2403 let has_schema: bool = conn
2404 .query_row(
2405 "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2406 rusqlite::params![kind],
2407 |row| row.get(0),
2408 )
2409 .map_err(EngineError::Sqlite)?;
2410 if !has_schema {
2411 return Err(EngineError::InvalidConfig(format!(
2412 "kind {kind:?} has no registered property-FTS schema; register one with \
2413 admin.register_fts_property_schema(..) before using fused filters on \
2414 expansion slots, or use JsonPathEq for non-fused semantics \
2415 (expand slot uses edge label {edge_label:?})"
2416 )));
2417 }
2418 }
2419 Ok(())
2420 }
2421
2422 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2428 let conn = self.lock_connection()?;
2429 conn.query_row(
2430 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2431 rusqlite::params![id],
2432 |row| {
2433 Ok(RunRow {
2434 id: row.get(0)?,
2435 kind: row.get(1)?,
2436 status: row.get(2)?,
2437 properties: row.get(3)?,
2438 })
2439 },
2440 )
2441 .optional()
2442 .map_err(EngineError::Sqlite)
2443 }
2444
2445 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2451 let conn = self.lock_connection()?;
2452 conn.query_row(
2453 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2454 rusqlite::params![id],
2455 |row| {
2456 Ok(StepRow {
2457 id: row.get(0)?,
2458 run_id: row.get(1)?,
2459 kind: row.get(2)?,
2460 status: row.get(3)?,
2461 properties: row.get(4)?,
2462 })
2463 },
2464 )
2465 .optional()
2466 .map_err(EngineError::Sqlite)
2467 }
2468
2469 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2475 let conn = self.lock_connection()?;
2476 conn.query_row(
2477 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2478 rusqlite::params![id],
2479 |row| {
2480 Ok(ActionRow {
2481 id: row.get(0)?,
2482 step_id: row.get(1)?,
2483 kind: row.get(2)?,
2484 status: row.get(3)?,
2485 properties: row.get(4)?,
2486 })
2487 },
2488 )
2489 .optional()
2490 .map_err(EngineError::Sqlite)
2491 }
2492
2493 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2499 let conn = self.lock_connection()?;
2500 let mut stmt = conn
2501 .prepare_cached(
2502 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2503 )
2504 .map_err(EngineError::Sqlite)?;
2505 let rows = stmt
2506 .query_map([], |row| {
2507 Ok(RunRow {
2508 id: row.get(0)?,
2509 kind: row.get(1)?,
2510 status: row.get(2)?,
2511 properties: row.get(3)?,
2512 })
2513 })
2514 .map_err(EngineError::Sqlite)?
2515 .collect::<Result<Vec<_>, _>>()
2516 .map_err(EngineError::Sqlite)?;
2517 Ok(rows)
2518 }
2519
2520 #[must_use]
2530 #[allow(clippy::expect_used)]
2531 pub fn shape_sql_count(&self) -> usize {
2532 self.shape_sql_map
2533 .lock()
2534 .unwrap_or_else(PoisonError::into_inner)
2535 .len()
2536 }
2537
2538 #[must_use]
2540 pub fn schema_manager(&self) -> Arc<SchemaManager> {
2541 Arc::clone(&self.schema_manager)
2542 }
2543
2544 #[must_use]
2553 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2554 let cache_hit = self
2555 .shape_sql_map
2556 .lock()
2557 .unwrap_or_else(PoisonError::into_inner)
2558 .contains_key(&compiled.shape_hash);
2559 QueryPlan {
2560 sql: wrap_node_row_projection_sql(&compiled.sql),
2561 bind_count: compiled.binds.len(),
2562 driving_table: compiled.driving_table,
2563 shape_hash: compiled.shape_hash,
2564 cache_hit,
2565 }
2566 }
2567
2568 #[doc(hidden)]
2575 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2576 let conn = self.lock_connection()?;
2577 let result = conn
2578 .query_row(&format!("PRAGMA {name}"), [], |row| {
2579 row.get::<_, rusqlite::types::Value>(0)
2581 })
2582 .map_err(EngineError::Sqlite)?;
2583 let s = match result {
2584 rusqlite::types::Value::Text(t) => t,
2585 rusqlite::types::Value::Integer(i) => i.to_string(),
2586 rusqlite::types::Value::Real(f) => f.to_string(),
2587 rusqlite::types::Value::Blob(_) => {
2588 return Err(EngineError::InvalidWrite(format!(
2589 "PRAGMA {name} returned an unexpected BLOB value"
2590 )));
2591 }
2592 rusqlite::types::Value::Null => String::new(),
2593 };
2594 Ok(s)
2595 }
2596
2597 pub fn query_provenance_events(
2606 &self,
2607 subject: &str,
2608 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2609 let conn = self.lock_connection()?;
2610 let mut stmt = conn
2611 .prepare_cached(
2612 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2613 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2614 )
2615 .map_err(EngineError::Sqlite)?;
2616 let events = stmt
2617 .query_map(rusqlite::params![subject], |row| {
2618 Ok(ProvenanceEvent {
2619 id: row.get(0)?,
2620 event_type: row.get(1)?,
2621 subject: row.get(2)?,
2622 source_ref: row.get(3)?,
2623 metadata_json: row.get(4)?,
2624 created_at: row.get(5)?,
2625 })
2626 })
2627 .map_err(EngineError::Sqlite)?
2628 .collect::<Result<Vec<_>, _>>()
2629 .map_err(EngineError::Sqlite)?;
2630 Ok(events)
2631 }
2632
2633 fn scan_fallback_if_first_registration(
2639 &self,
2640 kind: &str,
2641 ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2642 let conn = self.lock_connection()?;
2643
2644 let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2647 let table_exists: bool = conn
2649 .query_row(
2650 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2651 rusqlite::params![prop_table],
2652 |_| Ok(true),
2653 )
2654 .optional()?
2655 .unwrap_or(false);
2656 let prop_empty = if table_exists {
2657 let cnt: i64 =
2658 conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2659 r.get(0)
2660 })?;
2661 cnt == 0
2662 } else {
2663 true
2664 };
2665 let needs_scan: bool = if prop_empty {
2666 conn.query_row(
2667 "SELECT 1 FROM fts_property_rebuild_state \
2668 WHERE kind = ?1 AND is_first_registration = 1 \
2669 AND state IN ('PENDING','BUILDING','SWAPPING') \
2670 LIMIT 1",
2671 rusqlite::params![kind],
2672 |_| Ok(true),
2673 )
2674 .optional()?
2675 .unwrap_or(false)
2676 } else {
2677 false
2678 };
2679
2680 if !needs_scan {
2681 return Ok(None);
2682 }
2683
2684 let mut stmt = conn
2687 .prepare_cached(
2688 "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2689 am.last_accessed_at \
2690 FROM nodes n \
2691 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2692 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2693 )
2694 .map_err(EngineError::Sqlite)?;
2695
2696 let nodes = stmt
2697 .query_map(rusqlite::params![kind], |row| {
2698 Ok(NodeRow {
2699 row_id: row.get(0)?,
2700 logical_id: row.get(1)?,
2701 kind: row.get(2)?,
2702 properties: row.get(3)?,
2703 content_ref: row.get(4)?,
2704 last_accessed_at: row.get(5)?,
2705 edge_properties: None,
2706 })
2707 })
2708 .map_err(EngineError::Sqlite)?
2709 .collect::<Result<Vec<_>, _>>()
2710 .map_err(EngineError::Sqlite)?;
2711
2712 Ok(Some(nodes))
2713 }
2714
2715 pub fn get_property_fts_rebuild_progress(
2721 &self,
2722 kind: &str,
2723 ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2724 let conn = self.lock_connection()?;
2725 let row = conn
2726 .query_row(
2727 "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2728 FROM fts_property_rebuild_state WHERE kind = ?1",
2729 rusqlite::params![kind],
2730 |r| {
2731 Ok(crate::rebuild_actor::RebuildProgress {
2732 state: r.get(0)?,
2733 rows_total: r.get(1)?,
2734 rows_done: r.get(2)?,
2735 started_at: r.get(3)?,
2736 last_progress_at: r.get(4)?,
2737 error_message: r.get(5)?,
2738 })
2739 },
2740 )
2741 .optional()?;
2742 Ok(row)
2743 }
2744}
2745
2746fn adapt_fts_nodes_sql_for_per_kind_tables(
2756 compiled: &CompiledQuery,
2757 conn: &rusqlite::Connection,
2758) -> Result<(String, Vec<BindValue>), EngineError> {
2759 let root_kind = compiled
2760 .binds
2761 .get(1)
2762 .and_then(|b| {
2763 if let BindValue::Text(k) = b {
2764 Some(k.as_str())
2765 } else {
2766 None
2767 }
2768 })
2769 .unwrap_or("");
2770 let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
2771 let prop_table_exists: bool = conn
2772 .query_row(
2773 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2774 rusqlite::params![prop_table],
2775 |_| Ok(true),
2776 )
2777 .optional()
2778 .map_err(EngineError::Sqlite)?
2779 .unwrap_or(false);
2780
2781 Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
2782}
2783
2784#[allow(clippy::unnecessary_wraps)]
2790fn check_vec_identity_at_open(
2791 conn: &rusqlite::Connection,
2792 embedder: &dyn QueryEmbedder,
2793) -> Result<(), EngineError> {
2794 let row: Option<String> = conn
2795 .query_row(
2796 "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
2797 [],
2798 |row| row.get(0),
2799 )
2800 .optional()
2801 .unwrap_or(None);
2802
2803 let Some(config_json) = row else {
2804 return Ok(());
2805 };
2806
2807 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
2809 return Ok(());
2810 };
2811
2812 let identity = embedder.identity();
2813
2814 if let Some(stored_model) = parsed
2815 .get("model_identity")
2816 .and_then(serde_json::Value::as_str)
2817 && stored_model != identity.model_identity
2818 {
2819 trace_warn!(
2820 stored_model_identity = stored_model,
2821 embedder_model_identity = %identity.model_identity,
2822 "vec identity mismatch at open: model_identity differs"
2823 );
2824 }
2825
2826 if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
2827 let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
2828 if stored_dim != identity.dimension {
2829 trace_warn!(
2830 stored_dimensions = stored_dim,
2831 embedder_dimensions = identity.dimension,
2832 "vec identity mismatch at open: dimensions differ"
2833 );
2834 }
2835 }
2836
2837 Ok(())
2838}
2839
2840fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
2852 let schema_count: i64 = conn
2853 .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
2854 row.get(0)
2855 })
2856 .map_err(EngineError::Sqlite)?;
2857 if schema_count == 0 {
2858 return Ok(());
2859 }
2860
2861 let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
2862 let needs_position_backfill = if needs_fts_rebuild {
2863 false
2864 } else {
2865 open_guard_check_positions_empty(conn)?
2866 };
2867
2868 if needs_fts_rebuild || needs_position_backfill {
2869 let per_kind_tables: Vec<String> = {
2870 let mut stmt = conn
2871 .prepare(
2872 "SELECT name FROM sqlite_master \
2873 WHERE type='table' AND name LIKE 'fts_props_%' \
2874 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2875 )
2876 .map_err(EngineError::Sqlite)?;
2877 stmt.query_map([], |r| r.get::<_, String>(0))
2878 .map_err(EngineError::Sqlite)?
2879 .collect::<Result<Vec<_>, _>>()
2880 .map_err(EngineError::Sqlite)?
2881 };
2882 let tx = conn
2883 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
2884 .map_err(EngineError::Sqlite)?;
2885 for table in &per_kind_tables {
2886 tx.execute_batch(&format!("DELETE FROM {table}"))
2887 .map_err(EngineError::Sqlite)?;
2888 }
2889 tx.execute("DELETE FROM fts_node_property_positions", [])
2890 .map_err(EngineError::Sqlite)?;
2891 crate::projection::insert_property_fts_rows(
2892 &tx,
2893 "SELECT logical_id, properties FROM nodes \
2894 WHERE kind = ?1 AND superseded_at IS NULL",
2895 )
2896 .map_err(EngineError::Sqlite)?;
2897 tx.commit().map_err(EngineError::Sqlite)?;
2898 }
2899 Ok(())
2900}
2901
2902fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2903 let kinds: Vec<String> = {
2904 let mut stmt = conn
2905 .prepare("SELECT kind FROM fts_property_schemas")
2906 .map_err(EngineError::Sqlite)?;
2907 stmt.query_map([], |row| row.get::<_, String>(0))
2908 .map_err(EngineError::Sqlite)?
2909 .collect::<Result<Vec<_>, _>>()
2910 .map_err(EngineError::Sqlite)?
2911 };
2912 for kind in &kinds {
2913 let table = fathomdb_schema::fts_kind_table_name(kind);
2914 let table_exists: bool = conn
2915 .query_row(
2916 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2917 rusqlite::params![table],
2918 |_| Ok(true),
2919 )
2920 .optional()
2921 .map_err(EngineError::Sqlite)?
2922 .unwrap_or(false);
2923 let fts_count: i64 = if table_exists {
2924 conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
2925 row.get(0)
2926 })
2927 .map_err(EngineError::Sqlite)?
2928 } else {
2929 0
2930 };
2931 if fts_count == 0 {
2932 let node_count: i64 = conn
2933 .query_row(
2934 "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
2935 rusqlite::params![kind],
2936 |row| row.get(0),
2937 )
2938 .map_err(EngineError::Sqlite)?;
2939 if node_count > 0 {
2940 return Ok(true);
2941 }
2942 }
2943 }
2944 Ok(false)
2945}
2946
2947fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2948 let recursive_count: i64 = conn
2949 .query_row(
2950 "SELECT COUNT(*) FROM fts_property_schemas \
2951 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
2952 [],
2953 |row| row.get(0),
2954 )
2955 .map_err(EngineError::Sqlite)?;
2956 if recursive_count == 0 {
2957 return Ok(false);
2958 }
2959 let pos_count: i64 = conn
2960 .query_row(
2961 "SELECT COUNT(*) FROM fts_node_property_positions",
2962 [],
2963 |row| row.get(0),
2964 )
2965 .map_err(EngineError::Sqlite)?;
2966 Ok(pos_count == 0)
2967}
2968
2969fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2970 format!(
2971 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2972 FROM ({base_sql}) q \
2973 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2974 )
2975}
2976
2977pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2985 match err {
2986 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2987 (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
2989 || msg.contains("no such module: vec0")
2990 }
2991 _ => false,
2992 }
2993}
2994
2995fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2996 match value {
2997 ScalarValue::Text(text) => BindValue::Text(text.clone()),
2998 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2999 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
3000 }
3001}
3002
3003fn merge_search_branches(
3021 strict: Vec<SearchHit>,
3022 relaxed: Vec<SearchHit>,
3023 limit: usize,
3024) -> Vec<SearchHit> {
3025 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
3026}
3027
3028fn merge_search_branches_three(
3040 strict: Vec<SearchHit>,
3041 relaxed: Vec<SearchHit>,
3042 vector: Vec<SearchHit>,
3043 limit: usize,
3044) -> Vec<SearchHit> {
3045 let strict_block = dedup_branch_hits(strict);
3046 let relaxed_block = dedup_branch_hits(relaxed);
3047 let vector_block = dedup_branch_hits(vector);
3048
3049 let mut seen: std::collections::HashSet<String> = strict_block
3050 .iter()
3051 .map(|h| h.node.logical_id.clone())
3052 .collect();
3053
3054 let mut merged = strict_block;
3055 for hit in relaxed_block {
3056 if seen.insert(hit.node.logical_id.clone()) {
3057 merged.push(hit);
3058 }
3059 }
3060 for hit in vector_block {
3061 if seen.insert(hit.node.logical_id.clone()) {
3062 merged.push(hit);
3063 }
3064 }
3065
3066 if merged.len() > limit {
3067 merged.truncate(limit);
3068 }
3069 merged
3070}
3071
3072fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
3076 hits.sort_by(|a, b| {
3077 b.score
3078 .partial_cmp(&a.score)
3079 .unwrap_or(std::cmp::Ordering::Equal)
3080 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
3081 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
3082 });
3083
3084 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
3085 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
3086 hits
3087}
3088
3089fn source_priority(source: SearchHitSource) -> u8 {
3090 match source {
3093 SearchHitSource::Chunk => 0,
3094 SearchHitSource::Property => 1,
3095 SearchHitSource::Vector => 2,
3096 }
3097}
3098
3099const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
3117const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
3118
3119fn load_position_map(
3123 conn: &Connection,
3124 logical_id: &str,
3125 kind: &str,
3126) -> Result<Vec<(usize, usize, String)>, EngineError> {
3127 let mut stmt = conn
3128 .prepare_cached(
3129 "SELECT start_offset, end_offset, leaf_path \
3130 FROM fts_node_property_positions \
3131 WHERE node_logical_id = ?1 AND kind = ?2 \
3132 ORDER BY start_offset ASC",
3133 )
3134 .map_err(EngineError::Sqlite)?;
3135 let rows = stmt
3136 .query_map(rusqlite::params![logical_id, kind], |row| {
3137 let start: i64 = row.get(0)?;
3138 let end: i64 = row.get(1)?;
3139 let path: String = row.get(2)?;
3140 let start = usize::try_from(start).unwrap_or(0);
3144 let end = usize::try_from(end).unwrap_or(0);
3145 Ok((start, end, path))
3146 })
3147 .map_err(EngineError::Sqlite)?;
3148 let mut out = Vec::new();
3149 for row in rows {
3150 out.push(row.map_err(EngineError::Sqlite)?);
3151 }
3152 Ok(out)
3153}
3154
3155fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3162 let mut offsets = Vec::new();
3163 let bytes = wrapped.as_bytes();
3164 let open_bytes = open.as_bytes();
3165 let close_bytes = close.as_bytes();
3166 let mut i = 0usize;
3167 let mut marker_bytes_seen = 0usize;
3170 while i < bytes.len() {
3171 if bytes[i..].starts_with(open_bytes) {
3172 let original_offset = i - marker_bytes_seen;
3175 offsets.push(original_offset);
3176 i += open_bytes.len();
3177 marker_bytes_seen += open_bytes.len();
3178 } else if bytes[i..].starts_with(close_bytes) {
3179 i += close_bytes.len();
3180 marker_bytes_seen += close_bytes.len();
3181 } else {
3182 i += 1;
3183 }
3184 }
3185 offsets
3186}
3187
3188fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3191 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3193 Ok(i) => i,
3194 Err(0) => return None,
3195 Err(i) => i - 1,
3196 };
3197 let (start, end, path) = &positions[idx];
3198 if offset >= *start && offset < *end {
3199 Some(path.as_str())
3200 } else {
3201 None
3202 }
3203}
3204
3205fn resolve_hit_attribution(
3214 conn: &Connection,
3215 hit: &SearchHit,
3216 match_expr: &str,
3217) -> Result<HitAttribution, EngineError> {
3218 if matches!(hit.source, SearchHitSource::Chunk) {
3219 return Ok(HitAttribution {
3220 matched_paths: vec!["text_content".to_owned()],
3221 });
3222 }
3223 if !matches!(hit.source, SearchHitSource::Property) {
3224 return Ok(HitAttribution {
3225 matched_paths: Vec::new(),
3226 });
3227 }
3228 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3229 return Ok(HitAttribution {
3230 matched_paths: Vec::new(),
3231 });
3232 };
3233 let rowid: i64 = match rowid_str.parse() {
3234 Ok(v) => v,
3235 Err(_) => {
3236 return Ok(HitAttribution {
3237 matched_paths: Vec::new(),
3238 });
3239 }
3240 };
3241
3242 let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3248 let highlight_sql = format!(
3249 "SELECT highlight({prop_table}, 1, ?1, ?2) \
3250 FROM {prop_table} \
3251 WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3252 );
3253 let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3254 let wrapped: Option<String> = stmt
3255 .query_row(
3256 rusqlite::params![
3257 ATTRIBUTION_HIGHLIGHT_OPEN,
3258 ATTRIBUTION_HIGHLIGHT_CLOSE,
3259 rowid,
3260 match_expr,
3261 ],
3262 |row| row.get(0),
3263 )
3264 .optional()
3265 .map_err(EngineError::Sqlite)?;
3266 let Some(wrapped) = wrapped else {
3267 return Ok(HitAttribution {
3268 matched_paths: Vec::new(),
3269 });
3270 };
3271
3272 let offsets = parse_highlight_offsets(
3273 &wrapped,
3274 ATTRIBUTION_HIGHLIGHT_OPEN,
3275 ATTRIBUTION_HIGHLIGHT_CLOSE,
3276 );
3277 if offsets.is_empty() {
3278 return Ok(HitAttribution {
3279 matched_paths: Vec::new(),
3280 });
3281 }
3282
3283 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3284 if positions.is_empty() {
3285 return Ok(HitAttribution {
3288 matched_paths: Vec::new(),
3289 });
3290 }
3291
3292 let mut matched_paths: Vec<String> = Vec::new();
3293 for offset in offsets {
3294 if let Some(path) = find_leaf_for_offset(&positions, offset)
3295 && !matched_paths.iter().any(|p| p == path)
3296 {
3297 matched_paths.push(path.to_owned());
3298 }
3299 }
3300 Ok(HitAttribution { matched_paths })
3301}
3302
3303fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3310 let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3311 let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3312 if !any_weighted {
3313 return format!("bm25({table})");
3314 }
3315 let weights: Vec<String> = std::iter::once("0.0".to_owned())
3317 .chain(
3318 schema
3319 .paths
3320 .iter()
3321 .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3322 )
3323 .collect();
3324 format!("bm25({table}, {})", weights.join(", "))
3325}
3326
3327fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3328 match value {
3329 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3330 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3331 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3332 }
3333}
3334
3335#[cfg(test)]
3336#[allow(clippy::expect_used)]
3337mod tests {
3338 use std::panic::{AssertUnwindSafe, catch_unwind};
3339 use std::sync::Arc;
3340
3341 use fathomdb_query::{BindValue, QueryBuilder};
3342 use fathomdb_schema::SchemaManager;
3343 use rusqlite::types::Value;
3344 use tempfile::NamedTempFile;
3345
3346 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3347
3348 use fathomdb_query::{
3349 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3350 };
3351
3352 use super::{
3353 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3354 wrap_node_row_projection_sql,
3355 };
3356
3357 fn mk_hit(
3358 logical_id: &str,
3359 score: f64,
3360 match_mode: SearchMatchMode,
3361 source: SearchHitSource,
3362 ) -> SearchHit {
3363 SearchHit {
3364 node: NodeRowLite {
3365 row_id: format!("{logical_id}-row"),
3366 logical_id: logical_id.to_owned(),
3367 kind: "Goal".to_owned(),
3368 properties: "{}".to_owned(),
3369 content_ref: None,
3370 last_accessed_at: None,
3371 },
3372 score,
3373 modality: RetrievalModality::Text,
3374 source,
3375 match_mode: Some(match_mode),
3376 snippet: None,
3377 written_at: 0,
3378 projection_row_id: None,
3379 vector_distance: None,
3380 attribution: None,
3381 }
3382 }
3383
3384 #[test]
3385 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3386 let strict = vec![mk_hit(
3387 "a",
3388 1.0,
3389 SearchMatchMode::Strict,
3390 SearchHitSource::Chunk,
3391 )];
3392 let relaxed = vec![mk_hit(
3394 "b",
3395 9.9,
3396 SearchMatchMode::Relaxed,
3397 SearchHitSource::Chunk,
3398 )];
3399 let merged = merge_search_branches(strict, relaxed, 10);
3400 assert_eq!(merged.len(), 2);
3401 assert_eq!(merged[0].node.logical_id, "a");
3402 assert!(matches!(
3403 merged[0].match_mode,
3404 Some(SearchMatchMode::Strict)
3405 ));
3406 assert_eq!(merged[1].node.logical_id, "b");
3407 assert!(matches!(
3408 merged[1].match_mode,
3409 Some(SearchMatchMode::Relaxed)
3410 ));
3411 }
3412
3413 #[test]
3414 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3415 let strict = vec![mk_hit(
3416 "shared",
3417 1.0,
3418 SearchMatchMode::Strict,
3419 SearchHitSource::Chunk,
3420 )];
3421 let relaxed = vec![
3422 mk_hit(
3423 "shared",
3424 9.9,
3425 SearchMatchMode::Relaxed,
3426 SearchHitSource::Chunk,
3427 ),
3428 mk_hit(
3429 "other",
3430 2.0,
3431 SearchMatchMode::Relaxed,
3432 SearchHitSource::Chunk,
3433 ),
3434 ];
3435 let merged = merge_search_branches(strict, relaxed, 10);
3436 assert_eq!(merged.len(), 2);
3437 assert_eq!(merged[0].node.logical_id, "shared");
3438 assert!(matches!(
3439 merged[0].match_mode,
3440 Some(SearchMatchMode::Strict)
3441 ));
3442 assert_eq!(merged[1].node.logical_id, "other");
3443 assert!(matches!(
3444 merged[1].match_mode,
3445 Some(SearchMatchMode::Relaxed)
3446 ));
3447 }
3448
3449 #[test]
3450 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3451 let strict = vec![
3452 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3453 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3454 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3455 ];
3456 let merged = merge_search_branches(strict, vec![], 10);
3457 assert_eq!(
3458 merged
3459 .iter()
3460 .map(|h| &h.node.logical_id)
3461 .collect::<Vec<_>>(),
3462 vec!["a", "c", "b"]
3463 );
3464 }
3465
3466 #[test]
3467 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3468 let strict = vec![
3469 mk_hit(
3470 "shared",
3471 1.0,
3472 SearchMatchMode::Strict,
3473 SearchHitSource::Property,
3474 ),
3475 mk_hit(
3476 "shared",
3477 1.0,
3478 SearchMatchMode::Strict,
3479 SearchHitSource::Chunk,
3480 ),
3481 ];
3482 let merged = merge_search_branches(strict, vec![], 10);
3483 assert_eq!(merged.len(), 1);
3484 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3485 }
3486
3487 #[test]
3488 fn merge_truncates_to_limit_after_block_merge() {
3489 let strict = vec![
3490 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3491 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3492 ];
3493 let relaxed = vec![mk_hit(
3494 "c",
3495 9.0,
3496 SearchMatchMode::Relaxed,
3497 SearchHitSource::Chunk,
3498 )];
3499 let merged = merge_search_branches(strict, relaxed, 2);
3500 assert_eq!(merged.len(), 2);
3501 assert_eq!(merged[0].node.logical_id, "a");
3502 assert_eq!(merged[1].node.logical_id, "b");
3503 }
3504
3505 #[test]
3514 fn search_architecturally_supports_three_branch_fusion() {
3515 let strict = vec![mk_hit(
3516 "alpha",
3517 1.0,
3518 SearchMatchMode::Strict,
3519 SearchHitSource::Chunk,
3520 )];
3521 let relaxed = vec![mk_hit(
3522 "bravo",
3523 5.0,
3524 SearchMatchMode::Relaxed,
3525 SearchHitSource::Chunk,
3526 )];
3527 let mut vector_hit = mk_hit(
3530 "charlie",
3531 9.9,
3532 SearchMatchMode::Strict,
3533 SearchHitSource::Vector,
3534 );
3535 vector_hit.match_mode = None;
3539 vector_hit.modality = RetrievalModality::Vector;
3540 let vector = vec![vector_hit];
3541
3542 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3543 assert_eq!(merged.len(), 3);
3544 assert_eq!(merged[0].node.logical_id, "alpha");
3545 assert_eq!(merged[1].node.logical_id, "bravo");
3546 assert_eq!(merged[2].node.logical_id, "charlie");
3547 assert!(matches!(merged[2].source, SearchHitSource::Vector));
3549
3550 let strict2 = vec![mk_hit(
3553 "shared",
3554 0.5,
3555 SearchMatchMode::Strict,
3556 SearchHitSource::Chunk,
3557 )];
3558 let relaxed2 = vec![mk_hit(
3559 "shared",
3560 5.0,
3561 SearchMatchMode::Relaxed,
3562 SearchHitSource::Chunk,
3563 )];
3564 let mut vshared = mk_hit(
3565 "shared",
3566 9.9,
3567 SearchMatchMode::Strict,
3568 SearchHitSource::Vector,
3569 );
3570 vshared.match_mode = None;
3571 vshared.modality = RetrievalModality::Vector;
3572 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3573 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3574 assert!(matches!(
3575 merged2[0].match_mode,
3576 Some(SearchMatchMode::Strict)
3577 ));
3578 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3579
3580 let mut vshared2 = mk_hit(
3582 "shared",
3583 9.9,
3584 SearchMatchMode::Strict,
3585 SearchHitSource::Vector,
3586 );
3587 vshared2.match_mode = None;
3588 vshared2.modality = RetrievalModality::Vector;
3589 let merged3 = merge_search_branches_three(
3590 vec![],
3591 vec![mk_hit(
3592 "shared",
3593 1.0,
3594 SearchMatchMode::Relaxed,
3595 SearchHitSource::Chunk,
3596 )],
3597 vec![vshared2],
3598 10,
3599 );
3600 assert_eq!(merged3.len(), 1);
3601 assert!(matches!(
3602 merged3[0].match_mode,
3603 Some(SearchMatchMode::Relaxed)
3604 ));
3605 }
3606
3607 #[test]
3621 fn merge_search_branches_three_vector_only_preserves_vector_block() {
3622 let mut vector_hit = mk_hit(
3623 "solo",
3624 0.75,
3625 SearchMatchMode::Strict,
3626 SearchHitSource::Vector,
3627 );
3628 vector_hit.match_mode = None;
3629 vector_hit.modality = RetrievalModality::Vector;
3630
3631 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3632
3633 assert_eq!(merged.len(), 1);
3634 assert_eq!(merged[0].node.logical_id, "solo");
3635 assert!(matches!(merged[0].source, SearchHitSource::Vector));
3636 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3637 assert!(
3638 merged[0].match_mode.is_none(),
3639 "vector hits carry match_mode=None per addendum 1"
3640 );
3641 }
3642
3643 #[test]
3655 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3656 let strict = vec![
3657 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3658 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3659 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3660 ];
3661 let relaxed = vec![mk_hit(
3662 "d",
3663 9.0,
3664 SearchMatchMode::Relaxed,
3665 SearchHitSource::Chunk,
3666 )];
3667 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3668 vector_hit.match_mode = None;
3669 vector_hit.modality = RetrievalModality::Vector;
3670 let vector = vec![vector_hit];
3671
3672 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3673
3674 assert_eq!(merged.len(), 2);
3675 assert_eq!(merged[0].node.logical_id, "a");
3676 assert_eq!(merged[1].node.logical_id, "b");
3677 assert!(
3679 merged
3680 .iter()
3681 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3682 "strict block must win limit contention against higher-scored relaxed/vector hits"
3683 );
3684 assert!(
3685 merged
3686 .iter()
3687 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3688 "no vector source hits should leak past the limit"
3689 );
3690 }
3691
3692 #[test]
3693 fn is_vec_table_absent_matches_known_error_messages() {
3694 use rusqlite::ffi;
3695 fn make_err(msg: &str) -> rusqlite::Error {
3696 rusqlite::Error::SqliteFailure(
3697 ffi::Error {
3698 code: ffi::ErrorCode::Unknown,
3699 extended_code: 1,
3700 },
3701 Some(msg.to_owned()),
3702 )
3703 }
3704 assert!(is_vec_table_absent(&make_err(
3705 "no such table: vec_nodes_active"
3706 )));
3707 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3708 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3709 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3710 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3711 }
3712
3713 #[test]
3716 fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
3717 let db = NamedTempFile::new().expect("temporary db");
3720 let coordinator = ExecutionCoordinator::open(
3721 db.path(),
3722 Arc::new(SchemaManager::new()),
3723 None,
3724 1,
3725 Arc::new(TelemetryCounters::default()),
3726 None,
3727 )
3728 .expect("coordinator");
3729
3730 let compiled = QueryBuilder::nodes("MyKind")
3731 .vector_search("some query", 5)
3732 .compile()
3733 .expect("vector query compiles");
3734
3735 let rows = coordinator
3736 .execute_compiled_read(&compiled)
3737 .expect("degraded read must succeed");
3738 assert!(
3739 rows.was_degraded,
3740 "must degrade when vec_mykind table does not exist"
3741 );
3742 assert!(
3743 rows.nodes.is_empty(),
3744 "degraded result must return empty nodes"
3745 );
3746 }
3747
3748 #[test]
3749 fn bind_value_text_maps_to_sql_text() {
3750 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
3751 assert_eq!(val, Value::Text("hello".to_owned()));
3752 }
3753
3754 #[test]
3755 fn bind_value_integer_maps_to_sql_integer() {
3756 let val = bind_value_to_sql(&BindValue::Integer(42));
3757 assert_eq!(val, Value::Integer(42));
3758 }
3759
3760 #[test]
3761 fn bind_value_bool_true_maps_to_integer_one() {
3762 let val = bind_value_to_sql(&BindValue::Bool(true));
3763 assert_eq!(val, Value::Integer(1));
3764 }
3765
3766 #[test]
3767 fn bind_value_bool_false_maps_to_integer_zero() {
3768 let val = bind_value_to_sql(&BindValue::Bool(false));
3769 assert_eq!(val, Value::Integer(0));
3770 }
3771
3772 #[test]
3773 fn same_shape_queries_share_one_cache_entry() {
3774 let db = NamedTempFile::new().expect("temporary db");
3775 let coordinator = ExecutionCoordinator::open(
3776 db.path(),
3777 Arc::new(SchemaManager::new()),
3778 None,
3779 1,
3780 Arc::new(TelemetryCounters::default()),
3781 None,
3782 )
3783 .expect("coordinator");
3784
3785 let compiled_a = QueryBuilder::nodes("Meeting")
3786 .text_search("budget", 5)
3787 .limit(10)
3788 .compile()
3789 .expect("compiled a");
3790 let compiled_b = QueryBuilder::nodes("Meeting")
3791 .text_search("standup", 5)
3792 .limit(10)
3793 .compile()
3794 .expect("compiled b");
3795
3796 coordinator
3797 .execute_compiled_read(&compiled_a)
3798 .expect("read a");
3799 coordinator
3800 .execute_compiled_read(&compiled_b)
3801 .expect("read b");
3802
3803 assert_eq!(
3804 compiled_a.shape_hash, compiled_b.shape_hash,
3805 "different bind values, same structural shape → same hash"
3806 );
3807 assert_eq!(coordinator.shape_sql_count(), 1);
3808 }
3809
3810 #[test]
3811 fn vector_read_degrades_gracefully_when_vec_table_absent() {
3812 let db = NamedTempFile::new().expect("temporary db");
3813 let coordinator = ExecutionCoordinator::open(
3814 db.path(),
3815 Arc::new(SchemaManager::new()),
3816 None,
3817 1,
3818 Arc::new(TelemetryCounters::default()),
3819 None,
3820 )
3821 .expect("coordinator");
3822
3823 let compiled = QueryBuilder::nodes("Meeting")
3824 .vector_search("budget embeddings", 5)
3825 .compile()
3826 .expect("vector query compiles");
3827
3828 let result = coordinator.execute_compiled_read(&compiled);
3829 let rows = result.expect("degraded read must succeed, not error");
3830 assert!(
3831 rows.was_degraded,
3832 "result must be flagged as degraded when vec_nodes_active is absent"
3833 );
3834 assert!(
3835 rows.nodes.is_empty(),
3836 "degraded result must return empty nodes"
3837 );
3838 }
3839
3840 #[test]
3841 fn coordinator_caches_by_shape_hash() {
3842 let db = NamedTempFile::new().expect("temporary db");
3843 let coordinator = ExecutionCoordinator::open(
3844 db.path(),
3845 Arc::new(SchemaManager::new()),
3846 None,
3847 1,
3848 Arc::new(TelemetryCounters::default()),
3849 None,
3850 )
3851 .expect("coordinator");
3852
3853 let compiled = QueryBuilder::nodes("Meeting")
3854 .text_search("budget", 5)
3855 .compile()
3856 .expect("compiled query");
3857
3858 coordinator
3859 .execute_compiled_read(&compiled)
3860 .expect("execute compiled read");
3861 assert_eq!(coordinator.shape_sql_count(), 1);
3862 }
3863
3864 #[test]
3867 fn explain_returns_correct_sql() {
3868 let db = NamedTempFile::new().expect("temporary db");
3869 let coordinator = ExecutionCoordinator::open(
3870 db.path(),
3871 Arc::new(SchemaManager::new()),
3872 None,
3873 1,
3874 Arc::new(TelemetryCounters::default()),
3875 None,
3876 )
3877 .expect("coordinator");
3878
3879 let compiled = QueryBuilder::nodes("Meeting")
3880 .text_search("budget", 5)
3881 .compile()
3882 .expect("compiled query");
3883
3884 let plan = coordinator.explain_compiled_read(&compiled);
3885
3886 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3887 }
3888
3889 #[test]
3890 fn explain_returns_correct_driving_table() {
3891 use fathomdb_query::DrivingTable;
3892
3893 let db = NamedTempFile::new().expect("temporary db");
3894 let coordinator = ExecutionCoordinator::open(
3895 db.path(),
3896 Arc::new(SchemaManager::new()),
3897 None,
3898 1,
3899 Arc::new(TelemetryCounters::default()),
3900 None,
3901 )
3902 .expect("coordinator");
3903
3904 let compiled = QueryBuilder::nodes("Meeting")
3905 .text_search("budget", 5)
3906 .compile()
3907 .expect("compiled query");
3908
3909 let plan = coordinator.explain_compiled_read(&compiled);
3910
3911 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3912 }
3913
3914 #[test]
3915 fn explain_reports_cache_miss_then_hit() {
3916 let db = NamedTempFile::new().expect("temporary db");
3917 let coordinator = ExecutionCoordinator::open(
3918 db.path(),
3919 Arc::new(SchemaManager::new()),
3920 None,
3921 1,
3922 Arc::new(TelemetryCounters::default()),
3923 None,
3924 )
3925 .expect("coordinator");
3926
3927 let compiled = QueryBuilder::nodes("Meeting")
3928 .text_search("budget", 5)
3929 .compile()
3930 .expect("compiled query");
3931
3932 let plan_before = coordinator.explain_compiled_read(&compiled);
3934 assert!(
3935 !plan_before.cache_hit,
3936 "cache miss expected before first execute"
3937 );
3938
3939 coordinator
3941 .execute_compiled_read(&compiled)
3942 .expect("execute read");
3943
3944 let plan_after = coordinator.explain_compiled_read(&compiled);
3946 assert!(
3947 plan_after.cache_hit,
3948 "cache hit expected after first execute"
3949 );
3950 }
3951
3952 #[test]
3953 fn explain_does_not_execute_query() {
3954 let db = NamedTempFile::new().expect("temporary db");
3959 let coordinator = ExecutionCoordinator::open(
3960 db.path(),
3961 Arc::new(SchemaManager::new()),
3962 None,
3963 1,
3964 Arc::new(TelemetryCounters::default()),
3965 None,
3966 )
3967 .expect("coordinator");
3968
3969 let compiled = QueryBuilder::nodes("Meeting")
3970 .text_search("anything", 5)
3971 .compile()
3972 .expect("compiled query");
3973
3974 let plan = coordinator.explain_compiled_read(&compiled);
3976
3977 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3978 assert_eq!(plan.bind_count, compiled.binds.len());
3979 }
3980
3981 #[test]
3982 fn coordinator_executes_compiled_read() {
3983 let db = NamedTempFile::new().expect("temporary db");
3984 let coordinator = ExecutionCoordinator::open(
3985 db.path(),
3986 Arc::new(SchemaManager::new()),
3987 None,
3988 1,
3989 Arc::new(TelemetryCounters::default()),
3990 None,
3991 )
3992 .expect("coordinator");
3993 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3994
3995 conn.execute_batch(
3996 r#"
3997 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3998 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3999 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4000 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
4001 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4002 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
4003 "#,
4004 )
4005 .expect("seed data");
4006
4007 let compiled = QueryBuilder::nodes("Meeting")
4008 .text_search("budget", 5)
4009 .limit(5)
4010 .compile()
4011 .expect("compiled query");
4012
4013 let rows = coordinator
4014 .execute_compiled_read(&compiled)
4015 .expect("execute read");
4016
4017 assert_eq!(rows.nodes.len(), 1);
4018 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4019 }
4020
4021 #[test]
4022 fn text_search_finds_structured_only_node_via_property_fts() {
4023 let db = NamedTempFile::new().expect("temporary db");
4024 let coordinator = ExecutionCoordinator::open(
4025 db.path(),
4026 Arc::new(SchemaManager::new()),
4027 None,
4028 1,
4029 Arc::new(TelemetryCounters::default()),
4030 None,
4031 )
4032 .expect("coordinator");
4033 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4034
4035 conn.execute_batch(
4038 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
4039 node_logical_id UNINDEXED, text_content, \
4040 tokenize = 'porter unicode61 remove_diacritics 2'\
4041 )",
4042 )
4043 .expect("create per-kind fts table");
4044 conn.execute_batch(
4045 r#"
4046 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4047 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
4048 INSERT INTO fts_props_goal (node_logical_id, text_content)
4049 VALUES ('goal-1', 'Ship v2');
4050 "#,
4051 )
4052 .expect("seed data");
4053
4054 let compiled = QueryBuilder::nodes("Goal")
4055 .text_search("Ship", 5)
4056 .limit(5)
4057 .compile()
4058 .expect("compiled query");
4059
4060 let rows = coordinator
4061 .execute_compiled_read(&compiled)
4062 .expect("execute read");
4063
4064 assert_eq!(rows.nodes.len(), 1);
4065 assert_eq!(rows.nodes[0].logical_id, "goal-1");
4066 }
4067
4068 #[test]
4069 fn text_search_returns_both_chunk_and_property_backed_hits() {
4070 let db = NamedTempFile::new().expect("temporary db");
4071 let coordinator = ExecutionCoordinator::open(
4072 db.path(),
4073 Arc::new(SchemaManager::new()),
4074 None,
4075 1,
4076 Arc::new(TelemetryCounters::default()),
4077 None,
4078 )
4079 .expect("coordinator");
4080 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4081
4082 conn.execute_batch(
4084 r"
4085 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4086 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4087 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4088 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
4089 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4090 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
4091 ",
4092 )
4093 .expect("seed chunk-backed node");
4094
4095 conn.execute_batch(
4098 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
4099 node_logical_id UNINDEXED, text_content, \
4100 tokenize = 'porter unicode61 remove_diacritics 2'\
4101 )",
4102 )
4103 .expect("create per-kind fts table");
4104 conn.execute_batch(
4105 r#"
4106 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4107 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
4108 INSERT INTO fts_props_meeting (node_logical_id, text_content)
4109 VALUES ('meeting-2', 'quarterly sync');
4110 "#,
4111 )
4112 .expect("seed property-backed node");
4113
4114 let compiled = QueryBuilder::nodes("Meeting")
4115 .text_search("quarterly", 10)
4116 .limit(10)
4117 .compile()
4118 .expect("compiled query");
4119
4120 let rows = coordinator
4121 .execute_compiled_read(&compiled)
4122 .expect("execute read");
4123
4124 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
4125 ids.sort_unstable();
4126 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
4127 }
4128
4129 #[test]
4130 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
4131 let db = NamedTempFile::new().expect("temporary db");
4132 let coordinator = ExecutionCoordinator::open(
4133 db.path(),
4134 Arc::new(SchemaManager::new()),
4135 None,
4136 1,
4137 Arc::new(TelemetryCounters::default()),
4138 None,
4139 )
4140 .expect("coordinator");
4141 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4142
4143 conn.execute_batch(
4144 r"
4145 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4146 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4147 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4148 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4149 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4150 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4151 ",
4152 )
4153 .expect("seed chunk-backed node");
4154
4155 let compiled = QueryBuilder::nodes("Meeting")
4156 .text_search("not a ship", 10)
4157 .limit(10)
4158 .compile()
4159 .expect("compiled query");
4160
4161 let rows = coordinator
4162 .execute_compiled_read(&compiled)
4163 .expect("execute read");
4164
4165 assert_eq!(rows.nodes.len(), 1);
4166 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4167 }
4168
4169 #[test]
4172 fn capability_gate_reports_false_without_feature() {
4173 let db = NamedTempFile::new().expect("temporary db");
4174 let coordinator = ExecutionCoordinator::open(
4177 db.path(),
4178 Arc::new(SchemaManager::new()),
4179 None,
4180 1,
4181 Arc::new(TelemetryCounters::default()),
4182 None,
4183 )
4184 .expect("coordinator");
4185 assert!(
4186 !coordinator.vector_enabled(),
4187 "vector_enabled must be false when no dimension is requested"
4188 );
4189 }
4190
4191 #[cfg(feature = "sqlite-vec")]
4192 #[test]
4193 fn capability_gate_reports_true_when_feature_enabled() {
4194 let db = NamedTempFile::new().expect("temporary db");
4195 let coordinator = ExecutionCoordinator::open(
4196 db.path(),
4197 Arc::new(SchemaManager::new()),
4198 Some(128),
4199 1,
4200 Arc::new(TelemetryCounters::default()),
4201 None,
4202 )
4203 .expect("coordinator");
4204 assert!(
4205 coordinator.vector_enabled(),
4206 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4207 );
4208 }
4209
4210 #[test]
4213 fn read_run_returns_inserted_run() {
4214 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4215
4216 let db = NamedTempFile::new().expect("temporary db");
4217 let writer = WriterActor::start(
4218 db.path(),
4219 Arc::new(SchemaManager::new()),
4220 ProvenanceMode::Warn,
4221 Arc::new(TelemetryCounters::default()),
4222 )
4223 .expect("writer");
4224 writer
4225 .submit(WriteRequest {
4226 label: "runtime".to_owned(),
4227 nodes: vec![],
4228 node_retires: vec![],
4229 edges: vec![],
4230 edge_retires: vec![],
4231 chunks: vec![],
4232 runs: vec![RunInsert {
4233 id: "run-r1".to_owned(),
4234 kind: "session".to_owned(),
4235 status: "active".to_owned(),
4236 properties: "{}".to_owned(),
4237 source_ref: Some("src-1".to_owned()),
4238 upsert: false,
4239 supersedes_id: None,
4240 }],
4241 steps: vec![],
4242 actions: vec![],
4243 optional_backfills: vec![],
4244 vec_inserts: vec![],
4245 operational_writes: vec![],
4246 })
4247 .expect("write run");
4248
4249 let coordinator = ExecutionCoordinator::open(
4250 db.path(),
4251 Arc::new(SchemaManager::new()),
4252 None,
4253 1,
4254 Arc::new(TelemetryCounters::default()),
4255 None,
4256 )
4257 .expect("coordinator");
4258 let row = coordinator
4259 .read_run("run-r1")
4260 .expect("read_run")
4261 .expect("row exists");
4262 assert_eq!(row.id, "run-r1");
4263 assert_eq!(row.kind, "session");
4264 assert_eq!(row.status, "active");
4265 }
4266
4267 #[test]
4268 fn read_step_returns_inserted_step() {
4269 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4270
4271 let db = NamedTempFile::new().expect("temporary db");
4272 let writer = WriterActor::start(
4273 db.path(),
4274 Arc::new(SchemaManager::new()),
4275 ProvenanceMode::Warn,
4276 Arc::new(TelemetryCounters::default()),
4277 )
4278 .expect("writer");
4279 writer
4280 .submit(WriteRequest {
4281 label: "runtime".to_owned(),
4282 nodes: vec![],
4283 node_retires: vec![],
4284 edges: vec![],
4285 edge_retires: vec![],
4286 chunks: vec![],
4287 runs: vec![RunInsert {
4288 id: "run-s1".to_owned(),
4289 kind: "session".to_owned(),
4290 status: "active".to_owned(),
4291 properties: "{}".to_owned(),
4292 source_ref: Some("src-1".to_owned()),
4293 upsert: false,
4294 supersedes_id: None,
4295 }],
4296 steps: vec![StepInsert {
4297 id: "step-s1".to_owned(),
4298 run_id: "run-s1".to_owned(),
4299 kind: "llm".to_owned(),
4300 status: "completed".to_owned(),
4301 properties: "{}".to_owned(),
4302 source_ref: Some("src-1".to_owned()),
4303 upsert: false,
4304 supersedes_id: None,
4305 }],
4306 actions: vec![],
4307 optional_backfills: vec![],
4308 vec_inserts: vec![],
4309 operational_writes: vec![],
4310 })
4311 .expect("write step");
4312
4313 let coordinator = ExecutionCoordinator::open(
4314 db.path(),
4315 Arc::new(SchemaManager::new()),
4316 None,
4317 1,
4318 Arc::new(TelemetryCounters::default()),
4319 None,
4320 )
4321 .expect("coordinator");
4322 let row = coordinator
4323 .read_step("step-s1")
4324 .expect("read_step")
4325 .expect("row exists");
4326 assert_eq!(row.id, "step-s1");
4327 assert_eq!(row.run_id, "run-s1");
4328 assert_eq!(row.kind, "llm");
4329 }
4330
4331 #[test]
4332 fn read_action_returns_inserted_action() {
4333 use crate::{
4334 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4335 writer::{ActionInsert, StepInsert},
4336 };
4337
4338 let db = NamedTempFile::new().expect("temporary db");
4339 let writer = WriterActor::start(
4340 db.path(),
4341 Arc::new(SchemaManager::new()),
4342 ProvenanceMode::Warn,
4343 Arc::new(TelemetryCounters::default()),
4344 )
4345 .expect("writer");
4346 writer
4347 .submit(WriteRequest {
4348 label: "runtime".to_owned(),
4349 nodes: vec![],
4350 node_retires: vec![],
4351 edges: vec![],
4352 edge_retires: vec![],
4353 chunks: vec![],
4354 runs: vec![RunInsert {
4355 id: "run-a1".to_owned(),
4356 kind: "session".to_owned(),
4357 status: "active".to_owned(),
4358 properties: "{}".to_owned(),
4359 source_ref: Some("src-1".to_owned()),
4360 upsert: false,
4361 supersedes_id: None,
4362 }],
4363 steps: vec![StepInsert {
4364 id: "step-a1".to_owned(),
4365 run_id: "run-a1".to_owned(),
4366 kind: "llm".to_owned(),
4367 status: "completed".to_owned(),
4368 properties: "{}".to_owned(),
4369 source_ref: Some("src-1".to_owned()),
4370 upsert: false,
4371 supersedes_id: None,
4372 }],
4373 actions: vec![ActionInsert {
4374 id: "action-a1".to_owned(),
4375 step_id: "step-a1".to_owned(),
4376 kind: "emit".to_owned(),
4377 status: "completed".to_owned(),
4378 properties: "{}".to_owned(),
4379 source_ref: Some("src-1".to_owned()),
4380 upsert: false,
4381 supersedes_id: None,
4382 }],
4383 optional_backfills: vec![],
4384 vec_inserts: vec![],
4385 operational_writes: vec![],
4386 })
4387 .expect("write action");
4388
4389 let coordinator = ExecutionCoordinator::open(
4390 db.path(),
4391 Arc::new(SchemaManager::new()),
4392 None,
4393 1,
4394 Arc::new(TelemetryCounters::default()),
4395 None,
4396 )
4397 .expect("coordinator");
4398 let row = coordinator
4399 .read_action("action-a1")
4400 .expect("read_action")
4401 .expect("row exists");
4402 assert_eq!(row.id, "action-a1");
4403 assert_eq!(row.step_id, "step-a1");
4404 assert_eq!(row.kind, "emit");
4405 }
4406
4407 #[test]
4408 fn read_active_runs_excludes_superseded() {
4409 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4410
4411 let db = NamedTempFile::new().expect("temporary db");
4412 let writer = WriterActor::start(
4413 db.path(),
4414 Arc::new(SchemaManager::new()),
4415 ProvenanceMode::Warn,
4416 Arc::new(TelemetryCounters::default()),
4417 )
4418 .expect("writer");
4419
4420 writer
4422 .submit(WriteRequest {
4423 label: "v1".to_owned(),
4424 nodes: vec![],
4425 node_retires: vec![],
4426 edges: vec![],
4427 edge_retires: vec![],
4428 chunks: vec![],
4429 runs: vec![RunInsert {
4430 id: "run-v1".to_owned(),
4431 kind: "session".to_owned(),
4432 status: "active".to_owned(),
4433 properties: "{}".to_owned(),
4434 source_ref: Some("src-1".to_owned()),
4435 upsert: false,
4436 supersedes_id: None,
4437 }],
4438 steps: vec![],
4439 actions: vec![],
4440 optional_backfills: vec![],
4441 vec_inserts: vec![],
4442 operational_writes: vec![],
4443 })
4444 .expect("v1 write");
4445
4446 writer
4448 .submit(WriteRequest {
4449 label: "v2".to_owned(),
4450 nodes: vec![],
4451 node_retires: vec![],
4452 edges: vec![],
4453 edge_retires: vec![],
4454 chunks: vec![],
4455 runs: vec![RunInsert {
4456 id: "run-v2".to_owned(),
4457 kind: "session".to_owned(),
4458 status: "completed".to_owned(),
4459 properties: "{}".to_owned(),
4460 source_ref: Some("src-2".to_owned()),
4461 upsert: true,
4462 supersedes_id: Some("run-v1".to_owned()),
4463 }],
4464 steps: vec![],
4465 actions: vec![],
4466 optional_backfills: vec![],
4467 vec_inserts: vec![],
4468 operational_writes: vec![],
4469 })
4470 .expect("v2 write");
4471
4472 let coordinator = ExecutionCoordinator::open(
4473 db.path(),
4474 Arc::new(SchemaManager::new()),
4475 None,
4476 1,
4477 Arc::new(TelemetryCounters::default()),
4478 None,
4479 )
4480 .expect("coordinator");
4481 let active = coordinator.read_active_runs().expect("read_active_runs");
4482
4483 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4484 assert_eq!(active[0].id, "run-v2");
4485 }
4486
4487 #[allow(clippy::panic)]
4488 fn poison_connection(coordinator: &ExecutionCoordinator) {
4489 let result = catch_unwind(AssertUnwindSafe(|| {
4490 let _guard = coordinator.pool.connections[0]
4491 .lock()
4492 .expect("poison test lock");
4493 panic!("poison coordinator connection mutex");
4494 }));
4495 assert!(
4496 result.is_err(),
4497 "poison test must unwind while holding the connection mutex"
4498 );
4499 }
4500
4501 #[allow(clippy::panic)]
4502 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4503 where
4504 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4505 {
4506 match op(coordinator) {
4507 Err(EngineError::Bridge(message)) => {
4508 assert_eq!(message, "connection mutex poisoned");
4509 }
4510 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4511 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4512 }
4513 }
4514
4515 #[test]
4516 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4517 let db = NamedTempFile::new().expect("temporary db");
4518 let coordinator = ExecutionCoordinator::open(
4519 db.path(),
4520 Arc::new(SchemaManager::new()),
4521 None,
4522 1,
4523 Arc::new(TelemetryCounters::default()),
4524 None,
4525 )
4526 .expect("coordinator");
4527
4528 poison_connection(&coordinator);
4529
4530 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4531 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4532 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4533 assert_poisoned_connection_error(
4534 &coordinator,
4535 super::ExecutionCoordinator::read_active_runs,
4536 );
4537 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4538 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4539 }
4540
4541 #[test]
4544 fn shape_cache_stays_bounded() {
4545 use fathomdb_query::ShapeHash;
4546
4547 let db = NamedTempFile::new().expect("temporary db");
4548 let coordinator = ExecutionCoordinator::open(
4549 db.path(),
4550 Arc::new(SchemaManager::new()),
4551 None,
4552 1,
4553 Arc::new(TelemetryCounters::default()),
4554 None,
4555 )
4556 .expect("coordinator");
4557
4558 {
4560 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4561 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4562 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4563 }
4564 }
4565 let compiled = QueryBuilder::nodes("Meeting")
4570 .text_search("budget", 5)
4571 .limit(10)
4572 .compile()
4573 .expect("compiled query");
4574
4575 coordinator
4576 .execute_compiled_read(&compiled)
4577 .expect("execute read");
4578
4579 assert!(
4580 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4581 "shape cache must stay bounded: got {} entries, max {}",
4582 coordinator.shape_sql_count(),
4583 super::MAX_SHAPE_CACHE_SIZE
4584 );
4585 }
4586
4587 #[test]
4590 fn read_pool_size_configurable() {
4591 let db = NamedTempFile::new().expect("temporary db");
4592 let coordinator = ExecutionCoordinator::open(
4593 db.path(),
4594 Arc::new(SchemaManager::new()),
4595 None,
4596 2,
4597 Arc::new(TelemetryCounters::default()),
4598 None,
4599 )
4600 .expect("coordinator with pool_size=2");
4601
4602 assert_eq!(coordinator.pool.size(), 2);
4603
4604 let compiled = QueryBuilder::nodes("Meeting")
4606 .text_search("budget", 5)
4607 .limit(10)
4608 .compile()
4609 .expect("compiled query");
4610
4611 let result = coordinator.execute_compiled_read(&compiled);
4612 assert!(result.is_ok(), "read through pool must succeed");
4613 }
4614
4615 #[test]
4618 fn grouped_read_results_match_baseline() {
4619 use fathomdb_query::TraverseDirection;
4620
4621 let db = NamedTempFile::new().expect("temporary db");
4622
4623 let coordinator = ExecutionCoordinator::open(
4625 db.path(),
4626 Arc::new(SchemaManager::new()),
4627 None,
4628 1,
4629 Arc::new(TelemetryCounters::default()),
4630 None,
4631 )
4632 .expect("coordinator");
4633
4634 {
4637 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4638 for i in 0..10 {
4639 conn.execute_batch(&format!(
4640 r#"
4641 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4642 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4643 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4644 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4645 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4646 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4647
4648 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4649 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4650 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4651 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4652
4653 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4654 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4655 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4656 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4657 "#,
4658 )).expect("seed data");
4659 }
4660 }
4661
4662 let compiled = QueryBuilder::nodes("Meeting")
4663 .text_search("meeting", 10)
4664 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
4665 .limit(10)
4666 .compile_grouped()
4667 .expect("compiled grouped query");
4668
4669 let result = coordinator
4670 .execute_compiled_grouped_read(&compiled)
4671 .expect("grouped read");
4672
4673 assert!(!result.was_degraded, "grouped read should not be degraded");
4674 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4675 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4676 assert_eq!(result.expansions[0].slot, "tasks");
4677 assert_eq!(
4678 result.expansions[0].roots.len(),
4679 10,
4680 "each expansion slot should have entries for all 10 roots"
4681 );
4682
4683 for root_expansion in &result.expansions[0].roots {
4685 assert_eq!(
4686 root_expansion.nodes.len(),
4687 2,
4688 "root {} should have 2 expansion nodes, got {}",
4689 root_expansion.root_logical_id,
4690 root_expansion.nodes.len()
4691 );
4692 }
4693 }
4694
4695 #[test]
4698 fn build_bm25_expr_no_weights() {
4699 let schema_json = r#"["$.title","$.body"]"#;
4700 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4701 assert_eq!(result, "bm25(fts_props_testkind)");
4702 }
4703
4704 #[test]
4705 fn build_bm25_expr_with_weights() {
4706 let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4707 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4708 assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4709 }
4710
4711 #[test]
4714 #[allow(clippy::too_many_lines)]
4715 fn weighted_schema_bm25_orders_title_match_above_body_match() {
4716 use crate::{
4717 AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4718 WriterActor, writer::ChunkPolicy,
4719 };
4720 use fathomdb_schema::fts_column_name;
4721
4722 let db = NamedTempFile::new().expect("temporary db");
4723 let schema_manager = Arc::new(SchemaManager::new());
4724
4725 {
4727 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4728 admin
4729 .register_fts_property_schema_with_entries(
4730 "Article",
4731 &[
4732 FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
4733 FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
4734 ],
4735 None,
4736 &[],
4737 crate::rebuild_actor::RebuildMode::Eager,
4738 )
4739 .expect("register schema with weights");
4740 }
4741
4742 let writer = WriterActor::start(
4744 db.path(),
4745 Arc::clone(&schema_manager),
4746 ProvenanceMode::Warn,
4747 Arc::new(TelemetryCounters::default()),
4748 )
4749 .expect("writer");
4750
4751 writer
4753 .submit(WriteRequest {
4754 label: "insert-a".to_owned(),
4755 nodes: vec![NodeInsert {
4756 row_id: "row-a".to_owned(),
4757 logical_id: "article-a".to_owned(),
4758 kind: "Article".to_owned(),
4759 properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
4760 source_ref: Some("src-a".to_owned()),
4761 upsert: false,
4762 chunk_policy: ChunkPolicy::Preserve,
4763 content_ref: None,
4764 }],
4765 node_retires: vec![],
4766 edges: vec![],
4767 edge_retires: vec![],
4768 chunks: vec![],
4769 runs: vec![],
4770 steps: vec![],
4771 actions: vec![],
4772 optional_backfills: vec![],
4773 vec_inserts: vec![],
4774 operational_writes: vec![],
4775 })
4776 .expect("write node A");
4777
4778 writer
4780 .submit(WriteRequest {
4781 label: "insert-b".to_owned(),
4782 nodes: vec![NodeInsert {
4783 row_id: "row-b".to_owned(),
4784 logical_id: "article-b".to_owned(),
4785 kind: "Article".to_owned(),
4786 properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
4787 source_ref: Some("src-b".to_owned()),
4788 upsert: false,
4789 chunk_policy: ChunkPolicy::Preserve,
4790 content_ref: None,
4791 }],
4792 node_retires: vec![],
4793 edges: vec![],
4794 edge_retires: vec![],
4795 chunks: vec![],
4796 runs: vec![],
4797 steps: vec![],
4798 actions: vec![],
4799 optional_backfills: vec![],
4800 vec_inserts: vec![],
4801 operational_writes: vec![],
4802 })
4803 .expect("write node B");
4804
4805 drop(writer);
4806
4807 {
4809 let title_col = fts_column_name("$.title", false);
4810 let body_col = fts_column_name("$.body", false);
4811 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4812 let count: i64 = conn
4813 .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
4814 .expect("count fts rows");
4815 assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
4816 let (title_a, body_a): (String, String) = conn
4817 .query_row(
4818 &format!(
4819 "SELECT {title_col}, {body_col} FROM fts_props_article \
4820 WHERE node_logical_id = 'article-a'"
4821 ),
4822 [],
4823 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
4824 )
4825 .expect("select article-a");
4826 assert_eq!(
4827 title_a, "rust",
4828 "article-a must have 'rust' in title column"
4829 );
4830 assert_eq!(
4831 body_a, "other",
4832 "article-a must have 'other' in body column"
4833 );
4834 }
4835
4836 let coordinator = ExecutionCoordinator::open(
4838 db.path(),
4839 Arc::clone(&schema_manager),
4840 None,
4841 1,
4842 Arc::new(TelemetryCounters::default()),
4843 None,
4844 )
4845 .expect("coordinator");
4846
4847 let compiled = fathomdb_query::QueryBuilder::nodes("Article")
4848 .text_search("rust", 5)
4849 .limit(10)
4850 .compile()
4851 .expect("compiled query");
4852
4853 let rows = coordinator
4854 .execute_compiled_read(&compiled)
4855 .expect("execute read");
4856
4857 assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
4858 assert_eq!(
4859 rows.nodes[0].logical_id, "article-a",
4860 "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
4861 );
4862 }
4863
4864 #[test]
4875 fn property_fts_hit_matched_paths_from_positions() {
4876 use crate::{AdminService, rebuild_actor::RebuildMode};
4877 use fathomdb_query::compile_search;
4878
4879 let db = NamedTempFile::new().expect("temporary db");
4880 let schema_manager = Arc::new(SchemaManager::new());
4881
4882 {
4885 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4886 admin
4887 .register_fts_property_schema_with_entries(
4888 "Item",
4889 &[
4890 crate::FtsPropertyPathSpec::scalar("$.body"),
4891 crate::FtsPropertyPathSpec::scalar("$.title"),
4892 ],
4893 None,
4894 &[],
4895 RebuildMode::Eager,
4896 )
4897 .expect("register Item FTS schema");
4898 }
4899
4900 let coordinator = ExecutionCoordinator::open(
4901 db.path(),
4902 Arc::clone(&schema_manager),
4903 None,
4904 1,
4905 Arc::new(TelemetryCounters::default()),
4906 None,
4907 )
4908 .expect("coordinator");
4909
4910 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4911
4912 let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
4917 assert_eq!(
4919 crate::writer::LEAF_SEPARATOR.len(),
4920 29,
4921 "LEAF_SEPARATOR length changed; update position offsets"
4922 );
4923
4924 conn.execute(
4925 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4926 VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
4927 [],
4928 )
4929 .expect("insert node");
4930 conn.execute(
4932 "INSERT INTO fts_props_item (node_logical_id, text_content) \
4933 VALUES ('item-1', ?1)",
4934 rusqlite::params![blob],
4935 )
4936 .expect("insert fts row");
4937 conn.execute(
4938 "INSERT INTO fts_node_property_positions \
4939 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4940 VALUES ('item-1', 'Item', 0, 5, '$.body')",
4941 [],
4942 )
4943 .expect("insert body position");
4944 conn.execute(
4945 "INSERT INTO fts_node_property_positions \
4946 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4947 VALUES ('item-1', 'Item', 34, 44, '$.title')",
4948 [],
4949 )
4950 .expect("insert title position");
4951
4952 let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
4953 let mut compiled = compile_search(ast.ast()).expect("compile search");
4954 compiled.attribution_requested = true;
4955
4956 let rows = coordinator
4957 .execute_compiled_search(&compiled)
4958 .expect("search");
4959
4960 assert!(!rows.hits.is_empty(), "expected at least one hit");
4961 let hit = rows
4962 .hits
4963 .iter()
4964 .find(|h| h.node.logical_id == "item-1")
4965 .expect("item-1 must be in hits");
4966
4967 let att = hit
4968 .attribution
4969 .as_ref()
4970 .expect("attribution must be Some when attribution_requested");
4971 assert!(
4972 att.matched_paths.contains(&"$.title".to_owned()),
4973 "matched_paths must contain '$.title', got {:?}",
4974 att.matched_paths,
4975 );
4976 assert!(
4977 !att.matched_paths.contains(&"$.body".to_owned()),
4978 "matched_paths must NOT contain '$.body', got {:?}",
4979 att.matched_paths,
4980 );
4981 }
4982
4983 #[test]
4991 fn vector_hit_has_no_attribution() {
4992 use fathomdb_query::compile_vector_search;
4993
4994 let db = NamedTempFile::new().expect("temporary db");
4995 let coordinator = ExecutionCoordinator::open(
4996 db.path(),
4997 Arc::new(SchemaManager::new()),
4998 None,
4999 1,
5000 Arc::new(TelemetryCounters::default()),
5001 None,
5002 )
5003 .expect("coordinator");
5004
5005 let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
5007 let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
5008 compiled.attribution_requested = true;
5009
5010 let rows = coordinator
5013 .execute_compiled_vector_search(&compiled)
5014 .expect("vector search must not error");
5015
5016 assert!(
5017 rows.was_degraded,
5018 "vector search without vec table must degrade"
5019 );
5020 for hit in &rows.hits {
5021 assert!(
5022 hit.attribution.is_none(),
5023 "vector hits must carry attribution = None, got {:?}",
5024 hit.attribution
5025 );
5026 }
5027 }
5028
5029 #[test]
5043 fn chunk_hit_has_text_content_attribution() {
5044 use fathomdb_query::compile_search;
5045
5046 let db = NamedTempFile::new().expect("temporary db");
5047 let coordinator = ExecutionCoordinator::open(
5048 db.path(),
5049 Arc::new(SchemaManager::new()),
5050 None,
5051 1,
5052 Arc::new(TelemetryCounters::default()),
5053 None,
5054 )
5055 .expect("coordinator");
5056
5057 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5058
5059 conn.execute_batch(
5060 r"
5061 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5062 VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
5063 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5064 VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
5065 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5066 VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
5067 ",
5068 )
5069 .expect("seed chunk node");
5070
5071 let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
5072 let mut compiled = compile_search(ast.ast()).expect("compile search");
5073 compiled.attribution_requested = true;
5074
5075 let rows = coordinator
5076 .execute_compiled_search(&compiled)
5077 .expect("search");
5078
5079 assert!(!rows.hits.is_empty(), "expected chunk hit");
5080 let hit = rows
5081 .hits
5082 .iter()
5083 .find(|h| matches!(h.source, SearchHitSource::Chunk))
5084 .expect("must have a Chunk hit");
5085
5086 let att = hit
5087 .attribution
5088 .as_ref()
5089 .expect("attribution must be Some when attribution_requested");
5090 assert_eq!(
5091 att.matched_paths,
5092 vec!["text_content".to_owned()],
5093 "chunk matched_paths must be [\"text_content\"], got {:?}",
5094 att.matched_paths,
5095 );
5096 }
5097
5098 #[test]
5105 #[allow(clippy::too_many_lines)]
5106 fn mixed_kind_results_get_per_kind_matched_paths() {
5107 use crate::{AdminService, rebuild_actor::RebuildMode};
5108 use fathomdb_query::compile_search;
5109
5110 let db = NamedTempFile::new().expect("temporary db");
5111 let schema_manager = Arc::new(SchemaManager::new());
5112
5113 {
5116 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5117 admin
5118 .register_fts_property_schema_with_entries(
5119 "KindA",
5120 &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
5121 None,
5122 &[],
5123 RebuildMode::Eager,
5124 )
5125 .expect("register KindA FTS schema");
5126 admin
5127 .register_fts_property_schema_with_entries(
5128 "KindB",
5129 &[crate::FtsPropertyPathSpec::scalar("$.beta")],
5130 None,
5131 &[],
5132 RebuildMode::Eager,
5133 )
5134 .expect("register KindB FTS schema");
5135 }
5136
5137 let coordinator = ExecutionCoordinator::open(
5138 db.path(),
5139 Arc::clone(&schema_manager),
5140 None,
5141 1,
5142 Arc::new(TelemetryCounters::default()),
5143 None,
5144 )
5145 .expect("coordinator");
5146
5147 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5148
5149 conn.execute(
5151 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5152 VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5153 [],
5154 )
5155 .expect("insert KindA node");
5156 conn.execute(
5158 "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
5159 VALUES ('node-a', 'xenoterm')",
5160 [],
5161 )
5162 .expect("insert KindA fts row");
5163 conn.execute(
5164 "INSERT INTO fts_node_property_positions \
5165 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5166 VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5167 [],
5168 )
5169 .expect("insert KindA position");
5170
5171 conn.execute(
5173 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5174 VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5175 [],
5176 )
5177 .expect("insert KindB node");
5178 conn.execute(
5180 "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
5181 VALUES ('node-b', 'xenoterm')",
5182 [],
5183 )
5184 .expect("insert KindB fts row");
5185 conn.execute(
5186 "INSERT INTO fts_node_property_positions \
5187 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5188 VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5189 [],
5190 )
5191 .expect("insert KindB position");
5192
5193 let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5195 let mut compiled = compile_search(ast.ast()).expect("compile search");
5196 compiled.attribution_requested = true;
5197
5198 let rows = coordinator
5199 .execute_compiled_search(&compiled)
5200 .expect("search");
5201
5202 assert!(
5204 rows.hits.len() >= 2,
5205 "expected hits for both kinds, got {}",
5206 rows.hits.len()
5207 );
5208
5209 for hit in &rows.hits {
5210 let att = hit
5211 .attribution
5212 .as_ref()
5213 .expect("attribution must be Some when attribution_requested");
5214 match hit.node.kind.as_str() {
5215 "KindA" => {
5216 assert_eq!(
5217 att.matched_paths,
5218 vec!["$.alpha".to_owned()],
5219 "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5220 att.matched_paths,
5221 );
5222 }
5223 "KindB" => {
5224 assert_eq!(
5225 att.matched_paths,
5226 vec!["$.beta".to_owned()],
5227 "KindB hit must have matched_paths=['$.beta'], got {:?}",
5228 att.matched_paths,
5229 );
5230 }
5231 other => {
5232 assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5234 }
5235 }
5236 }
5237 }
5238
5239 #[test]
5242 fn tokenizer_strategy_from_str() {
5243 use super::TokenizerStrategy;
5244 assert_eq!(
5245 TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5246 TokenizerStrategy::RecallOptimizedEnglish,
5247 );
5248 assert_eq!(
5249 TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5250 TokenizerStrategy::PrecisionOptimized,
5251 );
5252 assert_eq!(
5253 TokenizerStrategy::from_str("trigram"),
5254 TokenizerStrategy::SubstringTrigram,
5255 );
5256 assert_eq!(
5257 TokenizerStrategy::from_str("icu"),
5258 TokenizerStrategy::GlobalCjk,
5259 );
5260 assert_eq!(
5261 TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5262 TokenizerStrategy::SourceCode,
5263 );
5264 assert_eq!(
5266 TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5267 TokenizerStrategy::SourceCode,
5268 );
5269 assert_eq!(
5270 TokenizerStrategy::from_str("my_custom_tokenizer"),
5271 TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5272 );
5273 }
5274
5275 #[test]
5276 fn trigram_short_query_returns_empty() {
5277 use fathomdb_query::compile_search;
5278
5279 let db = NamedTempFile::new().expect("temporary db");
5280 let schema_manager = Arc::new(SchemaManager::new());
5281
5282 {
5284 let bootstrap = ExecutionCoordinator::open(
5285 db.path(),
5286 Arc::clone(&schema_manager),
5287 None,
5288 1,
5289 Arc::new(TelemetryCounters::default()),
5290 None,
5291 )
5292 .expect("bootstrap coordinator");
5293 drop(bootstrap);
5294 }
5295
5296 {
5298 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5299 conn.execute_batch(
5300 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5301 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5302 )
5303 .expect("insert profile");
5304 }
5305
5306 let coordinator = ExecutionCoordinator::open(
5308 db.path(),
5309 Arc::clone(&schema_manager),
5310 None,
5311 1,
5312 Arc::new(TelemetryCounters::default()),
5313 None,
5314 )
5315 .expect("coordinator reopen");
5316
5317 let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5319 let compiled = compile_search(ast.ast()).expect("compile search");
5320 let rows = coordinator
5321 .execute_compiled_search(&compiled)
5322 .expect("short trigram query must not error");
5323 assert!(
5324 rows.hits.is_empty(),
5325 "2-char trigram query must return empty"
5326 );
5327 }
5328
5329 #[test]
5330 fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5331 use fathomdb_query::compile_search;
5341
5342 let db = NamedTempFile::new().expect("temporary db");
5343 let schema_manager = Arc::new(SchemaManager::new());
5344
5345 {
5347 let bootstrap = ExecutionCoordinator::open(
5348 db.path(),
5349 Arc::clone(&schema_manager),
5350 None,
5351 1,
5352 Arc::new(TelemetryCounters::default()),
5353 None,
5354 )
5355 .expect("bootstrap coordinator");
5356 drop(bootstrap);
5357 }
5358
5359 {
5361 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5362 conn.execute(
5363 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5364 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5365 [],
5366 )
5367 .expect("insert profile");
5368 conn.execute_batch(
5369 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5370 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5371 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5372 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5373 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5374 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5375 )
5376 .expect("insert node and fts row");
5377 }
5378
5379 let coordinator = ExecutionCoordinator::open(
5381 db.path(),
5382 Arc::clone(&schema_manager),
5383 None,
5384 1,
5385 Arc::new(TelemetryCounters::default()),
5386 None,
5387 )
5388 .expect("coordinator reopen");
5389
5390 let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5392 let compiled = compile_search(ast.ast()).expect("compile search");
5393 let rows = coordinator
5394 .execute_compiled_search(&compiled)
5395 .expect("source code search must not error");
5396 assert!(
5397 !rows.hits.is_empty(),
5398 "SourceCode strategy search for 'std.io' must return the document; \
5399 got empty — FTS5 expression was likely corrupted by post-render escaping"
5400 );
5401 }
5402
5403 #[derive(Debug)]
5406 struct StubEmbedder {
5407 model_identity: String,
5408 dimension: usize,
5409 }
5410
5411 impl StubEmbedder {
5412 fn new(model_identity: &str, dimension: usize) -> Self {
5413 Self {
5414 model_identity: model_identity.to_owned(),
5415 dimension,
5416 }
5417 }
5418 }
5419
5420 impl crate::embedder::QueryEmbedder for StubEmbedder {
5421 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5422 Ok(vec![0.0; self.dimension])
5423 }
5424 fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5425 crate::embedder::QueryEmbedderIdentity {
5426 model_identity: self.model_identity.clone(),
5427 model_version: "1.0".to_owned(),
5428 dimension: self.dimension,
5429 normalization_policy: "l2".to_owned(),
5430 }
5431 }
5432 fn max_tokens(&self) -> usize {
5433 512
5434 }
5435 }
5436
5437 fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5438 let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5439 conn.execute_batch(
5440 "CREATE TABLE IF NOT EXISTS projection_profiles (
5441 kind TEXT NOT NULL,
5442 facet TEXT NOT NULL,
5443 config_json TEXT NOT NULL,
5444 active_at INTEGER,
5445 created_at INTEGER,
5446 PRIMARY KEY (kind, facet)
5447 );",
5448 )
5449 .expect("create projection_profiles");
5450 conn
5451 }
5452
5453 #[test]
5454 fn check_vec_identity_no_profile_no_panic() {
5455 let conn = make_in_memory_db_with_projection_profiles();
5456 let embedder = StubEmbedder::new("bge-small", 384);
5457 let result = super::check_vec_identity_at_open(&conn, &embedder);
5458 assert!(result.is_ok(), "no profile row must return Ok(())");
5459 }
5460
5461 #[test]
5462 fn check_vec_identity_matching_identity_ok() {
5463 let conn = make_in_memory_db_with_projection_profiles();
5464 conn.execute(
5465 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5466 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5467 [],
5468 )
5469 .expect("insert profile");
5470 let embedder = StubEmbedder::new("bge-small", 384);
5471 let result = super::check_vec_identity_at_open(&conn, &embedder);
5472 assert!(result.is_ok(), "matching profile must return Ok(())");
5473 }
5474
5475 #[test]
5476 fn check_vec_identity_mismatched_dimensions_ok() {
5477 let conn = make_in_memory_db_with_projection_profiles();
5478 conn.execute(
5479 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5480 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5481 [],
5482 )
5483 .expect("insert profile");
5484 let embedder = StubEmbedder::new("bge-small", 768);
5486 let result = super::check_vec_identity_at_open(&conn, &embedder);
5487 assert!(
5488 result.is_ok(),
5489 "dimension mismatch must warn and return Ok(())"
5490 );
5491 }
5492
5493 #[test]
5494 fn custom_tokenizer_passthrough() {
5495 use super::TokenizerStrategy;
5496 let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5497 assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5499 assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5501 assert_ne!(strategy, TokenizerStrategy::SourceCode);
5502 }
5503
5504 #[test]
5505 fn check_vec_identity_mismatched_model_ok() {
5506 let conn = make_in_memory_db_with_projection_profiles();
5507 conn.execute(
5508 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5509 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5510 [],
5511 )
5512 .expect("insert profile");
5513 let embedder = StubEmbedder::new("bge-large", 384);
5515 let result = super::check_vec_identity_at_open(&conn, &embedder);
5516 assert!(
5517 result.is_ok(),
5518 "model_identity mismatch must warn and return Ok(())"
5519 );
5520 }
5521}