1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10 FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11 SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25const BATCH_CHUNK_SIZE: usize = 200;
30
31fn compile_expansion_filter(
44 filter: Option<&Predicate>,
45 first_param: usize,
46) -> (String, Vec<Value>) {
47 let Some(predicate) = filter else {
48 return (String::new(), vec![]);
49 };
50 let p = first_param;
51 match predicate {
52 Predicate::JsonPathEq { path, value } => {
53 let val = match value {
54 ScalarValue::Text(t) => Value::Text(t.clone()),
55 ScalarValue::Integer(i) => Value::Integer(*i),
56 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
57 };
58 (
59 format!(
60 "\n AND json_extract(n.properties, ?{p}) = ?{}",
61 p + 1
62 ),
63 vec![Value::Text(path.clone()), val],
64 )
65 }
66 Predicate::JsonPathCompare { path, op, value } => {
67 let val = match value {
68 ScalarValue::Text(t) => Value::Text(t.clone()),
69 ScalarValue::Integer(i) => Value::Integer(*i),
70 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
71 };
72 let operator = match op {
73 ComparisonOp::Gt => ">",
74 ComparisonOp::Gte => ">=",
75 ComparisonOp::Lt => "<",
76 ComparisonOp::Lte => "<=",
77 };
78 (
79 format!(
80 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
81 p + 1
82 ),
83 vec![Value::Text(path.clone()), val],
84 )
85 }
86 Predicate::JsonPathFusedEq { path, value } => (
87 format!(
88 "\n AND json_extract(n.properties, ?{p}) = ?{}",
89 p + 1
90 ),
91 vec![Value::Text(path.clone()), Value::Text(value.clone())],
92 ),
93 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
94 let operator = match op {
95 ComparisonOp::Gt => ">",
96 ComparisonOp::Gte => ">=",
97 ComparisonOp::Lt => "<",
98 ComparisonOp::Lte => "<=",
99 };
100 (
101 format!(
102 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
103 p + 1
104 ),
105 vec![Value::Text(path.clone()), Value::Integer(*value)],
106 )
107 }
108 Predicate::KindEq(kind) => (
109 format!("\n AND n.kind = ?{p}"),
110 vec![Value::Text(kind.clone())],
111 ),
112 Predicate::LogicalIdEq(logical_id) => (
113 format!("\n AND n.logical_id = ?{p}"),
114 vec![Value::Text(logical_id.clone())],
115 ),
116 Predicate::SourceRefEq(source_ref) => (
117 format!("\n AND n.source_ref = ?{p}"),
118 vec![Value::Text(source_ref.clone())],
119 ),
120 Predicate::ContentRefEq(uri) => (
121 format!("\n AND n.content_ref = ?{p}"),
122 vec![Value::Text(uri.clone())],
123 ),
124 Predicate::ContentRefNotNull => (
125 "\n AND n.content_ref IS NOT NULL".to_owned(),
126 vec![],
127 ),
128 }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum TokenizerStrategy {
137 RecallOptimizedEnglish,
139 PrecisionOptimized,
141 SubstringTrigram,
143 GlobalCjk,
145 SourceCode,
147 Custom(String),
149}
150
151impl TokenizerStrategy {
152 pub fn from_str(s: &str) -> Self {
155 match s {
156 "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
157 "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
158 "trigram" => Self::SubstringTrigram,
159 "icu" => Self::GlobalCjk,
160 s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
161 other => Self::Custom(other.to_string()),
162 }
163 }
164}
165
166struct ReadPool {
171 connections: Vec<Mutex<Connection>>,
172}
173
174impl fmt::Debug for ReadPool {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 f.debug_struct("ReadPool")
177 .field("size", &self.connections.len())
178 .finish()
179 }
180}
181
182impl ReadPool {
183 fn new(
194 db_path: &Path,
195 pool_size: usize,
196 schema_manager: &SchemaManager,
197 vector_enabled: bool,
198 ) -> Result<Self, EngineError> {
199 let mut connections = Vec::with_capacity(pool_size);
200 for _ in 0..pool_size {
201 let conn = if vector_enabled {
202 #[cfg(feature = "sqlite-vec")]
203 {
204 sqlite::open_readonly_connection_with_vec(db_path)?
205 }
206 #[cfg(not(feature = "sqlite-vec"))]
207 {
208 sqlite::open_readonly_connection(db_path)?
209 }
210 } else {
211 sqlite::open_readonly_connection(db_path)?
212 };
213 schema_manager
214 .initialize_reader_connection(&conn)
215 .map_err(EngineError::Schema)?;
216 connections.push(Mutex::new(conn));
217 }
218 Ok(Self { connections })
219 }
220
221 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
230 for conn in &self.connections {
232 if let Ok(guard) = conn.try_lock() {
233 return Ok(guard);
234 }
235 }
236 self.connections[0].lock().map_err(|_| {
238 trace_error!("read pool: connection mutex poisoned");
239 EngineError::Bridge("connection mutex poisoned".to_owned())
240 })
241 }
242
243 #[cfg(test)]
245 fn size(&self) -> usize {
246 self.connections.len()
247 }
248}
249
250#[derive(Clone, Debug, PartialEq, Eq)]
254pub struct QueryPlan {
255 pub sql: String,
256 pub bind_count: usize,
257 pub driving_table: DrivingTable,
258 pub shape_hash: ShapeHash,
259 pub cache_hit: bool,
260}
261
262#[derive(Clone, Debug, PartialEq, Eq)]
264pub struct NodeRow {
265 pub row_id: String,
267 pub logical_id: String,
269 pub kind: String,
271 pub properties: String,
273 pub content_ref: Option<String>,
275 pub last_accessed_at: Option<i64>,
277}
278
279#[derive(Clone, Debug, PartialEq, Eq)]
281pub struct RunRow {
282 pub id: String,
284 pub kind: String,
286 pub status: String,
288 pub properties: String,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct StepRow {
295 pub id: String,
297 pub run_id: String,
299 pub kind: String,
301 pub status: String,
303 pub properties: String,
305}
306
307#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct ActionRow {
310 pub id: String,
312 pub step_id: String,
314 pub kind: String,
316 pub status: String,
318 pub properties: String,
320}
321
322#[derive(Clone, Debug, PartialEq, Eq)]
324pub struct ProvenanceEvent {
325 pub id: String,
326 pub event_type: String,
327 pub subject: String,
328 pub source_ref: Option<String>,
329 pub metadata_json: String,
330 pub created_at: i64,
331}
332
333#[derive(Clone, Debug, Default, PartialEq, Eq)]
335pub struct QueryRows {
336 pub nodes: Vec<NodeRow>,
338 pub runs: Vec<RunRow>,
340 pub steps: Vec<StepRow>,
342 pub actions: Vec<ActionRow>,
344 pub was_degraded: bool,
347}
348
349#[derive(Clone, Debug, PartialEq, Eq)]
351pub struct ExpansionRootRows {
352 pub root_logical_id: String,
354 pub nodes: Vec<NodeRow>,
356}
357
358#[derive(Clone, Debug, PartialEq, Eq)]
360pub struct ExpansionSlotRows {
361 pub slot: String,
363 pub roots: Vec<ExpansionRootRows>,
365}
366
367#[derive(Clone, Debug, Default, PartialEq, Eq)]
369pub struct GroupedQueryRows {
370 pub roots: Vec<NodeRow>,
372 pub expansions: Vec<ExpansionSlotRows>,
374 pub was_degraded: bool,
376}
377
378pub struct ExecutionCoordinator {
380 database_path: PathBuf,
381 schema_manager: Arc<SchemaManager>,
382 pool: ReadPool,
383 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
384 vector_enabled: bool,
385 vec_degradation_warned: AtomicBool,
386 telemetry: Arc<TelemetryCounters>,
387 query_embedder: Option<Arc<dyn QueryEmbedder>>,
394 fts_strategies: HashMap<String, TokenizerStrategy>,
405}
406
407impl fmt::Debug for ExecutionCoordinator {
408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409 f.debug_struct("ExecutionCoordinator")
410 .field("database_path", &self.database_path)
411 .finish_non_exhaustive()
412 }
413}
414
415impl ExecutionCoordinator {
416 pub fn open(
419 path: impl AsRef<Path>,
420 schema_manager: Arc<SchemaManager>,
421 vector_dimension: Option<usize>,
422 pool_size: usize,
423 telemetry: Arc<TelemetryCounters>,
424 query_embedder: Option<Arc<dyn QueryEmbedder>>,
425 ) -> Result<Self, EngineError> {
426 let path = path.as_ref().to_path_buf();
427 #[cfg(feature = "sqlite-vec")]
428 let mut conn = if vector_dimension.is_some() {
429 sqlite::open_connection_with_vec(&path)?
430 } else {
431 sqlite::open_connection(&path)?
432 };
433 #[cfg(not(feature = "sqlite-vec"))]
434 let mut conn = sqlite::open_connection(&path)?;
435
436 let report = schema_manager.bootstrap(&conn)?;
437
438 run_open_time_fts_guards(&mut conn)?;
457
458 #[cfg(feature = "sqlite-vec")]
459 let mut vector_enabled = report.vector_profile_enabled;
460 #[cfg(not(feature = "sqlite-vec"))]
461 let vector_enabled = {
462 let _ = &report;
463 false
464 };
465
466 if vector_dimension.is_some() {
471 #[cfg(feature = "sqlite-vec")]
472 {
473 vector_enabled = true;
474 }
475 }
476
477 if let Some(ref emb) = query_embedder {
480 check_vec_identity_at_open(&conn, emb.as_ref())?;
481 }
482
483 let fts_strategies: HashMap<String, TokenizerStrategy> = {
485 let mut map = HashMap::new();
486 let mut stmt = conn
487 .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
488 let rows = stmt.query_map([], |row| {
489 let kind: String = row.get(0)?;
490 let config_json: String = row.get(1)?;
491 Ok((kind, config_json))
492 })?;
493 for row in rows.flatten() {
494 let (kind, config_json) = row;
495 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
496 && let Some(tok) = v["tokenizer"].as_str()
497 {
498 map.insert(kind, TokenizerStrategy::from_str(tok));
499 }
500 }
501 map
502 };
503
504 drop(conn);
506
507 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
508
509 Ok(Self {
510 database_path: path,
511 schema_manager,
512 pool,
513 shape_sql_map: Mutex::new(HashMap::new()),
514 vector_enabled,
515 vec_degradation_warned: AtomicBool::new(false),
516 telemetry,
517 query_embedder,
518 fts_strategies,
519 })
520 }
521
522 pub fn database_path(&self) -> &Path {
524 &self.database_path
525 }
526
527 #[must_use]
529 pub fn vector_enabled(&self) -> bool {
530 self.vector_enabled
531 }
532
533 #[must_use]
540 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
541 self.query_embedder.as_ref()
542 }
543
544 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
545 self.pool.acquire()
546 }
547
548 #[must_use]
554 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
555 let mut total = SqliteCacheStatus::default();
556 for conn_mutex in &self.pool.connections {
557 if let Ok(conn) = conn_mutex.try_lock() {
558 total.add(&read_db_cache_status(&conn));
559 }
560 }
561 total
562 }
563
564 #[allow(clippy::expect_used, clippy::too_many_lines)]
567 pub fn execute_compiled_read(
568 &self,
569 compiled: &CompiledQuery,
570 ) -> Result<QueryRows, EngineError> {
571 if compiled.driving_table == DrivingTable::FtsNodes
576 && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
577 && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
578 {
579 self.telemetry.increment_queries();
580 return Ok(QueryRows {
581 nodes,
582 runs: Vec::new(),
583 steps: Vec::new(),
584 actions: Vec::new(),
585 was_degraded: false,
586 });
587 }
588
589 let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
599 let conn_check = match self.lock_connection() {
600 Ok(g) => g,
601 Err(e) => {
602 self.telemetry.increment_errors();
603 return Err(e);
604 }
605 };
606 let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
607 drop(conn_check);
608 result?
609 } else if compiled.driving_table == DrivingTable::VecNodes {
610 let root_kind = compiled
611 .binds
612 .get(1)
613 .and_then(|b| {
614 if let BindValue::Text(k) = b {
615 Some(k.as_str())
616 } else {
617 None
618 }
619 })
620 .unwrap_or("");
621 let vec_table = if root_kind.is_empty() {
622 "vec__unknown".to_owned()
623 } else {
624 fathomdb_schema::vec_kind_table_name(root_kind)
625 };
626 let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
627 (new_sql, compiled.binds.clone())
628 } else {
629 (compiled.sql.clone(), compiled.binds.clone())
630 };
631
632 let row_sql = wrap_node_row_projection_sql(&adapted_sql);
633 {
639 let mut cache = self
640 .shape_sql_map
641 .lock()
642 .unwrap_or_else(PoisonError::into_inner);
643 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
644 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
645 cache.clear();
646 }
647 cache.insert(compiled.shape_hash, row_sql.clone());
648 }
649
650 let bind_values = adapted_binds
651 .iter()
652 .map(bind_value_to_sql)
653 .collect::<Vec<_>>();
654
655 let conn_guard = match self.lock_connection() {
660 Ok(g) => g,
661 Err(e) => {
662 self.telemetry.increment_errors();
663 return Err(e);
664 }
665 };
666 let mut statement = match conn_guard.prepare_cached(&row_sql) {
667 Ok(stmt) => stmt,
668 Err(e) if is_vec_table_absent(&e) => {
669 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
670 trace_warn!("vector table absent, degrading to non-vector query");
671 }
672 return Ok(QueryRows {
673 was_degraded: true,
674 ..Default::default()
675 });
676 }
677 Err(e) => {
678 self.telemetry.increment_errors();
679 return Err(EngineError::Sqlite(e));
680 }
681 };
682 let nodes = match statement
683 .query_map(params_from_iter(bind_values.iter()), |row| {
684 Ok(NodeRow {
685 row_id: row.get(0)?,
686 logical_id: row.get(1)?,
687 kind: row.get(2)?,
688 properties: row.get(3)?,
689 content_ref: row.get(4)?,
690 last_accessed_at: row.get(5)?,
691 })
692 })
693 .and_then(Iterator::collect)
694 {
695 Ok(rows) => rows,
696 Err(e) => {
697 self.telemetry.increment_errors();
698 return Err(EngineError::Sqlite(e));
699 }
700 };
701
702 self.telemetry.increment_queries();
703 Ok(QueryRows {
704 nodes,
705 runs: Vec::new(),
706 steps: Vec::new(),
707 actions: Vec::new(),
708 was_degraded: false,
709 })
710 }
711
712 pub fn execute_compiled_search(
727 &self,
728 compiled: &CompiledSearch,
729 ) -> Result<SearchRows, EngineError> {
730 let (relaxed_query, was_degraded_at_plan_time) =
737 fathomdb_query::derive_relaxed(&compiled.text_query);
738 let relaxed = relaxed_query.map(|q| CompiledSearch {
739 root_kind: compiled.root_kind.clone(),
740 text_query: q,
741 limit: compiled.limit,
742 fusable_filters: compiled.fusable_filters.clone(),
743 residual_filters: compiled.residual_filters.clone(),
744 attribution_requested: compiled.attribution_requested,
745 });
746 let plan = CompiledSearchPlan {
747 strict: compiled.clone(),
748 relaxed,
749 was_degraded_at_plan_time,
750 };
751 self.execute_compiled_search_plan(&plan)
752 }
753
754 pub fn execute_compiled_search_plan(
773 &self,
774 plan: &CompiledSearchPlan,
775 ) -> Result<SearchRows, EngineError> {
776 let strict = &plan.strict;
777 let limit = strict.limit;
778 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
779
780 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
781 let strict_underfilled = strict_hits.len() < fallback_threshold;
782
783 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
784 let mut fallback_used = false;
785 let mut was_degraded = false;
786 if let Some(relaxed) = plan.relaxed.as_ref()
787 && strict_underfilled
788 {
789 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
790 fallback_used = true;
791 was_degraded = plan.was_degraded_at_plan_time;
792 }
793
794 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
795 if strict.attribution_requested {
799 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
800 self.populate_attribution_for_hits(
801 &mut merged,
802 &strict.text_query,
803 relaxed_text_query,
804 )?;
805 }
806 let strict_hit_count = merged
807 .iter()
808 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
809 .count();
810 let relaxed_hit_count = merged
811 .iter()
812 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
813 .count();
814 let vector_hit_count = 0;
818
819 Ok(SearchRows {
820 hits: merged,
821 strict_hit_count,
822 relaxed_hit_count,
823 vector_hit_count,
824 fallback_used,
825 was_degraded,
826 })
827 }
828
829 #[allow(clippy::too_many_lines)]
858 pub fn execute_compiled_vector_search(
859 &self,
860 compiled: &CompiledVectorSearch,
861 ) -> Result<SearchRows, EngineError> {
862 use std::fmt::Write as _;
863
864 if compiled.limit == 0 {
868 return Ok(SearchRows::default());
869 }
870
871 let filter_by_kind = !compiled.root_kind.is_empty();
872 let mut binds: Vec<BindValue> = Vec::new();
873 binds.push(BindValue::Text(compiled.query_text.clone()));
874 if filter_by_kind {
875 binds.push(BindValue::Text(compiled.root_kind.clone()));
876 }
877
878 let mut fused_clauses = String::new();
881 for predicate in &compiled.fusable_filters {
882 match predicate {
883 Predicate::KindEq(kind) => {
884 binds.push(BindValue::Text(kind.clone()));
885 let idx = binds.len();
886 let _ = write!(
887 fused_clauses,
888 "\n AND src.kind = ?{idx}"
889 );
890 }
891 Predicate::LogicalIdEq(logical_id) => {
892 binds.push(BindValue::Text(logical_id.clone()));
893 let idx = binds.len();
894 let _ = write!(
895 fused_clauses,
896 "\n AND src.logical_id = ?{idx}"
897 );
898 }
899 Predicate::SourceRefEq(source_ref) => {
900 binds.push(BindValue::Text(source_ref.clone()));
901 let idx = binds.len();
902 let _ = write!(
903 fused_clauses,
904 "\n AND src.source_ref = ?{idx}"
905 );
906 }
907 Predicate::ContentRefEq(uri) => {
908 binds.push(BindValue::Text(uri.clone()));
909 let idx = binds.len();
910 let _ = write!(
911 fused_clauses,
912 "\n AND src.content_ref = ?{idx}"
913 );
914 }
915 Predicate::ContentRefNotNull => {
916 fused_clauses
917 .push_str("\n AND src.content_ref IS NOT NULL");
918 }
919 Predicate::JsonPathFusedEq { path, value } => {
920 binds.push(BindValue::Text(path.clone()));
921 let path_idx = binds.len();
922 binds.push(BindValue::Text(value.clone()));
923 let value_idx = binds.len();
924 let _ = write!(
925 fused_clauses,
926 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
927 );
928 }
929 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
930 binds.push(BindValue::Text(path.clone()));
931 let path_idx = binds.len();
932 binds.push(BindValue::Integer(*value));
933 let value_idx = binds.len();
934 let operator = match op {
935 ComparisonOp::Gt => ">",
936 ComparisonOp::Gte => ">=",
937 ComparisonOp::Lt => "<",
938 ComparisonOp::Lte => "<=",
939 };
940 let _ = write!(
941 fused_clauses,
942 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
943 );
944 }
945 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
946 }
949 }
950 }
951
952 let mut filter_clauses = String::new();
954 for predicate in &compiled.residual_filters {
955 match predicate {
956 Predicate::JsonPathEq { path, value } => {
957 binds.push(BindValue::Text(path.clone()));
958 let path_idx = binds.len();
959 binds.push(scalar_to_bind(value));
960 let value_idx = binds.len();
961 let _ = write!(
962 filter_clauses,
963 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
964 );
965 }
966 Predicate::JsonPathCompare { path, op, value } => {
967 binds.push(BindValue::Text(path.clone()));
968 let path_idx = binds.len();
969 binds.push(scalar_to_bind(value));
970 let value_idx = binds.len();
971 let operator = match op {
972 ComparisonOp::Gt => ">",
973 ComparisonOp::Gte => ">=",
974 ComparisonOp::Lt => "<",
975 ComparisonOp::Lte => "<=",
976 };
977 let _ = write!(
978 filter_clauses,
979 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
980 );
981 }
982 Predicate::KindEq(_)
983 | Predicate::LogicalIdEq(_)
984 | Predicate::SourceRefEq(_)
985 | Predicate::ContentRefEq(_)
986 | Predicate::ContentRefNotNull
987 | Predicate::JsonPathFusedEq { .. }
988 | Predicate::JsonPathFusedTimestampCmp { .. } => {
989 }
991 }
992 }
993
994 let limit = compiled.limit;
997 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
998 let limit_idx = binds.len();
999
1000 let base_limit = limit;
1006 let kind_clause = if filter_by_kind {
1007 "\n AND src.kind = ?2"
1008 } else {
1009 ""
1010 };
1011
1012 let vec_table = if compiled.root_kind.is_empty() {
1016 "vec__unknown".to_owned()
1017 } else {
1018 fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1019 };
1020
1021 let sql = format!(
1022 "WITH vector_hits AS (
1023 SELECT
1024 src.row_id AS row_id,
1025 src.logical_id AS logical_id,
1026 src.kind AS kind,
1027 src.properties AS properties,
1028 src.source_ref AS source_ref,
1029 src.content_ref AS content_ref,
1030 src.created_at AS created_at,
1031 vc.distance AS distance,
1032 vc.chunk_id AS chunk_id
1033 FROM (
1034 SELECT chunk_id, distance
1035 FROM {vec_table}
1036 WHERE embedding MATCH ?1
1037 LIMIT {base_limit}
1038 ) vc
1039 JOIN chunks c ON c.id = vc.chunk_id
1040 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1041 WHERE 1 = 1{kind_clause}{fused_clauses}
1042 )
1043 SELECT
1044 h.row_id,
1045 h.logical_id,
1046 h.kind,
1047 h.properties,
1048 h.content_ref,
1049 am.last_accessed_at,
1050 h.created_at,
1051 h.distance,
1052 h.chunk_id
1053 FROM vector_hits h
1054 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1055 WHERE 1 = 1{filter_clauses}
1056 ORDER BY h.distance ASC
1057 LIMIT ?{limit_idx}"
1058 );
1059
1060 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1061
1062 let conn_guard = match self.lock_connection() {
1063 Ok(g) => g,
1064 Err(e) => {
1065 self.telemetry.increment_errors();
1066 return Err(e);
1067 }
1068 };
1069 let mut statement = match conn_guard.prepare_cached(&sql) {
1070 Ok(stmt) => stmt,
1071 Err(e) if is_vec_table_absent(&e) => {
1072 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1074 trace_warn!("vector table absent, degrading vector_search to empty result");
1075 }
1076 return Ok(SearchRows {
1077 hits: Vec::new(),
1078 strict_hit_count: 0,
1079 relaxed_hit_count: 0,
1080 vector_hit_count: 0,
1081 fallback_used: false,
1082 was_degraded: true,
1083 });
1084 }
1085 Err(e) => {
1086 self.telemetry.increment_errors();
1087 return Err(EngineError::Sqlite(e));
1088 }
1089 };
1090
1091 let attribution_requested = compiled.attribution_requested;
1092 let hits = match statement
1093 .query_map(params_from_iter(bind_values.iter()), |row| {
1094 let distance: f64 = row.get(7)?;
1095 let score = -distance;
1102 Ok(SearchHit {
1103 node: fathomdb_query::NodeRowLite {
1104 row_id: row.get(0)?,
1105 logical_id: row.get(1)?,
1106 kind: row.get(2)?,
1107 properties: row.get(3)?,
1108 content_ref: row.get(4)?,
1109 last_accessed_at: row.get(5)?,
1110 },
1111 written_at: row.get(6)?,
1112 score,
1113 modality: RetrievalModality::Vector,
1114 source: SearchHitSource::Vector,
1115 match_mode: None,
1117 snippet: None,
1119 projection_row_id: row.get::<_, Option<String>>(8)?,
1120 vector_distance: Some(distance),
1121 attribution: if attribution_requested {
1122 Some(HitAttribution {
1123 matched_paths: Vec::new(),
1124 })
1125 } else {
1126 None
1127 },
1128 })
1129 })
1130 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1131 {
1132 Ok(rows) => rows,
1133 Err(e) => {
1134 if is_vec_table_absent(&e) {
1138 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1139 trace_warn!(
1140 "vector table absent at query time, degrading vector_search to empty result"
1141 );
1142 }
1143 drop(statement);
1144 drop(conn_guard);
1145 return Ok(SearchRows {
1146 hits: Vec::new(),
1147 strict_hit_count: 0,
1148 relaxed_hit_count: 0,
1149 vector_hit_count: 0,
1150 fallback_used: false,
1151 was_degraded: true,
1152 });
1153 }
1154 self.telemetry.increment_errors();
1155 return Err(EngineError::Sqlite(e));
1156 }
1157 };
1158
1159 drop(statement);
1160 drop(conn_guard);
1161
1162 self.telemetry.increment_queries();
1163 let vector_hit_count = hits.len();
1164 Ok(SearchRows {
1165 hits,
1166 strict_hit_count: 0,
1167 relaxed_hit_count: 0,
1168 vector_hit_count,
1169 fallback_used: false,
1170 was_degraded: false,
1171 })
1172 }
1173
1174 pub fn execute_retrieval_plan(
1206 &self,
1207 plan: &CompiledRetrievalPlan,
1208 raw_query: &str,
1209 ) -> Result<SearchRows, EngineError> {
1210 let mut plan = plan.clone();
1216 let limit = plan.text.strict.limit;
1217
1218 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1220
1221 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1224 let strict_underfilled = strict_hits.len() < fallback_threshold;
1225 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1226 let mut fallback_used = false;
1227 let mut was_degraded = false;
1228 if let Some(relaxed) = plan.text.relaxed.as_ref()
1229 && strict_underfilled
1230 {
1231 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1232 fallback_used = true;
1233 was_degraded = plan.was_degraded_at_plan_time;
1234 }
1235
1236 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1243 if text_branches_empty && self.query_embedder.is_some() {
1244 self.fill_vector_branch(&mut plan, raw_query);
1245 }
1246
1247 let mut vector_hits: Vec<SearchHit> = Vec::new();
1252 if let Some(vector) = plan.vector.as_ref()
1253 && strict_hits.is_empty()
1254 && relaxed_hits.is_empty()
1255 {
1256 let vector_rows = self.execute_compiled_vector_search(vector)?;
1257 vector_hits = vector_rows.hits;
1262 if vector_rows.was_degraded {
1263 was_degraded = true;
1264 }
1265 }
1266 if text_branches_empty
1273 && plan.was_degraded_at_plan_time
1274 && plan.vector.is_none()
1275 && self.query_embedder.is_some()
1276 {
1277 was_degraded = true;
1278 }
1279
1280 let strict = &plan.text.strict;
1282 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1283 if strict.attribution_requested {
1284 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1285 self.populate_attribution_for_hits(
1286 &mut merged,
1287 &strict.text_query,
1288 relaxed_text_query,
1289 )?;
1290 }
1291
1292 let strict_hit_count = merged
1293 .iter()
1294 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1295 .count();
1296 let relaxed_hit_count = merged
1297 .iter()
1298 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1299 .count();
1300 let vector_hit_count = merged
1301 .iter()
1302 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1303 .count();
1304
1305 Ok(SearchRows {
1306 hits: merged,
1307 strict_hit_count,
1308 relaxed_hit_count,
1309 vector_hit_count,
1310 fallback_used,
1311 was_degraded,
1312 })
1313 }
1314
1315 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1331 let Some(embedder) = self.query_embedder.as_ref() else {
1332 return;
1333 };
1334 match embedder.embed_query(raw_query) {
1335 Ok(vec) => {
1336 let literal = match serde_json::to_string(&vec) {
1342 Ok(s) => s,
1343 Err(err) => {
1344 trace_warn!(
1345 error = %err,
1346 "query embedder vector serialization failed; skipping vector branch"
1347 );
1348 let _ = err; plan.was_degraded_at_plan_time = true;
1350 return;
1351 }
1352 };
1353 let strict = &plan.text.strict;
1354 plan.vector = Some(CompiledVectorSearch {
1355 root_kind: strict.root_kind.clone(),
1356 query_text: literal,
1357 limit: strict.limit,
1358 fusable_filters: strict.fusable_filters.clone(),
1359 residual_filters: strict.residual_filters.clone(),
1360 attribution_requested: strict.attribution_requested,
1361 });
1362 }
1363 Err(err) => {
1364 trace_warn!(
1365 error = %err,
1366 "query embedder unavailable, skipping vector branch"
1367 );
1368 let _ = err; plan.was_degraded_at_plan_time = true;
1370 }
1371 }
1372 }
1373
1374 #[allow(clippy::too_many_lines)]
1383 fn run_search_branch(
1384 &self,
1385 compiled: &CompiledSearch,
1386 branch: SearchBranch,
1387 ) -> Result<Vec<SearchHit>, EngineError> {
1388 use std::fmt::Write as _;
1389 if matches!(
1401 compiled.text_query,
1402 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1403 ) {
1404 return Ok(Vec::new());
1405 }
1406 let rendered_base = render_text_query_fts5(&compiled.text_query);
1407 let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1420 if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1421 && rendered_base
1422 .chars()
1423 .filter(|c| c.is_alphanumeric())
1424 .count()
1425 < 3
1426 {
1427 return Ok(Vec::new());
1428 }
1429 let rendered = rendered_base;
1430 let filter_by_kind = !compiled.root_kind.is_empty();
1436
1437 let conn_guard = match self.lock_connection() {
1441 Ok(g) => g,
1442 Err(e) => {
1443 self.telemetry.increment_errors();
1444 return Err(e);
1445 }
1446 };
1447
1448 let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1466 let kind = compiled.root_kind.clone();
1467 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1468 let exists: bool = conn_guard
1469 .query_row(
1470 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1471 rusqlite::params![prop_table],
1472 |_| Ok(true),
1473 )
1474 .optional()
1475 .map_err(EngineError::Sqlite)?
1476 .unwrap_or(false);
1477 if exists {
1478 vec![(kind, prop_table)]
1479 } else {
1480 vec![]
1481 }
1482 } else {
1483 let kind_eq_values: Vec<String> = compiled
1488 .fusable_filters
1489 .iter()
1490 .filter_map(|p| match p {
1491 Predicate::KindEq(k) => Some(k.clone()),
1492 _ => None,
1493 })
1494 .collect();
1495 if kind_eq_values.len() == 1 {
1496 let kind = kind_eq_values[0].clone();
1497 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1498 let exists: bool = conn_guard
1499 .query_row(
1500 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1501 rusqlite::params![prop_table],
1502 |_| Ok(true),
1503 )
1504 .optional()
1505 .map_err(EngineError::Sqlite)?
1506 .unwrap_or(false);
1507 if exists {
1508 vec![(kind, prop_table)]
1509 } else {
1510 vec![]
1511 }
1512 } else {
1513 let mut stmt = conn_guard
1517 .prepare("SELECT kind FROM fts_property_schemas")
1518 .map_err(EngineError::Sqlite)?;
1519 let all_kinds: Vec<String> = stmt
1520 .query_map([], |r| r.get::<_, String>(0))
1521 .map_err(EngineError::Sqlite)?
1522 .collect::<Result<Vec<_>, _>>()
1523 .map_err(EngineError::Sqlite)?;
1524 drop(stmt);
1525 let mut result = Vec::new();
1526 for kind in all_kinds {
1527 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1528 let exists: bool = conn_guard
1529 .query_row(
1530 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1531 rusqlite::params![prop_table],
1532 |_| Ok(true),
1533 )
1534 .optional()
1535 .map_err(EngineError::Sqlite)?
1536 .unwrap_or(false);
1537 if exists {
1538 result.push((kind, prop_table));
1539 }
1540 }
1541 result
1542 }
1543 };
1544 let use_prop_fts = !prop_fts_tables.is_empty();
1545
1546 let mut binds: Vec<BindValue> = if filter_by_kind {
1552 if use_prop_fts {
1553 vec![
1554 BindValue::Text(rendered.clone()),
1555 BindValue::Text(compiled.root_kind.clone()),
1556 BindValue::Text(rendered),
1557 ]
1558 } else {
1559 vec![
1560 BindValue::Text(rendered.clone()),
1561 BindValue::Text(compiled.root_kind.clone()),
1562 ]
1563 }
1564 } else if use_prop_fts {
1565 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1567 } else {
1568 vec![BindValue::Text(rendered)]
1569 };
1570
1571 let mut fused_clauses = String::new();
1580 for predicate in &compiled.fusable_filters {
1581 match predicate {
1582 Predicate::KindEq(kind) => {
1583 binds.push(BindValue::Text(kind.clone()));
1584 let idx = binds.len();
1585 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1586 }
1587 Predicate::LogicalIdEq(logical_id) => {
1588 binds.push(BindValue::Text(logical_id.clone()));
1589 let idx = binds.len();
1590 let _ = write!(
1591 fused_clauses,
1592 "\n AND u.logical_id = ?{idx}"
1593 );
1594 }
1595 Predicate::SourceRefEq(source_ref) => {
1596 binds.push(BindValue::Text(source_ref.clone()));
1597 let idx = binds.len();
1598 let _ = write!(
1599 fused_clauses,
1600 "\n AND u.source_ref = ?{idx}"
1601 );
1602 }
1603 Predicate::ContentRefEq(uri) => {
1604 binds.push(BindValue::Text(uri.clone()));
1605 let idx = binds.len();
1606 let _ = write!(
1607 fused_clauses,
1608 "\n AND u.content_ref = ?{idx}"
1609 );
1610 }
1611 Predicate::ContentRefNotNull => {
1612 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1613 }
1614 Predicate::JsonPathFusedEq { path, value } => {
1615 binds.push(BindValue::Text(path.clone()));
1616 let path_idx = binds.len();
1617 binds.push(BindValue::Text(value.clone()));
1618 let value_idx = binds.len();
1619 let _ = write!(
1620 fused_clauses,
1621 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1622 );
1623 }
1624 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1625 binds.push(BindValue::Text(path.clone()));
1626 let path_idx = binds.len();
1627 binds.push(BindValue::Integer(*value));
1628 let value_idx = binds.len();
1629 let operator = match op {
1630 ComparisonOp::Gt => ">",
1631 ComparisonOp::Gte => ">=",
1632 ComparisonOp::Lt => "<",
1633 ComparisonOp::Lte => "<=",
1634 };
1635 let _ = write!(
1636 fused_clauses,
1637 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1638 );
1639 }
1640 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1641 }
1644 }
1645 }
1646
1647 let mut filter_clauses = String::new();
1648 for predicate in &compiled.residual_filters {
1649 match predicate {
1650 Predicate::JsonPathEq { path, value } => {
1651 binds.push(BindValue::Text(path.clone()));
1652 let path_idx = binds.len();
1653 binds.push(scalar_to_bind(value));
1654 let value_idx = binds.len();
1655 let _ = write!(
1656 filter_clauses,
1657 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1658 );
1659 }
1660 Predicate::JsonPathCompare { path, op, value } => {
1661 binds.push(BindValue::Text(path.clone()));
1662 let path_idx = binds.len();
1663 binds.push(scalar_to_bind(value));
1664 let value_idx = binds.len();
1665 let operator = match op {
1666 ComparisonOp::Gt => ">",
1667 ComparisonOp::Gte => ">=",
1668 ComparisonOp::Lt => "<",
1669 ComparisonOp::Lte => "<=",
1670 };
1671 let _ = write!(
1672 filter_clauses,
1673 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1674 );
1675 }
1676 Predicate::KindEq(_)
1677 | Predicate::LogicalIdEq(_)
1678 | Predicate::SourceRefEq(_)
1679 | Predicate::ContentRefEq(_)
1680 | Predicate::ContentRefNotNull
1681 | Predicate::JsonPathFusedEq { .. }
1682 | Predicate::JsonPathFusedTimestampCmp { .. } => {
1683 }
1686 }
1687 }
1688
1689 let limit = compiled.limit;
1696 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1697 let limit_idx = binds.len();
1698 let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1714 let prop_arm_sql: String = if use_prop_fts {
1715 prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1716 let bm25_expr = conn_guard
1718 .query_row(
1719 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1720 rusqlite::params![kind],
1721 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1722 )
1723 .ok()
1724 .map_or_else(
1725 || format!("bm25({prop_table})"),
1726 |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1727 );
1728 let is_weighted = bm25_expr != format!("bm25({prop_table})");
1731 let snippet_expr = if is_weighted {
1732 "'' AS snippet".to_owned()
1733 } else {
1734 "substr(fp.text_content, 1, 200) AS snippet".to_owned()
1735 };
1736 let _ = write!(
1737 acc,
1738 "
1739 UNION ALL
1740 SELECT
1741 src.row_id AS row_id,
1742 fp.node_logical_id AS logical_id,
1743 src.kind AS kind,
1744 src.properties AS properties,
1745 src.source_ref AS source_ref,
1746 src.content_ref AS content_ref,
1747 src.created_at AS created_at,
1748 -{bm25_expr} AS score,
1749 'property' AS source,
1750 {snippet_expr},
1751 CAST(fp.rowid AS TEXT) AS projection_row_id
1752 FROM {prop_table} fp
1753 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1754 WHERE {prop_table} MATCH ?{prop_bind_idx}"
1755 );
1756 acc
1757 })
1758 } else {
1759 String::new()
1760 };
1761 let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
1762 ("?1", "\n AND src.kind = ?2")
1763 } else {
1764 ("?1", "")
1765 };
1766 let sql = format!(
1767 "WITH search_hits AS (
1768 SELECT
1769 u.row_id AS row_id,
1770 u.logical_id AS logical_id,
1771 u.kind AS kind,
1772 u.properties AS properties,
1773 u.source_ref AS source_ref,
1774 u.content_ref AS content_ref,
1775 u.created_at AS created_at,
1776 u.score AS score,
1777 u.source AS source,
1778 u.snippet AS snippet,
1779 u.projection_row_id AS projection_row_id
1780 FROM (
1781 SELECT
1782 src.row_id AS row_id,
1783 c.node_logical_id AS logical_id,
1784 src.kind AS kind,
1785 src.properties AS properties,
1786 src.source_ref AS source_ref,
1787 src.content_ref AS content_ref,
1788 src.created_at AS created_at,
1789 -bm25(fts_nodes) AS score,
1790 'chunk' AS source,
1791 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1792 f.chunk_id AS projection_row_id
1793 FROM fts_nodes f
1794 JOIN chunks c ON c.id = f.chunk_id
1795 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1796 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
1797 ) u
1798 WHERE 1 = 1{fused_clauses}
1799 ORDER BY u.score DESC
1800 LIMIT ?{limit_idx}
1801 )
1802 SELECT
1803 h.row_id,
1804 h.logical_id,
1805 h.kind,
1806 h.properties,
1807 h.content_ref,
1808 am.last_accessed_at,
1809 h.created_at,
1810 h.score,
1811 h.source,
1812 h.snippet,
1813 h.projection_row_id
1814 FROM search_hits h
1815 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1816 WHERE 1 = 1{filter_clauses}
1817 ORDER BY h.score DESC"
1818 );
1819
1820 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1821
1822 let mut statement = match conn_guard.prepare_cached(&sql) {
1823 Ok(stmt) => stmt,
1824 Err(e) => {
1825 self.telemetry.increment_errors();
1826 return Err(EngineError::Sqlite(e));
1827 }
1828 };
1829
1830 let hits = match statement
1831 .query_map(params_from_iter(bind_values.iter()), |row| {
1832 let source_str: String = row.get(8)?;
1833 let source = if source_str == "property" {
1838 SearchHitSource::Property
1839 } else {
1840 SearchHitSource::Chunk
1841 };
1842 let match_mode = match branch {
1843 SearchBranch::Strict => SearchMatchMode::Strict,
1844 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1845 };
1846 Ok(SearchHit {
1847 node: fathomdb_query::NodeRowLite {
1848 row_id: row.get(0)?,
1849 logical_id: row.get(1)?,
1850 kind: row.get(2)?,
1851 properties: row.get(3)?,
1852 content_ref: row.get(4)?,
1853 last_accessed_at: row.get(5)?,
1854 },
1855 written_at: row.get(6)?,
1856 score: row.get(7)?,
1857 modality: RetrievalModality::Text,
1859 source,
1860 match_mode: Some(match_mode),
1861 snippet: row.get(9)?,
1862 projection_row_id: row.get(10)?,
1863 vector_distance: None,
1864 attribution: None,
1865 })
1866 })
1867 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1868 {
1869 Ok(rows) => rows,
1870 Err(e) => {
1871 self.telemetry.increment_errors();
1872 return Err(EngineError::Sqlite(e));
1873 }
1874 };
1875
1876 drop(statement);
1880 drop(conn_guard);
1881
1882 self.telemetry.increment_queries();
1883 Ok(hits)
1884 }
1885
1886 fn populate_attribution_for_hits(
1890 &self,
1891 hits: &mut [SearchHit],
1892 strict_text_query: &fathomdb_query::TextQuery,
1893 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1894 ) -> Result<(), EngineError> {
1895 let conn_guard = match self.lock_connection() {
1896 Ok(g) => g,
1897 Err(e) => {
1898 self.telemetry.increment_errors();
1899 return Err(e);
1900 }
1901 };
1902 let strict_expr = render_text_query_fts5(strict_text_query);
1903 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1904 for hit in hits.iter_mut() {
1905 let match_expr = match hit.match_mode {
1910 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1911 Some(SearchMatchMode::Relaxed) => {
1912 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1913 }
1914 None => continue,
1915 };
1916 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1917 Ok(att) => hit.attribution = Some(att),
1918 Err(e) => {
1919 self.telemetry.increment_errors();
1920 return Err(e);
1921 }
1922 }
1923 }
1924 Ok(())
1925 }
1926
1927 pub fn execute_compiled_grouped_read(
1931 &self,
1932 compiled: &CompiledGroupedQuery,
1933 ) -> Result<GroupedQueryRows, EngineError> {
1934 let root_rows = self.execute_compiled_read(&compiled.root)?;
1935 if root_rows.was_degraded {
1936 return Ok(GroupedQueryRows {
1937 roots: Vec::new(),
1938 expansions: Vec::new(),
1939 was_degraded: true,
1940 });
1941 }
1942
1943 let roots = root_rows.nodes;
1944 let mut expansions = Vec::with_capacity(compiled.expansions.len());
1945 for expansion in &compiled.expansions {
1946 let slot_rows = if roots.is_empty() {
1947 Vec::new()
1948 } else {
1949 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1950 };
1951 expansions.push(ExpansionSlotRows {
1952 slot: expansion.slot.clone(),
1953 roots: slot_rows,
1954 });
1955 }
1956
1957 Ok(GroupedQueryRows {
1958 roots,
1959 expansions,
1960 was_degraded: false,
1961 })
1962 }
1963
1964 fn read_expansion_nodes_chunked(
1970 &self,
1971 roots: &[NodeRow],
1972 expansion: &ExpansionSlot,
1973 hard_limit: usize,
1974 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1975 if roots.len() <= BATCH_CHUNK_SIZE {
1976 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1977 }
1978
1979 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1982 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1983 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1984 per_root
1985 .entry(group.root_logical_id)
1986 .or_default()
1987 .extend(group.nodes);
1988 }
1989 }
1990
1991 Ok(roots
1992 .iter()
1993 .map(|root| ExpansionRootRows {
1994 root_logical_id: root.logical_id.clone(),
1995 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1996 })
1997 .collect())
1998 }
1999
2000 fn read_expansion_nodes_batched(
2005 &self,
2006 roots: &[NodeRow],
2007 expansion: &ExpansionSlot,
2008 hard_limit: usize,
2009 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2010 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2011 let (join_condition, next_logical_id) = match expansion.direction {
2012 fathomdb_query::TraverseDirection::Out => {
2013 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2014 }
2015 fathomdb_query::TraverseDirection::In => {
2016 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2017 }
2018 };
2019
2020 if expansion.filter.as_ref().is_some_and(|f| {
2025 matches!(
2026 f,
2027 Predicate::JsonPathFusedEq { .. } | Predicate::JsonPathFusedTimestampCmp { .. }
2028 )
2029 }) {
2030 self.validate_fused_filter_for_edge_label(&expansion.label)?;
2031 }
2032
2033 let root_seed_union: String = (1..=root_ids.len())
2037 .map(|i| format!("SELECT ?{i}"))
2038 .collect::<Vec<_>>()
2039 .join(" UNION ALL ");
2040
2041 let edge_kind_param = root_ids.len() + 1;
2044 let filter_param_start = root_ids.len() + 2;
2045
2046 let (filter_sql, filter_binds) =
2050 compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2051
2052 let sql = format!(
2056 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2057 traversed(root_id, logical_id, depth, visited, emitted) AS (
2058 SELECT rid, rid, 0, printf(',%s,', rid), 0
2059 FROM root_ids
2060 UNION ALL
2061 SELECT
2062 t.root_id,
2063 {next_logical_id},
2064 t.depth + 1,
2065 t.visited || {next_logical_id} || ',',
2066 t.emitted + 1
2067 FROM traversed t
2068 JOIN edges e ON {join_condition}
2069 AND e.kind = ?{edge_kind_param}
2070 AND e.superseded_at IS NULL
2071 WHERE t.depth < {max_depth}
2072 AND t.emitted < {hard_limit}
2073 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2074 ),
2075 numbered AS (
2076 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2077 , n.content_ref, am.last_accessed_at
2078 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2079 FROM traversed t
2080 JOIN nodes n ON n.logical_id = t.logical_id
2081 AND n.superseded_at IS NULL
2082 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2083 WHERE t.depth > 0{filter_sql}
2084 )
2085 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2086 FROM numbered
2087 WHERE rn <= {hard_limit}
2088 ORDER BY root_id, logical_id",
2089 max_depth = expansion.max_depth,
2090 );
2091
2092 let conn_guard = self.lock_connection()?;
2093 let mut statement = conn_guard
2094 .prepare_cached(&sql)
2095 .map_err(EngineError::Sqlite)?;
2096
2097 let mut bind_values: Vec<Value> = root_ids
2099 .iter()
2100 .map(|id| Value::Text((*id).to_owned()))
2101 .collect();
2102 bind_values.push(Value::Text(expansion.label.clone()));
2103 bind_values.extend(filter_binds);
2104
2105 let rows = statement
2106 .query_map(params_from_iter(bind_values.iter()), |row| {
2107 Ok((
2108 row.get::<_, String>(0)?, NodeRow {
2110 row_id: row.get(1)?,
2111 logical_id: row.get(2)?,
2112 kind: row.get(3)?,
2113 properties: row.get(4)?,
2114 content_ref: row.get(5)?,
2115 last_accessed_at: row.get(6)?,
2116 },
2117 ))
2118 })
2119 .map_err(EngineError::Sqlite)?
2120 .collect::<Result<Vec<_>, _>>()
2121 .map_err(EngineError::Sqlite)?;
2122
2123 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2125 for (root_id, node) in rows {
2126 per_root.entry(root_id).or_default().push(node);
2127 }
2128
2129 let root_groups = roots
2130 .iter()
2131 .map(|root| ExpansionRootRows {
2132 root_logical_id: root.logical_id.clone(),
2133 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2134 })
2135 .collect();
2136
2137 Ok(root_groups)
2138 }
2139
2140 fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2154 let conn = self.lock_connection()?;
2155 let mut stmt = conn
2157 .prepare_cached(
2158 "SELECT DISTINCT n.kind \
2159 FROM edges e \
2160 JOIN nodes n ON n.logical_id = e.target_logical_id \
2161 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2162 )
2163 .map_err(EngineError::Sqlite)?;
2164 let target_kinds: Vec<String> = stmt
2165 .query_map(rusqlite::params![edge_label], |row| row.get(0))
2166 .map_err(EngineError::Sqlite)?
2167 .collect::<Result<Vec<_>, _>>()
2168 .map_err(EngineError::Sqlite)?;
2169
2170 for kind in &target_kinds {
2171 let has_schema: bool = conn
2172 .query_row(
2173 "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2174 rusqlite::params![kind],
2175 |row| row.get(0),
2176 )
2177 .map_err(EngineError::Sqlite)?;
2178 if !has_schema {
2179 return Err(EngineError::InvalidConfig(format!(
2180 "kind {kind:?} has no registered property-FTS schema; register one with \
2181 admin.register_fts_property_schema(..) before using fused filters on \
2182 expansion slots, or use JsonPathEq for non-fused semantics \
2183 (expand slot uses edge label {edge_label:?})"
2184 )));
2185 }
2186 }
2187 Ok(())
2188 }
2189
2190 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2196 let conn = self.lock_connection()?;
2197 conn.query_row(
2198 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2199 rusqlite::params![id],
2200 |row| {
2201 Ok(RunRow {
2202 id: row.get(0)?,
2203 kind: row.get(1)?,
2204 status: row.get(2)?,
2205 properties: row.get(3)?,
2206 })
2207 },
2208 )
2209 .optional()
2210 .map_err(EngineError::Sqlite)
2211 }
2212
2213 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2219 let conn = self.lock_connection()?;
2220 conn.query_row(
2221 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2222 rusqlite::params![id],
2223 |row| {
2224 Ok(StepRow {
2225 id: row.get(0)?,
2226 run_id: row.get(1)?,
2227 kind: row.get(2)?,
2228 status: row.get(3)?,
2229 properties: row.get(4)?,
2230 })
2231 },
2232 )
2233 .optional()
2234 .map_err(EngineError::Sqlite)
2235 }
2236
2237 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2243 let conn = self.lock_connection()?;
2244 conn.query_row(
2245 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2246 rusqlite::params![id],
2247 |row| {
2248 Ok(ActionRow {
2249 id: row.get(0)?,
2250 step_id: row.get(1)?,
2251 kind: row.get(2)?,
2252 status: row.get(3)?,
2253 properties: row.get(4)?,
2254 })
2255 },
2256 )
2257 .optional()
2258 .map_err(EngineError::Sqlite)
2259 }
2260
2261 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2267 let conn = self.lock_connection()?;
2268 let mut stmt = conn
2269 .prepare_cached(
2270 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2271 )
2272 .map_err(EngineError::Sqlite)?;
2273 let rows = stmt
2274 .query_map([], |row| {
2275 Ok(RunRow {
2276 id: row.get(0)?,
2277 kind: row.get(1)?,
2278 status: row.get(2)?,
2279 properties: row.get(3)?,
2280 })
2281 })
2282 .map_err(EngineError::Sqlite)?
2283 .collect::<Result<Vec<_>, _>>()
2284 .map_err(EngineError::Sqlite)?;
2285 Ok(rows)
2286 }
2287
2288 #[must_use]
2298 #[allow(clippy::expect_used)]
2299 pub fn shape_sql_count(&self) -> usize {
2300 self.shape_sql_map
2301 .lock()
2302 .unwrap_or_else(PoisonError::into_inner)
2303 .len()
2304 }
2305
2306 #[must_use]
2308 pub fn schema_manager(&self) -> Arc<SchemaManager> {
2309 Arc::clone(&self.schema_manager)
2310 }
2311
2312 #[must_use]
2321 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2322 let cache_hit = self
2323 .shape_sql_map
2324 .lock()
2325 .unwrap_or_else(PoisonError::into_inner)
2326 .contains_key(&compiled.shape_hash);
2327 QueryPlan {
2328 sql: wrap_node_row_projection_sql(&compiled.sql),
2329 bind_count: compiled.binds.len(),
2330 driving_table: compiled.driving_table,
2331 shape_hash: compiled.shape_hash,
2332 cache_hit,
2333 }
2334 }
2335
2336 #[doc(hidden)]
2343 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2344 let conn = self.lock_connection()?;
2345 let result = conn
2346 .query_row(&format!("PRAGMA {name}"), [], |row| {
2347 row.get::<_, rusqlite::types::Value>(0)
2349 })
2350 .map_err(EngineError::Sqlite)?;
2351 let s = match result {
2352 rusqlite::types::Value::Text(t) => t,
2353 rusqlite::types::Value::Integer(i) => i.to_string(),
2354 rusqlite::types::Value::Real(f) => f.to_string(),
2355 rusqlite::types::Value::Blob(_) => {
2356 return Err(EngineError::InvalidWrite(format!(
2357 "PRAGMA {name} returned an unexpected BLOB value"
2358 )));
2359 }
2360 rusqlite::types::Value::Null => String::new(),
2361 };
2362 Ok(s)
2363 }
2364
2365 pub fn query_provenance_events(
2374 &self,
2375 subject: &str,
2376 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2377 let conn = self.lock_connection()?;
2378 let mut stmt = conn
2379 .prepare_cached(
2380 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2381 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2382 )
2383 .map_err(EngineError::Sqlite)?;
2384 let events = stmt
2385 .query_map(rusqlite::params![subject], |row| {
2386 Ok(ProvenanceEvent {
2387 id: row.get(0)?,
2388 event_type: row.get(1)?,
2389 subject: row.get(2)?,
2390 source_ref: row.get(3)?,
2391 metadata_json: row.get(4)?,
2392 created_at: row.get(5)?,
2393 })
2394 })
2395 .map_err(EngineError::Sqlite)?
2396 .collect::<Result<Vec<_>, _>>()
2397 .map_err(EngineError::Sqlite)?;
2398 Ok(events)
2399 }
2400
2401 fn scan_fallback_if_first_registration(
2407 &self,
2408 kind: &str,
2409 ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2410 let conn = self.lock_connection()?;
2411
2412 let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2415 let table_exists: bool = conn
2417 .query_row(
2418 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2419 rusqlite::params![prop_table],
2420 |_| Ok(true),
2421 )
2422 .optional()?
2423 .unwrap_or(false);
2424 let prop_empty = if table_exists {
2425 let cnt: i64 =
2426 conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2427 r.get(0)
2428 })?;
2429 cnt == 0
2430 } else {
2431 true
2432 };
2433 let needs_scan: bool = if prop_empty {
2434 conn.query_row(
2435 "SELECT 1 FROM fts_property_rebuild_state \
2436 WHERE kind = ?1 AND is_first_registration = 1 \
2437 AND state IN ('PENDING','BUILDING','SWAPPING') \
2438 LIMIT 1",
2439 rusqlite::params![kind],
2440 |_| Ok(true),
2441 )
2442 .optional()?
2443 .unwrap_or(false)
2444 } else {
2445 false
2446 };
2447
2448 if !needs_scan {
2449 return Ok(None);
2450 }
2451
2452 let mut stmt = conn
2455 .prepare_cached(
2456 "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2457 am.last_accessed_at \
2458 FROM nodes n \
2459 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2460 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2461 )
2462 .map_err(EngineError::Sqlite)?;
2463
2464 let nodes = stmt
2465 .query_map(rusqlite::params![kind], |row| {
2466 Ok(NodeRow {
2467 row_id: row.get(0)?,
2468 logical_id: row.get(1)?,
2469 kind: row.get(2)?,
2470 properties: row.get(3)?,
2471 content_ref: row.get(4)?,
2472 last_accessed_at: row.get(5)?,
2473 })
2474 })
2475 .map_err(EngineError::Sqlite)?
2476 .collect::<Result<Vec<_>, _>>()
2477 .map_err(EngineError::Sqlite)?;
2478
2479 Ok(Some(nodes))
2480 }
2481
2482 pub fn get_property_fts_rebuild_progress(
2488 &self,
2489 kind: &str,
2490 ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2491 let conn = self.lock_connection()?;
2492 let row = conn
2493 .query_row(
2494 "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2495 FROM fts_property_rebuild_state WHERE kind = ?1",
2496 rusqlite::params![kind],
2497 |r| {
2498 Ok(crate::rebuild_actor::RebuildProgress {
2499 state: r.get(0)?,
2500 rows_total: r.get(1)?,
2501 rows_done: r.get(2)?,
2502 started_at: r.get(3)?,
2503 last_progress_at: r.get(4)?,
2504 error_message: r.get(5)?,
2505 })
2506 },
2507 )
2508 .optional()?;
2509 Ok(row)
2510 }
2511}
2512
2513fn adapt_fts_nodes_sql_for_per_kind_tables(
2519 compiled: &CompiledQuery,
2520 conn: &rusqlite::Connection,
2521) -> Result<(String, Vec<BindValue>), EngineError> {
2522 let root_kind = compiled
2523 .binds
2524 .get(1)
2525 .and_then(|b| {
2526 if let BindValue::Text(k) = b {
2527 Some(k.as_str())
2528 } else {
2529 None
2530 }
2531 })
2532 .unwrap_or("");
2533 let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
2534 let prop_table_exists: bool = conn
2535 .query_row(
2536 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2537 rusqlite::params![prop_table],
2538 |_| Ok(true),
2539 )
2540 .optional()
2541 .map_err(EngineError::Sqlite)?
2542 .unwrap_or(false);
2543
2544 let (new_sql, removed_bind_positions) = if prop_table_exists {
2549 let s = compiled
2550 .sql
2551 .replace("fts_node_properties", &prop_table)
2552 .replace("\n AND fp.kind = ?4", "");
2553 (renumber_sql_params(&s, &[4]), vec![3usize])
2554 } else {
2555 let s = strip_prop_fts_union_arm(&compiled.sql);
2556 (renumber_sql_params(&s, &[3, 4]), vec![2usize, 3])
2557 };
2558
2559 let new_binds: Vec<BindValue> = compiled
2560 .binds
2561 .iter()
2562 .enumerate()
2563 .filter(|(i, _)| !removed_bind_positions.contains(i))
2564 .map(|(_, b)| b.clone())
2565 .collect();
2566
2567 Ok((new_sql, new_binds))
2568}
2569
2570#[allow(clippy::unnecessary_wraps)]
2576fn check_vec_identity_at_open(
2577 conn: &rusqlite::Connection,
2578 embedder: &dyn QueryEmbedder,
2579) -> Result<(), EngineError> {
2580 let row: Option<String> = conn
2581 .query_row(
2582 "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
2583 [],
2584 |row| row.get(0),
2585 )
2586 .optional()
2587 .unwrap_or(None);
2588
2589 let Some(config_json) = row else {
2590 return Ok(());
2591 };
2592
2593 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
2595 return Ok(());
2596 };
2597
2598 let identity = embedder.identity();
2599
2600 if let Some(stored_model) = parsed
2601 .get("model_identity")
2602 .and_then(serde_json::Value::as_str)
2603 && stored_model != identity.model_identity
2604 {
2605 trace_warn!(
2606 stored_model_identity = stored_model,
2607 embedder_model_identity = %identity.model_identity,
2608 "vec identity mismatch at open: model_identity differs"
2609 );
2610 }
2611
2612 if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
2613 let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
2614 if stored_dim != identity.dimension {
2615 trace_warn!(
2616 stored_dimensions = stored_dim,
2617 embedder_dimensions = identity.dimension,
2618 "vec identity mismatch at open: dimensions differ"
2619 );
2620 }
2621 }
2622
2623 Ok(())
2624}
2625
2626fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
2638 let schema_count: i64 = conn
2639 .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
2640 row.get(0)
2641 })
2642 .map_err(EngineError::Sqlite)?;
2643 if schema_count == 0 {
2644 return Ok(());
2645 }
2646
2647 let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
2648 let needs_position_backfill = if needs_fts_rebuild {
2649 false
2650 } else {
2651 open_guard_check_positions_empty(conn)?
2652 };
2653
2654 if needs_fts_rebuild || needs_position_backfill {
2655 let per_kind_tables: Vec<String> = {
2656 let mut stmt = conn
2657 .prepare(
2658 "SELECT name FROM sqlite_master \
2659 WHERE type='table' AND name LIKE 'fts_props_%' \
2660 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2661 )
2662 .map_err(EngineError::Sqlite)?;
2663 stmt.query_map([], |r| r.get::<_, String>(0))
2664 .map_err(EngineError::Sqlite)?
2665 .collect::<Result<Vec<_>, _>>()
2666 .map_err(EngineError::Sqlite)?
2667 };
2668 let tx = conn
2669 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
2670 .map_err(EngineError::Sqlite)?;
2671 for table in &per_kind_tables {
2672 tx.execute_batch(&format!("DELETE FROM {table}"))
2673 .map_err(EngineError::Sqlite)?;
2674 }
2675 tx.execute("DELETE FROM fts_node_property_positions", [])
2676 .map_err(EngineError::Sqlite)?;
2677 crate::projection::insert_property_fts_rows(
2678 &tx,
2679 "SELECT logical_id, properties FROM nodes \
2680 WHERE kind = ?1 AND superseded_at IS NULL",
2681 )
2682 .map_err(EngineError::Sqlite)?;
2683 tx.commit().map_err(EngineError::Sqlite)?;
2684 }
2685 Ok(())
2686}
2687
2688fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2689 let kinds: Vec<String> = {
2690 let mut stmt = conn
2691 .prepare("SELECT kind FROM fts_property_schemas")
2692 .map_err(EngineError::Sqlite)?;
2693 stmt.query_map([], |row| row.get::<_, String>(0))
2694 .map_err(EngineError::Sqlite)?
2695 .collect::<Result<Vec<_>, _>>()
2696 .map_err(EngineError::Sqlite)?
2697 };
2698 for kind in &kinds {
2699 let table = fathomdb_schema::fts_kind_table_name(kind);
2700 let table_exists: bool = conn
2701 .query_row(
2702 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2703 rusqlite::params![table],
2704 |_| Ok(true),
2705 )
2706 .optional()
2707 .map_err(EngineError::Sqlite)?
2708 .unwrap_or(false);
2709 let fts_count: i64 = if table_exists {
2710 conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
2711 row.get(0)
2712 })
2713 .map_err(EngineError::Sqlite)?
2714 } else {
2715 0
2716 };
2717 if fts_count == 0 {
2718 let node_count: i64 = conn
2719 .query_row(
2720 "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
2721 rusqlite::params![kind],
2722 |row| row.get(0),
2723 )
2724 .map_err(EngineError::Sqlite)?;
2725 if node_count > 0 {
2726 return Ok(true);
2727 }
2728 }
2729 }
2730 Ok(false)
2731}
2732
2733fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2734 let recursive_count: i64 = conn
2735 .query_row(
2736 "SELECT COUNT(*) FROM fts_property_schemas \
2737 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
2738 [],
2739 |row| row.get(0),
2740 )
2741 .map_err(EngineError::Sqlite)?;
2742 if recursive_count == 0 {
2743 return Ok(false);
2744 }
2745 let pos_count: i64 = conn
2746 .query_row(
2747 "SELECT COUNT(*) FROM fts_node_property_positions",
2748 [],
2749 |row| row.get(0),
2750 )
2751 .map_err(EngineError::Sqlite)?;
2752 Ok(pos_count == 0)
2753}
2754
2755fn renumber_sql_params(sql: &str, removed: &[usize]) -> String {
2766 let mut result = String::with_capacity(sql.len());
2769 let bytes = sql.as_bytes();
2770 let mut i = 0;
2771 while i < bytes.len() {
2772 if bytes[i] == b'?' {
2773 let num_start = i + 1;
2775 let mut j = num_start;
2776 while j < bytes.len() && bytes[j].is_ascii_digit() {
2777 j += 1;
2778 }
2779 if j > num_start {
2780 let num_str = &sql[num_start..j];
2782 if let Ok(n) = num_str.parse::<usize>() {
2783 let offset = removed.iter().filter(|&&r| r < n).count();
2785 result.push('?');
2786 result.push_str(&(n - offset).to_string());
2787 i = j;
2788 continue;
2789 }
2790 }
2791 }
2792 result.push(bytes[i] as char);
2793 i += 1;
2794 }
2795 result
2796}
2797
2798fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2799 format!(
2800 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2801 FROM ({base_sql}) q \
2802 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2803 )
2804}
2805
2806fn strip_prop_fts_union_arm(sql: &str) -> String {
2826 let union_marker =
2832 " UNION\n SELECT fp.node_logical_id";
2833 if let Some(start) = sql.find(union_marker) {
2834 let end_marker = "\n ) u";
2836 if let Some(rel_end) = sql[start..].find(end_marker) {
2837 let end = start + rel_end;
2838 return format!("{}{}", &sql[..start], &sql[end..]);
2840 }
2841 }
2842 sql.to_owned()
2844}
2845
2846pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2854 match err {
2855 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2856 (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
2858 || msg.contains("no such module: vec0")
2859 }
2860 _ => false,
2861 }
2862}
2863
2864fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2865 match value {
2866 ScalarValue::Text(text) => BindValue::Text(text.clone()),
2867 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2868 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2869 }
2870}
2871
2872fn merge_search_branches(
2890 strict: Vec<SearchHit>,
2891 relaxed: Vec<SearchHit>,
2892 limit: usize,
2893) -> Vec<SearchHit> {
2894 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2895}
2896
2897fn merge_search_branches_three(
2909 strict: Vec<SearchHit>,
2910 relaxed: Vec<SearchHit>,
2911 vector: Vec<SearchHit>,
2912 limit: usize,
2913) -> Vec<SearchHit> {
2914 let strict_block = dedup_branch_hits(strict);
2915 let relaxed_block = dedup_branch_hits(relaxed);
2916 let vector_block = dedup_branch_hits(vector);
2917
2918 let mut seen: std::collections::HashSet<String> = strict_block
2919 .iter()
2920 .map(|h| h.node.logical_id.clone())
2921 .collect();
2922
2923 let mut merged = strict_block;
2924 for hit in relaxed_block {
2925 if seen.insert(hit.node.logical_id.clone()) {
2926 merged.push(hit);
2927 }
2928 }
2929 for hit in vector_block {
2930 if seen.insert(hit.node.logical_id.clone()) {
2931 merged.push(hit);
2932 }
2933 }
2934
2935 if merged.len() > limit {
2936 merged.truncate(limit);
2937 }
2938 merged
2939}
2940
2941fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2945 hits.sort_by(|a, b| {
2946 b.score
2947 .partial_cmp(&a.score)
2948 .unwrap_or(std::cmp::Ordering::Equal)
2949 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2950 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2951 });
2952
2953 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2954 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2955 hits
2956}
2957
2958fn source_priority(source: SearchHitSource) -> u8 {
2959 match source {
2962 SearchHitSource::Chunk => 0,
2963 SearchHitSource::Property => 1,
2964 SearchHitSource::Vector => 2,
2965 }
2966}
2967
2968const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2986const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2987
2988fn load_position_map(
2992 conn: &Connection,
2993 logical_id: &str,
2994 kind: &str,
2995) -> Result<Vec<(usize, usize, String)>, EngineError> {
2996 let mut stmt = conn
2997 .prepare_cached(
2998 "SELECT start_offset, end_offset, leaf_path \
2999 FROM fts_node_property_positions \
3000 WHERE node_logical_id = ?1 AND kind = ?2 \
3001 ORDER BY start_offset ASC",
3002 )
3003 .map_err(EngineError::Sqlite)?;
3004 let rows = stmt
3005 .query_map(rusqlite::params![logical_id, kind], |row| {
3006 let start: i64 = row.get(0)?;
3007 let end: i64 = row.get(1)?;
3008 let path: String = row.get(2)?;
3009 let start = usize::try_from(start).unwrap_or(0);
3013 let end = usize::try_from(end).unwrap_or(0);
3014 Ok((start, end, path))
3015 })
3016 .map_err(EngineError::Sqlite)?;
3017 let mut out = Vec::new();
3018 for row in rows {
3019 out.push(row.map_err(EngineError::Sqlite)?);
3020 }
3021 Ok(out)
3022}
3023
3024fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3031 let mut offsets = Vec::new();
3032 let bytes = wrapped.as_bytes();
3033 let open_bytes = open.as_bytes();
3034 let close_bytes = close.as_bytes();
3035 let mut i = 0usize;
3036 let mut marker_bytes_seen = 0usize;
3039 while i < bytes.len() {
3040 if bytes[i..].starts_with(open_bytes) {
3041 let original_offset = i - marker_bytes_seen;
3044 offsets.push(original_offset);
3045 i += open_bytes.len();
3046 marker_bytes_seen += open_bytes.len();
3047 } else if bytes[i..].starts_with(close_bytes) {
3048 i += close_bytes.len();
3049 marker_bytes_seen += close_bytes.len();
3050 } else {
3051 i += 1;
3052 }
3053 }
3054 offsets
3055}
3056
3057fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3060 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3062 Ok(i) => i,
3063 Err(0) => return None,
3064 Err(i) => i - 1,
3065 };
3066 let (start, end, path) = &positions[idx];
3067 if offset >= *start && offset < *end {
3068 Some(path.as_str())
3069 } else {
3070 None
3071 }
3072}
3073
3074fn resolve_hit_attribution(
3083 conn: &Connection,
3084 hit: &SearchHit,
3085 match_expr: &str,
3086) -> Result<HitAttribution, EngineError> {
3087 if !matches!(hit.source, SearchHitSource::Property) {
3088 return Ok(HitAttribution {
3089 matched_paths: Vec::new(),
3090 });
3091 }
3092 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3093 return Ok(HitAttribution {
3094 matched_paths: Vec::new(),
3095 });
3096 };
3097 let rowid: i64 = match rowid_str.parse() {
3098 Ok(v) => v,
3099 Err(_) => {
3100 return Ok(HitAttribution {
3101 matched_paths: Vec::new(),
3102 });
3103 }
3104 };
3105
3106 let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3112 let highlight_sql = format!(
3113 "SELECT highlight({prop_table}, 1, ?1, ?2) \
3114 FROM {prop_table} \
3115 WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3116 );
3117 let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3118 let wrapped: Option<String> = stmt
3119 .query_row(
3120 rusqlite::params![
3121 ATTRIBUTION_HIGHLIGHT_OPEN,
3122 ATTRIBUTION_HIGHLIGHT_CLOSE,
3123 rowid,
3124 match_expr,
3125 ],
3126 |row| row.get(0),
3127 )
3128 .optional()
3129 .map_err(EngineError::Sqlite)?;
3130 let Some(wrapped) = wrapped else {
3131 return Ok(HitAttribution {
3132 matched_paths: Vec::new(),
3133 });
3134 };
3135
3136 let offsets = parse_highlight_offsets(
3137 &wrapped,
3138 ATTRIBUTION_HIGHLIGHT_OPEN,
3139 ATTRIBUTION_HIGHLIGHT_CLOSE,
3140 );
3141 if offsets.is_empty() {
3142 return Ok(HitAttribution {
3143 matched_paths: Vec::new(),
3144 });
3145 }
3146
3147 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3148 if positions.is_empty() {
3149 return Ok(HitAttribution {
3152 matched_paths: Vec::new(),
3153 });
3154 }
3155
3156 let mut matched_paths: Vec<String> = Vec::new();
3157 for offset in offsets {
3158 if let Some(path) = find_leaf_for_offset(&positions, offset)
3159 && !matched_paths.iter().any(|p| p == path)
3160 {
3161 matched_paths.push(path.to_owned());
3162 }
3163 }
3164 Ok(HitAttribution { matched_paths })
3165}
3166
3167fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3174 let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3175 let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3176 if !any_weighted {
3177 return format!("bm25({table})");
3178 }
3179 let weights: Vec<String> = std::iter::once("0.0".to_owned())
3181 .chain(
3182 schema
3183 .paths
3184 .iter()
3185 .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3186 )
3187 .collect();
3188 format!("bm25({table}, {})", weights.join(", "))
3189}
3190
3191fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3192 match value {
3193 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3194 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3195 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3196 }
3197}
3198
3199#[cfg(test)]
3200#[allow(clippy::expect_used)]
3201mod tests {
3202 use std::panic::{AssertUnwindSafe, catch_unwind};
3203 use std::sync::Arc;
3204
3205 use fathomdb_query::{BindValue, QueryBuilder};
3206 use fathomdb_schema::SchemaManager;
3207 use rusqlite::types::Value;
3208 use tempfile::NamedTempFile;
3209
3210 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3211
3212 use fathomdb_query::{
3213 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3214 };
3215
3216 use super::{
3217 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3218 wrap_node_row_projection_sql,
3219 };
3220
3221 fn mk_hit(
3222 logical_id: &str,
3223 score: f64,
3224 match_mode: SearchMatchMode,
3225 source: SearchHitSource,
3226 ) -> SearchHit {
3227 SearchHit {
3228 node: NodeRowLite {
3229 row_id: format!("{logical_id}-row"),
3230 logical_id: logical_id.to_owned(),
3231 kind: "Goal".to_owned(),
3232 properties: "{}".to_owned(),
3233 content_ref: None,
3234 last_accessed_at: None,
3235 },
3236 score,
3237 modality: RetrievalModality::Text,
3238 source,
3239 match_mode: Some(match_mode),
3240 snippet: None,
3241 written_at: 0,
3242 projection_row_id: None,
3243 vector_distance: None,
3244 attribution: None,
3245 }
3246 }
3247
3248 #[test]
3249 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3250 let strict = vec![mk_hit(
3251 "a",
3252 1.0,
3253 SearchMatchMode::Strict,
3254 SearchHitSource::Chunk,
3255 )];
3256 let relaxed = vec![mk_hit(
3258 "b",
3259 9.9,
3260 SearchMatchMode::Relaxed,
3261 SearchHitSource::Chunk,
3262 )];
3263 let merged = merge_search_branches(strict, relaxed, 10);
3264 assert_eq!(merged.len(), 2);
3265 assert_eq!(merged[0].node.logical_id, "a");
3266 assert!(matches!(
3267 merged[0].match_mode,
3268 Some(SearchMatchMode::Strict)
3269 ));
3270 assert_eq!(merged[1].node.logical_id, "b");
3271 assert!(matches!(
3272 merged[1].match_mode,
3273 Some(SearchMatchMode::Relaxed)
3274 ));
3275 }
3276
3277 #[test]
3278 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3279 let strict = vec![mk_hit(
3280 "shared",
3281 1.0,
3282 SearchMatchMode::Strict,
3283 SearchHitSource::Chunk,
3284 )];
3285 let relaxed = vec![
3286 mk_hit(
3287 "shared",
3288 9.9,
3289 SearchMatchMode::Relaxed,
3290 SearchHitSource::Chunk,
3291 ),
3292 mk_hit(
3293 "other",
3294 2.0,
3295 SearchMatchMode::Relaxed,
3296 SearchHitSource::Chunk,
3297 ),
3298 ];
3299 let merged = merge_search_branches(strict, relaxed, 10);
3300 assert_eq!(merged.len(), 2);
3301 assert_eq!(merged[0].node.logical_id, "shared");
3302 assert!(matches!(
3303 merged[0].match_mode,
3304 Some(SearchMatchMode::Strict)
3305 ));
3306 assert_eq!(merged[1].node.logical_id, "other");
3307 assert!(matches!(
3308 merged[1].match_mode,
3309 Some(SearchMatchMode::Relaxed)
3310 ));
3311 }
3312
3313 #[test]
3314 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3315 let strict = vec![
3316 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3317 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3318 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3319 ];
3320 let merged = merge_search_branches(strict, vec![], 10);
3321 assert_eq!(
3322 merged
3323 .iter()
3324 .map(|h| &h.node.logical_id)
3325 .collect::<Vec<_>>(),
3326 vec!["a", "c", "b"]
3327 );
3328 }
3329
3330 #[test]
3331 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3332 let strict = vec![
3333 mk_hit(
3334 "shared",
3335 1.0,
3336 SearchMatchMode::Strict,
3337 SearchHitSource::Property,
3338 ),
3339 mk_hit(
3340 "shared",
3341 1.0,
3342 SearchMatchMode::Strict,
3343 SearchHitSource::Chunk,
3344 ),
3345 ];
3346 let merged = merge_search_branches(strict, vec![], 10);
3347 assert_eq!(merged.len(), 1);
3348 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3349 }
3350
3351 #[test]
3352 fn merge_truncates_to_limit_after_block_merge() {
3353 let strict = vec![
3354 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3355 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3356 ];
3357 let relaxed = vec![mk_hit(
3358 "c",
3359 9.0,
3360 SearchMatchMode::Relaxed,
3361 SearchHitSource::Chunk,
3362 )];
3363 let merged = merge_search_branches(strict, relaxed, 2);
3364 assert_eq!(merged.len(), 2);
3365 assert_eq!(merged[0].node.logical_id, "a");
3366 assert_eq!(merged[1].node.logical_id, "b");
3367 }
3368
3369 #[test]
3378 fn search_architecturally_supports_three_branch_fusion() {
3379 let strict = vec![mk_hit(
3380 "alpha",
3381 1.0,
3382 SearchMatchMode::Strict,
3383 SearchHitSource::Chunk,
3384 )];
3385 let relaxed = vec![mk_hit(
3386 "bravo",
3387 5.0,
3388 SearchMatchMode::Relaxed,
3389 SearchHitSource::Chunk,
3390 )];
3391 let mut vector_hit = mk_hit(
3394 "charlie",
3395 9.9,
3396 SearchMatchMode::Strict,
3397 SearchHitSource::Vector,
3398 );
3399 vector_hit.match_mode = None;
3403 vector_hit.modality = RetrievalModality::Vector;
3404 let vector = vec![vector_hit];
3405
3406 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3407 assert_eq!(merged.len(), 3);
3408 assert_eq!(merged[0].node.logical_id, "alpha");
3409 assert_eq!(merged[1].node.logical_id, "bravo");
3410 assert_eq!(merged[2].node.logical_id, "charlie");
3411 assert!(matches!(merged[2].source, SearchHitSource::Vector));
3413
3414 let strict2 = vec![mk_hit(
3417 "shared",
3418 0.5,
3419 SearchMatchMode::Strict,
3420 SearchHitSource::Chunk,
3421 )];
3422 let relaxed2 = vec![mk_hit(
3423 "shared",
3424 5.0,
3425 SearchMatchMode::Relaxed,
3426 SearchHitSource::Chunk,
3427 )];
3428 let mut vshared = mk_hit(
3429 "shared",
3430 9.9,
3431 SearchMatchMode::Strict,
3432 SearchHitSource::Vector,
3433 );
3434 vshared.match_mode = None;
3435 vshared.modality = RetrievalModality::Vector;
3436 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3437 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3438 assert!(matches!(
3439 merged2[0].match_mode,
3440 Some(SearchMatchMode::Strict)
3441 ));
3442 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3443
3444 let mut vshared2 = mk_hit(
3446 "shared",
3447 9.9,
3448 SearchMatchMode::Strict,
3449 SearchHitSource::Vector,
3450 );
3451 vshared2.match_mode = None;
3452 vshared2.modality = RetrievalModality::Vector;
3453 let merged3 = merge_search_branches_three(
3454 vec![],
3455 vec![mk_hit(
3456 "shared",
3457 1.0,
3458 SearchMatchMode::Relaxed,
3459 SearchHitSource::Chunk,
3460 )],
3461 vec![vshared2],
3462 10,
3463 );
3464 assert_eq!(merged3.len(), 1);
3465 assert!(matches!(
3466 merged3[0].match_mode,
3467 Some(SearchMatchMode::Relaxed)
3468 ));
3469 }
3470
3471 #[test]
3485 fn merge_search_branches_three_vector_only_preserves_vector_block() {
3486 let mut vector_hit = mk_hit(
3487 "solo",
3488 0.75,
3489 SearchMatchMode::Strict,
3490 SearchHitSource::Vector,
3491 );
3492 vector_hit.match_mode = None;
3493 vector_hit.modality = RetrievalModality::Vector;
3494
3495 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3496
3497 assert_eq!(merged.len(), 1);
3498 assert_eq!(merged[0].node.logical_id, "solo");
3499 assert!(matches!(merged[0].source, SearchHitSource::Vector));
3500 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3501 assert!(
3502 merged[0].match_mode.is_none(),
3503 "vector hits carry match_mode=None per addendum 1"
3504 );
3505 }
3506
3507 #[test]
3519 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3520 let strict = vec![
3521 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3522 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3523 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3524 ];
3525 let relaxed = vec![mk_hit(
3526 "d",
3527 9.0,
3528 SearchMatchMode::Relaxed,
3529 SearchHitSource::Chunk,
3530 )];
3531 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3532 vector_hit.match_mode = None;
3533 vector_hit.modality = RetrievalModality::Vector;
3534 let vector = vec![vector_hit];
3535
3536 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3537
3538 assert_eq!(merged.len(), 2);
3539 assert_eq!(merged[0].node.logical_id, "a");
3540 assert_eq!(merged[1].node.logical_id, "b");
3541 assert!(
3543 merged
3544 .iter()
3545 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3546 "strict block must win limit contention against higher-scored relaxed/vector hits"
3547 );
3548 assert!(
3549 merged
3550 .iter()
3551 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3552 "no vector source hits should leak past the limit"
3553 );
3554 }
3555
3556 #[test]
3557 fn is_vec_table_absent_matches_known_error_messages() {
3558 use rusqlite::ffi;
3559 fn make_err(msg: &str) -> rusqlite::Error {
3560 rusqlite::Error::SqliteFailure(
3561 ffi::Error {
3562 code: ffi::ErrorCode::Unknown,
3563 extended_code: 1,
3564 },
3565 Some(msg.to_owned()),
3566 )
3567 }
3568 assert!(is_vec_table_absent(&make_err(
3569 "no such table: vec_nodes_active"
3570 )));
3571 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3572 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3573 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3574 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3575 }
3576
3577 #[test]
3580 fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
3581 let db = NamedTempFile::new().expect("temporary db");
3584 let coordinator = ExecutionCoordinator::open(
3585 db.path(),
3586 Arc::new(SchemaManager::new()),
3587 None,
3588 1,
3589 Arc::new(TelemetryCounters::default()),
3590 None,
3591 )
3592 .expect("coordinator");
3593
3594 let compiled = QueryBuilder::nodes("MyKind")
3595 .vector_search("some query", 5)
3596 .compile()
3597 .expect("vector query compiles");
3598
3599 let rows = coordinator
3600 .execute_compiled_read(&compiled)
3601 .expect("degraded read must succeed");
3602 assert!(
3603 rows.was_degraded,
3604 "must degrade when vec_mykind table does not exist"
3605 );
3606 assert!(
3607 rows.nodes.is_empty(),
3608 "degraded result must return empty nodes"
3609 );
3610 }
3611
3612 #[test]
3613 fn bind_value_text_maps_to_sql_text() {
3614 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
3615 assert_eq!(val, Value::Text("hello".to_owned()));
3616 }
3617
3618 #[test]
3619 fn bind_value_integer_maps_to_sql_integer() {
3620 let val = bind_value_to_sql(&BindValue::Integer(42));
3621 assert_eq!(val, Value::Integer(42));
3622 }
3623
3624 #[test]
3625 fn bind_value_bool_true_maps_to_integer_one() {
3626 let val = bind_value_to_sql(&BindValue::Bool(true));
3627 assert_eq!(val, Value::Integer(1));
3628 }
3629
3630 #[test]
3631 fn bind_value_bool_false_maps_to_integer_zero() {
3632 let val = bind_value_to_sql(&BindValue::Bool(false));
3633 assert_eq!(val, Value::Integer(0));
3634 }
3635
3636 #[test]
3637 fn same_shape_queries_share_one_cache_entry() {
3638 let db = NamedTempFile::new().expect("temporary db");
3639 let coordinator = ExecutionCoordinator::open(
3640 db.path(),
3641 Arc::new(SchemaManager::new()),
3642 None,
3643 1,
3644 Arc::new(TelemetryCounters::default()),
3645 None,
3646 )
3647 .expect("coordinator");
3648
3649 let compiled_a = QueryBuilder::nodes("Meeting")
3650 .text_search("budget", 5)
3651 .limit(10)
3652 .compile()
3653 .expect("compiled a");
3654 let compiled_b = QueryBuilder::nodes("Meeting")
3655 .text_search("standup", 5)
3656 .limit(10)
3657 .compile()
3658 .expect("compiled b");
3659
3660 coordinator
3661 .execute_compiled_read(&compiled_a)
3662 .expect("read a");
3663 coordinator
3664 .execute_compiled_read(&compiled_b)
3665 .expect("read b");
3666
3667 assert_eq!(
3668 compiled_a.shape_hash, compiled_b.shape_hash,
3669 "different bind values, same structural shape → same hash"
3670 );
3671 assert_eq!(coordinator.shape_sql_count(), 1);
3672 }
3673
3674 #[test]
3675 fn vector_read_degrades_gracefully_when_vec_table_absent() {
3676 let db = NamedTempFile::new().expect("temporary db");
3677 let coordinator = ExecutionCoordinator::open(
3678 db.path(),
3679 Arc::new(SchemaManager::new()),
3680 None,
3681 1,
3682 Arc::new(TelemetryCounters::default()),
3683 None,
3684 )
3685 .expect("coordinator");
3686
3687 let compiled = QueryBuilder::nodes("Meeting")
3688 .vector_search("budget embeddings", 5)
3689 .compile()
3690 .expect("vector query compiles");
3691
3692 let result = coordinator.execute_compiled_read(&compiled);
3693 let rows = result.expect("degraded read must succeed, not error");
3694 assert!(
3695 rows.was_degraded,
3696 "result must be flagged as degraded when vec_nodes_active is absent"
3697 );
3698 assert!(
3699 rows.nodes.is_empty(),
3700 "degraded result must return empty nodes"
3701 );
3702 }
3703
3704 #[test]
3705 fn coordinator_caches_by_shape_hash() {
3706 let db = NamedTempFile::new().expect("temporary db");
3707 let coordinator = ExecutionCoordinator::open(
3708 db.path(),
3709 Arc::new(SchemaManager::new()),
3710 None,
3711 1,
3712 Arc::new(TelemetryCounters::default()),
3713 None,
3714 )
3715 .expect("coordinator");
3716
3717 let compiled = QueryBuilder::nodes("Meeting")
3718 .text_search("budget", 5)
3719 .compile()
3720 .expect("compiled query");
3721
3722 coordinator
3723 .execute_compiled_read(&compiled)
3724 .expect("execute compiled read");
3725 assert_eq!(coordinator.shape_sql_count(), 1);
3726 }
3727
3728 #[test]
3731 fn explain_returns_correct_sql() {
3732 let db = NamedTempFile::new().expect("temporary db");
3733 let coordinator = ExecutionCoordinator::open(
3734 db.path(),
3735 Arc::new(SchemaManager::new()),
3736 None,
3737 1,
3738 Arc::new(TelemetryCounters::default()),
3739 None,
3740 )
3741 .expect("coordinator");
3742
3743 let compiled = QueryBuilder::nodes("Meeting")
3744 .text_search("budget", 5)
3745 .compile()
3746 .expect("compiled query");
3747
3748 let plan = coordinator.explain_compiled_read(&compiled);
3749
3750 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3751 }
3752
3753 #[test]
3754 fn explain_returns_correct_driving_table() {
3755 use fathomdb_query::DrivingTable;
3756
3757 let db = NamedTempFile::new().expect("temporary db");
3758 let coordinator = ExecutionCoordinator::open(
3759 db.path(),
3760 Arc::new(SchemaManager::new()),
3761 None,
3762 1,
3763 Arc::new(TelemetryCounters::default()),
3764 None,
3765 )
3766 .expect("coordinator");
3767
3768 let compiled = QueryBuilder::nodes("Meeting")
3769 .text_search("budget", 5)
3770 .compile()
3771 .expect("compiled query");
3772
3773 let plan = coordinator.explain_compiled_read(&compiled);
3774
3775 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3776 }
3777
3778 #[test]
3779 fn explain_reports_cache_miss_then_hit() {
3780 let db = NamedTempFile::new().expect("temporary db");
3781 let coordinator = ExecutionCoordinator::open(
3782 db.path(),
3783 Arc::new(SchemaManager::new()),
3784 None,
3785 1,
3786 Arc::new(TelemetryCounters::default()),
3787 None,
3788 )
3789 .expect("coordinator");
3790
3791 let compiled = QueryBuilder::nodes("Meeting")
3792 .text_search("budget", 5)
3793 .compile()
3794 .expect("compiled query");
3795
3796 let plan_before = coordinator.explain_compiled_read(&compiled);
3798 assert!(
3799 !plan_before.cache_hit,
3800 "cache miss expected before first execute"
3801 );
3802
3803 coordinator
3805 .execute_compiled_read(&compiled)
3806 .expect("execute read");
3807
3808 let plan_after = coordinator.explain_compiled_read(&compiled);
3810 assert!(
3811 plan_after.cache_hit,
3812 "cache hit expected after first execute"
3813 );
3814 }
3815
3816 #[test]
3817 fn explain_does_not_execute_query() {
3818 let db = NamedTempFile::new().expect("temporary db");
3823 let coordinator = ExecutionCoordinator::open(
3824 db.path(),
3825 Arc::new(SchemaManager::new()),
3826 None,
3827 1,
3828 Arc::new(TelemetryCounters::default()),
3829 None,
3830 )
3831 .expect("coordinator");
3832
3833 let compiled = QueryBuilder::nodes("Meeting")
3834 .text_search("anything", 5)
3835 .compile()
3836 .expect("compiled query");
3837
3838 let plan = coordinator.explain_compiled_read(&compiled);
3840
3841 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3842 assert_eq!(plan.bind_count, compiled.binds.len());
3843 }
3844
3845 #[test]
3846 fn coordinator_executes_compiled_read() {
3847 let db = NamedTempFile::new().expect("temporary db");
3848 let coordinator = ExecutionCoordinator::open(
3849 db.path(),
3850 Arc::new(SchemaManager::new()),
3851 None,
3852 1,
3853 Arc::new(TelemetryCounters::default()),
3854 None,
3855 )
3856 .expect("coordinator");
3857 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3858
3859 conn.execute_batch(
3860 r#"
3861 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3862 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3863 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3864 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
3865 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3866 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
3867 "#,
3868 )
3869 .expect("seed data");
3870
3871 let compiled = QueryBuilder::nodes("Meeting")
3872 .text_search("budget", 5)
3873 .limit(5)
3874 .compile()
3875 .expect("compiled query");
3876
3877 let rows = coordinator
3878 .execute_compiled_read(&compiled)
3879 .expect("execute read");
3880
3881 assert_eq!(rows.nodes.len(), 1);
3882 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3883 }
3884
3885 #[test]
3886 fn text_search_finds_structured_only_node_via_property_fts() {
3887 let db = NamedTempFile::new().expect("temporary db");
3888 let coordinator = ExecutionCoordinator::open(
3889 db.path(),
3890 Arc::new(SchemaManager::new()),
3891 None,
3892 1,
3893 Arc::new(TelemetryCounters::default()),
3894 None,
3895 )
3896 .expect("coordinator");
3897 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3898
3899 conn.execute_batch(
3902 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
3903 node_logical_id UNINDEXED, text_content, \
3904 tokenize = 'porter unicode61 remove_diacritics 2'\
3905 )",
3906 )
3907 .expect("create per-kind fts table");
3908 conn.execute_batch(
3909 r#"
3910 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3911 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
3912 INSERT INTO fts_props_goal (node_logical_id, text_content)
3913 VALUES ('goal-1', 'Ship v2');
3914 "#,
3915 )
3916 .expect("seed data");
3917
3918 let compiled = QueryBuilder::nodes("Goal")
3919 .text_search("Ship", 5)
3920 .limit(5)
3921 .compile()
3922 .expect("compiled query");
3923
3924 let rows = coordinator
3925 .execute_compiled_read(&compiled)
3926 .expect("execute read");
3927
3928 assert_eq!(rows.nodes.len(), 1);
3929 assert_eq!(rows.nodes[0].logical_id, "goal-1");
3930 }
3931
3932 #[test]
3933 fn text_search_returns_both_chunk_and_property_backed_hits() {
3934 let db = NamedTempFile::new().expect("temporary db");
3935 let coordinator = ExecutionCoordinator::open(
3936 db.path(),
3937 Arc::new(SchemaManager::new()),
3938 None,
3939 1,
3940 Arc::new(TelemetryCounters::default()),
3941 None,
3942 )
3943 .expect("coordinator");
3944 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3945
3946 conn.execute_batch(
3948 r"
3949 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3950 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3951 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3952 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3953 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3954 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3955 ",
3956 )
3957 .expect("seed chunk-backed node");
3958
3959 conn.execute_batch(
3962 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
3963 node_logical_id UNINDEXED, text_content, \
3964 tokenize = 'porter unicode61 remove_diacritics 2'\
3965 )",
3966 )
3967 .expect("create per-kind fts table");
3968 conn.execute_batch(
3969 r#"
3970 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3971 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3972 INSERT INTO fts_props_meeting (node_logical_id, text_content)
3973 VALUES ('meeting-2', 'quarterly sync');
3974 "#,
3975 )
3976 .expect("seed property-backed node");
3977
3978 let compiled = QueryBuilder::nodes("Meeting")
3979 .text_search("quarterly", 10)
3980 .limit(10)
3981 .compile()
3982 .expect("compiled query");
3983
3984 let rows = coordinator
3985 .execute_compiled_read(&compiled)
3986 .expect("execute read");
3987
3988 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3989 ids.sort_unstable();
3990 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3991 }
3992
3993 #[test]
3994 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3995 let db = NamedTempFile::new().expect("temporary db");
3996 let coordinator = ExecutionCoordinator::open(
3997 db.path(),
3998 Arc::new(SchemaManager::new()),
3999 None,
4000 1,
4001 Arc::new(TelemetryCounters::default()),
4002 None,
4003 )
4004 .expect("coordinator");
4005 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4006
4007 conn.execute_batch(
4008 r"
4009 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4010 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4011 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4012 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4013 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4014 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4015 ",
4016 )
4017 .expect("seed chunk-backed node");
4018
4019 let compiled = QueryBuilder::nodes("Meeting")
4020 .text_search("not a ship", 10)
4021 .limit(10)
4022 .compile()
4023 .expect("compiled query");
4024
4025 let rows = coordinator
4026 .execute_compiled_read(&compiled)
4027 .expect("execute read");
4028
4029 assert_eq!(rows.nodes.len(), 1);
4030 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4031 }
4032
4033 #[test]
4036 fn capability_gate_reports_false_without_feature() {
4037 let db = NamedTempFile::new().expect("temporary db");
4038 let coordinator = ExecutionCoordinator::open(
4041 db.path(),
4042 Arc::new(SchemaManager::new()),
4043 None,
4044 1,
4045 Arc::new(TelemetryCounters::default()),
4046 None,
4047 )
4048 .expect("coordinator");
4049 assert!(
4050 !coordinator.vector_enabled(),
4051 "vector_enabled must be false when no dimension is requested"
4052 );
4053 }
4054
4055 #[cfg(feature = "sqlite-vec")]
4056 #[test]
4057 fn capability_gate_reports_true_when_feature_enabled() {
4058 let db = NamedTempFile::new().expect("temporary db");
4059 let coordinator = ExecutionCoordinator::open(
4060 db.path(),
4061 Arc::new(SchemaManager::new()),
4062 Some(128),
4063 1,
4064 Arc::new(TelemetryCounters::default()),
4065 None,
4066 )
4067 .expect("coordinator");
4068 assert!(
4069 coordinator.vector_enabled(),
4070 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4071 );
4072 }
4073
4074 #[test]
4077 fn read_run_returns_inserted_run() {
4078 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4079
4080 let db = NamedTempFile::new().expect("temporary db");
4081 let writer = WriterActor::start(
4082 db.path(),
4083 Arc::new(SchemaManager::new()),
4084 ProvenanceMode::Warn,
4085 Arc::new(TelemetryCounters::default()),
4086 )
4087 .expect("writer");
4088 writer
4089 .submit(WriteRequest {
4090 label: "runtime".to_owned(),
4091 nodes: vec![],
4092 node_retires: vec![],
4093 edges: vec![],
4094 edge_retires: vec![],
4095 chunks: vec![],
4096 runs: vec![RunInsert {
4097 id: "run-r1".to_owned(),
4098 kind: "session".to_owned(),
4099 status: "active".to_owned(),
4100 properties: "{}".to_owned(),
4101 source_ref: Some("src-1".to_owned()),
4102 upsert: false,
4103 supersedes_id: None,
4104 }],
4105 steps: vec![],
4106 actions: vec![],
4107 optional_backfills: vec![],
4108 vec_inserts: vec![],
4109 operational_writes: vec![],
4110 })
4111 .expect("write run");
4112
4113 let coordinator = ExecutionCoordinator::open(
4114 db.path(),
4115 Arc::new(SchemaManager::new()),
4116 None,
4117 1,
4118 Arc::new(TelemetryCounters::default()),
4119 None,
4120 )
4121 .expect("coordinator");
4122 let row = coordinator
4123 .read_run("run-r1")
4124 .expect("read_run")
4125 .expect("row exists");
4126 assert_eq!(row.id, "run-r1");
4127 assert_eq!(row.kind, "session");
4128 assert_eq!(row.status, "active");
4129 }
4130
4131 #[test]
4132 fn read_step_returns_inserted_step() {
4133 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4134
4135 let db = NamedTempFile::new().expect("temporary db");
4136 let writer = WriterActor::start(
4137 db.path(),
4138 Arc::new(SchemaManager::new()),
4139 ProvenanceMode::Warn,
4140 Arc::new(TelemetryCounters::default()),
4141 )
4142 .expect("writer");
4143 writer
4144 .submit(WriteRequest {
4145 label: "runtime".to_owned(),
4146 nodes: vec![],
4147 node_retires: vec![],
4148 edges: vec![],
4149 edge_retires: vec![],
4150 chunks: vec![],
4151 runs: vec![RunInsert {
4152 id: "run-s1".to_owned(),
4153 kind: "session".to_owned(),
4154 status: "active".to_owned(),
4155 properties: "{}".to_owned(),
4156 source_ref: Some("src-1".to_owned()),
4157 upsert: false,
4158 supersedes_id: None,
4159 }],
4160 steps: vec![StepInsert {
4161 id: "step-s1".to_owned(),
4162 run_id: "run-s1".to_owned(),
4163 kind: "llm".to_owned(),
4164 status: "completed".to_owned(),
4165 properties: "{}".to_owned(),
4166 source_ref: Some("src-1".to_owned()),
4167 upsert: false,
4168 supersedes_id: None,
4169 }],
4170 actions: vec![],
4171 optional_backfills: vec![],
4172 vec_inserts: vec![],
4173 operational_writes: vec![],
4174 })
4175 .expect("write step");
4176
4177 let coordinator = ExecutionCoordinator::open(
4178 db.path(),
4179 Arc::new(SchemaManager::new()),
4180 None,
4181 1,
4182 Arc::new(TelemetryCounters::default()),
4183 None,
4184 )
4185 .expect("coordinator");
4186 let row = coordinator
4187 .read_step("step-s1")
4188 .expect("read_step")
4189 .expect("row exists");
4190 assert_eq!(row.id, "step-s1");
4191 assert_eq!(row.run_id, "run-s1");
4192 assert_eq!(row.kind, "llm");
4193 }
4194
4195 #[test]
4196 fn read_action_returns_inserted_action() {
4197 use crate::{
4198 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4199 writer::{ActionInsert, StepInsert},
4200 };
4201
4202 let db = NamedTempFile::new().expect("temporary db");
4203 let writer = WriterActor::start(
4204 db.path(),
4205 Arc::new(SchemaManager::new()),
4206 ProvenanceMode::Warn,
4207 Arc::new(TelemetryCounters::default()),
4208 )
4209 .expect("writer");
4210 writer
4211 .submit(WriteRequest {
4212 label: "runtime".to_owned(),
4213 nodes: vec![],
4214 node_retires: vec![],
4215 edges: vec![],
4216 edge_retires: vec![],
4217 chunks: vec![],
4218 runs: vec![RunInsert {
4219 id: "run-a1".to_owned(),
4220 kind: "session".to_owned(),
4221 status: "active".to_owned(),
4222 properties: "{}".to_owned(),
4223 source_ref: Some("src-1".to_owned()),
4224 upsert: false,
4225 supersedes_id: None,
4226 }],
4227 steps: vec![StepInsert {
4228 id: "step-a1".to_owned(),
4229 run_id: "run-a1".to_owned(),
4230 kind: "llm".to_owned(),
4231 status: "completed".to_owned(),
4232 properties: "{}".to_owned(),
4233 source_ref: Some("src-1".to_owned()),
4234 upsert: false,
4235 supersedes_id: None,
4236 }],
4237 actions: vec![ActionInsert {
4238 id: "action-a1".to_owned(),
4239 step_id: "step-a1".to_owned(),
4240 kind: "emit".to_owned(),
4241 status: "completed".to_owned(),
4242 properties: "{}".to_owned(),
4243 source_ref: Some("src-1".to_owned()),
4244 upsert: false,
4245 supersedes_id: None,
4246 }],
4247 optional_backfills: vec![],
4248 vec_inserts: vec![],
4249 operational_writes: vec![],
4250 })
4251 .expect("write action");
4252
4253 let coordinator = ExecutionCoordinator::open(
4254 db.path(),
4255 Arc::new(SchemaManager::new()),
4256 None,
4257 1,
4258 Arc::new(TelemetryCounters::default()),
4259 None,
4260 )
4261 .expect("coordinator");
4262 let row = coordinator
4263 .read_action("action-a1")
4264 .expect("read_action")
4265 .expect("row exists");
4266 assert_eq!(row.id, "action-a1");
4267 assert_eq!(row.step_id, "step-a1");
4268 assert_eq!(row.kind, "emit");
4269 }
4270
4271 #[test]
4272 fn read_active_runs_excludes_superseded() {
4273 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4274
4275 let db = NamedTempFile::new().expect("temporary db");
4276 let writer = WriterActor::start(
4277 db.path(),
4278 Arc::new(SchemaManager::new()),
4279 ProvenanceMode::Warn,
4280 Arc::new(TelemetryCounters::default()),
4281 )
4282 .expect("writer");
4283
4284 writer
4286 .submit(WriteRequest {
4287 label: "v1".to_owned(),
4288 nodes: vec![],
4289 node_retires: vec![],
4290 edges: vec![],
4291 edge_retires: vec![],
4292 chunks: vec![],
4293 runs: vec![RunInsert {
4294 id: "run-v1".to_owned(),
4295 kind: "session".to_owned(),
4296 status: "active".to_owned(),
4297 properties: "{}".to_owned(),
4298 source_ref: Some("src-1".to_owned()),
4299 upsert: false,
4300 supersedes_id: None,
4301 }],
4302 steps: vec![],
4303 actions: vec![],
4304 optional_backfills: vec![],
4305 vec_inserts: vec![],
4306 operational_writes: vec![],
4307 })
4308 .expect("v1 write");
4309
4310 writer
4312 .submit(WriteRequest {
4313 label: "v2".to_owned(),
4314 nodes: vec![],
4315 node_retires: vec![],
4316 edges: vec![],
4317 edge_retires: vec![],
4318 chunks: vec![],
4319 runs: vec![RunInsert {
4320 id: "run-v2".to_owned(),
4321 kind: "session".to_owned(),
4322 status: "completed".to_owned(),
4323 properties: "{}".to_owned(),
4324 source_ref: Some("src-2".to_owned()),
4325 upsert: true,
4326 supersedes_id: Some("run-v1".to_owned()),
4327 }],
4328 steps: vec![],
4329 actions: vec![],
4330 optional_backfills: vec![],
4331 vec_inserts: vec![],
4332 operational_writes: vec![],
4333 })
4334 .expect("v2 write");
4335
4336 let coordinator = ExecutionCoordinator::open(
4337 db.path(),
4338 Arc::new(SchemaManager::new()),
4339 None,
4340 1,
4341 Arc::new(TelemetryCounters::default()),
4342 None,
4343 )
4344 .expect("coordinator");
4345 let active = coordinator.read_active_runs().expect("read_active_runs");
4346
4347 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4348 assert_eq!(active[0].id, "run-v2");
4349 }
4350
4351 #[allow(clippy::panic)]
4352 fn poison_connection(coordinator: &ExecutionCoordinator) {
4353 let result = catch_unwind(AssertUnwindSafe(|| {
4354 let _guard = coordinator.pool.connections[0]
4355 .lock()
4356 .expect("poison test lock");
4357 panic!("poison coordinator connection mutex");
4358 }));
4359 assert!(
4360 result.is_err(),
4361 "poison test must unwind while holding the connection mutex"
4362 );
4363 }
4364
4365 #[allow(clippy::panic)]
4366 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4367 where
4368 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4369 {
4370 match op(coordinator) {
4371 Err(EngineError::Bridge(message)) => {
4372 assert_eq!(message, "connection mutex poisoned");
4373 }
4374 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4375 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4376 }
4377 }
4378
4379 #[test]
4380 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4381 let db = NamedTempFile::new().expect("temporary db");
4382 let coordinator = ExecutionCoordinator::open(
4383 db.path(),
4384 Arc::new(SchemaManager::new()),
4385 None,
4386 1,
4387 Arc::new(TelemetryCounters::default()),
4388 None,
4389 )
4390 .expect("coordinator");
4391
4392 poison_connection(&coordinator);
4393
4394 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4395 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4396 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4397 assert_poisoned_connection_error(
4398 &coordinator,
4399 super::ExecutionCoordinator::read_active_runs,
4400 );
4401 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4402 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4403 }
4404
4405 #[test]
4408 fn shape_cache_stays_bounded() {
4409 use fathomdb_query::ShapeHash;
4410
4411 let db = NamedTempFile::new().expect("temporary db");
4412 let coordinator = ExecutionCoordinator::open(
4413 db.path(),
4414 Arc::new(SchemaManager::new()),
4415 None,
4416 1,
4417 Arc::new(TelemetryCounters::default()),
4418 None,
4419 )
4420 .expect("coordinator");
4421
4422 {
4424 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4425 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4426 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4427 }
4428 }
4429 let compiled = QueryBuilder::nodes("Meeting")
4434 .text_search("budget", 5)
4435 .limit(10)
4436 .compile()
4437 .expect("compiled query");
4438
4439 coordinator
4440 .execute_compiled_read(&compiled)
4441 .expect("execute read");
4442
4443 assert!(
4444 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4445 "shape cache must stay bounded: got {} entries, max {}",
4446 coordinator.shape_sql_count(),
4447 super::MAX_SHAPE_CACHE_SIZE
4448 );
4449 }
4450
4451 #[test]
4454 fn read_pool_size_configurable() {
4455 let db = NamedTempFile::new().expect("temporary db");
4456 let coordinator = ExecutionCoordinator::open(
4457 db.path(),
4458 Arc::new(SchemaManager::new()),
4459 None,
4460 2,
4461 Arc::new(TelemetryCounters::default()),
4462 None,
4463 )
4464 .expect("coordinator with pool_size=2");
4465
4466 assert_eq!(coordinator.pool.size(), 2);
4467
4468 let compiled = QueryBuilder::nodes("Meeting")
4470 .text_search("budget", 5)
4471 .limit(10)
4472 .compile()
4473 .expect("compiled query");
4474
4475 let result = coordinator.execute_compiled_read(&compiled);
4476 assert!(result.is_ok(), "read through pool must succeed");
4477 }
4478
4479 #[test]
4482 fn grouped_read_results_match_baseline() {
4483 use fathomdb_query::TraverseDirection;
4484
4485 let db = NamedTempFile::new().expect("temporary db");
4486
4487 let coordinator = ExecutionCoordinator::open(
4489 db.path(),
4490 Arc::new(SchemaManager::new()),
4491 None,
4492 1,
4493 Arc::new(TelemetryCounters::default()),
4494 None,
4495 )
4496 .expect("coordinator");
4497
4498 {
4501 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4502 for i in 0..10 {
4503 conn.execute_batch(&format!(
4504 r#"
4505 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4506 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4507 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4508 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4509 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4510 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4511
4512 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4513 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4514 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4515 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4516
4517 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4518 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4519 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4520 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4521 "#,
4522 )).expect("seed data");
4523 }
4524 }
4525
4526 let compiled = QueryBuilder::nodes("Meeting")
4527 .text_search("meeting", 10)
4528 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None)
4529 .limit(10)
4530 .compile_grouped()
4531 .expect("compiled grouped query");
4532
4533 let result = coordinator
4534 .execute_compiled_grouped_read(&compiled)
4535 .expect("grouped read");
4536
4537 assert!(!result.was_degraded, "grouped read should not be degraded");
4538 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4539 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4540 assert_eq!(result.expansions[0].slot, "tasks");
4541 assert_eq!(
4542 result.expansions[0].roots.len(),
4543 10,
4544 "each expansion slot should have entries for all 10 roots"
4545 );
4546
4547 for root_expansion in &result.expansions[0].roots {
4549 assert_eq!(
4550 root_expansion.nodes.len(),
4551 2,
4552 "root {} should have 2 expansion nodes, got {}",
4553 root_expansion.root_logical_id,
4554 root_expansion.nodes.len()
4555 );
4556 }
4557 }
4558
4559 #[test]
4562 fn build_bm25_expr_no_weights() {
4563 let schema_json = r#"["$.title","$.body"]"#;
4564 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4565 assert_eq!(result, "bm25(fts_props_testkind)");
4566 }
4567
4568 #[test]
4569 fn build_bm25_expr_with_weights() {
4570 let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4571 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4572 assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4573 }
4574
4575 #[test]
4578 #[allow(clippy::too_many_lines)]
4579 fn weighted_schema_bm25_orders_title_match_above_body_match() {
4580 use crate::{
4581 AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4582 WriterActor, writer::ChunkPolicy,
4583 };
4584 use fathomdb_schema::fts_column_name;
4585
4586 let db = NamedTempFile::new().expect("temporary db");
4587 let schema_manager = Arc::new(SchemaManager::new());
4588
4589 {
4591 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4592 admin
4593 .register_fts_property_schema_with_entries(
4594 "Article",
4595 &[
4596 FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
4597 FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
4598 ],
4599 None,
4600 &[],
4601 crate::rebuild_actor::RebuildMode::Eager,
4602 )
4603 .expect("register schema with weights");
4604 }
4605
4606 let writer = WriterActor::start(
4608 db.path(),
4609 Arc::clone(&schema_manager),
4610 ProvenanceMode::Warn,
4611 Arc::new(TelemetryCounters::default()),
4612 )
4613 .expect("writer");
4614
4615 writer
4617 .submit(WriteRequest {
4618 label: "insert-a".to_owned(),
4619 nodes: vec![NodeInsert {
4620 row_id: "row-a".to_owned(),
4621 logical_id: "article-a".to_owned(),
4622 kind: "Article".to_owned(),
4623 properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
4624 source_ref: Some("src-a".to_owned()),
4625 upsert: false,
4626 chunk_policy: ChunkPolicy::Preserve,
4627 content_ref: None,
4628 }],
4629 node_retires: vec![],
4630 edges: vec![],
4631 edge_retires: vec![],
4632 chunks: vec![],
4633 runs: vec![],
4634 steps: vec![],
4635 actions: vec![],
4636 optional_backfills: vec![],
4637 vec_inserts: vec![],
4638 operational_writes: vec![],
4639 })
4640 .expect("write node A");
4641
4642 writer
4644 .submit(WriteRequest {
4645 label: "insert-b".to_owned(),
4646 nodes: vec![NodeInsert {
4647 row_id: "row-b".to_owned(),
4648 logical_id: "article-b".to_owned(),
4649 kind: "Article".to_owned(),
4650 properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
4651 source_ref: Some("src-b".to_owned()),
4652 upsert: false,
4653 chunk_policy: ChunkPolicy::Preserve,
4654 content_ref: None,
4655 }],
4656 node_retires: vec![],
4657 edges: vec![],
4658 edge_retires: vec![],
4659 chunks: vec![],
4660 runs: vec![],
4661 steps: vec![],
4662 actions: vec![],
4663 optional_backfills: vec![],
4664 vec_inserts: vec![],
4665 operational_writes: vec![],
4666 })
4667 .expect("write node B");
4668
4669 drop(writer);
4670
4671 {
4673 let title_col = fts_column_name("$.title", false);
4674 let body_col = fts_column_name("$.body", false);
4675 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4676 let count: i64 = conn
4677 .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
4678 .expect("count fts rows");
4679 assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
4680 let (title_a, body_a): (String, String) = conn
4681 .query_row(
4682 &format!(
4683 "SELECT {title_col}, {body_col} FROM fts_props_article \
4684 WHERE node_logical_id = 'article-a'"
4685 ),
4686 [],
4687 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
4688 )
4689 .expect("select article-a");
4690 assert_eq!(
4691 title_a, "rust",
4692 "article-a must have 'rust' in title column"
4693 );
4694 assert_eq!(
4695 body_a, "other",
4696 "article-a must have 'other' in body column"
4697 );
4698 }
4699
4700 let coordinator = ExecutionCoordinator::open(
4702 db.path(),
4703 Arc::clone(&schema_manager),
4704 None,
4705 1,
4706 Arc::new(TelemetryCounters::default()),
4707 None,
4708 )
4709 .expect("coordinator");
4710
4711 let compiled = fathomdb_query::QueryBuilder::nodes("Article")
4712 .text_search("rust", 5)
4713 .limit(10)
4714 .compile()
4715 .expect("compiled query");
4716
4717 let rows = coordinator
4718 .execute_compiled_read(&compiled)
4719 .expect("execute read");
4720
4721 assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
4722 assert_eq!(
4723 rows.nodes[0].logical_id, "article-a",
4724 "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
4725 );
4726 }
4727
4728 #[test]
4739 fn property_fts_hit_matched_paths_from_positions() {
4740 use crate::{AdminService, rebuild_actor::RebuildMode};
4741 use fathomdb_query::compile_search;
4742
4743 let db = NamedTempFile::new().expect("temporary db");
4744 let schema_manager = Arc::new(SchemaManager::new());
4745
4746 {
4749 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4750 admin
4751 .register_fts_property_schema_with_entries(
4752 "Item",
4753 &[
4754 crate::FtsPropertyPathSpec::scalar("$.body"),
4755 crate::FtsPropertyPathSpec::scalar("$.title"),
4756 ],
4757 None,
4758 &[],
4759 RebuildMode::Eager,
4760 )
4761 .expect("register Item FTS schema");
4762 }
4763
4764 let coordinator = ExecutionCoordinator::open(
4765 db.path(),
4766 Arc::clone(&schema_manager),
4767 None,
4768 1,
4769 Arc::new(TelemetryCounters::default()),
4770 None,
4771 )
4772 .expect("coordinator");
4773
4774 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4775
4776 let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
4781 assert_eq!(
4783 crate::writer::LEAF_SEPARATOR.len(),
4784 29,
4785 "LEAF_SEPARATOR length changed; update position offsets"
4786 );
4787
4788 conn.execute(
4789 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4790 VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
4791 [],
4792 )
4793 .expect("insert node");
4794 conn.execute(
4796 "INSERT INTO fts_props_item (node_logical_id, text_content) \
4797 VALUES ('item-1', ?1)",
4798 rusqlite::params![blob],
4799 )
4800 .expect("insert fts row");
4801 conn.execute(
4802 "INSERT INTO fts_node_property_positions \
4803 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4804 VALUES ('item-1', 'Item', 0, 5, '$.body')",
4805 [],
4806 )
4807 .expect("insert body position");
4808 conn.execute(
4809 "INSERT INTO fts_node_property_positions \
4810 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4811 VALUES ('item-1', 'Item', 34, 44, '$.title')",
4812 [],
4813 )
4814 .expect("insert title position");
4815
4816 let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
4817 let mut compiled = compile_search(ast.ast()).expect("compile search");
4818 compiled.attribution_requested = true;
4819
4820 let rows = coordinator
4821 .execute_compiled_search(&compiled)
4822 .expect("search");
4823
4824 assert!(!rows.hits.is_empty(), "expected at least one hit");
4825 let hit = rows
4826 .hits
4827 .iter()
4828 .find(|h| h.node.logical_id == "item-1")
4829 .expect("item-1 must be in hits");
4830
4831 let att = hit
4832 .attribution
4833 .as_ref()
4834 .expect("attribution must be Some when attribution_requested");
4835 assert!(
4836 att.matched_paths.contains(&"$.title".to_owned()),
4837 "matched_paths must contain '$.title', got {:?}",
4838 att.matched_paths,
4839 );
4840 assert!(
4841 !att.matched_paths.contains(&"$.body".to_owned()),
4842 "matched_paths must NOT contain '$.body', got {:?}",
4843 att.matched_paths,
4844 );
4845 }
4846
4847 #[test]
4855 fn vector_hit_has_no_attribution() {
4856 use fathomdb_query::compile_vector_search;
4857
4858 let db = NamedTempFile::new().expect("temporary db");
4859 let coordinator = ExecutionCoordinator::open(
4860 db.path(),
4861 Arc::new(SchemaManager::new()),
4862 None,
4863 1,
4864 Arc::new(TelemetryCounters::default()),
4865 None,
4866 )
4867 .expect("coordinator");
4868
4869 let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
4871 let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
4872 compiled.attribution_requested = true;
4873
4874 let rows = coordinator
4877 .execute_compiled_vector_search(&compiled)
4878 .expect("vector search must not error");
4879
4880 assert!(
4881 rows.was_degraded,
4882 "vector search without vec table must degrade"
4883 );
4884 for hit in &rows.hits {
4885 assert!(
4886 hit.attribution.is_none(),
4887 "vector hits must carry attribution = None, got {:?}",
4888 hit.attribution
4889 );
4890 }
4891 }
4892
4893 #[test]
4907 fn chunk_hit_has_text_content_attribution() {
4908 use fathomdb_query::compile_search;
4909
4910 let db = NamedTempFile::new().expect("temporary db");
4911 let coordinator = ExecutionCoordinator::open(
4912 db.path(),
4913 Arc::new(SchemaManager::new()),
4914 None,
4915 1,
4916 Arc::new(TelemetryCounters::default()),
4917 None,
4918 )
4919 .expect("coordinator");
4920
4921 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4922
4923 conn.execute_batch(
4924 r"
4925 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4926 VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
4927 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4928 VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
4929 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4930 VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
4931 ",
4932 )
4933 .expect("seed chunk node");
4934
4935 let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
4936 let mut compiled = compile_search(ast.ast()).expect("compile search");
4937 compiled.attribution_requested = true;
4938
4939 let rows = coordinator
4940 .execute_compiled_search(&compiled)
4941 .expect("search");
4942
4943 assert!(!rows.hits.is_empty(), "expected chunk hit");
4944 let hit = rows
4945 .hits
4946 .iter()
4947 .find(|h| matches!(h.source, SearchHitSource::Chunk))
4948 .expect("must have a Chunk hit");
4949
4950 let att = hit
4955 .attribution
4956 .as_ref()
4957 .expect("attribution must be Some when attribution_requested");
4958 assert!(
4959 att.matched_paths.is_empty(),
4960 "placeholder: chunk matched_paths must be empty until integration \
4961 tests are updated; got {:?}",
4962 att.matched_paths,
4963 );
4964 }
4965
4966 #[test]
4973 #[allow(clippy::too_many_lines)]
4974 fn mixed_kind_results_get_per_kind_matched_paths() {
4975 use crate::{AdminService, rebuild_actor::RebuildMode};
4976 use fathomdb_query::compile_search;
4977
4978 let db = NamedTempFile::new().expect("temporary db");
4979 let schema_manager = Arc::new(SchemaManager::new());
4980
4981 {
4984 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4985 admin
4986 .register_fts_property_schema_with_entries(
4987 "KindA",
4988 &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
4989 None,
4990 &[],
4991 RebuildMode::Eager,
4992 )
4993 .expect("register KindA FTS schema");
4994 admin
4995 .register_fts_property_schema_with_entries(
4996 "KindB",
4997 &[crate::FtsPropertyPathSpec::scalar("$.beta")],
4998 None,
4999 &[],
5000 RebuildMode::Eager,
5001 )
5002 .expect("register KindB FTS schema");
5003 }
5004
5005 let coordinator = ExecutionCoordinator::open(
5006 db.path(),
5007 Arc::clone(&schema_manager),
5008 None,
5009 1,
5010 Arc::new(TelemetryCounters::default()),
5011 None,
5012 )
5013 .expect("coordinator");
5014
5015 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5016
5017 conn.execute(
5019 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5020 VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5021 [],
5022 )
5023 .expect("insert KindA node");
5024 conn.execute(
5026 "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
5027 VALUES ('node-a', 'xenoterm')",
5028 [],
5029 )
5030 .expect("insert KindA fts row");
5031 conn.execute(
5032 "INSERT INTO fts_node_property_positions \
5033 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5034 VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5035 [],
5036 )
5037 .expect("insert KindA position");
5038
5039 conn.execute(
5041 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5042 VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5043 [],
5044 )
5045 .expect("insert KindB node");
5046 conn.execute(
5048 "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
5049 VALUES ('node-b', 'xenoterm')",
5050 [],
5051 )
5052 .expect("insert KindB fts row");
5053 conn.execute(
5054 "INSERT INTO fts_node_property_positions \
5055 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5056 VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5057 [],
5058 )
5059 .expect("insert KindB position");
5060
5061 let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5063 let mut compiled = compile_search(ast.ast()).expect("compile search");
5064 compiled.attribution_requested = true;
5065
5066 let rows = coordinator
5067 .execute_compiled_search(&compiled)
5068 .expect("search");
5069
5070 assert!(
5072 rows.hits.len() >= 2,
5073 "expected hits for both kinds, got {}",
5074 rows.hits.len()
5075 );
5076
5077 for hit in &rows.hits {
5078 let att = hit
5079 .attribution
5080 .as_ref()
5081 .expect("attribution must be Some when attribution_requested");
5082 match hit.node.kind.as_str() {
5083 "KindA" => {
5084 assert_eq!(
5085 att.matched_paths,
5086 vec!["$.alpha".to_owned()],
5087 "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5088 att.matched_paths,
5089 );
5090 }
5091 "KindB" => {
5092 assert_eq!(
5093 att.matched_paths,
5094 vec!["$.beta".to_owned()],
5095 "KindB hit must have matched_paths=['$.beta'], got {:?}",
5096 att.matched_paths,
5097 );
5098 }
5099 other => {
5100 assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5102 }
5103 }
5104 }
5105 }
5106
5107 #[test]
5110 fn tokenizer_strategy_from_str() {
5111 use super::TokenizerStrategy;
5112 assert_eq!(
5113 TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5114 TokenizerStrategy::RecallOptimizedEnglish,
5115 );
5116 assert_eq!(
5117 TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5118 TokenizerStrategy::PrecisionOptimized,
5119 );
5120 assert_eq!(
5121 TokenizerStrategy::from_str("trigram"),
5122 TokenizerStrategy::SubstringTrigram,
5123 );
5124 assert_eq!(
5125 TokenizerStrategy::from_str("icu"),
5126 TokenizerStrategy::GlobalCjk,
5127 );
5128 assert_eq!(
5129 TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5130 TokenizerStrategy::SourceCode,
5131 );
5132 assert_eq!(
5134 TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5135 TokenizerStrategy::SourceCode,
5136 );
5137 assert_eq!(
5138 TokenizerStrategy::from_str("my_custom_tokenizer"),
5139 TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5140 );
5141 }
5142
5143 #[test]
5144 fn trigram_short_query_returns_empty() {
5145 use fathomdb_query::compile_search;
5146
5147 let db = NamedTempFile::new().expect("temporary db");
5148 let schema_manager = Arc::new(SchemaManager::new());
5149
5150 {
5152 let bootstrap = ExecutionCoordinator::open(
5153 db.path(),
5154 Arc::clone(&schema_manager),
5155 None,
5156 1,
5157 Arc::new(TelemetryCounters::default()),
5158 None,
5159 )
5160 .expect("bootstrap coordinator");
5161 drop(bootstrap);
5162 }
5163
5164 {
5166 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5167 conn.execute_batch(
5168 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5169 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5170 )
5171 .expect("insert profile");
5172 }
5173
5174 let coordinator = ExecutionCoordinator::open(
5176 db.path(),
5177 Arc::clone(&schema_manager),
5178 None,
5179 1,
5180 Arc::new(TelemetryCounters::default()),
5181 None,
5182 )
5183 .expect("coordinator reopen");
5184
5185 let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5187 let compiled = compile_search(ast.ast()).expect("compile search");
5188 let rows = coordinator
5189 .execute_compiled_search(&compiled)
5190 .expect("short trigram query must not error");
5191 assert!(
5192 rows.hits.is_empty(),
5193 "2-char trigram query must return empty"
5194 );
5195 }
5196
5197 #[test]
5198 fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5199 use fathomdb_query::compile_search;
5209
5210 let db = NamedTempFile::new().expect("temporary db");
5211 let schema_manager = Arc::new(SchemaManager::new());
5212
5213 {
5215 let bootstrap = ExecutionCoordinator::open(
5216 db.path(),
5217 Arc::clone(&schema_manager),
5218 None,
5219 1,
5220 Arc::new(TelemetryCounters::default()),
5221 None,
5222 )
5223 .expect("bootstrap coordinator");
5224 drop(bootstrap);
5225 }
5226
5227 {
5229 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5230 conn.execute(
5231 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5232 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5233 [],
5234 )
5235 .expect("insert profile");
5236 conn.execute_batch(
5237 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5238 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5239 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5240 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5241 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5242 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5243 )
5244 .expect("insert node and fts row");
5245 }
5246
5247 let coordinator = ExecutionCoordinator::open(
5249 db.path(),
5250 Arc::clone(&schema_manager),
5251 None,
5252 1,
5253 Arc::new(TelemetryCounters::default()),
5254 None,
5255 )
5256 .expect("coordinator reopen");
5257
5258 let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5260 let compiled = compile_search(ast.ast()).expect("compile search");
5261 let rows = coordinator
5262 .execute_compiled_search(&compiled)
5263 .expect("source code search must not error");
5264 assert!(
5265 !rows.hits.is_empty(),
5266 "SourceCode strategy search for 'std.io' must return the document; \
5267 got empty — FTS5 expression was likely corrupted by post-render escaping"
5268 );
5269 }
5270
5271 #[derive(Debug)]
5274 struct StubEmbedder {
5275 model_identity: String,
5276 dimension: usize,
5277 }
5278
5279 impl StubEmbedder {
5280 fn new(model_identity: &str, dimension: usize) -> Self {
5281 Self {
5282 model_identity: model_identity.to_owned(),
5283 dimension,
5284 }
5285 }
5286 }
5287
5288 impl crate::embedder::QueryEmbedder for StubEmbedder {
5289 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5290 Ok(vec![0.0; self.dimension])
5291 }
5292 fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5293 crate::embedder::QueryEmbedderIdentity {
5294 model_identity: self.model_identity.clone(),
5295 model_version: "1.0".to_owned(),
5296 dimension: self.dimension,
5297 normalization_policy: "l2".to_owned(),
5298 }
5299 }
5300 fn max_tokens(&self) -> usize {
5301 512
5302 }
5303 }
5304
5305 fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5306 let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5307 conn.execute_batch(
5308 "CREATE TABLE IF NOT EXISTS projection_profiles (
5309 kind TEXT NOT NULL,
5310 facet TEXT NOT NULL,
5311 config_json TEXT NOT NULL,
5312 active_at INTEGER,
5313 created_at INTEGER,
5314 PRIMARY KEY (kind, facet)
5315 );",
5316 )
5317 .expect("create projection_profiles");
5318 conn
5319 }
5320
5321 #[test]
5322 fn check_vec_identity_no_profile_no_panic() {
5323 let conn = make_in_memory_db_with_projection_profiles();
5324 let embedder = StubEmbedder::new("bge-small", 384);
5325 let result = super::check_vec_identity_at_open(&conn, &embedder);
5326 assert!(result.is_ok(), "no profile row must return Ok(())");
5327 }
5328
5329 #[test]
5330 fn check_vec_identity_matching_identity_ok() {
5331 let conn = make_in_memory_db_with_projection_profiles();
5332 conn.execute(
5333 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5334 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5335 [],
5336 )
5337 .expect("insert profile");
5338 let embedder = StubEmbedder::new("bge-small", 384);
5339 let result = super::check_vec_identity_at_open(&conn, &embedder);
5340 assert!(result.is_ok(), "matching profile must return Ok(())");
5341 }
5342
5343 #[test]
5344 fn check_vec_identity_mismatched_dimensions_ok() {
5345 let conn = make_in_memory_db_with_projection_profiles();
5346 conn.execute(
5347 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5348 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5349 [],
5350 )
5351 .expect("insert profile");
5352 let embedder = StubEmbedder::new("bge-small", 768);
5354 let result = super::check_vec_identity_at_open(&conn, &embedder);
5355 assert!(
5356 result.is_ok(),
5357 "dimension mismatch must warn and return Ok(())"
5358 );
5359 }
5360
5361 #[test]
5362 fn custom_tokenizer_passthrough() {
5363 use super::TokenizerStrategy;
5364 let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5365 assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5367 assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5369 assert_ne!(strategy, TokenizerStrategy::SourceCode);
5370 }
5371
5372 #[test]
5373 fn check_vec_identity_mismatched_model_ok() {
5374 let conn = make_in_memory_db_with_projection_profiles();
5375 conn.execute(
5376 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5377 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5378 [],
5379 )
5380 .expect("insert profile");
5381 let embedder = StubEmbedder::new("bge-large", 384);
5383 let result = super::check_vec_identity_at_open(&conn, &embedder);
5384 assert!(
5385 result.is_ok(),
5386 "model_identity mismatch must warn and return Ok(())"
5387 );
5388 }
5389}