1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, EdgeExpansionSlot,
10 ExpansionSlot, FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue,
11 SearchBranch, SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash,
12 render_text_query_fts5,
13};
14use fathomdb_schema::SchemaManager;
15use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
16
17use crate::embedder::QueryEmbedder;
18use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
19use crate::{EngineError, sqlite};
20
21const MAX_SHAPE_CACHE_SIZE: usize = 4096;
25
26const BATCH_CHUNK_SIZE: usize = 200;
31
32fn compile_expansion_in_filter(
37 p: usize,
38 path: &str,
39 value_binds: Vec<Value>,
40) -> (String, Vec<Value>) {
41 let first_val = p + 1;
42 let placeholders = (0..value_binds.len())
43 .map(|i| format!("?{}", first_val + i))
44 .collect::<Vec<_>>()
45 .join(", ");
46 let mut binds = vec![Value::Text(path.to_owned())];
47 binds.extend(value_binds);
48 (
49 format!("\n AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
50 binds,
51 )
52}
53
54#[allow(clippy::too_many_lines)]
69fn compile_expansion_filter(
70 filter: Option<&Predicate>,
71 first_param: usize,
72) -> (String, Vec<Value>) {
73 let Some(predicate) = filter else {
74 return (String::new(), vec![]);
75 };
76 let p = first_param;
77 match predicate {
78 Predicate::JsonPathEq { path, value } => {
79 let val = match value {
80 ScalarValue::Text(t) => Value::Text(t.clone()),
81 ScalarValue::Integer(i) => Value::Integer(*i),
82 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
83 };
84 (
85 format!(
86 "\n AND json_extract(n.properties, ?{p}) = ?{}",
87 p + 1
88 ),
89 vec![Value::Text(path.clone()), val],
90 )
91 }
92 Predicate::JsonPathCompare { path, op, value } => {
93 let val = match value {
94 ScalarValue::Text(t) => Value::Text(t.clone()),
95 ScalarValue::Integer(i) => Value::Integer(*i),
96 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
97 };
98 let operator = match op {
99 ComparisonOp::Gt => ">",
100 ComparisonOp::Gte => ">=",
101 ComparisonOp::Lt => "<",
102 ComparisonOp::Lte => "<=",
103 };
104 (
105 format!(
106 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
107 p + 1
108 ),
109 vec![Value::Text(path.clone()), val],
110 )
111 }
112 Predicate::JsonPathFusedEq { path, value } => (
113 format!(
114 "\n AND json_extract(n.properties, ?{p}) = ?{}",
115 p + 1
116 ),
117 vec![Value::Text(path.clone()), Value::Text(value.clone())],
118 ),
119 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
120 let operator = match op {
121 ComparisonOp::Gt => ">",
122 ComparisonOp::Gte => ">=",
123 ComparisonOp::Lt => "<",
124 ComparisonOp::Lte => "<=",
125 };
126 (
127 format!(
128 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
129 p + 1
130 ),
131 vec![Value::Text(path.clone()), Value::Integer(*value)],
132 )
133 }
134 Predicate::JsonPathFusedBoolEq { path, value } => (
135 format!(
136 "\n AND json_extract(n.properties, ?{p}) = ?{}",
137 p + 1
138 ),
139 vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
140 ),
141 Predicate::KindEq(kind) => (
142 format!("\n AND n.kind = ?{p}"),
143 vec![Value::Text(kind.clone())],
144 ),
145 Predicate::LogicalIdEq(logical_id) => (
146 format!("\n AND n.logical_id = ?{p}"),
147 vec![Value::Text(logical_id.clone())],
148 ),
149 Predicate::SourceRefEq(source_ref) => (
150 format!("\n AND n.source_ref = ?{p}"),
151 vec![Value::Text(source_ref.clone())],
152 ),
153 Predicate::ContentRefEq(uri) => (
154 format!("\n AND n.content_ref = ?{p}"),
155 vec![Value::Text(uri.clone())],
156 ),
157 Predicate::ContentRefNotNull => (
158 "\n AND n.content_ref IS NOT NULL".to_owned(),
159 vec![],
160 ),
161 Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
162 unreachable!(
163 "compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
164 );
165 }
166 Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
167 p,
168 path,
169 values.iter().map(|v| Value::Text(v.clone())).collect(),
170 ),
171 Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
172 p,
173 path,
174 values
175 .iter()
176 .map(|v| match v {
177 ScalarValue::Text(t) => Value::Text(t.clone()),
178 ScalarValue::Integer(i) => Value::Integer(*i),
179 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
180 })
181 .collect(),
182 ),
183 }
184}
185
186fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
196 let Some(predicate) = filter else {
197 return (String::new(), vec![]);
198 };
199 let p = first_param;
200 match predicate {
201 Predicate::EdgePropertyEq { path, value } => {
202 let val = match value {
203 ScalarValue::Text(t) => Value::Text(t.clone()),
204 ScalarValue::Integer(i) => Value::Integer(*i),
205 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
206 };
207 (
208 format!(
209 "\n AND json_extract(e.properties, ?{p}) = ?{}",
210 p + 1
211 ),
212 vec![Value::Text(path.clone()), val],
213 )
214 }
215 Predicate::EdgePropertyCompare { path, op, value } => {
216 let val = match value {
217 ScalarValue::Text(t) => Value::Text(t.clone()),
218 ScalarValue::Integer(i) => Value::Integer(*i),
219 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
220 };
221 let operator = match op {
222 ComparisonOp::Gt => ">",
223 ComparisonOp::Gte => ">=",
224 ComparisonOp::Lt => "<",
225 ComparisonOp::Lte => "<=",
226 };
227 (
228 format!(
229 "\n AND json_extract(e.properties, ?{p}) {operator} ?{}",
230 p + 1
231 ),
232 vec![Value::Text(path.clone()), val],
233 )
234 }
235 _ => {
236 unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
237 }
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
246pub enum TokenizerStrategy {
247 RecallOptimizedEnglish,
249 PrecisionOptimized,
251 SubstringTrigram,
253 GlobalCjk,
255 SourceCode,
257 Custom(String),
259}
260
261impl TokenizerStrategy {
262 pub fn from_str(s: &str) -> Self {
265 match s {
266 "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
267 "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
268 "trigram" => Self::SubstringTrigram,
269 "icu" => Self::GlobalCjk,
270 s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
271 other => Self::Custom(other.to_string()),
272 }
273 }
274}
275
276struct ReadPool {
281 connections: Vec<Mutex<Connection>>,
282}
283
284impl fmt::Debug for ReadPool {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 f.debug_struct("ReadPool")
287 .field("size", &self.connections.len())
288 .finish()
289 }
290}
291
292impl ReadPool {
293 fn new(
304 db_path: &Path,
305 pool_size: usize,
306 schema_manager: &SchemaManager,
307 vector_enabled: bool,
308 ) -> Result<Self, EngineError> {
309 let mut connections = Vec::with_capacity(pool_size);
310 for _ in 0..pool_size {
311 let conn = if vector_enabled {
312 #[cfg(feature = "sqlite-vec")]
313 {
314 sqlite::open_readonly_connection_with_vec(db_path)?
315 }
316 #[cfg(not(feature = "sqlite-vec"))]
317 {
318 sqlite::open_readonly_connection(db_path)?
319 }
320 } else {
321 sqlite::open_readonly_connection(db_path)?
322 };
323 schema_manager
324 .initialize_reader_connection(&conn)
325 .map_err(EngineError::Schema)?;
326 connections.push(Mutex::new(conn));
327 }
328 Ok(Self { connections })
329 }
330
331 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
340 for conn in &self.connections {
342 if let Ok(guard) = conn.try_lock() {
343 return Ok(guard);
344 }
345 }
346 self.connections[0].lock().map_err(|_| {
348 trace_error!("read pool: connection mutex poisoned");
349 EngineError::Bridge("connection mutex poisoned".to_owned())
350 })
351 }
352
353 #[cfg(test)]
355 fn size(&self) -> usize {
356 self.connections.len()
357 }
358}
359
360#[derive(Clone, Debug, PartialEq, Eq)]
364pub struct QueryPlan {
365 pub sql: String,
366 pub bind_count: usize,
367 pub driving_table: DrivingTable,
368 pub shape_hash: ShapeHash,
369 pub cache_hit: bool,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq)]
374pub struct NodeRow {
375 pub row_id: String,
377 pub logical_id: String,
379 pub kind: String,
381 pub properties: String,
383 pub content_ref: Option<String>,
385 pub last_accessed_at: Option<i64>,
387}
388
389#[derive(Clone, Debug, PartialEq)]
399pub struct EdgeRow {
400 pub row_id: String,
402 pub logical_id: String,
404 pub source_logical_id: String,
406 pub target_logical_id: String,
408 pub kind: String,
410 pub properties: String,
412 pub source_ref: Option<String>,
414 pub confidence: Option<f64>,
416}
417
418#[derive(Clone, Debug, PartialEq, Eq)]
420pub struct RunRow {
421 pub id: String,
423 pub kind: String,
425 pub status: String,
427 pub properties: String,
429}
430
431#[derive(Clone, Debug, PartialEq, Eq)]
433pub struct StepRow {
434 pub id: String,
436 pub run_id: String,
438 pub kind: String,
440 pub status: String,
442 pub properties: String,
444}
445
446#[derive(Clone, Debug, PartialEq, Eq)]
448pub struct ActionRow {
449 pub id: String,
451 pub step_id: String,
453 pub kind: String,
455 pub status: String,
457 pub properties: String,
459}
460
461#[derive(Clone, Debug, PartialEq, Eq)]
463pub struct ProvenanceEvent {
464 pub id: String,
465 pub event_type: String,
466 pub subject: String,
467 pub source_ref: Option<String>,
468 pub metadata_json: String,
469 pub created_at: i64,
470}
471
472#[derive(Clone, Debug, Default, PartialEq, Eq)]
474pub struct QueryRows {
475 pub nodes: Vec<NodeRow>,
477 pub runs: Vec<RunRow>,
479 pub steps: Vec<StepRow>,
481 pub actions: Vec<ActionRow>,
483 pub was_degraded: bool,
486}
487
488#[derive(Clone, Debug, PartialEq, Eq)]
490pub struct ExpansionRootRows {
491 pub root_logical_id: String,
493 pub nodes: Vec<NodeRow>,
495}
496
497#[derive(Clone, Debug, PartialEq, Eq)]
499pub struct ExpansionSlotRows {
500 pub slot: String,
502 pub roots: Vec<ExpansionRootRows>,
504}
505
506#[derive(Clone, Debug, PartialEq)]
511pub struct EdgeExpansionRootRows {
512 pub root_logical_id: String,
514 pub pairs: Vec<(EdgeRow, NodeRow)>,
518}
519
520#[derive(Clone, Debug, PartialEq)]
524pub struct EdgeExpansionSlotRows {
525 pub slot: String,
527 pub roots: Vec<EdgeExpansionRootRows>,
529}
530
531#[derive(Clone, Debug, Default, PartialEq)]
538pub struct GroupedQueryRows {
539 pub roots: Vec<NodeRow>,
541 pub expansions: Vec<ExpansionSlotRows>,
543 pub edge_expansions: Vec<EdgeExpansionSlotRows>,
545 pub was_degraded: bool,
547}
548
549pub struct ExecutionCoordinator {
551 database_path: PathBuf,
552 schema_manager: Arc<SchemaManager>,
553 pool: ReadPool,
554 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
555 vector_enabled: bool,
556 vec_degradation_warned: AtomicBool,
557 telemetry: Arc<TelemetryCounters>,
558 query_embedder: Option<Arc<dyn QueryEmbedder>>,
565 fts_strategies: HashMap<String, TokenizerStrategy>,
576}
577
578impl fmt::Debug for ExecutionCoordinator {
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580 f.debug_struct("ExecutionCoordinator")
581 .field("database_path", &self.database_path)
582 .finish_non_exhaustive()
583 }
584}
585
586impl ExecutionCoordinator {
587 pub fn open(
590 path: impl AsRef<Path>,
591 schema_manager: Arc<SchemaManager>,
592 vector_dimension: Option<usize>,
593 pool_size: usize,
594 telemetry: Arc<TelemetryCounters>,
595 query_embedder: Option<Arc<dyn QueryEmbedder>>,
596 ) -> Result<Self, EngineError> {
597 let path = path.as_ref().to_path_buf();
598 #[cfg(feature = "sqlite-vec")]
599 let mut conn = if vector_dimension.is_some() {
600 sqlite::open_connection_with_vec(&path)?
601 } else {
602 sqlite::open_connection(&path)?
603 };
604 #[cfg(not(feature = "sqlite-vec"))]
605 let mut conn = sqlite::open_connection(&path)?;
606
607 let report = schema_manager.bootstrap(&conn)?;
608
609 run_open_time_fts_guards(&mut conn)?;
628
629 #[cfg(feature = "sqlite-vec")]
630 let mut vector_enabled = report.vector_profile_enabled;
631 #[cfg(not(feature = "sqlite-vec"))]
632 let vector_enabled = {
633 let _ = &report;
634 false
635 };
636
637 if vector_dimension.is_some() {
642 #[cfg(feature = "sqlite-vec")]
643 {
644 vector_enabled = true;
645 }
646 }
647
648 if let Some(ref emb) = query_embedder {
651 check_vec_identity_at_open(&conn, emb.as_ref())?;
652 }
653
654 let fts_strategies: HashMap<String, TokenizerStrategy> = {
656 let mut map = HashMap::new();
657 let mut stmt = conn
658 .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
659 let rows = stmt.query_map([], |row| {
660 let kind: String = row.get(0)?;
661 let config_json: String = row.get(1)?;
662 Ok((kind, config_json))
663 })?;
664 for row in rows.flatten() {
665 let (kind, config_json) = row;
666 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
667 && let Some(tok) = v["tokenizer"].as_str()
668 {
669 map.insert(kind, TokenizerStrategy::from_str(tok));
670 }
671 }
672 map
673 };
674
675 drop(conn);
677
678 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
679
680 Ok(Self {
681 database_path: path,
682 schema_manager,
683 pool,
684 shape_sql_map: Mutex::new(HashMap::new()),
685 vector_enabled,
686 vec_degradation_warned: AtomicBool::new(false),
687 telemetry,
688 query_embedder,
689 fts_strategies,
690 })
691 }
692
693 pub fn database_path(&self) -> &Path {
695 &self.database_path
696 }
697
698 #[must_use]
700 pub fn vector_enabled(&self) -> bool {
701 self.vector_enabled
702 }
703
704 #[must_use]
711 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
712 self.query_embedder.as_ref()
713 }
714
715 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
716 self.pool.acquire()
717 }
718
719 #[must_use]
725 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
726 let mut total = SqliteCacheStatus::default();
727 for conn_mutex in &self.pool.connections {
728 if let Ok(conn) = conn_mutex.try_lock() {
729 total.add(&read_db_cache_status(&conn));
730 }
731 }
732 total
733 }
734
735 #[allow(clippy::expect_used, clippy::too_many_lines)]
738 pub fn execute_compiled_read(
739 &self,
740 compiled: &CompiledQuery,
741 ) -> Result<QueryRows, EngineError> {
742 if compiled.driving_table == DrivingTable::FtsNodes
747 && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
748 && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
749 {
750 self.telemetry.increment_queries();
751 return Ok(QueryRows {
752 nodes,
753 runs: Vec::new(),
754 steps: Vec::new(),
755 actions: Vec::new(),
756 was_degraded: false,
757 });
758 }
759
760 let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
770 let conn_check = match self.lock_connection() {
771 Ok(g) => g,
772 Err(e) => {
773 self.telemetry.increment_errors();
774 return Err(e);
775 }
776 };
777 let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
778 drop(conn_check);
779 result?
780 } else if compiled.driving_table == DrivingTable::VecNodes {
781 let root_kind = compiled
782 .binds
783 .get(1)
784 .and_then(|b| {
785 if let BindValue::Text(k) = b {
786 Some(k.as_str())
787 } else {
788 None
789 }
790 })
791 .unwrap_or("");
792 let vec_table = if root_kind.is_empty() {
793 "vec__unknown".to_owned()
794 } else {
795 fathomdb_schema::vec_kind_table_name(root_kind)
796 };
797 let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
798 (new_sql, compiled.binds.clone())
799 } else {
800 (compiled.sql.clone(), compiled.binds.clone())
801 };
802
803 let row_sql = wrap_node_row_projection_sql(&adapted_sql);
804 {
810 let mut cache = self
811 .shape_sql_map
812 .lock()
813 .unwrap_or_else(PoisonError::into_inner);
814 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
815 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
816 cache.clear();
817 }
818 cache.insert(compiled.shape_hash, row_sql.clone());
819 }
820
821 let bind_values = adapted_binds
822 .iter()
823 .map(bind_value_to_sql)
824 .collect::<Vec<_>>();
825
826 let conn_guard = match self.lock_connection() {
831 Ok(g) => g,
832 Err(e) => {
833 self.telemetry.increment_errors();
834 return Err(e);
835 }
836 };
837 let mut statement = match conn_guard.prepare_cached(&row_sql) {
838 Ok(stmt) => stmt,
839 Err(e) if is_vec_table_absent(&e) => {
840 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
841 trace_warn!("vector table absent, degrading to non-vector query");
842 }
843 return Ok(QueryRows {
844 was_degraded: true,
845 ..Default::default()
846 });
847 }
848 Err(e) => {
849 self.telemetry.increment_errors();
850 return Err(EngineError::Sqlite(e));
851 }
852 };
853 let nodes = match statement
854 .query_map(params_from_iter(bind_values.iter()), |row| {
855 Ok(NodeRow {
856 row_id: row.get(0)?,
857 logical_id: row.get(1)?,
858 kind: row.get(2)?,
859 properties: row.get(3)?,
860 content_ref: row.get(4)?,
861 last_accessed_at: row.get(5)?,
862 })
863 })
864 .and_then(Iterator::collect)
865 {
866 Ok(rows) => rows,
867 Err(e) => {
868 self.telemetry.increment_errors();
869 return Err(EngineError::Sqlite(e));
870 }
871 };
872
873 self.telemetry.increment_queries();
874 Ok(QueryRows {
875 nodes,
876 runs: Vec::new(),
877 steps: Vec::new(),
878 actions: Vec::new(),
879 was_degraded: false,
880 })
881 }
882
883 pub fn execute_compiled_search(
898 &self,
899 compiled: &CompiledSearch,
900 ) -> Result<SearchRows, EngineError> {
901 let (relaxed_query, was_degraded_at_plan_time) =
908 fathomdb_query::derive_relaxed(&compiled.text_query);
909 let relaxed = relaxed_query.map(|q| CompiledSearch {
910 root_kind: compiled.root_kind.clone(),
911 text_query: q,
912 limit: compiled.limit,
913 fusable_filters: compiled.fusable_filters.clone(),
914 residual_filters: compiled.residual_filters.clone(),
915 attribution_requested: compiled.attribution_requested,
916 });
917 let plan = CompiledSearchPlan {
918 strict: compiled.clone(),
919 relaxed,
920 was_degraded_at_plan_time,
921 };
922 self.execute_compiled_search_plan(&plan)
923 }
924
925 pub fn execute_compiled_search_plan(
944 &self,
945 plan: &CompiledSearchPlan,
946 ) -> Result<SearchRows, EngineError> {
947 let strict = &plan.strict;
948 let limit = strict.limit;
949 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
950
951 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
952 let strict_underfilled = strict_hits.len() < fallback_threshold;
953
954 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
955 let mut fallback_used = false;
956 let mut was_degraded = false;
957 if let Some(relaxed) = plan.relaxed.as_ref()
958 && strict_underfilled
959 {
960 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
961 fallback_used = true;
962 was_degraded = plan.was_degraded_at_plan_time;
963 }
964
965 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
966 if strict.attribution_requested {
970 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
971 self.populate_attribution_for_hits(
972 &mut merged,
973 &strict.text_query,
974 relaxed_text_query,
975 )?;
976 }
977 let strict_hit_count = merged
978 .iter()
979 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
980 .count();
981 let relaxed_hit_count = merged
982 .iter()
983 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
984 .count();
985 let vector_hit_count = 0;
989
990 Ok(SearchRows {
991 hits: merged,
992 strict_hit_count,
993 relaxed_hit_count,
994 vector_hit_count,
995 fallback_used,
996 was_degraded,
997 })
998 }
999
1000 #[allow(clippy::too_many_lines)]
1029 pub fn execute_compiled_vector_search(
1030 &self,
1031 compiled: &CompiledVectorSearch,
1032 ) -> Result<SearchRows, EngineError> {
1033 use std::fmt::Write as _;
1034
1035 if compiled.limit == 0 {
1039 return Ok(SearchRows::default());
1040 }
1041
1042 let filter_by_kind = !compiled.root_kind.is_empty();
1043 let mut binds: Vec<BindValue> = Vec::new();
1044 binds.push(BindValue::Text(compiled.query_text.clone()));
1045 if filter_by_kind {
1046 binds.push(BindValue::Text(compiled.root_kind.clone()));
1047 }
1048
1049 let mut fused_clauses = String::new();
1052 for predicate in &compiled.fusable_filters {
1053 match predicate {
1054 Predicate::KindEq(kind) => {
1055 binds.push(BindValue::Text(kind.clone()));
1056 let idx = binds.len();
1057 let _ = write!(
1058 fused_clauses,
1059 "\n AND src.kind = ?{idx}"
1060 );
1061 }
1062 Predicate::LogicalIdEq(logical_id) => {
1063 binds.push(BindValue::Text(logical_id.clone()));
1064 let idx = binds.len();
1065 let _ = write!(
1066 fused_clauses,
1067 "\n AND src.logical_id = ?{idx}"
1068 );
1069 }
1070 Predicate::SourceRefEq(source_ref) => {
1071 binds.push(BindValue::Text(source_ref.clone()));
1072 let idx = binds.len();
1073 let _ = write!(
1074 fused_clauses,
1075 "\n AND src.source_ref = ?{idx}"
1076 );
1077 }
1078 Predicate::ContentRefEq(uri) => {
1079 binds.push(BindValue::Text(uri.clone()));
1080 let idx = binds.len();
1081 let _ = write!(
1082 fused_clauses,
1083 "\n AND src.content_ref = ?{idx}"
1084 );
1085 }
1086 Predicate::ContentRefNotNull => {
1087 fused_clauses
1088 .push_str("\n AND src.content_ref IS NOT NULL");
1089 }
1090 Predicate::JsonPathFusedEq { path, value } => {
1091 binds.push(BindValue::Text(path.clone()));
1092 let path_idx = binds.len();
1093 binds.push(BindValue::Text(value.clone()));
1094 let value_idx = binds.len();
1095 let _ = write!(
1096 fused_clauses,
1097 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1098 );
1099 }
1100 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1101 binds.push(BindValue::Text(path.clone()));
1102 let path_idx = binds.len();
1103 binds.push(BindValue::Integer(*value));
1104 let value_idx = binds.len();
1105 let operator = match op {
1106 ComparisonOp::Gt => ">",
1107 ComparisonOp::Gte => ">=",
1108 ComparisonOp::Lt => "<",
1109 ComparisonOp::Lte => "<=",
1110 };
1111 let _ = write!(
1112 fused_clauses,
1113 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
1114 );
1115 }
1116 Predicate::JsonPathFusedBoolEq { path, value } => {
1117 binds.push(BindValue::Text(path.clone()));
1118 let path_idx = binds.len();
1119 binds.push(BindValue::Integer(i64::from(*value)));
1120 let value_idx = binds.len();
1121 let _ = write!(
1122 fused_clauses,
1123 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
1124 );
1125 }
1126 Predicate::JsonPathFusedIn { path, values } => {
1127 binds.push(BindValue::Text(path.clone()));
1128 let first_param = binds.len();
1129 for v in values {
1130 binds.push(BindValue::Text(v.clone()));
1131 }
1132 let placeholders = (1..=values.len())
1133 .map(|i| format!("?{}", first_param + i))
1134 .collect::<Vec<_>>()
1135 .join(", ");
1136 let _ = write!(
1137 fused_clauses,
1138 "\n AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
1139 );
1140 }
1141 Predicate::JsonPathEq { .. }
1142 | Predicate::JsonPathCompare { .. }
1143 | Predicate::JsonPathIn { .. }
1144 | Predicate::EdgePropertyEq { .. }
1145 | Predicate::EdgePropertyCompare { .. } => {
1146 }
1150 }
1151 }
1152
1153 let mut filter_clauses = String::new();
1155 for predicate in &compiled.residual_filters {
1156 match predicate {
1157 Predicate::JsonPathEq { path, value } => {
1158 binds.push(BindValue::Text(path.clone()));
1159 let path_idx = binds.len();
1160 binds.push(scalar_to_bind(value));
1161 let value_idx = binds.len();
1162 let _ = write!(
1163 filter_clauses,
1164 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1165 );
1166 }
1167 Predicate::JsonPathCompare { path, op, value } => {
1168 binds.push(BindValue::Text(path.clone()));
1169 let path_idx = binds.len();
1170 binds.push(scalar_to_bind(value));
1171 let value_idx = binds.len();
1172 let operator = match op {
1173 ComparisonOp::Gt => ">",
1174 ComparisonOp::Gte => ">=",
1175 ComparisonOp::Lt => "<",
1176 ComparisonOp::Lte => "<=",
1177 };
1178 let _ = write!(
1179 filter_clauses,
1180 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1181 );
1182 }
1183 Predicate::JsonPathIn { path, values } => {
1184 binds.push(BindValue::Text(path.clone()));
1185 let first_param = binds.len();
1186 for v in values {
1187 binds.push(scalar_to_bind(v));
1188 }
1189 let placeholders = (1..=values.len())
1190 .map(|i| format!("?{}", first_param + i))
1191 .collect::<Vec<_>>()
1192 .join(", ");
1193 let _ = write!(
1194 filter_clauses,
1195 "\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1196 );
1197 }
1198 Predicate::KindEq(_)
1199 | Predicate::LogicalIdEq(_)
1200 | Predicate::SourceRefEq(_)
1201 | Predicate::ContentRefEq(_)
1202 | Predicate::ContentRefNotNull
1203 | Predicate::JsonPathFusedEq { .. }
1204 | Predicate::JsonPathFusedTimestampCmp { .. }
1205 | Predicate::JsonPathFusedBoolEq { .. }
1206 | Predicate::JsonPathFusedIn { .. }
1207 | Predicate::EdgePropertyEq { .. }
1208 | Predicate::EdgePropertyCompare { .. } => {
1209 }
1212 }
1213 }
1214
1215 let limit = compiled.limit;
1218 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1219 let limit_idx = binds.len();
1220
1221 let base_limit = limit;
1227 let kind_clause = if filter_by_kind {
1228 "\n AND src.kind = ?2"
1229 } else {
1230 ""
1231 };
1232
1233 let vec_table = if compiled.root_kind.is_empty() {
1237 "vec__unknown".to_owned()
1238 } else {
1239 fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
1240 };
1241
1242 let sql = format!(
1243 "WITH vector_hits AS (
1244 SELECT
1245 src.row_id AS row_id,
1246 src.logical_id AS logical_id,
1247 src.kind AS kind,
1248 src.properties AS properties,
1249 src.source_ref AS source_ref,
1250 src.content_ref AS content_ref,
1251 src.created_at AS created_at,
1252 vc.distance AS distance,
1253 vc.chunk_id AS chunk_id
1254 FROM (
1255 SELECT chunk_id, distance
1256 FROM {vec_table}
1257 WHERE embedding MATCH ?1
1258 LIMIT {base_limit}
1259 ) vc
1260 JOIN chunks c ON c.id = vc.chunk_id
1261 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1262 WHERE 1 = 1{kind_clause}{fused_clauses}
1263 )
1264 SELECT
1265 h.row_id,
1266 h.logical_id,
1267 h.kind,
1268 h.properties,
1269 h.content_ref,
1270 am.last_accessed_at,
1271 h.created_at,
1272 h.distance,
1273 h.chunk_id
1274 FROM vector_hits h
1275 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1276 WHERE 1 = 1{filter_clauses}
1277 ORDER BY h.distance ASC
1278 LIMIT ?{limit_idx}"
1279 );
1280
1281 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1282
1283 let conn_guard = match self.lock_connection() {
1284 Ok(g) => g,
1285 Err(e) => {
1286 self.telemetry.increment_errors();
1287 return Err(e);
1288 }
1289 };
1290 let mut statement = match conn_guard.prepare_cached(&sql) {
1291 Ok(stmt) => stmt,
1292 Err(e) if is_vec_table_absent(&e) => {
1293 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1295 trace_warn!("vector table absent, degrading vector_search to empty result");
1296 }
1297 return Ok(SearchRows {
1298 hits: Vec::new(),
1299 strict_hit_count: 0,
1300 relaxed_hit_count: 0,
1301 vector_hit_count: 0,
1302 fallback_used: false,
1303 was_degraded: true,
1304 });
1305 }
1306 Err(e) => {
1307 self.telemetry.increment_errors();
1308 return Err(EngineError::Sqlite(e));
1309 }
1310 };
1311
1312 let attribution_requested = compiled.attribution_requested;
1313 let hits = match statement
1314 .query_map(params_from_iter(bind_values.iter()), |row| {
1315 let distance: f64 = row.get(7)?;
1316 let score = -distance;
1323 Ok(SearchHit {
1324 node: fathomdb_query::NodeRowLite {
1325 row_id: row.get(0)?,
1326 logical_id: row.get(1)?,
1327 kind: row.get(2)?,
1328 properties: row.get(3)?,
1329 content_ref: row.get(4)?,
1330 last_accessed_at: row.get(5)?,
1331 },
1332 written_at: row.get(6)?,
1333 score,
1334 modality: RetrievalModality::Vector,
1335 source: SearchHitSource::Vector,
1336 match_mode: None,
1338 snippet: None,
1340 projection_row_id: row.get::<_, Option<String>>(8)?,
1341 vector_distance: Some(distance),
1342 attribution: if attribution_requested {
1343 Some(HitAttribution {
1344 matched_paths: Vec::new(),
1345 })
1346 } else {
1347 None
1348 },
1349 })
1350 })
1351 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1352 {
1353 Ok(rows) => rows,
1354 Err(e) => {
1355 if is_vec_table_absent(&e) {
1359 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1360 trace_warn!(
1361 "vector table absent at query time, degrading vector_search to empty result"
1362 );
1363 }
1364 drop(statement);
1365 drop(conn_guard);
1366 return Ok(SearchRows {
1367 hits: Vec::new(),
1368 strict_hit_count: 0,
1369 relaxed_hit_count: 0,
1370 vector_hit_count: 0,
1371 fallback_used: false,
1372 was_degraded: true,
1373 });
1374 }
1375 self.telemetry.increment_errors();
1376 return Err(EngineError::Sqlite(e));
1377 }
1378 };
1379
1380 drop(statement);
1381 drop(conn_guard);
1382
1383 self.telemetry.increment_queries();
1384 let vector_hit_count = hits.len();
1385 Ok(SearchRows {
1386 hits,
1387 strict_hit_count: 0,
1388 relaxed_hit_count: 0,
1389 vector_hit_count,
1390 fallback_used: false,
1391 was_degraded: false,
1392 })
1393 }
1394
1395 pub fn execute_retrieval_plan(
1427 &self,
1428 plan: &CompiledRetrievalPlan,
1429 raw_query: &str,
1430 ) -> Result<SearchRows, EngineError> {
1431 let mut plan = plan.clone();
1437 let limit = plan.text.strict.limit;
1438
1439 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1441
1442 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1445 let strict_underfilled = strict_hits.len() < fallback_threshold;
1446 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1447 let mut fallback_used = false;
1448 let mut was_degraded = false;
1449 if let Some(relaxed) = plan.text.relaxed.as_ref()
1450 && strict_underfilled
1451 {
1452 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1453 fallback_used = true;
1454 was_degraded = plan.was_degraded_at_plan_time;
1455 }
1456
1457 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1464 if text_branches_empty && self.query_embedder.is_some() {
1465 self.fill_vector_branch(&mut plan, raw_query);
1466 }
1467
1468 let mut vector_hits: Vec<SearchHit> = Vec::new();
1473 if let Some(vector) = plan.vector.as_ref()
1474 && strict_hits.is_empty()
1475 && relaxed_hits.is_empty()
1476 {
1477 let vector_rows = self.execute_compiled_vector_search(vector)?;
1478 vector_hits = vector_rows.hits;
1483 if vector_rows.was_degraded {
1484 was_degraded = true;
1485 }
1486 }
1487 if text_branches_empty
1494 && plan.was_degraded_at_plan_time
1495 && plan.vector.is_none()
1496 && self.query_embedder.is_some()
1497 {
1498 was_degraded = true;
1499 }
1500
1501 let strict = &plan.text.strict;
1503 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1504 if strict.attribution_requested {
1505 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1506 self.populate_attribution_for_hits(
1507 &mut merged,
1508 &strict.text_query,
1509 relaxed_text_query,
1510 )?;
1511 }
1512
1513 let strict_hit_count = merged
1514 .iter()
1515 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1516 .count();
1517 let relaxed_hit_count = merged
1518 .iter()
1519 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1520 .count();
1521 let vector_hit_count = merged
1522 .iter()
1523 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1524 .count();
1525
1526 Ok(SearchRows {
1527 hits: merged,
1528 strict_hit_count,
1529 relaxed_hit_count,
1530 vector_hit_count,
1531 fallback_used,
1532 was_degraded,
1533 })
1534 }
1535
1536 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1552 let Some(embedder) = self.query_embedder.as_ref() else {
1553 return;
1554 };
1555 match embedder.embed_query(raw_query) {
1556 Ok(vec) => {
1557 let literal = match serde_json::to_string(&vec) {
1563 Ok(s) => s,
1564 Err(err) => {
1565 trace_warn!(
1566 error = %err,
1567 "query embedder vector serialization failed; skipping vector branch"
1568 );
1569 let _ = err; plan.was_degraded_at_plan_time = true;
1571 return;
1572 }
1573 };
1574 let strict = &plan.text.strict;
1575 plan.vector = Some(CompiledVectorSearch {
1576 root_kind: strict.root_kind.clone(),
1577 query_text: literal,
1578 limit: strict.limit,
1579 fusable_filters: strict.fusable_filters.clone(),
1580 residual_filters: strict.residual_filters.clone(),
1581 attribution_requested: strict.attribution_requested,
1582 });
1583 }
1584 Err(err) => {
1585 trace_warn!(
1586 error = %err,
1587 "query embedder unavailable, skipping vector branch"
1588 );
1589 let _ = err; plan.was_degraded_at_plan_time = true;
1591 }
1592 }
1593 }
1594
1595 #[allow(clippy::too_many_lines)]
1604 fn run_search_branch(
1605 &self,
1606 compiled: &CompiledSearch,
1607 branch: SearchBranch,
1608 ) -> Result<Vec<SearchHit>, EngineError> {
1609 use std::fmt::Write as _;
1610 if matches!(
1622 compiled.text_query,
1623 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1624 ) {
1625 return Ok(Vec::new());
1626 }
1627 let rendered_base = render_text_query_fts5(&compiled.text_query);
1628 let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1641 if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1642 && rendered_base
1643 .chars()
1644 .filter(|c| c.is_alphanumeric())
1645 .count()
1646 < 3
1647 {
1648 return Ok(Vec::new());
1649 }
1650 let rendered = rendered_base;
1651 let filter_by_kind = !compiled.root_kind.is_empty();
1657
1658 let conn_guard = match self.lock_connection() {
1662 Ok(g) => g,
1663 Err(e) => {
1664 self.telemetry.increment_errors();
1665 return Err(e);
1666 }
1667 };
1668
1669 let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1687 let kind = compiled.root_kind.clone();
1688 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1689 let exists: bool = conn_guard
1690 .query_row(
1691 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1692 rusqlite::params![prop_table],
1693 |_| Ok(true),
1694 )
1695 .optional()
1696 .map_err(EngineError::Sqlite)?
1697 .unwrap_or(false);
1698 if exists {
1699 vec![(kind, prop_table)]
1700 } else {
1701 vec![]
1702 }
1703 } else {
1704 let kind_eq_values: Vec<String> = compiled
1709 .fusable_filters
1710 .iter()
1711 .filter_map(|p| match p {
1712 Predicate::KindEq(k) => Some(k.clone()),
1713 _ => None,
1714 })
1715 .collect();
1716 if kind_eq_values.len() == 1 {
1717 let kind = kind_eq_values[0].clone();
1718 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1719 let exists: bool = conn_guard
1720 .query_row(
1721 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1722 rusqlite::params![prop_table],
1723 |_| Ok(true),
1724 )
1725 .optional()
1726 .map_err(EngineError::Sqlite)?
1727 .unwrap_or(false);
1728 if exists {
1729 vec![(kind, prop_table)]
1730 } else {
1731 vec![]
1732 }
1733 } else {
1734 let mut stmt = conn_guard
1738 .prepare("SELECT kind FROM fts_property_schemas")
1739 .map_err(EngineError::Sqlite)?;
1740 let all_kinds: Vec<String> = stmt
1741 .query_map([], |r| r.get::<_, String>(0))
1742 .map_err(EngineError::Sqlite)?
1743 .collect::<Result<Vec<_>, _>>()
1744 .map_err(EngineError::Sqlite)?;
1745 drop(stmt);
1746 let mut result = Vec::new();
1747 for kind in all_kinds {
1748 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1749 let exists: bool = conn_guard
1750 .query_row(
1751 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1752 rusqlite::params![prop_table],
1753 |_| Ok(true),
1754 )
1755 .optional()
1756 .map_err(EngineError::Sqlite)?
1757 .unwrap_or(false);
1758 if exists {
1759 result.push((kind, prop_table));
1760 }
1761 }
1762 result
1763 }
1764 };
1765 let use_prop_fts = !prop_fts_tables.is_empty();
1766
1767 let mut binds: Vec<BindValue> = if filter_by_kind {
1773 if use_prop_fts {
1774 vec![
1775 BindValue::Text(rendered.clone()),
1776 BindValue::Text(compiled.root_kind.clone()),
1777 BindValue::Text(rendered),
1778 ]
1779 } else {
1780 vec![
1781 BindValue::Text(rendered.clone()),
1782 BindValue::Text(compiled.root_kind.clone()),
1783 ]
1784 }
1785 } else if use_prop_fts {
1786 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1788 } else {
1789 vec![BindValue::Text(rendered)]
1790 };
1791
1792 let mut fused_clauses = String::new();
1801 for predicate in &compiled.fusable_filters {
1802 match predicate {
1803 Predicate::KindEq(kind) => {
1804 binds.push(BindValue::Text(kind.clone()));
1805 let idx = binds.len();
1806 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1807 }
1808 Predicate::LogicalIdEq(logical_id) => {
1809 binds.push(BindValue::Text(logical_id.clone()));
1810 let idx = binds.len();
1811 let _ = write!(
1812 fused_clauses,
1813 "\n AND u.logical_id = ?{idx}"
1814 );
1815 }
1816 Predicate::SourceRefEq(source_ref) => {
1817 binds.push(BindValue::Text(source_ref.clone()));
1818 let idx = binds.len();
1819 let _ = write!(
1820 fused_clauses,
1821 "\n AND u.source_ref = ?{idx}"
1822 );
1823 }
1824 Predicate::ContentRefEq(uri) => {
1825 binds.push(BindValue::Text(uri.clone()));
1826 let idx = binds.len();
1827 let _ = write!(
1828 fused_clauses,
1829 "\n AND u.content_ref = ?{idx}"
1830 );
1831 }
1832 Predicate::ContentRefNotNull => {
1833 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1834 }
1835 Predicate::JsonPathFusedEq { path, value } => {
1836 binds.push(BindValue::Text(path.clone()));
1837 let path_idx = binds.len();
1838 binds.push(BindValue::Text(value.clone()));
1839 let value_idx = binds.len();
1840 let _ = write!(
1841 fused_clauses,
1842 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1843 );
1844 }
1845 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1846 binds.push(BindValue::Text(path.clone()));
1847 let path_idx = binds.len();
1848 binds.push(BindValue::Integer(*value));
1849 let value_idx = binds.len();
1850 let operator = match op {
1851 ComparisonOp::Gt => ">",
1852 ComparisonOp::Gte => ">=",
1853 ComparisonOp::Lt => "<",
1854 ComparisonOp::Lte => "<=",
1855 };
1856 let _ = write!(
1857 fused_clauses,
1858 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1859 );
1860 }
1861 Predicate::JsonPathFusedBoolEq { path, value } => {
1862 binds.push(BindValue::Text(path.clone()));
1863 let path_idx = binds.len();
1864 binds.push(BindValue::Integer(i64::from(*value)));
1865 let value_idx = binds.len();
1866 let _ = write!(
1867 fused_clauses,
1868 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1869 );
1870 }
1871 Predicate::JsonPathFusedIn { path, values } => {
1872 binds.push(BindValue::Text(path.clone()));
1873 let first_param = binds.len();
1874 for v in values {
1875 binds.push(BindValue::Text(v.clone()));
1876 }
1877 let placeholders = (1..=values.len())
1878 .map(|i| format!("?{}", first_param + i))
1879 .collect::<Vec<_>>()
1880 .join(", ");
1881 let _ = write!(
1882 fused_clauses,
1883 "\n AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
1884 );
1885 }
1886 Predicate::JsonPathEq { .. }
1887 | Predicate::JsonPathCompare { .. }
1888 | Predicate::JsonPathIn { .. }
1889 | Predicate::EdgePropertyEq { .. }
1890 | Predicate::EdgePropertyCompare { .. } => {
1891 }
1895 }
1896 }
1897
1898 let mut filter_clauses = String::new();
1899 for predicate in &compiled.residual_filters {
1900 match predicate {
1901 Predicate::JsonPathEq { path, value } => {
1902 binds.push(BindValue::Text(path.clone()));
1903 let path_idx = binds.len();
1904 binds.push(scalar_to_bind(value));
1905 let value_idx = binds.len();
1906 let _ = write!(
1907 filter_clauses,
1908 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1909 );
1910 }
1911 Predicate::JsonPathCompare { path, op, value } => {
1912 binds.push(BindValue::Text(path.clone()));
1913 let path_idx = binds.len();
1914 binds.push(scalar_to_bind(value));
1915 let value_idx = binds.len();
1916 let operator = match op {
1917 ComparisonOp::Gt => ">",
1918 ComparisonOp::Gte => ">=",
1919 ComparisonOp::Lt => "<",
1920 ComparisonOp::Lte => "<=",
1921 };
1922 let _ = write!(
1923 filter_clauses,
1924 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1925 );
1926 }
1927 Predicate::JsonPathIn { path, values } => {
1928 binds.push(BindValue::Text(path.clone()));
1929 let first_param = binds.len();
1930 for v in values {
1931 binds.push(scalar_to_bind(v));
1932 }
1933 let placeholders = (1..=values.len())
1934 .map(|i| format!("?{}", first_param + i))
1935 .collect::<Vec<_>>()
1936 .join(", ");
1937 let _ = write!(
1938 filter_clauses,
1939 "\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
1940 );
1941 }
1942 Predicate::KindEq(_)
1943 | Predicate::LogicalIdEq(_)
1944 | Predicate::SourceRefEq(_)
1945 | Predicate::ContentRefEq(_)
1946 | Predicate::ContentRefNotNull
1947 | Predicate::JsonPathFusedEq { .. }
1948 | Predicate::JsonPathFusedTimestampCmp { .. }
1949 | Predicate::JsonPathFusedBoolEq { .. }
1950 | Predicate::JsonPathFusedIn { .. }
1951 | Predicate::EdgePropertyEq { .. }
1952 | Predicate::EdgePropertyCompare { .. } => {
1953 }
1957 }
1958 }
1959
1960 let limit = compiled.limit;
1967 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1968 let limit_idx = binds.len();
1969 let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1985 let prop_arm_sql: String = if use_prop_fts {
1986 prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1987 let bm25_expr = conn_guard
1989 .query_row(
1990 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1991 rusqlite::params![kind],
1992 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1993 )
1994 .ok()
1995 .map_or_else(
1996 || format!("bm25({prop_table})"),
1997 |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1998 );
1999 let is_weighted = bm25_expr != format!("bm25({prop_table})");
2002 let snippet_expr = if is_weighted {
2003 "'' AS snippet".to_owned()
2004 } else {
2005 "substr(fp.text_content, 1, 200) AS snippet".to_owned()
2006 };
2007 let _ = write!(
2008 acc,
2009 "
2010 UNION ALL
2011 SELECT
2012 src.row_id AS row_id,
2013 fp.node_logical_id AS logical_id,
2014 src.kind AS kind,
2015 src.properties AS properties,
2016 src.source_ref AS source_ref,
2017 src.content_ref AS content_ref,
2018 src.created_at AS created_at,
2019 -{bm25_expr} AS score,
2020 'property' AS source,
2021 {snippet_expr},
2022 CAST(fp.rowid AS TEXT) AS projection_row_id
2023 FROM {prop_table} fp
2024 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
2025 WHERE {prop_table} MATCH ?{prop_bind_idx}"
2026 );
2027 acc
2028 })
2029 } else {
2030 String::new()
2031 };
2032 let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
2033 ("?1", "\n AND src.kind = ?2")
2034 } else {
2035 ("?1", "")
2036 };
2037 let sql = format!(
2038 "WITH search_hits AS (
2039 SELECT
2040 u.row_id AS row_id,
2041 u.logical_id AS logical_id,
2042 u.kind AS kind,
2043 u.properties AS properties,
2044 u.source_ref AS source_ref,
2045 u.content_ref AS content_ref,
2046 u.created_at AS created_at,
2047 u.score AS score,
2048 u.source AS source,
2049 u.snippet AS snippet,
2050 u.projection_row_id AS projection_row_id
2051 FROM (
2052 SELECT
2053 src.row_id AS row_id,
2054 c.node_logical_id AS logical_id,
2055 src.kind AS kind,
2056 src.properties AS properties,
2057 src.source_ref AS source_ref,
2058 src.content_ref AS content_ref,
2059 src.created_at AS created_at,
2060 -bm25(fts_nodes) AS score,
2061 'chunk' AS source,
2062 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
2063 f.chunk_id AS projection_row_id
2064 FROM fts_nodes f
2065 JOIN chunks c ON c.id = f.chunk_id
2066 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
2067 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
2068 ) u
2069 WHERE 1 = 1{fused_clauses}
2070 ORDER BY u.score DESC
2071 LIMIT ?{limit_idx}
2072 )
2073 SELECT
2074 h.row_id,
2075 h.logical_id,
2076 h.kind,
2077 h.properties,
2078 h.content_ref,
2079 am.last_accessed_at,
2080 h.created_at,
2081 h.score,
2082 h.source,
2083 h.snippet,
2084 h.projection_row_id
2085 FROM search_hits h
2086 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
2087 WHERE 1 = 1{filter_clauses}
2088 ORDER BY h.score DESC"
2089 );
2090
2091 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
2092
2093 let mut statement = match conn_guard.prepare_cached(&sql) {
2094 Ok(stmt) => stmt,
2095 Err(e) => {
2096 self.telemetry.increment_errors();
2097 return Err(EngineError::Sqlite(e));
2098 }
2099 };
2100
2101 let hits = match statement
2102 .query_map(params_from_iter(bind_values.iter()), |row| {
2103 let source_str: String = row.get(8)?;
2104 let source = if source_str == "property" {
2109 SearchHitSource::Property
2110 } else {
2111 SearchHitSource::Chunk
2112 };
2113 let match_mode = match branch {
2114 SearchBranch::Strict => SearchMatchMode::Strict,
2115 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
2116 };
2117 Ok(SearchHit {
2118 node: fathomdb_query::NodeRowLite {
2119 row_id: row.get(0)?,
2120 logical_id: row.get(1)?,
2121 kind: row.get(2)?,
2122 properties: row.get(3)?,
2123 content_ref: row.get(4)?,
2124 last_accessed_at: row.get(5)?,
2125 },
2126 written_at: row.get(6)?,
2127 score: row.get(7)?,
2128 modality: RetrievalModality::Text,
2130 source,
2131 match_mode: Some(match_mode),
2132 snippet: row.get(9)?,
2133 projection_row_id: row.get(10)?,
2134 vector_distance: None,
2135 attribution: None,
2136 })
2137 })
2138 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
2139 {
2140 Ok(rows) => rows,
2141 Err(e) => {
2142 self.telemetry.increment_errors();
2143 return Err(EngineError::Sqlite(e));
2144 }
2145 };
2146
2147 drop(statement);
2151 drop(conn_guard);
2152
2153 self.telemetry.increment_queries();
2154 Ok(hits)
2155 }
2156
2157 fn populate_attribution_for_hits(
2161 &self,
2162 hits: &mut [SearchHit],
2163 strict_text_query: &fathomdb_query::TextQuery,
2164 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
2165 ) -> Result<(), EngineError> {
2166 let conn_guard = match self.lock_connection() {
2167 Ok(g) => g,
2168 Err(e) => {
2169 self.telemetry.increment_errors();
2170 return Err(e);
2171 }
2172 };
2173 let strict_expr = render_text_query_fts5(strict_text_query);
2174 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
2175 for hit in hits.iter_mut() {
2176 let match_expr = match hit.match_mode {
2181 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
2182 Some(SearchMatchMode::Relaxed) => {
2183 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
2184 }
2185 None => continue,
2186 };
2187 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
2188 Ok(att) => hit.attribution = Some(att),
2189 Err(e) => {
2190 self.telemetry.increment_errors();
2191 return Err(e);
2192 }
2193 }
2194 }
2195 Ok(())
2196 }
2197
2198 pub fn execute_compiled_grouped_read(
2202 &self,
2203 compiled: &CompiledGroupedQuery,
2204 ) -> Result<GroupedQueryRows, EngineError> {
2205 let root_rows = self.execute_compiled_read(&compiled.root)?;
2206 if root_rows.was_degraded {
2207 return Ok(GroupedQueryRows {
2208 roots: Vec::new(),
2209 expansions: Vec::new(),
2210 edge_expansions: Vec::new(),
2211 was_degraded: true,
2212 });
2213 }
2214
2215 let roots = root_rows.nodes;
2216 let mut expansions = Vec::with_capacity(compiled.expansions.len());
2217 for expansion in &compiled.expansions {
2218 let slot_rows = if roots.is_empty() {
2219 Vec::new()
2220 } else {
2221 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
2222 };
2223 expansions.push(ExpansionSlotRows {
2224 slot: expansion.slot.clone(),
2225 roots: slot_rows,
2226 });
2227 }
2228
2229 let mut edge_expansions = Vec::with_capacity(compiled.edge_expansions.len());
2230 for edge_expansion in &compiled.edge_expansions {
2231 let slot_rows = if roots.is_empty() {
2232 Vec::new()
2233 } else {
2234 self.read_edge_expansion_chunked(&roots, edge_expansion, compiled.hints.hard_limit)?
2235 };
2236 edge_expansions.push(EdgeExpansionSlotRows {
2237 slot: edge_expansion.slot.clone(),
2238 roots: slot_rows,
2239 });
2240 }
2241
2242 Ok(GroupedQueryRows {
2243 roots,
2244 expansions,
2245 edge_expansions,
2246 was_degraded: false,
2247 })
2248 }
2249
2250 fn read_expansion_nodes_chunked(
2256 &self,
2257 roots: &[NodeRow],
2258 expansion: &ExpansionSlot,
2259 hard_limit: usize,
2260 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2261 if roots.len() <= BATCH_CHUNK_SIZE {
2262 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
2263 }
2264
2265 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2268 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2269 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
2270 per_root
2271 .entry(group.root_logical_id)
2272 .or_default()
2273 .extend(group.nodes);
2274 }
2275 }
2276
2277 Ok(roots
2278 .iter()
2279 .map(|root| ExpansionRootRows {
2280 root_logical_id: root.logical_id.clone(),
2281 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2282 })
2283 .collect())
2284 }
2285
2286 #[allow(clippy::too_many_lines)]
2291 fn read_expansion_nodes_batched(
2292 &self,
2293 roots: &[NodeRow],
2294 expansion: &ExpansionSlot,
2295 hard_limit: usize,
2296 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
2297 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2298 let (join_condition, next_logical_id) = match expansion.direction {
2299 fathomdb_query::TraverseDirection::Out => {
2300 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2301 }
2302 fathomdb_query::TraverseDirection::In => {
2303 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2304 }
2305 };
2306
2307 if expansion.filter.as_ref().is_some_and(|f| {
2312 matches!(
2313 f,
2314 Predicate::JsonPathFusedEq { .. }
2315 | Predicate::JsonPathFusedTimestampCmp { .. }
2316 | Predicate::JsonPathFusedIn { .. }
2317 )
2318 }) {
2319 self.validate_fused_filter_for_edge_label(&expansion.label)?;
2320 }
2321
2322 let root_seed_union: String = (1..=root_ids.len())
2326 .map(|i| format!("SELECT ?{i}"))
2327 .collect::<Vec<_>>()
2328 .join(" UNION ALL ");
2329
2330 let edge_kind_param = root_ids.len() + 1;
2334 let edge_filter_param_start = root_ids.len() + 2;
2335
2336 let (edge_filter_sql, edge_filter_binds) =
2339 compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2340
2341 let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2342
2343 let (filter_sql, filter_binds) =
2347 compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2348
2349 let sql = format!(
2353 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2354 traversed(root_id, logical_id, depth, visited, emitted) AS (
2355 SELECT rid, rid, 0, printf(',%s,', rid), 0
2356 FROM root_ids
2357 UNION ALL
2358 SELECT
2359 t.root_id,
2360 {next_logical_id},
2361 t.depth + 1,
2362 t.visited || {next_logical_id} || ',',
2363 t.emitted + 1
2364 FROM traversed t
2365 JOIN edges e ON {join_condition}
2366 AND e.kind = ?{edge_kind_param}
2367 AND e.superseded_at IS NULL{edge_filter_sql}
2368 WHERE t.depth < {max_depth}
2369 AND t.emitted < {hard_limit}
2370 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2371 ),
2372 numbered AS (
2373 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2374 , n.content_ref, am.last_accessed_at
2375 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2376 FROM traversed t
2377 JOIN nodes n ON n.logical_id = t.logical_id
2378 AND n.superseded_at IS NULL
2379 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2380 WHERE t.depth > 0{filter_sql}
2381 )
2382 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2383 FROM numbered
2384 WHERE rn <= {hard_limit}
2385 ORDER BY root_id, logical_id",
2386 max_depth = expansion.max_depth,
2387 );
2388
2389 let conn_guard = self.lock_connection()?;
2390 let mut statement = conn_guard
2391 .prepare_cached(&sql)
2392 .map_err(EngineError::Sqlite)?;
2393
2394 let mut bind_values: Vec<Value> = root_ids
2397 .iter()
2398 .map(|id| Value::Text((*id).to_owned()))
2399 .collect();
2400 bind_values.push(Value::Text(expansion.label.clone()));
2401 bind_values.extend(edge_filter_binds);
2402 bind_values.extend(filter_binds);
2403
2404 let rows = statement
2405 .query_map(params_from_iter(bind_values.iter()), |row| {
2406 Ok((
2407 row.get::<_, String>(0)?, NodeRow {
2409 row_id: row.get(1)?,
2410 logical_id: row.get(2)?,
2411 kind: row.get(3)?,
2412 properties: row.get(4)?,
2413 content_ref: row.get(5)?,
2414 last_accessed_at: row.get(6)?,
2415 },
2416 ))
2417 })
2418 .map_err(EngineError::Sqlite)?
2419 .collect::<Result<Vec<_>, _>>()
2420 .map_err(EngineError::Sqlite)?;
2421
2422 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2424 for (root_id, node) in rows {
2425 per_root.entry(root_id).or_default().push(node);
2426 }
2427
2428 let root_groups = roots
2429 .iter()
2430 .map(|root| ExpansionRootRows {
2431 root_logical_id: root.logical_id.clone(),
2432 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2433 })
2434 .collect();
2435
2436 Ok(root_groups)
2437 }
2438
2439 fn read_edge_expansion_chunked(
2445 &self,
2446 roots: &[NodeRow],
2447 expansion: &EdgeExpansionSlot,
2448 hard_limit: usize,
2449 ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2450 if roots.len() <= BATCH_CHUNK_SIZE {
2451 return self.read_edge_expansion_batched(roots, expansion, hard_limit);
2452 }
2453
2454 let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2455 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
2456 for group in self.read_edge_expansion_batched(chunk, expansion, hard_limit)? {
2457 per_root
2458 .entry(group.root_logical_id)
2459 .or_default()
2460 .extend(group.pairs);
2461 }
2462 }
2463
2464 Ok(roots
2465 .iter()
2466 .map(|root| EdgeExpansionRootRows {
2467 root_logical_id: root.logical_id.clone(),
2468 pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2469 })
2470 .collect())
2471 }
2472
2473 #[allow(clippy::too_many_lines)]
2482 fn read_edge_expansion_batched(
2483 &self,
2484 roots: &[NodeRow],
2485 expansion: &EdgeExpansionSlot,
2486 hard_limit: usize,
2487 ) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
2488 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
2489 let (join_condition, next_logical_id) = match expansion.direction {
2490 fathomdb_query::TraverseDirection::Out => {
2491 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
2492 }
2493 fathomdb_query::TraverseDirection::In => {
2494 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
2495 }
2496 };
2497
2498 if expansion.endpoint_filter.as_ref().is_some_and(|f| {
2501 matches!(
2502 f,
2503 Predicate::JsonPathFusedEq { .. }
2504 | Predicate::JsonPathFusedTimestampCmp { .. }
2505 | Predicate::JsonPathFusedIn { .. }
2506 )
2507 }) {
2508 self.validate_fused_filter_for_edge_label(&expansion.label)?;
2509 }
2510
2511 let root_seed_union: String = (1..=root_ids.len())
2512 .map(|i| format!("SELECT ?{i}"))
2513 .collect::<Vec<_>>()
2514 .join(" UNION ALL ");
2515
2516 let edge_kind_param = root_ids.len() + 1;
2517 let edge_filter_param_start = root_ids.len() + 2;
2518
2519 let (edge_filter_sql, edge_filter_binds) =
2520 compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
2521
2522 let endpoint_filter_param_start = edge_filter_param_start + edge_filter_binds.len();
2523 let (endpoint_filter_sql, endpoint_filter_binds) = compile_expansion_filter(
2524 expansion.endpoint_filter.as_ref(),
2525 endpoint_filter_param_start,
2526 );
2527
2528 let sql = format!(
2533 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2534 traversed(root_id, logical_id, depth, visited, emitted, edge_row_id) AS (
2535 SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_row_id
2536 FROM root_ids
2537 UNION ALL
2538 SELECT
2539 t.root_id,
2540 {next_logical_id},
2541 t.depth + 1,
2542 t.visited || {next_logical_id} || ',',
2543 t.emitted + 1,
2544 e.row_id AS edge_row_id
2545 FROM traversed t
2546 JOIN edges e ON {join_condition}
2547 AND e.kind = ?{edge_kind_param}
2548 AND e.superseded_at IS NULL{edge_filter_sql}
2549 WHERE t.depth < {max_depth}
2550 AND t.emitted < {hard_limit}
2551 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2552 ),
2553 numbered AS (
2554 SELECT t.root_id,
2555 e.row_id AS e_row_id,
2556 e.logical_id AS e_logical_id,
2557 e.source_logical_id AS e_source,
2558 e.target_logical_id AS e_target,
2559 e.kind AS e_kind,
2560 e.properties AS e_properties,
2561 e.source_ref AS e_source_ref,
2562 e.confidence AS e_confidence,
2563 n.row_id AS n_row_id,
2564 n.logical_id AS n_logical_id,
2565 n.kind AS n_kind,
2566 n.properties AS n_properties,
2567 n.content_ref AS n_content_ref,
2568 am.last_accessed_at AS n_last_accessed_at,
2569 ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id, e.row_id) AS rn
2570 FROM traversed t
2571 JOIN edges e ON e.row_id = t.edge_row_id
2572 JOIN nodes n ON n.logical_id = t.logical_id
2573 AND n.superseded_at IS NULL
2574 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2575 WHERE t.depth > 0{endpoint_filter_sql}
2576 )
2577 SELECT root_id,
2578 e_row_id, e_logical_id, e_source, e_target, e_kind, e_properties,
2579 e_source_ref, e_confidence,
2580 n_row_id, n_logical_id, n_kind, n_properties, n_content_ref,
2581 n_last_accessed_at
2582 FROM numbered
2583 WHERE rn <= {hard_limit}
2584 ORDER BY root_id, n_logical_id, e_row_id",
2585 max_depth = expansion.max_depth,
2586 );
2587
2588 let conn_guard = self.lock_connection()?;
2589 let mut statement = conn_guard
2590 .prepare_cached(&sql)
2591 .map_err(EngineError::Sqlite)?;
2592
2593 let mut bind_values: Vec<Value> = root_ids
2594 .iter()
2595 .map(|id| Value::Text((*id).to_owned()))
2596 .collect();
2597 bind_values.push(Value::Text(expansion.label.clone()));
2598 bind_values.extend(edge_filter_binds);
2599 bind_values.extend(endpoint_filter_binds);
2600
2601 let rows = statement
2602 .query_map(params_from_iter(bind_values.iter()), |row| {
2603 let root_id: String = row.get(0)?;
2604 let edge_row = EdgeRow {
2605 row_id: row.get(1)?,
2606 logical_id: row.get(2)?,
2607 source_logical_id: row.get(3)?,
2608 target_logical_id: row.get(4)?,
2609 kind: row.get(5)?,
2610 properties: row.get(6)?,
2611 source_ref: row.get(7)?,
2612 confidence: row.get(8)?,
2613 };
2614 let node_row = NodeRow {
2615 row_id: row.get(9)?,
2616 logical_id: row.get(10)?,
2617 kind: row.get(11)?,
2618 properties: row.get(12)?,
2619 content_ref: row.get(13)?,
2620 last_accessed_at: row.get(14)?,
2621 };
2622 Ok((root_id, edge_row, node_row))
2623 })
2624 .map_err(EngineError::Sqlite)?
2625 .collect::<Result<Vec<_>, _>>()
2626 .map_err(EngineError::Sqlite)?;
2627
2628 let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
2629 for (root_id, edge, node) in rows {
2630 per_root.entry(root_id).or_default().push((edge, node));
2631 }
2632
2633 let root_groups = roots
2634 .iter()
2635 .map(|root| EdgeExpansionRootRows {
2636 root_logical_id: root.logical_id.clone(),
2637 pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
2638 })
2639 .collect();
2640
2641 Ok(root_groups)
2642 }
2643
2644 fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2658 let conn = self.lock_connection()?;
2659 let mut stmt = conn
2661 .prepare_cached(
2662 "SELECT DISTINCT n.kind \
2663 FROM edges e \
2664 JOIN nodes n ON n.logical_id = e.target_logical_id \
2665 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2666 )
2667 .map_err(EngineError::Sqlite)?;
2668 let target_kinds: Vec<String> = stmt
2669 .query_map(rusqlite::params![edge_label], |row| row.get(0))
2670 .map_err(EngineError::Sqlite)?
2671 .collect::<Result<Vec<_>, _>>()
2672 .map_err(EngineError::Sqlite)?;
2673
2674 for kind in &target_kinds {
2675 let has_schema: bool = conn
2676 .query_row(
2677 "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2678 rusqlite::params![kind],
2679 |row| row.get(0),
2680 )
2681 .map_err(EngineError::Sqlite)?;
2682 if !has_schema {
2683 return Err(EngineError::InvalidConfig(format!(
2684 "kind {kind:?} has no registered property-FTS schema; register one with \
2685 admin.register_fts_property_schema(..) before using fused filters on \
2686 expansion slots, or use JsonPathEq for non-fused semantics \
2687 (expand slot uses edge label {edge_label:?})"
2688 )));
2689 }
2690 }
2691 Ok(())
2692 }
2693
2694 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2700 let conn = self.lock_connection()?;
2701 conn.query_row(
2702 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2703 rusqlite::params![id],
2704 |row| {
2705 Ok(RunRow {
2706 id: row.get(0)?,
2707 kind: row.get(1)?,
2708 status: row.get(2)?,
2709 properties: row.get(3)?,
2710 })
2711 },
2712 )
2713 .optional()
2714 .map_err(EngineError::Sqlite)
2715 }
2716
2717 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2723 let conn = self.lock_connection()?;
2724 conn.query_row(
2725 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2726 rusqlite::params![id],
2727 |row| {
2728 Ok(StepRow {
2729 id: row.get(0)?,
2730 run_id: row.get(1)?,
2731 kind: row.get(2)?,
2732 status: row.get(3)?,
2733 properties: row.get(4)?,
2734 })
2735 },
2736 )
2737 .optional()
2738 .map_err(EngineError::Sqlite)
2739 }
2740
2741 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2747 let conn = self.lock_connection()?;
2748 conn.query_row(
2749 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2750 rusqlite::params![id],
2751 |row| {
2752 Ok(ActionRow {
2753 id: row.get(0)?,
2754 step_id: row.get(1)?,
2755 kind: row.get(2)?,
2756 status: row.get(3)?,
2757 properties: row.get(4)?,
2758 })
2759 },
2760 )
2761 .optional()
2762 .map_err(EngineError::Sqlite)
2763 }
2764
2765 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2771 let conn = self.lock_connection()?;
2772 let mut stmt = conn
2773 .prepare_cached(
2774 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2775 )
2776 .map_err(EngineError::Sqlite)?;
2777 let rows = stmt
2778 .query_map([], |row| {
2779 Ok(RunRow {
2780 id: row.get(0)?,
2781 kind: row.get(1)?,
2782 status: row.get(2)?,
2783 properties: row.get(3)?,
2784 })
2785 })
2786 .map_err(EngineError::Sqlite)?
2787 .collect::<Result<Vec<_>, _>>()
2788 .map_err(EngineError::Sqlite)?;
2789 Ok(rows)
2790 }
2791
2792 #[must_use]
2802 #[allow(clippy::expect_used)]
2803 pub fn shape_sql_count(&self) -> usize {
2804 self.shape_sql_map
2805 .lock()
2806 .unwrap_or_else(PoisonError::into_inner)
2807 .len()
2808 }
2809
2810 #[must_use]
2812 pub fn schema_manager(&self) -> Arc<SchemaManager> {
2813 Arc::clone(&self.schema_manager)
2814 }
2815
2816 #[must_use]
2825 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2826 let cache_hit = self
2827 .shape_sql_map
2828 .lock()
2829 .unwrap_or_else(PoisonError::into_inner)
2830 .contains_key(&compiled.shape_hash);
2831 QueryPlan {
2832 sql: wrap_node_row_projection_sql(&compiled.sql),
2833 bind_count: compiled.binds.len(),
2834 driving_table: compiled.driving_table,
2835 shape_hash: compiled.shape_hash,
2836 cache_hit,
2837 }
2838 }
2839
2840 #[doc(hidden)]
2847 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2848 let conn = self.lock_connection()?;
2849 let result = conn
2850 .query_row(&format!("PRAGMA {name}"), [], |row| {
2851 row.get::<_, rusqlite::types::Value>(0)
2853 })
2854 .map_err(EngineError::Sqlite)?;
2855 let s = match result {
2856 rusqlite::types::Value::Text(t) => t,
2857 rusqlite::types::Value::Integer(i) => i.to_string(),
2858 rusqlite::types::Value::Real(f) => f.to_string(),
2859 rusqlite::types::Value::Blob(_) => {
2860 return Err(EngineError::InvalidWrite(format!(
2861 "PRAGMA {name} returned an unexpected BLOB value"
2862 )));
2863 }
2864 rusqlite::types::Value::Null => String::new(),
2865 };
2866 Ok(s)
2867 }
2868
2869 pub fn query_provenance_events(
2878 &self,
2879 subject: &str,
2880 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2881 let conn = self.lock_connection()?;
2882 let mut stmt = conn
2883 .prepare_cached(
2884 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2885 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2886 )
2887 .map_err(EngineError::Sqlite)?;
2888 let events = stmt
2889 .query_map(rusqlite::params![subject], |row| {
2890 Ok(ProvenanceEvent {
2891 id: row.get(0)?,
2892 event_type: row.get(1)?,
2893 subject: row.get(2)?,
2894 source_ref: row.get(3)?,
2895 metadata_json: row.get(4)?,
2896 created_at: row.get(5)?,
2897 })
2898 })
2899 .map_err(EngineError::Sqlite)?
2900 .collect::<Result<Vec<_>, _>>()
2901 .map_err(EngineError::Sqlite)?;
2902 Ok(events)
2903 }
2904
2905 fn scan_fallback_if_first_registration(
2911 &self,
2912 kind: &str,
2913 ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2914 let conn = self.lock_connection()?;
2915
2916 let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2919 let table_exists: bool = conn
2921 .query_row(
2922 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2923 rusqlite::params![prop_table],
2924 |_| Ok(true),
2925 )
2926 .optional()?
2927 .unwrap_or(false);
2928 let prop_empty = if table_exists {
2929 let cnt: i64 =
2930 conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2931 r.get(0)
2932 })?;
2933 cnt == 0
2934 } else {
2935 true
2936 };
2937 let needs_scan: bool = if prop_empty {
2938 conn.query_row(
2939 "SELECT 1 FROM fts_property_rebuild_state \
2940 WHERE kind = ?1 AND is_first_registration = 1 \
2941 AND state IN ('PENDING','BUILDING','SWAPPING') \
2942 LIMIT 1",
2943 rusqlite::params![kind],
2944 |_| Ok(true),
2945 )
2946 .optional()?
2947 .unwrap_or(false)
2948 } else {
2949 false
2950 };
2951
2952 if !needs_scan {
2953 return Ok(None);
2954 }
2955
2956 let mut stmt = conn
2959 .prepare_cached(
2960 "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2961 am.last_accessed_at \
2962 FROM nodes n \
2963 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2964 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2965 )
2966 .map_err(EngineError::Sqlite)?;
2967
2968 let nodes = stmt
2969 .query_map(rusqlite::params![kind], |row| {
2970 Ok(NodeRow {
2971 row_id: row.get(0)?,
2972 logical_id: row.get(1)?,
2973 kind: row.get(2)?,
2974 properties: row.get(3)?,
2975 content_ref: row.get(4)?,
2976 last_accessed_at: row.get(5)?,
2977 })
2978 })
2979 .map_err(EngineError::Sqlite)?
2980 .collect::<Result<Vec<_>, _>>()
2981 .map_err(EngineError::Sqlite)?;
2982
2983 Ok(Some(nodes))
2984 }
2985
2986 pub fn get_property_fts_rebuild_progress(
2992 &self,
2993 kind: &str,
2994 ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2995 let conn = self.lock_connection()?;
2996 let row = conn
2997 .query_row(
2998 "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2999 FROM fts_property_rebuild_state WHERE kind = ?1",
3000 rusqlite::params![kind],
3001 |r| {
3002 Ok(crate::rebuild_actor::RebuildProgress {
3003 state: r.get(0)?,
3004 rows_total: r.get(1)?,
3005 rows_done: r.get(2)?,
3006 started_at: r.get(3)?,
3007 last_progress_at: r.get(4)?,
3008 error_message: r.get(5)?,
3009 })
3010 },
3011 )
3012 .optional()?;
3013 Ok(row)
3014 }
3015}
3016
3017fn adapt_fts_nodes_sql_for_per_kind_tables(
3027 compiled: &CompiledQuery,
3028 conn: &rusqlite::Connection,
3029) -> Result<(String, Vec<BindValue>), EngineError> {
3030 let root_kind = compiled
3031 .binds
3032 .get(1)
3033 .and_then(|b| {
3034 if let BindValue::Text(k) = b {
3035 Some(k.as_str())
3036 } else {
3037 None
3038 }
3039 })
3040 .unwrap_or("");
3041 let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
3042 let prop_table_exists: bool = conn
3043 .query_row(
3044 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3045 rusqlite::params![prop_table],
3046 |_| Ok(true),
3047 )
3048 .optional()
3049 .map_err(EngineError::Sqlite)?
3050 .unwrap_or(false);
3051
3052 Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
3053}
3054
3055#[allow(clippy::unnecessary_wraps)]
3061fn check_vec_identity_at_open(
3062 conn: &rusqlite::Connection,
3063 embedder: &dyn QueryEmbedder,
3064) -> Result<(), EngineError> {
3065 let row: Option<String> = conn
3066 .query_row(
3067 "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
3068 [],
3069 |row| row.get(0),
3070 )
3071 .optional()
3072 .unwrap_or(None);
3073
3074 let Some(config_json) = row else {
3075 return Ok(());
3076 };
3077
3078 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
3080 return Ok(());
3081 };
3082
3083 let identity = embedder.identity();
3084
3085 if let Some(stored_model) = parsed
3086 .get("model_identity")
3087 .and_then(serde_json::Value::as_str)
3088 && stored_model != identity.model_identity
3089 {
3090 trace_warn!(
3091 stored_model_identity = stored_model,
3092 embedder_model_identity = %identity.model_identity,
3093 "vec identity mismatch at open: model_identity differs"
3094 );
3095 }
3096
3097 if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
3098 let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
3099 if stored_dim != identity.dimension {
3100 trace_warn!(
3101 stored_dimensions = stored_dim,
3102 embedder_dimensions = identity.dimension,
3103 "vec identity mismatch at open: dimensions differ"
3104 );
3105 }
3106 }
3107
3108 Ok(())
3109}
3110
3111fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
3123 let schema_count: i64 = conn
3124 .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
3125 row.get(0)
3126 })
3127 .map_err(EngineError::Sqlite)?;
3128 if schema_count == 0 {
3129 return Ok(());
3130 }
3131
3132 let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
3133 let needs_position_backfill = if needs_fts_rebuild {
3134 false
3135 } else {
3136 open_guard_check_positions_empty(conn)?
3137 };
3138
3139 if needs_fts_rebuild || needs_position_backfill {
3140 let per_kind_tables: Vec<String> = {
3141 let mut stmt = conn
3142 .prepare(
3143 "SELECT name FROM sqlite_master \
3144 WHERE type='table' AND name LIKE 'fts_props_%' \
3145 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
3146 )
3147 .map_err(EngineError::Sqlite)?;
3148 stmt.query_map([], |r| r.get::<_, String>(0))
3149 .map_err(EngineError::Sqlite)?
3150 .collect::<Result<Vec<_>, _>>()
3151 .map_err(EngineError::Sqlite)?
3152 };
3153 let tx = conn
3154 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
3155 .map_err(EngineError::Sqlite)?;
3156 for table in &per_kind_tables {
3157 tx.execute_batch(&format!("DELETE FROM {table}"))
3158 .map_err(EngineError::Sqlite)?;
3159 }
3160 tx.execute("DELETE FROM fts_node_property_positions", [])
3161 .map_err(EngineError::Sqlite)?;
3162 crate::projection::insert_property_fts_rows(
3163 &tx,
3164 "SELECT logical_id, properties FROM nodes \
3165 WHERE kind = ?1 AND superseded_at IS NULL",
3166 )
3167 .map_err(EngineError::Sqlite)?;
3168 tx.commit().map_err(EngineError::Sqlite)?;
3169 }
3170 Ok(())
3171}
3172
3173fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3174 let kinds: Vec<String> = {
3175 let mut stmt = conn
3176 .prepare("SELECT kind FROM fts_property_schemas")
3177 .map_err(EngineError::Sqlite)?;
3178 stmt.query_map([], |row| row.get::<_, String>(0))
3179 .map_err(EngineError::Sqlite)?
3180 .collect::<Result<Vec<_>, _>>()
3181 .map_err(EngineError::Sqlite)?
3182 };
3183 for kind in &kinds {
3184 let table = fathomdb_schema::fts_kind_table_name(kind);
3185 let table_exists: bool = conn
3186 .query_row(
3187 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
3188 rusqlite::params![table],
3189 |_| Ok(true),
3190 )
3191 .optional()
3192 .map_err(EngineError::Sqlite)?
3193 .unwrap_or(false);
3194 let fts_count: i64 = if table_exists {
3195 conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
3196 row.get(0)
3197 })
3198 .map_err(EngineError::Sqlite)?
3199 } else {
3200 0
3201 };
3202 if fts_count == 0 {
3203 let node_count: i64 = conn
3204 .query_row(
3205 "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
3206 rusqlite::params![kind],
3207 |row| row.get(0),
3208 )
3209 .map_err(EngineError::Sqlite)?;
3210 if node_count > 0 {
3211 return Ok(true);
3212 }
3213 }
3214 }
3215 Ok(false)
3216}
3217
3218fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
3219 let recursive_count: i64 = conn
3220 .query_row(
3221 "SELECT COUNT(*) FROM fts_property_schemas \
3222 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
3223 [],
3224 |row| row.get(0),
3225 )
3226 .map_err(EngineError::Sqlite)?;
3227 if recursive_count == 0 {
3228 return Ok(false);
3229 }
3230 let pos_count: i64 = conn
3231 .query_row(
3232 "SELECT COUNT(*) FROM fts_node_property_positions",
3233 [],
3234 |row| row.get(0),
3235 )
3236 .map_err(EngineError::Sqlite)?;
3237 Ok(pos_count == 0)
3238}
3239
3240fn wrap_node_row_projection_sql(base_sql: &str) -> String {
3241 format!(
3242 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
3243 FROM ({base_sql}) q \
3244 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
3245 )
3246}
3247
3248pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
3256 match err {
3257 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
3258 (msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
3260 || msg.contains("no such module: vec0")
3261 }
3262 _ => false,
3263 }
3264}
3265
3266fn scalar_to_bind(value: &ScalarValue) -> BindValue {
3267 match value {
3268 ScalarValue::Text(text) => BindValue::Text(text.clone()),
3269 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
3270 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
3271 }
3272}
3273
3274fn merge_search_branches(
3292 strict: Vec<SearchHit>,
3293 relaxed: Vec<SearchHit>,
3294 limit: usize,
3295) -> Vec<SearchHit> {
3296 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
3297}
3298
3299fn merge_search_branches_three(
3311 strict: Vec<SearchHit>,
3312 relaxed: Vec<SearchHit>,
3313 vector: Vec<SearchHit>,
3314 limit: usize,
3315) -> Vec<SearchHit> {
3316 let strict_block = dedup_branch_hits(strict);
3317 let relaxed_block = dedup_branch_hits(relaxed);
3318 let vector_block = dedup_branch_hits(vector);
3319
3320 let mut seen: std::collections::HashSet<String> = strict_block
3321 .iter()
3322 .map(|h| h.node.logical_id.clone())
3323 .collect();
3324
3325 let mut merged = strict_block;
3326 for hit in relaxed_block {
3327 if seen.insert(hit.node.logical_id.clone()) {
3328 merged.push(hit);
3329 }
3330 }
3331 for hit in vector_block {
3332 if seen.insert(hit.node.logical_id.clone()) {
3333 merged.push(hit);
3334 }
3335 }
3336
3337 if merged.len() > limit {
3338 merged.truncate(limit);
3339 }
3340 merged
3341}
3342
3343fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
3347 hits.sort_by(|a, b| {
3348 b.score
3349 .partial_cmp(&a.score)
3350 .unwrap_or(std::cmp::Ordering::Equal)
3351 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
3352 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
3353 });
3354
3355 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
3356 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
3357 hits
3358}
3359
3360fn source_priority(source: SearchHitSource) -> u8 {
3361 match source {
3364 SearchHitSource::Chunk => 0,
3365 SearchHitSource::Property => 1,
3366 SearchHitSource::Vector => 2,
3367 }
3368}
3369
3370const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
3388const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
3389
3390fn load_position_map(
3394 conn: &Connection,
3395 logical_id: &str,
3396 kind: &str,
3397) -> Result<Vec<(usize, usize, String)>, EngineError> {
3398 let mut stmt = conn
3399 .prepare_cached(
3400 "SELECT start_offset, end_offset, leaf_path \
3401 FROM fts_node_property_positions \
3402 WHERE node_logical_id = ?1 AND kind = ?2 \
3403 ORDER BY start_offset ASC",
3404 )
3405 .map_err(EngineError::Sqlite)?;
3406 let rows = stmt
3407 .query_map(rusqlite::params![logical_id, kind], |row| {
3408 let start: i64 = row.get(0)?;
3409 let end: i64 = row.get(1)?;
3410 let path: String = row.get(2)?;
3411 let start = usize::try_from(start).unwrap_or(0);
3415 let end = usize::try_from(end).unwrap_or(0);
3416 Ok((start, end, path))
3417 })
3418 .map_err(EngineError::Sqlite)?;
3419 let mut out = Vec::new();
3420 for row in rows {
3421 out.push(row.map_err(EngineError::Sqlite)?);
3422 }
3423 Ok(out)
3424}
3425
3426fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
3433 let mut offsets = Vec::new();
3434 let bytes = wrapped.as_bytes();
3435 let open_bytes = open.as_bytes();
3436 let close_bytes = close.as_bytes();
3437 let mut i = 0usize;
3438 let mut marker_bytes_seen = 0usize;
3441 while i < bytes.len() {
3442 if bytes[i..].starts_with(open_bytes) {
3443 let original_offset = i - marker_bytes_seen;
3446 offsets.push(original_offset);
3447 i += open_bytes.len();
3448 marker_bytes_seen += open_bytes.len();
3449 } else if bytes[i..].starts_with(close_bytes) {
3450 i += close_bytes.len();
3451 marker_bytes_seen += close_bytes.len();
3452 } else {
3453 i += 1;
3454 }
3455 }
3456 offsets
3457}
3458
3459fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3462 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3464 Ok(i) => i,
3465 Err(0) => return None,
3466 Err(i) => i - 1,
3467 };
3468 let (start, end, path) = &positions[idx];
3469 if offset >= *start && offset < *end {
3470 Some(path.as_str())
3471 } else {
3472 None
3473 }
3474}
3475
3476fn resolve_hit_attribution(
3485 conn: &Connection,
3486 hit: &SearchHit,
3487 match_expr: &str,
3488) -> Result<HitAttribution, EngineError> {
3489 if matches!(hit.source, SearchHitSource::Chunk) {
3490 return Ok(HitAttribution {
3491 matched_paths: vec!["text_content".to_owned()],
3492 });
3493 }
3494 if !matches!(hit.source, SearchHitSource::Property) {
3495 return Ok(HitAttribution {
3496 matched_paths: Vec::new(),
3497 });
3498 }
3499 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3500 return Ok(HitAttribution {
3501 matched_paths: Vec::new(),
3502 });
3503 };
3504 let rowid: i64 = match rowid_str.parse() {
3505 Ok(v) => v,
3506 Err(_) => {
3507 return Ok(HitAttribution {
3508 matched_paths: Vec::new(),
3509 });
3510 }
3511 };
3512
3513 let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3519 let highlight_sql = format!(
3520 "SELECT highlight({prop_table}, 1, ?1, ?2) \
3521 FROM {prop_table} \
3522 WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3523 );
3524 let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3525 let wrapped: Option<String> = stmt
3526 .query_row(
3527 rusqlite::params![
3528 ATTRIBUTION_HIGHLIGHT_OPEN,
3529 ATTRIBUTION_HIGHLIGHT_CLOSE,
3530 rowid,
3531 match_expr,
3532 ],
3533 |row| row.get(0),
3534 )
3535 .optional()
3536 .map_err(EngineError::Sqlite)?;
3537 let Some(wrapped) = wrapped else {
3538 return Ok(HitAttribution {
3539 matched_paths: Vec::new(),
3540 });
3541 };
3542
3543 let offsets = parse_highlight_offsets(
3544 &wrapped,
3545 ATTRIBUTION_HIGHLIGHT_OPEN,
3546 ATTRIBUTION_HIGHLIGHT_CLOSE,
3547 );
3548 if offsets.is_empty() {
3549 return Ok(HitAttribution {
3550 matched_paths: Vec::new(),
3551 });
3552 }
3553
3554 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3555 if positions.is_empty() {
3556 return Ok(HitAttribution {
3559 matched_paths: Vec::new(),
3560 });
3561 }
3562
3563 let mut matched_paths: Vec<String> = Vec::new();
3564 for offset in offsets {
3565 if let Some(path) = find_leaf_for_offset(&positions, offset)
3566 && !matched_paths.iter().any(|p| p == path)
3567 {
3568 matched_paths.push(path.to_owned());
3569 }
3570 }
3571 Ok(HitAttribution { matched_paths })
3572}
3573
3574fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3581 let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3582 let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3583 if !any_weighted {
3584 return format!("bm25({table})");
3585 }
3586 let weights: Vec<String> = std::iter::once("0.0".to_owned())
3588 .chain(
3589 schema
3590 .paths
3591 .iter()
3592 .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3593 )
3594 .collect();
3595 format!("bm25({table}, {})", weights.join(", "))
3596}
3597
3598fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3599 match value {
3600 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3601 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3602 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3603 }
3604}
3605
3606#[cfg(test)]
3607#[allow(clippy::expect_used)]
3608mod tests {
3609 use std::panic::{AssertUnwindSafe, catch_unwind};
3610 use std::sync::Arc;
3611
3612 use fathomdb_query::{BindValue, QueryBuilder};
3613 use fathomdb_schema::SchemaManager;
3614 use rusqlite::types::Value;
3615 use tempfile::NamedTempFile;
3616
3617 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3618
3619 use fathomdb_query::{
3620 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3621 };
3622
3623 use super::{
3624 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3625 wrap_node_row_projection_sql,
3626 };
3627
3628 fn mk_hit(
3629 logical_id: &str,
3630 score: f64,
3631 match_mode: SearchMatchMode,
3632 source: SearchHitSource,
3633 ) -> SearchHit {
3634 SearchHit {
3635 node: NodeRowLite {
3636 row_id: format!("{logical_id}-row"),
3637 logical_id: logical_id.to_owned(),
3638 kind: "Goal".to_owned(),
3639 properties: "{}".to_owned(),
3640 content_ref: None,
3641 last_accessed_at: None,
3642 },
3643 score,
3644 modality: RetrievalModality::Text,
3645 source,
3646 match_mode: Some(match_mode),
3647 snippet: None,
3648 written_at: 0,
3649 projection_row_id: None,
3650 vector_distance: None,
3651 attribution: None,
3652 }
3653 }
3654
3655 #[test]
3656 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3657 let strict = vec![mk_hit(
3658 "a",
3659 1.0,
3660 SearchMatchMode::Strict,
3661 SearchHitSource::Chunk,
3662 )];
3663 let relaxed = vec![mk_hit(
3665 "b",
3666 9.9,
3667 SearchMatchMode::Relaxed,
3668 SearchHitSource::Chunk,
3669 )];
3670 let merged = merge_search_branches(strict, relaxed, 10);
3671 assert_eq!(merged.len(), 2);
3672 assert_eq!(merged[0].node.logical_id, "a");
3673 assert!(matches!(
3674 merged[0].match_mode,
3675 Some(SearchMatchMode::Strict)
3676 ));
3677 assert_eq!(merged[1].node.logical_id, "b");
3678 assert!(matches!(
3679 merged[1].match_mode,
3680 Some(SearchMatchMode::Relaxed)
3681 ));
3682 }
3683
3684 #[test]
3685 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3686 let strict = vec![mk_hit(
3687 "shared",
3688 1.0,
3689 SearchMatchMode::Strict,
3690 SearchHitSource::Chunk,
3691 )];
3692 let relaxed = vec![
3693 mk_hit(
3694 "shared",
3695 9.9,
3696 SearchMatchMode::Relaxed,
3697 SearchHitSource::Chunk,
3698 ),
3699 mk_hit(
3700 "other",
3701 2.0,
3702 SearchMatchMode::Relaxed,
3703 SearchHitSource::Chunk,
3704 ),
3705 ];
3706 let merged = merge_search_branches(strict, relaxed, 10);
3707 assert_eq!(merged.len(), 2);
3708 assert_eq!(merged[0].node.logical_id, "shared");
3709 assert!(matches!(
3710 merged[0].match_mode,
3711 Some(SearchMatchMode::Strict)
3712 ));
3713 assert_eq!(merged[1].node.logical_id, "other");
3714 assert!(matches!(
3715 merged[1].match_mode,
3716 Some(SearchMatchMode::Relaxed)
3717 ));
3718 }
3719
3720 #[test]
3721 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3722 let strict = vec![
3723 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3724 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3725 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3726 ];
3727 let merged = merge_search_branches(strict, vec![], 10);
3728 assert_eq!(
3729 merged
3730 .iter()
3731 .map(|h| &h.node.logical_id)
3732 .collect::<Vec<_>>(),
3733 vec!["a", "c", "b"]
3734 );
3735 }
3736
3737 #[test]
3738 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3739 let strict = vec![
3740 mk_hit(
3741 "shared",
3742 1.0,
3743 SearchMatchMode::Strict,
3744 SearchHitSource::Property,
3745 ),
3746 mk_hit(
3747 "shared",
3748 1.0,
3749 SearchMatchMode::Strict,
3750 SearchHitSource::Chunk,
3751 ),
3752 ];
3753 let merged = merge_search_branches(strict, vec![], 10);
3754 assert_eq!(merged.len(), 1);
3755 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3756 }
3757
3758 #[test]
3759 fn merge_truncates_to_limit_after_block_merge() {
3760 let strict = vec![
3761 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3762 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3763 ];
3764 let relaxed = vec![mk_hit(
3765 "c",
3766 9.0,
3767 SearchMatchMode::Relaxed,
3768 SearchHitSource::Chunk,
3769 )];
3770 let merged = merge_search_branches(strict, relaxed, 2);
3771 assert_eq!(merged.len(), 2);
3772 assert_eq!(merged[0].node.logical_id, "a");
3773 assert_eq!(merged[1].node.logical_id, "b");
3774 }
3775
3776 #[test]
3785 fn search_architecturally_supports_three_branch_fusion() {
3786 let strict = vec![mk_hit(
3787 "alpha",
3788 1.0,
3789 SearchMatchMode::Strict,
3790 SearchHitSource::Chunk,
3791 )];
3792 let relaxed = vec![mk_hit(
3793 "bravo",
3794 5.0,
3795 SearchMatchMode::Relaxed,
3796 SearchHitSource::Chunk,
3797 )];
3798 let mut vector_hit = mk_hit(
3801 "charlie",
3802 9.9,
3803 SearchMatchMode::Strict,
3804 SearchHitSource::Vector,
3805 );
3806 vector_hit.match_mode = None;
3810 vector_hit.modality = RetrievalModality::Vector;
3811 let vector = vec![vector_hit];
3812
3813 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3814 assert_eq!(merged.len(), 3);
3815 assert_eq!(merged[0].node.logical_id, "alpha");
3816 assert_eq!(merged[1].node.logical_id, "bravo");
3817 assert_eq!(merged[2].node.logical_id, "charlie");
3818 assert!(matches!(merged[2].source, SearchHitSource::Vector));
3820
3821 let strict2 = vec![mk_hit(
3824 "shared",
3825 0.5,
3826 SearchMatchMode::Strict,
3827 SearchHitSource::Chunk,
3828 )];
3829 let relaxed2 = vec![mk_hit(
3830 "shared",
3831 5.0,
3832 SearchMatchMode::Relaxed,
3833 SearchHitSource::Chunk,
3834 )];
3835 let mut vshared = mk_hit(
3836 "shared",
3837 9.9,
3838 SearchMatchMode::Strict,
3839 SearchHitSource::Vector,
3840 );
3841 vshared.match_mode = None;
3842 vshared.modality = RetrievalModality::Vector;
3843 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3844 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3845 assert!(matches!(
3846 merged2[0].match_mode,
3847 Some(SearchMatchMode::Strict)
3848 ));
3849 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3850
3851 let mut vshared2 = mk_hit(
3853 "shared",
3854 9.9,
3855 SearchMatchMode::Strict,
3856 SearchHitSource::Vector,
3857 );
3858 vshared2.match_mode = None;
3859 vshared2.modality = RetrievalModality::Vector;
3860 let merged3 = merge_search_branches_three(
3861 vec![],
3862 vec![mk_hit(
3863 "shared",
3864 1.0,
3865 SearchMatchMode::Relaxed,
3866 SearchHitSource::Chunk,
3867 )],
3868 vec![vshared2],
3869 10,
3870 );
3871 assert_eq!(merged3.len(), 1);
3872 assert!(matches!(
3873 merged3[0].match_mode,
3874 Some(SearchMatchMode::Relaxed)
3875 ));
3876 }
3877
3878 #[test]
3892 fn merge_search_branches_three_vector_only_preserves_vector_block() {
3893 let mut vector_hit = mk_hit(
3894 "solo",
3895 0.75,
3896 SearchMatchMode::Strict,
3897 SearchHitSource::Vector,
3898 );
3899 vector_hit.match_mode = None;
3900 vector_hit.modality = RetrievalModality::Vector;
3901
3902 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3903
3904 assert_eq!(merged.len(), 1);
3905 assert_eq!(merged[0].node.logical_id, "solo");
3906 assert!(matches!(merged[0].source, SearchHitSource::Vector));
3907 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3908 assert!(
3909 merged[0].match_mode.is_none(),
3910 "vector hits carry match_mode=None per addendum 1"
3911 );
3912 }
3913
3914 #[test]
3926 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3927 let strict = vec![
3928 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3929 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3930 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3931 ];
3932 let relaxed = vec![mk_hit(
3933 "d",
3934 9.0,
3935 SearchMatchMode::Relaxed,
3936 SearchHitSource::Chunk,
3937 )];
3938 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3939 vector_hit.match_mode = None;
3940 vector_hit.modality = RetrievalModality::Vector;
3941 let vector = vec![vector_hit];
3942
3943 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3944
3945 assert_eq!(merged.len(), 2);
3946 assert_eq!(merged[0].node.logical_id, "a");
3947 assert_eq!(merged[1].node.logical_id, "b");
3948 assert!(
3950 merged
3951 .iter()
3952 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3953 "strict block must win limit contention against higher-scored relaxed/vector hits"
3954 );
3955 assert!(
3956 merged
3957 .iter()
3958 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3959 "no vector source hits should leak past the limit"
3960 );
3961 }
3962
3963 #[test]
3964 fn is_vec_table_absent_matches_known_error_messages() {
3965 use rusqlite::ffi;
3966 fn make_err(msg: &str) -> rusqlite::Error {
3967 rusqlite::Error::SqliteFailure(
3968 ffi::Error {
3969 code: ffi::ErrorCode::Unknown,
3970 extended_code: 1,
3971 },
3972 Some(msg.to_owned()),
3973 )
3974 }
3975 assert!(is_vec_table_absent(&make_err(
3976 "no such table: vec_nodes_active"
3977 )));
3978 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3979 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3980 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3981 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3982 }
3983
3984 #[test]
3987 fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
3988 let db = NamedTempFile::new().expect("temporary db");
3991 let coordinator = ExecutionCoordinator::open(
3992 db.path(),
3993 Arc::new(SchemaManager::new()),
3994 None,
3995 1,
3996 Arc::new(TelemetryCounters::default()),
3997 None,
3998 )
3999 .expect("coordinator");
4000
4001 let compiled = QueryBuilder::nodes("MyKind")
4002 .vector_search("some query", 5)
4003 .compile()
4004 .expect("vector query compiles");
4005
4006 let rows = coordinator
4007 .execute_compiled_read(&compiled)
4008 .expect("degraded read must succeed");
4009 assert!(
4010 rows.was_degraded,
4011 "must degrade when vec_mykind table does not exist"
4012 );
4013 assert!(
4014 rows.nodes.is_empty(),
4015 "degraded result must return empty nodes"
4016 );
4017 }
4018
4019 #[test]
4020 fn bind_value_text_maps_to_sql_text() {
4021 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
4022 assert_eq!(val, Value::Text("hello".to_owned()));
4023 }
4024
4025 #[test]
4026 fn bind_value_integer_maps_to_sql_integer() {
4027 let val = bind_value_to_sql(&BindValue::Integer(42));
4028 assert_eq!(val, Value::Integer(42));
4029 }
4030
4031 #[test]
4032 fn bind_value_bool_true_maps_to_integer_one() {
4033 let val = bind_value_to_sql(&BindValue::Bool(true));
4034 assert_eq!(val, Value::Integer(1));
4035 }
4036
4037 #[test]
4038 fn bind_value_bool_false_maps_to_integer_zero() {
4039 let val = bind_value_to_sql(&BindValue::Bool(false));
4040 assert_eq!(val, Value::Integer(0));
4041 }
4042
4043 #[test]
4044 fn same_shape_queries_share_one_cache_entry() {
4045 let db = NamedTempFile::new().expect("temporary db");
4046 let coordinator = ExecutionCoordinator::open(
4047 db.path(),
4048 Arc::new(SchemaManager::new()),
4049 None,
4050 1,
4051 Arc::new(TelemetryCounters::default()),
4052 None,
4053 )
4054 .expect("coordinator");
4055
4056 let compiled_a = QueryBuilder::nodes("Meeting")
4057 .text_search("budget", 5)
4058 .limit(10)
4059 .compile()
4060 .expect("compiled a");
4061 let compiled_b = QueryBuilder::nodes("Meeting")
4062 .text_search("standup", 5)
4063 .limit(10)
4064 .compile()
4065 .expect("compiled b");
4066
4067 coordinator
4068 .execute_compiled_read(&compiled_a)
4069 .expect("read a");
4070 coordinator
4071 .execute_compiled_read(&compiled_b)
4072 .expect("read b");
4073
4074 assert_eq!(
4075 compiled_a.shape_hash, compiled_b.shape_hash,
4076 "different bind values, same structural shape → same hash"
4077 );
4078 assert_eq!(coordinator.shape_sql_count(), 1);
4079 }
4080
4081 #[test]
4082 fn vector_read_degrades_gracefully_when_vec_table_absent() {
4083 let db = NamedTempFile::new().expect("temporary db");
4084 let coordinator = ExecutionCoordinator::open(
4085 db.path(),
4086 Arc::new(SchemaManager::new()),
4087 None,
4088 1,
4089 Arc::new(TelemetryCounters::default()),
4090 None,
4091 )
4092 .expect("coordinator");
4093
4094 let compiled = QueryBuilder::nodes("Meeting")
4095 .vector_search("budget embeddings", 5)
4096 .compile()
4097 .expect("vector query compiles");
4098
4099 let result = coordinator.execute_compiled_read(&compiled);
4100 let rows = result.expect("degraded read must succeed, not error");
4101 assert!(
4102 rows.was_degraded,
4103 "result must be flagged as degraded when vec_nodes_active is absent"
4104 );
4105 assert!(
4106 rows.nodes.is_empty(),
4107 "degraded result must return empty nodes"
4108 );
4109 }
4110
4111 #[test]
4112 fn coordinator_caches_by_shape_hash() {
4113 let db = NamedTempFile::new().expect("temporary db");
4114 let coordinator = ExecutionCoordinator::open(
4115 db.path(),
4116 Arc::new(SchemaManager::new()),
4117 None,
4118 1,
4119 Arc::new(TelemetryCounters::default()),
4120 None,
4121 )
4122 .expect("coordinator");
4123
4124 let compiled = QueryBuilder::nodes("Meeting")
4125 .text_search("budget", 5)
4126 .compile()
4127 .expect("compiled query");
4128
4129 coordinator
4130 .execute_compiled_read(&compiled)
4131 .expect("execute compiled read");
4132 assert_eq!(coordinator.shape_sql_count(), 1);
4133 }
4134
4135 #[test]
4138 fn explain_returns_correct_sql() {
4139 let db = NamedTempFile::new().expect("temporary db");
4140 let coordinator = ExecutionCoordinator::open(
4141 db.path(),
4142 Arc::new(SchemaManager::new()),
4143 None,
4144 1,
4145 Arc::new(TelemetryCounters::default()),
4146 None,
4147 )
4148 .expect("coordinator");
4149
4150 let compiled = QueryBuilder::nodes("Meeting")
4151 .text_search("budget", 5)
4152 .compile()
4153 .expect("compiled query");
4154
4155 let plan = coordinator.explain_compiled_read(&compiled);
4156
4157 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
4158 }
4159
4160 #[test]
4161 fn explain_returns_correct_driving_table() {
4162 use fathomdb_query::DrivingTable;
4163
4164 let db = NamedTempFile::new().expect("temporary db");
4165 let coordinator = ExecutionCoordinator::open(
4166 db.path(),
4167 Arc::new(SchemaManager::new()),
4168 None,
4169 1,
4170 Arc::new(TelemetryCounters::default()),
4171 None,
4172 )
4173 .expect("coordinator");
4174
4175 let compiled = QueryBuilder::nodes("Meeting")
4176 .text_search("budget", 5)
4177 .compile()
4178 .expect("compiled query");
4179
4180 let plan = coordinator.explain_compiled_read(&compiled);
4181
4182 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
4183 }
4184
4185 #[test]
4186 fn explain_reports_cache_miss_then_hit() {
4187 let db = NamedTempFile::new().expect("temporary db");
4188 let coordinator = ExecutionCoordinator::open(
4189 db.path(),
4190 Arc::new(SchemaManager::new()),
4191 None,
4192 1,
4193 Arc::new(TelemetryCounters::default()),
4194 None,
4195 )
4196 .expect("coordinator");
4197
4198 let compiled = QueryBuilder::nodes("Meeting")
4199 .text_search("budget", 5)
4200 .compile()
4201 .expect("compiled query");
4202
4203 let plan_before = coordinator.explain_compiled_read(&compiled);
4205 assert!(
4206 !plan_before.cache_hit,
4207 "cache miss expected before first execute"
4208 );
4209
4210 coordinator
4212 .execute_compiled_read(&compiled)
4213 .expect("execute read");
4214
4215 let plan_after = coordinator.explain_compiled_read(&compiled);
4217 assert!(
4218 plan_after.cache_hit,
4219 "cache hit expected after first execute"
4220 );
4221 }
4222
4223 #[test]
4224 fn explain_does_not_execute_query() {
4225 let db = NamedTempFile::new().expect("temporary db");
4230 let coordinator = ExecutionCoordinator::open(
4231 db.path(),
4232 Arc::new(SchemaManager::new()),
4233 None,
4234 1,
4235 Arc::new(TelemetryCounters::default()),
4236 None,
4237 )
4238 .expect("coordinator");
4239
4240 let compiled = QueryBuilder::nodes("Meeting")
4241 .text_search("anything", 5)
4242 .compile()
4243 .expect("compiled query");
4244
4245 let plan = coordinator.explain_compiled_read(&compiled);
4247
4248 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
4249 assert_eq!(plan.bind_count, compiled.binds.len());
4250 }
4251
4252 #[test]
4253 fn coordinator_executes_compiled_read() {
4254 let db = NamedTempFile::new().expect("temporary db");
4255 let coordinator = ExecutionCoordinator::open(
4256 db.path(),
4257 Arc::new(SchemaManager::new()),
4258 None,
4259 1,
4260 Arc::new(TelemetryCounters::default()),
4261 None,
4262 )
4263 .expect("coordinator");
4264 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4265
4266 conn.execute_batch(
4267 r#"
4268 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4269 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
4270 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4271 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
4272 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4273 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
4274 "#,
4275 )
4276 .expect("seed data");
4277
4278 let compiled = QueryBuilder::nodes("Meeting")
4279 .text_search("budget", 5)
4280 .limit(5)
4281 .compile()
4282 .expect("compiled query");
4283
4284 let rows = coordinator
4285 .execute_compiled_read(&compiled)
4286 .expect("execute read");
4287
4288 assert_eq!(rows.nodes.len(), 1);
4289 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4290 }
4291
4292 #[test]
4293 fn text_search_finds_structured_only_node_via_property_fts() {
4294 let db = NamedTempFile::new().expect("temporary db");
4295 let coordinator = ExecutionCoordinator::open(
4296 db.path(),
4297 Arc::new(SchemaManager::new()),
4298 None,
4299 1,
4300 Arc::new(TelemetryCounters::default()),
4301 None,
4302 )
4303 .expect("coordinator");
4304 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4305
4306 conn.execute_batch(
4309 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
4310 node_logical_id UNINDEXED, text_content, \
4311 tokenize = 'porter unicode61 remove_diacritics 2'\
4312 )",
4313 )
4314 .expect("create per-kind fts table");
4315 conn.execute_batch(
4316 r#"
4317 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4318 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
4319 INSERT INTO fts_props_goal (node_logical_id, text_content)
4320 VALUES ('goal-1', 'Ship v2');
4321 "#,
4322 )
4323 .expect("seed data");
4324
4325 let compiled = QueryBuilder::nodes("Goal")
4326 .text_search("Ship", 5)
4327 .limit(5)
4328 .compile()
4329 .expect("compiled query");
4330
4331 let rows = coordinator
4332 .execute_compiled_read(&compiled)
4333 .expect("execute read");
4334
4335 assert_eq!(rows.nodes.len(), 1);
4336 assert_eq!(rows.nodes[0].logical_id, "goal-1");
4337 }
4338
4339 #[test]
4340 fn text_search_returns_both_chunk_and_property_backed_hits() {
4341 let db = NamedTempFile::new().expect("temporary db");
4342 let coordinator = ExecutionCoordinator::open(
4343 db.path(),
4344 Arc::new(SchemaManager::new()),
4345 None,
4346 1,
4347 Arc::new(TelemetryCounters::default()),
4348 None,
4349 )
4350 .expect("coordinator");
4351 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4352
4353 conn.execute_batch(
4355 r"
4356 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4357 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4358 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4359 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
4360 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4361 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
4362 ",
4363 )
4364 .expect("seed chunk-backed node");
4365
4366 conn.execute_batch(
4369 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
4370 node_logical_id UNINDEXED, text_content, \
4371 tokenize = 'porter unicode61 remove_diacritics 2'\
4372 )",
4373 )
4374 .expect("create per-kind fts table");
4375 conn.execute_batch(
4376 r#"
4377 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4378 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
4379 INSERT INTO fts_props_meeting (node_logical_id, text_content)
4380 VALUES ('meeting-2', 'quarterly sync');
4381 "#,
4382 )
4383 .expect("seed property-backed node");
4384
4385 let compiled = QueryBuilder::nodes("Meeting")
4386 .text_search("quarterly", 10)
4387 .limit(10)
4388 .compile()
4389 .expect("compiled query");
4390
4391 let rows = coordinator
4392 .execute_compiled_read(&compiled)
4393 .expect("execute read");
4394
4395 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
4396 ids.sort_unstable();
4397 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
4398 }
4399
4400 #[test]
4401 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
4402 let db = NamedTempFile::new().expect("temporary db");
4403 let coordinator = ExecutionCoordinator::open(
4404 db.path(),
4405 Arc::new(SchemaManager::new()),
4406 None,
4407 1,
4408 Arc::new(TelemetryCounters::default()),
4409 None,
4410 )
4411 .expect("coordinator");
4412 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4413
4414 conn.execute_batch(
4415 r"
4416 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
4417 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
4418 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4419 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
4420 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4421 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
4422 ",
4423 )
4424 .expect("seed chunk-backed node");
4425
4426 let compiled = QueryBuilder::nodes("Meeting")
4427 .text_search("not a ship", 10)
4428 .limit(10)
4429 .compile()
4430 .expect("compiled query");
4431
4432 let rows = coordinator
4433 .execute_compiled_read(&compiled)
4434 .expect("execute read");
4435
4436 assert_eq!(rows.nodes.len(), 1);
4437 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
4438 }
4439
4440 #[test]
4443 fn capability_gate_reports_false_without_feature() {
4444 let db = NamedTempFile::new().expect("temporary db");
4445 let coordinator = ExecutionCoordinator::open(
4448 db.path(),
4449 Arc::new(SchemaManager::new()),
4450 None,
4451 1,
4452 Arc::new(TelemetryCounters::default()),
4453 None,
4454 )
4455 .expect("coordinator");
4456 assert!(
4457 !coordinator.vector_enabled(),
4458 "vector_enabled must be false when no dimension is requested"
4459 );
4460 }
4461
4462 #[cfg(feature = "sqlite-vec")]
4463 #[test]
4464 fn capability_gate_reports_true_when_feature_enabled() {
4465 let db = NamedTempFile::new().expect("temporary db");
4466 let coordinator = ExecutionCoordinator::open(
4467 db.path(),
4468 Arc::new(SchemaManager::new()),
4469 Some(128),
4470 1,
4471 Arc::new(TelemetryCounters::default()),
4472 None,
4473 )
4474 .expect("coordinator");
4475 assert!(
4476 coordinator.vector_enabled(),
4477 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
4478 );
4479 }
4480
4481 #[test]
4484 fn read_run_returns_inserted_run() {
4485 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4486
4487 let db = NamedTempFile::new().expect("temporary db");
4488 let writer = WriterActor::start(
4489 db.path(),
4490 Arc::new(SchemaManager::new()),
4491 ProvenanceMode::Warn,
4492 Arc::new(TelemetryCounters::default()),
4493 )
4494 .expect("writer");
4495 writer
4496 .submit(WriteRequest {
4497 label: "runtime".to_owned(),
4498 nodes: vec![],
4499 node_retires: vec![],
4500 edges: vec![],
4501 edge_retires: vec![],
4502 chunks: vec![],
4503 runs: vec![RunInsert {
4504 id: "run-r1".to_owned(),
4505 kind: "session".to_owned(),
4506 status: "active".to_owned(),
4507 properties: "{}".to_owned(),
4508 source_ref: Some("src-1".to_owned()),
4509 upsert: false,
4510 supersedes_id: None,
4511 }],
4512 steps: vec![],
4513 actions: vec![],
4514 optional_backfills: vec![],
4515 vec_inserts: vec![],
4516 operational_writes: vec![],
4517 })
4518 .expect("write run");
4519
4520 let coordinator = ExecutionCoordinator::open(
4521 db.path(),
4522 Arc::new(SchemaManager::new()),
4523 None,
4524 1,
4525 Arc::new(TelemetryCounters::default()),
4526 None,
4527 )
4528 .expect("coordinator");
4529 let row = coordinator
4530 .read_run("run-r1")
4531 .expect("read_run")
4532 .expect("row exists");
4533 assert_eq!(row.id, "run-r1");
4534 assert_eq!(row.kind, "session");
4535 assert_eq!(row.status, "active");
4536 }
4537
4538 #[test]
4539 fn read_step_returns_inserted_step() {
4540 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4541
4542 let db = NamedTempFile::new().expect("temporary db");
4543 let writer = WriterActor::start(
4544 db.path(),
4545 Arc::new(SchemaManager::new()),
4546 ProvenanceMode::Warn,
4547 Arc::new(TelemetryCounters::default()),
4548 )
4549 .expect("writer");
4550 writer
4551 .submit(WriteRequest {
4552 label: "runtime".to_owned(),
4553 nodes: vec![],
4554 node_retires: vec![],
4555 edges: vec![],
4556 edge_retires: vec![],
4557 chunks: vec![],
4558 runs: vec![RunInsert {
4559 id: "run-s1".to_owned(),
4560 kind: "session".to_owned(),
4561 status: "active".to_owned(),
4562 properties: "{}".to_owned(),
4563 source_ref: Some("src-1".to_owned()),
4564 upsert: false,
4565 supersedes_id: None,
4566 }],
4567 steps: vec![StepInsert {
4568 id: "step-s1".to_owned(),
4569 run_id: "run-s1".to_owned(),
4570 kind: "llm".to_owned(),
4571 status: "completed".to_owned(),
4572 properties: "{}".to_owned(),
4573 source_ref: Some("src-1".to_owned()),
4574 upsert: false,
4575 supersedes_id: None,
4576 }],
4577 actions: vec![],
4578 optional_backfills: vec![],
4579 vec_inserts: vec![],
4580 operational_writes: vec![],
4581 })
4582 .expect("write step");
4583
4584 let coordinator = ExecutionCoordinator::open(
4585 db.path(),
4586 Arc::new(SchemaManager::new()),
4587 None,
4588 1,
4589 Arc::new(TelemetryCounters::default()),
4590 None,
4591 )
4592 .expect("coordinator");
4593 let row = coordinator
4594 .read_step("step-s1")
4595 .expect("read_step")
4596 .expect("row exists");
4597 assert_eq!(row.id, "step-s1");
4598 assert_eq!(row.run_id, "run-s1");
4599 assert_eq!(row.kind, "llm");
4600 }
4601
4602 #[test]
4603 fn read_action_returns_inserted_action() {
4604 use crate::{
4605 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4606 writer::{ActionInsert, StepInsert},
4607 };
4608
4609 let db = NamedTempFile::new().expect("temporary db");
4610 let writer = WriterActor::start(
4611 db.path(),
4612 Arc::new(SchemaManager::new()),
4613 ProvenanceMode::Warn,
4614 Arc::new(TelemetryCounters::default()),
4615 )
4616 .expect("writer");
4617 writer
4618 .submit(WriteRequest {
4619 label: "runtime".to_owned(),
4620 nodes: vec![],
4621 node_retires: vec![],
4622 edges: vec![],
4623 edge_retires: vec![],
4624 chunks: vec![],
4625 runs: vec![RunInsert {
4626 id: "run-a1".to_owned(),
4627 kind: "session".to_owned(),
4628 status: "active".to_owned(),
4629 properties: "{}".to_owned(),
4630 source_ref: Some("src-1".to_owned()),
4631 upsert: false,
4632 supersedes_id: None,
4633 }],
4634 steps: vec![StepInsert {
4635 id: "step-a1".to_owned(),
4636 run_id: "run-a1".to_owned(),
4637 kind: "llm".to_owned(),
4638 status: "completed".to_owned(),
4639 properties: "{}".to_owned(),
4640 source_ref: Some("src-1".to_owned()),
4641 upsert: false,
4642 supersedes_id: None,
4643 }],
4644 actions: vec![ActionInsert {
4645 id: "action-a1".to_owned(),
4646 step_id: "step-a1".to_owned(),
4647 kind: "emit".to_owned(),
4648 status: "completed".to_owned(),
4649 properties: "{}".to_owned(),
4650 source_ref: Some("src-1".to_owned()),
4651 upsert: false,
4652 supersedes_id: None,
4653 }],
4654 optional_backfills: vec![],
4655 vec_inserts: vec![],
4656 operational_writes: vec![],
4657 })
4658 .expect("write action");
4659
4660 let coordinator = ExecutionCoordinator::open(
4661 db.path(),
4662 Arc::new(SchemaManager::new()),
4663 None,
4664 1,
4665 Arc::new(TelemetryCounters::default()),
4666 None,
4667 )
4668 .expect("coordinator");
4669 let row = coordinator
4670 .read_action("action-a1")
4671 .expect("read_action")
4672 .expect("row exists");
4673 assert_eq!(row.id, "action-a1");
4674 assert_eq!(row.step_id, "step-a1");
4675 assert_eq!(row.kind, "emit");
4676 }
4677
4678 #[test]
4679 fn read_active_runs_excludes_superseded() {
4680 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4681
4682 let db = NamedTempFile::new().expect("temporary db");
4683 let writer = WriterActor::start(
4684 db.path(),
4685 Arc::new(SchemaManager::new()),
4686 ProvenanceMode::Warn,
4687 Arc::new(TelemetryCounters::default()),
4688 )
4689 .expect("writer");
4690
4691 writer
4693 .submit(WriteRequest {
4694 label: "v1".to_owned(),
4695 nodes: vec![],
4696 node_retires: vec![],
4697 edges: vec![],
4698 edge_retires: vec![],
4699 chunks: vec![],
4700 runs: vec![RunInsert {
4701 id: "run-v1".to_owned(),
4702 kind: "session".to_owned(),
4703 status: "active".to_owned(),
4704 properties: "{}".to_owned(),
4705 source_ref: Some("src-1".to_owned()),
4706 upsert: false,
4707 supersedes_id: None,
4708 }],
4709 steps: vec![],
4710 actions: vec![],
4711 optional_backfills: vec![],
4712 vec_inserts: vec![],
4713 operational_writes: vec![],
4714 })
4715 .expect("v1 write");
4716
4717 writer
4719 .submit(WriteRequest {
4720 label: "v2".to_owned(),
4721 nodes: vec![],
4722 node_retires: vec![],
4723 edges: vec![],
4724 edge_retires: vec![],
4725 chunks: vec![],
4726 runs: vec![RunInsert {
4727 id: "run-v2".to_owned(),
4728 kind: "session".to_owned(),
4729 status: "completed".to_owned(),
4730 properties: "{}".to_owned(),
4731 source_ref: Some("src-2".to_owned()),
4732 upsert: true,
4733 supersedes_id: Some("run-v1".to_owned()),
4734 }],
4735 steps: vec![],
4736 actions: vec![],
4737 optional_backfills: vec![],
4738 vec_inserts: vec![],
4739 operational_writes: vec![],
4740 })
4741 .expect("v2 write");
4742
4743 let coordinator = ExecutionCoordinator::open(
4744 db.path(),
4745 Arc::new(SchemaManager::new()),
4746 None,
4747 1,
4748 Arc::new(TelemetryCounters::default()),
4749 None,
4750 )
4751 .expect("coordinator");
4752 let active = coordinator.read_active_runs().expect("read_active_runs");
4753
4754 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4755 assert_eq!(active[0].id, "run-v2");
4756 }
4757
4758 #[allow(clippy::panic)]
4759 fn poison_connection(coordinator: &ExecutionCoordinator) {
4760 let result = catch_unwind(AssertUnwindSafe(|| {
4761 let _guard = coordinator.pool.connections[0]
4762 .lock()
4763 .expect("poison test lock");
4764 panic!("poison coordinator connection mutex");
4765 }));
4766 assert!(
4767 result.is_err(),
4768 "poison test must unwind while holding the connection mutex"
4769 );
4770 }
4771
4772 #[allow(clippy::panic)]
4773 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4774 where
4775 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4776 {
4777 match op(coordinator) {
4778 Err(EngineError::Bridge(message)) => {
4779 assert_eq!(message, "connection mutex poisoned");
4780 }
4781 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4782 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4783 }
4784 }
4785
4786 #[test]
4787 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4788 let db = NamedTempFile::new().expect("temporary db");
4789 let coordinator = ExecutionCoordinator::open(
4790 db.path(),
4791 Arc::new(SchemaManager::new()),
4792 None,
4793 1,
4794 Arc::new(TelemetryCounters::default()),
4795 None,
4796 )
4797 .expect("coordinator");
4798
4799 poison_connection(&coordinator);
4800
4801 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4802 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4803 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4804 assert_poisoned_connection_error(
4805 &coordinator,
4806 super::ExecutionCoordinator::read_active_runs,
4807 );
4808 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4809 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4810 }
4811
4812 #[test]
4815 fn shape_cache_stays_bounded() {
4816 use fathomdb_query::ShapeHash;
4817
4818 let db = NamedTempFile::new().expect("temporary db");
4819 let coordinator = ExecutionCoordinator::open(
4820 db.path(),
4821 Arc::new(SchemaManager::new()),
4822 None,
4823 1,
4824 Arc::new(TelemetryCounters::default()),
4825 None,
4826 )
4827 .expect("coordinator");
4828
4829 {
4831 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4832 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4833 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4834 }
4835 }
4836 let compiled = QueryBuilder::nodes("Meeting")
4841 .text_search("budget", 5)
4842 .limit(10)
4843 .compile()
4844 .expect("compiled query");
4845
4846 coordinator
4847 .execute_compiled_read(&compiled)
4848 .expect("execute read");
4849
4850 assert!(
4851 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4852 "shape cache must stay bounded: got {} entries, max {}",
4853 coordinator.shape_sql_count(),
4854 super::MAX_SHAPE_CACHE_SIZE
4855 );
4856 }
4857
4858 #[test]
4861 fn read_pool_size_configurable() {
4862 let db = NamedTempFile::new().expect("temporary db");
4863 let coordinator = ExecutionCoordinator::open(
4864 db.path(),
4865 Arc::new(SchemaManager::new()),
4866 None,
4867 2,
4868 Arc::new(TelemetryCounters::default()),
4869 None,
4870 )
4871 .expect("coordinator with pool_size=2");
4872
4873 assert_eq!(coordinator.pool.size(), 2);
4874
4875 let compiled = QueryBuilder::nodes("Meeting")
4877 .text_search("budget", 5)
4878 .limit(10)
4879 .compile()
4880 .expect("compiled query");
4881
4882 let result = coordinator.execute_compiled_read(&compiled);
4883 assert!(result.is_ok(), "read through pool must succeed");
4884 }
4885
4886 #[test]
4889 fn grouped_read_results_match_baseline() {
4890 use fathomdb_query::TraverseDirection;
4891
4892 let db = NamedTempFile::new().expect("temporary db");
4893
4894 let coordinator = ExecutionCoordinator::open(
4896 db.path(),
4897 Arc::new(SchemaManager::new()),
4898 None,
4899 1,
4900 Arc::new(TelemetryCounters::default()),
4901 None,
4902 )
4903 .expect("coordinator");
4904
4905 {
4908 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4909 for i in 0..10 {
4910 conn.execute_batch(&format!(
4911 r#"
4912 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4913 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4914 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4915 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4916 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4917 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4918
4919 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4920 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4921 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4922 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4923
4924 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4925 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4926 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4927 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4928 "#,
4929 )).expect("seed data");
4930 }
4931 }
4932
4933 let compiled = QueryBuilder::nodes("Meeting")
4934 .text_search("meeting", 10)
4935 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
4936 .limit(10)
4937 .compile_grouped()
4938 .expect("compiled grouped query");
4939
4940 let result = coordinator
4941 .execute_compiled_grouped_read(&compiled)
4942 .expect("grouped read");
4943
4944 assert!(!result.was_degraded, "grouped read should not be degraded");
4945 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4946 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4947 assert_eq!(result.expansions[0].slot, "tasks");
4948 assert_eq!(
4949 result.expansions[0].roots.len(),
4950 10,
4951 "each expansion slot should have entries for all 10 roots"
4952 );
4953
4954 for root_expansion in &result.expansions[0].roots {
4956 assert_eq!(
4957 root_expansion.nodes.len(),
4958 2,
4959 "root {} should have 2 expansion nodes, got {}",
4960 root_expansion.root_logical_id,
4961 root_expansion.nodes.len()
4962 );
4963 }
4964 }
4965
4966 #[test]
4969 fn build_bm25_expr_no_weights() {
4970 let schema_json = r#"["$.title","$.body"]"#;
4971 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4972 assert_eq!(result, "bm25(fts_props_testkind)");
4973 }
4974
4975 #[test]
4976 fn build_bm25_expr_with_weights() {
4977 let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4978 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4979 assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4980 }
4981
4982 #[test]
4985 #[allow(clippy::too_many_lines)]
4986 fn weighted_schema_bm25_orders_title_match_above_body_match() {
4987 use crate::{
4988 AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4989 WriterActor, writer::ChunkPolicy,
4990 };
4991 use fathomdb_schema::fts_column_name;
4992
4993 let db = NamedTempFile::new().expect("temporary db");
4994 let schema_manager = Arc::new(SchemaManager::new());
4995
4996 {
4998 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4999 admin
5000 .register_fts_property_schema_with_entries(
5001 "Article",
5002 &[
5003 FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
5004 FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
5005 ],
5006 None,
5007 &[],
5008 crate::rebuild_actor::RebuildMode::Eager,
5009 )
5010 .expect("register schema with weights");
5011 }
5012
5013 let writer = WriterActor::start(
5015 db.path(),
5016 Arc::clone(&schema_manager),
5017 ProvenanceMode::Warn,
5018 Arc::new(TelemetryCounters::default()),
5019 )
5020 .expect("writer");
5021
5022 writer
5024 .submit(WriteRequest {
5025 label: "insert-a".to_owned(),
5026 nodes: vec![NodeInsert {
5027 row_id: "row-a".to_owned(),
5028 logical_id: "article-a".to_owned(),
5029 kind: "Article".to_owned(),
5030 properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
5031 source_ref: Some("src-a".to_owned()),
5032 upsert: false,
5033 chunk_policy: ChunkPolicy::Preserve,
5034 content_ref: None,
5035 }],
5036 node_retires: vec![],
5037 edges: vec![],
5038 edge_retires: vec![],
5039 chunks: vec![],
5040 runs: vec![],
5041 steps: vec![],
5042 actions: vec![],
5043 optional_backfills: vec![],
5044 vec_inserts: vec![],
5045 operational_writes: vec![],
5046 })
5047 .expect("write node A");
5048
5049 writer
5051 .submit(WriteRequest {
5052 label: "insert-b".to_owned(),
5053 nodes: vec![NodeInsert {
5054 row_id: "row-b".to_owned(),
5055 logical_id: "article-b".to_owned(),
5056 kind: "Article".to_owned(),
5057 properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
5058 source_ref: Some("src-b".to_owned()),
5059 upsert: false,
5060 chunk_policy: ChunkPolicy::Preserve,
5061 content_ref: None,
5062 }],
5063 node_retires: vec![],
5064 edges: vec![],
5065 edge_retires: vec![],
5066 chunks: vec![],
5067 runs: vec![],
5068 steps: vec![],
5069 actions: vec![],
5070 optional_backfills: vec![],
5071 vec_inserts: vec![],
5072 operational_writes: vec![],
5073 })
5074 .expect("write node B");
5075
5076 drop(writer);
5077
5078 {
5080 let title_col = fts_column_name("$.title", false);
5081 let body_col = fts_column_name("$.body", false);
5082 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5083 let count: i64 = conn
5084 .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
5085 .expect("count fts rows");
5086 assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
5087 let (title_a, body_a): (String, String) = conn
5088 .query_row(
5089 &format!(
5090 "SELECT {title_col}, {body_col} FROM fts_props_article \
5091 WHERE node_logical_id = 'article-a'"
5092 ),
5093 [],
5094 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
5095 )
5096 .expect("select article-a");
5097 assert_eq!(
5098 title_a, "rust",
5099 "article-a must have 'rust' in title column"
5100 );
5101 assert_eq!(
5102 body_a, "other",
5103 "article-a must have 'other' in body column"
5104 );
5105 }
5106
5107 let coordinator = ExecutionCoordinator::open(
5109 db.path(),
5110 Arc::clone(&schema_manager),
5111 None,
5112 1,
5113 Arc::new(TelemetryCounters::default()),
5114 None,
5115 )
5116 .expect("coordinator");
5117
5118 let compiled = fathomdb_query::QueryBuilder::nodes("Article")
5119 .text_search("rust", 5)
5120 .limit(10)
5121 .compile()
5122 .expect("compiled query");
5123
5124 let rows = coordinator
5125 .execute_compiled_read(&compiled)
5126 .expect("execute read");
5127
5128 assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
5129 assert_eq!(
5130 rows.nodes[0].logical_id, "article-a",
5131 "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
5132 );
5133 }
5134
5135 #[test]
5146 fn property_fts_hit_matched_paths_from_positions() {
5147 use crate::{AdminService, rebuild_actor::RebuildMode};
5148 use fathomdb_query::compile_search;
5149
5150 let db = NamedTempFile::new().expect("temporary db");
5151 let schema_manager = Arc::new(SchemaManager::new());
5152
5153 {
5156 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5157 admin
5158 .register_fts_property_schema_with_entries(
5159 "Item",
5160 &[
5161 crate::FtsPropertyPathSpec::scalar("$.body"),
5162 crate::FtsPropertyPathSpec::scalar("$.title"),
5163 ],
5164 None,
5165 &[],
5166 RebuildMode::Eager,
5167 )
5168 .expect("register Item FTS schema");
5169 }
5170
5171 let coordinator = ExecutionCoordinator::open(
5172 db.path(),
5173 Arc::clone(&schema_manager),
5174 None,
5175 1,
5176 Arc::new(TelemetryCounters::default()),
5177 None,
5178 )
5179 .expect("coordinator");
5180
5181 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5182
5183 let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
5188 assert_eq!(
5190 crate::writer::LEAF_SEPARATOR.len(),
5191 29,
5192 "LEAF_SEPARATOR length changed; update position offsets"
5193 );
5194
5195 conn.execute(
5196 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5197 VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
5198 [],
5199 )
5200 .expect("insert node");
5201 conn.execute(
5203 "INSERT INTO fts_props_item (node_logical_id, text_content) \
5204 VALUES ('item-1', ?1)",
5205 rusqlite::params![blob],
5206 )
5207 .expect("insert fts row");
5208 conn.execute(
5209 "INSERT INTO fts_node_property_positions \
5210 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5211 VALUES ('item-1', 'Item', 0, 5, '$.body')",
5212 [],
5213 )
5214 .expect("insert body position");
5215 conn.execute(
5216 "INSERT INTO fts_node_property_positions \
5217 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5218 VALUES ('item-1', 'Item', 34, 44, '$.title')",
5219 [],
5220 )
5221 .expect("insert title position");
5222
5223 let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
5224 let mut compiled = compile_search(ast.ast()).expect("compile search");
5225 compiled.attribution_requested = true;
5226
5227 let rows = coordinator
5228 .execute_compiled_search(&compiled)
5229 .expect("search");
5230
5231 assert!(!rows.hits.is_empty(), "expected at least one hit");
5232 let hit = rows
5233 .hits
5234 .iter()
5235 .find(|h| h.node.logical_id == "item-1")
5236 .expect("item-1 must be in hits");
5237
5238 let att = hit
5239 .attribution
5240 .as_ref()
5241 .expect("attribution must be Some when attribution_requested");
5242 assert!(
5243 att.matched_paths.contains(&"$.title".to_owned()),
5244 "matched_paths must contain '$.title', got {:?}",
5245 att.matched_paths,
5246 );
5247 assert!(
5248 !att.matched_paths.contains(&"$.body".to_owned()),
5249 "matched_paths must NOT contain '$.body', got {:?}",
5250 att.matched_paths,
5251 );
5252 }
5253
5254 #[test]
5262 fn vector_hit_has_no_attribution() {
5263 use fathomdb_query::compile_vector_search;
5264
5265 let db = NamedTempFile::new().expect("temporary db");
5266 let coordinator = ExecutionCoordinator::open(
5267 db.path(),
5268 Arc::new(SchemaManager::new()),
5269 None,
5270 1,
5271 Arc::new(TelemetryCounters::default()),
5272 None,
5273 )
5274 .expect("coordinator");
5275
5276 let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
5278 let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
5279 compiled.attribution_requested = true;
5280
5281 let rows = coordinator
5284 .execute_compiled_vector_search(&compiled)
5285 .expect("vector search must not error");
5286
5287 assert!(
5288 rows.was_degraded,
5289 "vector search without vec table must degrade"
5290 );
5291 for hit in &rows.hits {
5292 assert!(
5293 hit.attribution.is_none(),
5294 "vector hits must carry attribution = None, got {:?}",
5295 hit.attribution
5296 );
5297 }
5298 }
5299
5300 #[test]
5314 fn chunk_hit_has_text_content_attribution() {
5315 use fathomdb_query::compile_search;
5316
5317 let db = NamedTempFile::new().expect("temporary db");
5318 let coordinator = ExecutionCoordinator::open(
5319 db.path(),
5320 Arc::new(SchemaManager::new()),
5321 None,
5322 1,
5323 Arc::new(TelemetryCounters::default()),
5324 None,
5325 )
5326 .expect("coordinator");
5327
5328 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5329
5330 conn.execute_batch(
5331 r"
5332 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
5333 VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
5334 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
5335 VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
5336 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
5337 VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
5338 ",
5339 )
5340 .expect("seed chunk node");
5341
5342 let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
5343 let mut compiled = compile_search(ast.ast()).expect("compile search");
5344 compiled.attribution_requested = true;
5345
5346 let rows = coordinator
5347 .execute_compiled_search(&compiled)
5348 .expect("search");
5349
5350 assert!(!rows.hits.is_empty(), "expected chunk hit");
5351 let hit = rows
5352 .hits
5353 .iter()
5354 .find(|h| matches!(h.source, SearchHitSource::Chunk))
5355 .expect("must have a Chunk hit");
5356
5357 let att = hit
5358 .attribution
5359 .as_ref()
5360 .expect("attribution must be Some when attribution_requested");
5361 assert_eq!(
5362 att.matched_paths,
5363 vec!["text_content".to_owned()],
5364 "chunk matched_paths must be [\"text_content\"], got {:?}",
5365 att.matched_paths,
5366 );
5367 }
5368
5369 #[test]
5376 #[allow(clippy::too_many_lines)]
5377 fn mixed_kind_results_get_per_kind_matched_paths() {
5378 use crate::{AdminService, rebuild_actor::RebuildMode};
5379 use fathomdb_query::compile_search;
5380
5381 let db = NamedTempFile::new().expect("temporary db");
5382 let schema_manager = Arc::new(SchemaManager::new());
5383
5384 {
5387 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
5388 admin
5389 .register_fts_property_schema_with_entries(
5390 "KindA",
5391 &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
5392 None,
5393 &[],
5394 RebuildMode::Eager,
5395 )
5396 .expect("register KindA FTS schema");
5397 admin
5398 .register_fts_property_schema_with_entries(
5399 "KindB",
5400 &[crate::FtsPropertyPathSpec::scalar("$.beta")],
5401 None,
5402 &[],
5403 RebuildMode::Eager,
5404 )
5405 .expect("register KindB FTS schema");
5406 }
5407
5408 let coordinator = ExecutionCoordinator::open(
5409 db.path(),
5410 Arc::clone(&schema_manager),
5411 None,
5412 1,
5413 Arc::new(TelemetryCounters::default()),
5414 None,
5415 )
5416 .expect("coordinator");
5417
5418 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5419
5420 conn.execute(
5422 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5423 VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
5424 [],
5425 )
5426 .expect("insert KindA node");
5427 conn.execute(
5429 "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
5430 VALUES ('node-a', 'xenoterm')",
5431 [],
5432 )
5433 .expect("insert KindA fts row");
5434 conn.execute(
5435 "INSERT INTO fts_node_property_positions \
5436 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5437 VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
5438 [],
5439 )
5440 .expect("insert KindA position");
5441
5442 conn.execute(
5444 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5445 VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
5446 [],
5447 )
5448 .expect("insert KindB node");
5449 conn.execute(
5451 "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
5452 VALUES ('node-b', 'xenoterm')",
5453 [],
5454 )
5455 .expect("insert KindB fts row");
5456 conn.execute(
5457 "INSERT INTO fts_node_property_positions \
5458 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
5459 VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
5460 [],
5461 )
5462 .expect("insert KindB position");
5463
5464 let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
5466 let mut compiled = compile_search(ast.ast()).expect("compile search");
5467 compiled.attribution_requested = true;
5468
5469 let rows = coordinator
5470 .execute_compiled_search(&compiled)
5471 .expect("search");
5472
5473 assert!(
5475 rows.hits.len() >= 2,
5476 "expected hits for both kinds, got {}",
5477 rows.hits.len()
5478 );
5479
5480 for hit in &rows.hits {
5481 let att = hit
5482 .attribution
5483 .as_ref()
5484 .expect("attribution must be Some when attribution_requested");
5485 match hit.node.kind.as_str() {
5486 "KindA" => {
5487 assert_eq!(
5488 att.matched_paths,
5489 vec!["$.alpha".to_owned()],
5490 "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5491 att.matched_paths,
5492 );
5493 }
5494 "KindB" => {
5495 assert_eq!(
5496 att.matched_paths,
5497 vec!["$.beta".to_owned()],
5498 "KindB hit must have matched_paths=['$.beta'], got {:?}",
5499 att.matched_paths,
5500 );
5501 }
5502 other => {
5503 assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5505 }
5506 }
5507 }
5508 }
5509
5510 #[test]
5513 fn tokenizer_strategy_from_str() {
5514 use super::TokenizerStrategy;
5515 assert_eq!(
5516 TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5517 TokenizerStrategy::RecallOptimizedEnglish,
5518 );
5519 assert_eq!(
5520 TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5521 TokenizerStrategy::PrecisionOptimized,
5522 );
5523 assert_eq!(
5524 TokenizerStrategy::from_str("trigram"),
5525 TokenizerStrategy::SubstringTrigram,
5526 );
5527 assert_eq!(
5528 TokenizerStrategy::from_str("icu"),
5529 TokenizerStrategy::GlobalCjk,
5530 );
5531 assert_eq!(
5532 TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5533 TokenizerStrategy::SourceCode,
5534 );
5535 assert_eq!(
5537 TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5538 TokenizerStrategy::SourceCode,
5539 );
5540 assert_eq!(
5541 TokenizerStrategy::from_str("my_custom_tokenizer"),
5542 TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5543 );
5544 }
5545
5546 #[test]
5547 fn trigram_short_query_returns_empty() {
5548 use fathomdb_query::compile_search;
5549
5550 let db = NamedTempFile::new().expect("temporary db");
5551 let schema_manager = Arc::new(SchemaManager::new());
5552
5553 {
5555 let bootstrap = ExecutionCoordinator::open(
5556 db.path(),
5557 Arc::clone(&schema_manager),
5558 None,
5559 1,
5560 Arc::new(TelemetryCounters::default()),
5561 None,
5562 )
5563 .expect("bootstrap coordinator");
5564 drop(bootstrap);
5565 }
5566
5567 {
5569 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5570 conn.execute_batch(
5571 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5572 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5573 )
5574 .expect("insert profile");
5575 }
5576
5577 let coordinator = ExecutionCoordinator::open(
5579 db.path(),
5580 Arc::clone(&schema_manager),
5581 None,
5582 1,
5583 Arc::new(TelemetryCounters::default()),
5584 None,
5585 )
5586 .expect("coordinator reopen");
5587
5588 let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5590 let compiled = compile_search(ast.ast()).expect("compile search");
5591 let rows = coordinator
5592 .execute_compiled_search(&compiled)
5593 .expect("short trigram query must not error");
5594 assert!(
5595 rows.hits.is_empty(),
5596 "2-char trigram query must return empty"
5597 );
5598 }
5599
5600 #[test]
5601 fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5602 use fathomdb_query::compile_search;
5612
5613 let db = NamedTempFile::new().expect("temporary db");
5614 let schema_manager = Arc::new(SchemaManager::new());
5615
5616 {
5618 let bootstrap = ExecutionCoordinator::open(
5619 db.path(),
5620 Arc::clone(&schema_manager),
5621 None,
5622 1,
5623 Arc::new(TelemetryCounters::default()),
5624 None,
5625 )
5626 .expect("bootstrap coordinator");
5627 drop(bootstrap);
5628 }
5629
5630 {
5632 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5633 conn.execute(
5634 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5635 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5636 [],
5637 )
5638 .expect("insert profile");
5639 conn.execute_batch(
5640 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5641 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5642 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5643 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5644 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5645 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5646 )
5647 .expect("insert node and fts row");
5648 }
5649
5650 let coordinator = ExecutionCoordinator::open(
5652 db.path(),
5653 Arc::clone(&schema_manager),
5654 None,
5655 1,
5656 Arc::new(TelemetryCounters::default()),
5657 None,
5658 )
5659 .expect("coordinator reopen");
5660
5661 let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5663 let compiled = compile_search(ast.ast()).expect("compile search");
5664 let rows = coordinator
5665 .execute_compiled_search(&compiled)
5666 .expect("source code search must not error");
5667 assert!(
5668 !rows.hits.is_empty(),
5669 "SourceCode strategy search for 'std.io' must return the document; \
5670 got empty — FTS5 expression was likely corrupted by post-render escaping"
5671 );
5672 }
5673
5674 #[derive(Debug)]
5677 struct StubEmbedder {
5678 model_identity: String,
5679 dimension: usize,
5680 }
5681
5682 impl StubEmbedder {
5683 fn new(model_identity: &str, dimension: usize) -> Self {
5684 Self {
5685 model_identity: model_identity.to_owned(),
5686 dimension,
5687 }
5688 }
5689 }
5690
5691 impl crate::embedder::QueryEmbedder for StubEmbedder {
5692 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5693 Ok(vec![0.0; self.dimension])
5694 }
5695 fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5696 crate::embedder::QueryEmbedderIdentity {
5697 model_identity: self.model_identity.clone(),
5698 model_version: "1.0".to_owned(),
5699 dimension: self.dimension,
5700 normalization_policy: "l2".to_owned(),
5701 }
5702 }
5703 fn max_tokens(&self) -> usize {
5704 512
5705 }
5706 }
5707
5708 fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5709 let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5710 conn.execute_batch(
5711 "CREATE TABLE IF NOT EXISTS projection_profiles (
5712 kind TEXT NOT NULL,
5713 facet TEXT NOT NULL,
5714 config_json TEXT NOT NULL,
5715 active_at INTEGER,
5716 created_at INTEGER,
5717 PRIMARY KEY (kind, facet)
5718 );",
5719 )
5720 .expect("create projection_profiles");
5721 conn
5722 }
5723
5724 #[test]
5725 fn check_vec_identity_no_profile_no_panic() {
5726 let conn = make_in_memory_db_with_projection_profiles();
5727 let embedder = StubEmbedder::new("bge-small", 384);
5728 let result = super::check_vec_identity_at_open(&conn, &embedder);
5729 assert!(result.is_ok(), "no profile row must return Ok(())");
5730 }
5731
5732 #[test]
5733 fn check_vec_identity_matching_identity_ok() {
5734 let conn = make_in_memory_db_with_projection_profiles();
5735 conn.execute(
5736 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5737 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5738 [],
5739 )
5740 .expect("insert profile");
5741 let embedder = StubEmbedder::new("bge-small", 384);
5742 let result = super::check_vec_identity_at_open(&conn, &embedder);
5743 assert!(result.is_ok(), "matching profile must return Ok(())");
5744 }
5745
5746 #[test]
5747 fn check_vec_identity_mismatched_dimensions_ok() {
5748 let conn = make_in_memory_db_with_projection_profiles();
5749 conn.execute(
5750 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5751 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5752 [],
5753 )
5754 .expect("insert profile");
5755 let embedder = StubEmbedder::new("bge-small", 768);
5757 let result = super::check_vec_identity_at_open(&conn, &embedder);
5758 assert!(
5759 result.is_ok(),
5760 "dimension mismatch must warn and return Ok(())"
5761 );
5762 }
5763
5764 #[test]
5765 fn custom_tokenizer_passthrough() {
5766 use super::TokenizerStrategy;
5767 let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5768 assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5770 assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5772 assert_ne!(strategy, TokenizerStrategy::SourceCode);
5773 }
5774
5775 #[test]
5776 fn check_vec_identity_mismatched_model_ok() {
5777 let conn = make_in_memory_db_with_projection_profiles();
5778 conn.execute(
5779 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5780 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5781 [],
5782 )
5783 .expect("insert profile");
5784 let embedder = StubEmbedder::new("bge-large", 384);
5786 let result = super::check_vec_identity_at_open(&conn, &embedder);
5787 assert!(
5788 result.is_ok(),
5789 "model_identity mismatch must warn and return Ok(())"
5790 );
5791 }
5792}