1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10 FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11 SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25const BATCH_CHUNK_SIZE: usize = 200;
30
31fn compile_expansion_filter(
44 filter: Option<&Predicate>,
45 first_param: usize,
46) -> (String, Vec<Value>) {
47 let Some(predicate) = filter else {
48 return (String::new(), vec![]);
49 };
50 let p = first_param;
51 match predicate {
52 Predicate::JsonPathEq { path, value } => {
53 let val = match value {
54 ScalarValue::Text(t) => Value::Text(t.clone()),
55 ScalarValue::Integer(i) => Value::Integer(*i),
56 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
57 };
58 (
59 format!(
60 "\n AND json_extract(n.properties, ?{p}) = ?{}",
61 p + 1
62 ),
63 vec![Value::Text(path.clone()), val],
64 )
65 }
66 Predicate::JsonPathCompare { path, op, value } => {
67 let val = match value {
68 ScalarValue::Text(t) => Value::Text(t.clone()),
69 ScalarValue::Integer(i) => Value::Integer(*i),
70 ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
71 };
72 let operator = match op {
73 ComparisonOp::Gt => ">",
74 ComparisonOp::Gte => ">=",
75 ComparisonOp::Lt => "<",
76 ComparisonOp::Lte => "<=",
77 };
78 (
79 format!(
80 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
81 p + 1
82 ),
83 vec![Value::Text(path.clone()), val],
84 )
85 }
86 Predicate::JsonPathFusedEq { path, value } => (
87 format!(
88 "\n AND json_extract(n.properties, ?{p}) = ?{}",
89 p + 1
90 ),
91 vec![Value::Text(path.clone()), Value::Text(value.clone())],
92 ),
93 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
94 let operator = match op {
95 ComparisonOp::Gt => ">",
96 ComparisonOp::Gte => ">=",
97 ComparisonOp::Lt => "<",
98 ComparisonOp::Lte => "<=",
99 };
100 (
101 format!(
102 "\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
103 p + 1
104 ),
105 vec![Value::Text(path.clone()), Value::Integer(*value)],
106 )
107 }
108 Predicate::KindEq(kind) => (
109 format!("\n AND n.kind = ?{p}"),
110 vec![Value::Text(kind.clone())],
111 ),
112 Predicate::LogicalIdEq(logical_id) => (
113 format!("\n AND n.logical_id = ?{p}"),
114 vec![Value::Text(logical_id.clone())],
115 ),
116 Predicate::SourceRefEq(source_ref) => (
117 format!("\n AND n.source_ref = ?{p}"),
118 vec![Value::Text(source_ref.clone())],
119 ),
120 Predicate::ContentRefEq(uri) => (
121 format!("\n AND n.content_ref = ?{p}"),
122 vec![Value::Text(uri.clone())],
123 ),
124 Predicate::ContentRefNotNull => (
125 "\n AND n.content_ref IS NOT NULL".to_owned(),
126 vec![],
127 ),
128 }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum TokenizerStrategy {
137 RecallOptimizedEnglish,
139 PrecisionOptimized,
141 SubstringTrigram,
143 GlobalCjk,
145 SourceCode,
147 Custom(String),
149}
150
151impl TokenizerStrategy {
152 pub fn from_str(s: &str) -> Self {
155 match s {
156 "porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
157 "unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
158 "trigram" => Self::SubstringTrigram,
159 "icu" => Self::GlobalCjk,
160 s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
161 other => Self::Custom(other.to_string()),
162 }
163 }
164}
165
166struct ReadPool {
171 connections: Vec<Mutex<Connection>>,
172}
173
174impl fmt::Debug for ReadPool {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 f.debug_struct("ReadPool")
177 .field("size", &self.connections.len())
178 .finish()
179 }
180}
181
182impl ReadPool {
183 fn new(
194 db_path: &Path,
195 pool_size: usize,
196 schema_manager: &SchemaManager,
197 vector_enabled: bool,
198 ) -> Result<Self, EngineError> {
199 let mut connections = Vec::with_capacity(pool_size);
200 for _ in 0..pool_size {
201 let conn = if vector_enabled {
202 #[cfg(feature = "sqlite-vec")]
203 {
204 sqlite::open_readonly_connection_with_vec(db_path)?
205 }
206 #[cfg(not(feature = "sqlite-vec"))]
207 {
208 sqlite::open_readonly_connection(db_path)?
209 }
210 } else {
211 sqlite::open_readonly_connection(db_path)?
212 };
213 schema_manager
214 .initialize_reader_connection(&conn)
215 .map_err(EngineError::Schema)?;
216 connections.push(Mutex::new(conn));
217 }
218 Ok(Self { connections })
219 }
220
221 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
230 for conn in &self.connections {
232 if let Ok(guard) = conn.try_lock() {
233 return Ok(guard);
234 }
235 }
236 self.connections[0].lock().map_err(|_| {
238 trace_error!("read pool: connection mutex poisoned");
239 EngineError::Bridge("connection mutex poisoned".to_owned())
240 })
241 }
242
243 #[cfg(test)]
245 fn size(&self) -> usize {
246 self.connections.len()
247 }
248}
249
250#[derive(Clone, Debug, PartialEq, Eq)]
254pub struct QueryPlan {
255 pub sql: String,
256 pub bind_count: usize,
257 pub driving_table: DrivingTable,
258 pub shape_hash: ShapeHash,
259 pub cache_hit: bool,
260}
261
262#[derive(Clone, Debug, PartialEq, Eq)]
264pub struct NodeRow {
265 pub row_id: String,
267 pub logical_id: String,
269 pub kind: String,
271 pub properties: String,
273 pub content_ref: Option<String>,
275 pub last_accessed_at: Option<i64>,
277}
278
279#[derive(Clone, Debug, PartialEq, Eq)]
281pub struct RunRow {
282 pub id: String,
284 pub kind: String,
286 pub status: String,
288 pub properties: String,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct StepRow {
295 pub id: String,
297 pub run_id: String,
299 pub kind: String,
301 pub status: String,
303 pub properties: String,
305}
306
307#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct ActionRow {
310 pub id: String,
312 pub step_id: String,
314 pub kind: String,
316 pub status: String,
318 pub properties: String,
320}
321
322#[derive(Clone, Debug, PartialEq, Eq)]
324pub struct ProvenanceEvent {
325 pub id: String,
326 pub event_type: String,
327 pub subject: String,
328 pub source_ref: Option<String>,
329 pub metadata_json: String,
330 pub created_at: i64,
331}
332
333#[derive(Clone, Debug, Default, PartialEq, Eq)]
335pub struct QueryRows {
336 pub nodes: Vec<NodeRow>,
338 pub runs: Vec<RunRow>,
340 pub steps: Vec<StepRow>,
342 pub actions: Vec<ActionRow>,
344 pub was_degraded: bool,
347}
348
349#[derive(Clone, Debug, PartialEq, Eq)]
351pub struct ExpansionRootRows {
352 pub root_logical_id: String,
354 pub nodes: Vec<NodeRow>,
356}
357
358#[derive(Clone, Debug, PartialEq, Eq)]
360pub struct ExpansionSlotRows {
361 pub slot: String,
363 pub roots: Vec<ExpansionRootRows>,
365}
366
367#[derive(Clone, Debug, Default, PartialEq, Eq)]
369pub struct GroupedQueryRows {
370 pub roots: Vec<NodeRow>,
372 pub expansions: Vec<ExpansionSlotRows>,
374 pub was_degraded: bool,
376}
377
378pub struct ExecutionCoordinator {
380 database_path: PathBuf,
381 schema_manager: Arc<SchemaManager>,
382 pool: ReadPool,
383 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
384 vector_enabled: bool,
385 vec_degradation_warned: AtomicBool,
386 telemetry: Arc<TelemetryCounters>,
387 query_embedder: Option<Arc<dyn QueryEmbedder>>,
394 fts_strategies: HashMap<String, TokenizerStrategy>,
405}
406
407impl fmt::Debug for ExecutionCoordinator {
408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409 f.debug_struct("ExecutionCoordinator")
410 .field("database_path", &self.database_path)
411 .finish_non_exhaustive()
412 }
413}
414
415impl ExecutionCoordinator {
416 pub fn open(
419 path: impl AsRef<Path>,
420 schema_manager: Arc<SchemaManager>,
421 vector_dimension: Option<usize>,
422 pool_size: usize,
423 telemetry: Arc<TelemetryCounters>,
424 query_embedder: Option<Arc<dyn QueryEmbedder>>,
425 ) -> Result<Self, EngineError> {
426 let path = path.as_ref().to_path_buf();
427 #[cfg(feature = "sqlite-vec")]
428 let mut conn = if vector_dimension.is_some() {
429 sqlite::open_connection_with_vec(&path)?
430 } else {
431 sqlite::open_connection(&path)?
432 };
433 #[cfg(not(feature = "sqlite-vec"))]
434 let mut conn = sqlite::open_connection(&path)?;
435
436 let report = schema_manager.bootstrap(&conn)?;
437
438 run_open_time_fts_guards(&mut conn)?;
457
458 #[cfg(feature = "sqlite-vec")]
459 let mut vector_enabled = report.vector_profile_enabled;
460 #[cfg(not(feature = "sqlite-vec"))]
461 let vector_enabled = {
462 let _ = &report;
463 false
464 };
465
466 if let Some(dim) = vector_dimension {
467 schema_manager
468 .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
469 .map_err(EngineError::Schema)?;
470 #[cfg(feature = "sqlite-vec")]
472 {
473 vector_enabled = true;
474 }
475 }
476
477 if let Some(ref emb) = query_embedder {
480 check_vec_identity_at_open(&conn, emb.as_ref())?;
481 }
482
483 let fts_strategies: HashMap<String, TokenizerStrategy> = {
485 let mut map = HashMap::new();
486 let mut stmt = conn
487 .prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
488 let rows = stmt.query_map([], |row| {
489 let kind: String = row.get(0)?;
490 let config_json: String = row.get(1)?;
491 Ok((kind, config_json))
492 })?;
493 for row in rows.flatten() {
494 let (kind, config_json) = row;
495 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
496 && let Some(tok) = v["tokenizer"].as_str()
497 {
498 map.insert(kind, TokenizerStrategy::from_str(tok));
499 }
500 }
501 map
502 };
503
504 drop(conn);
506
507 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
508
509 Ok(Self {
510 database_path: path,
511 schema_manager,
512 pool,
513 shape_sql_map: Mutex::new(HashMap::new()),
514 vector_enabled,
515 vec_degradation_warned: AtomicBool::new(false),
516 telemetry,
517 query_embedder,
518 fts_strategies,
519 })
520 }
521
522 pub fn database_path(&self) -> &Path {
524 &self.database_path
525 }
526
527 #[must_use]
529 pub fn vector_enabled(&self) -> bool {
530 self.vector_enabled
531 }
532
533 #[must_use]
540 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
541 self.query_embedder.as_ref()
542 }
543
544 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
545 self.pool.acquire()
546 }
547
548 #[must_use]
554 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
555 let mut total = SqliteCacheStatus::default();
556 for conn_mutex in &self.pool.connections {
557 if let Ok(conn) = conn_mutex.try_lock() {
558 total.add(&read_db_cache_status(&conn));
559 }
560 }
561 total
562 }
563
564 #[allow(clippy::expect_used)]
567 pub fn execute_compiled_read(
568 &self,
569 compiled: &CompiledQuery,
570 ) -> Result<QueryRows, EngineError> {
571 if compiled.driving_table == DrivingTable::FtsNodes
576 && let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
577 && let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
578 {
579 self.telemetry.increment_queries();
580 return Ok(QueryRows {
581 nodes,
582 runs: Vec::new(),
583 steps: Vec::new(),
584 actions: Vec::new(),
585 was_degraded: false,
586 });
587 }
588
589 let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
595 let conn_check = match self.lock_connection() {
596 Ok(g) => g,
597 Err(e) => {
598 self.telemetry.increment_errors();
599 return Err(e);
600 }
601 };
602 let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
603 drop(conn_check);
604 result?
605 } else {
606 (compiled.sql.clone(), compiled.binds.clone())
607 };
608
609 let row_sql = wrap_node_row_projection_sql(&adapted_sql);
610 {
616 let mut cache = self
617 .shape_sql_map
618 .lock()
619 .unwrap_or_else(PoisonError::into_inner);
620 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
621 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
622 cache.clear();
623 }
624 cache.insert(compiled.shape_hash, row_sql.clone());
625 }
626
627 let bind_values = adapted_binds
628 .iter()
629 .map(bind_value_to_sql)
630 .collect::<Vec<_>>();
631
632 let conn_guard = match self.lock_connection() {
637 Ok(g) => g,
638 Err(e) => {
639 self.telemetry.increment_errors();
640 return Err(e);
641 }
642 };
643 let mut statement = match conn_guard.prepare_cached(&row_sql) {
644 Ok(stmt) => stmt,
645 Err(e) if is_vec_table_absent(&e) => {
646 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
647 trace_warn!("vector table absent, degrading to non-vector query");
648 }
649 return Ok(QueryRows {
650 was_degraded: true,
651 ..Default::default()
652 });
653 }
654 Err(e) => {
655 self.telemetry.increment_errors();
656 return Err(EngineError::Sqlite(e));
657 }
658 };
659 let nodes = match statement
660 .query_map(params_from_iter(bind_values.iter()), |row| {
661 Ok(NodeRow {
662 row_id: row.get(0)?,
663 logical_id: row.get(1)?,
664 kind: row.get(2)?,
665 properties: row.get(3)?,
666 content_ref: row.get(4)?,
667 last_accessed_at: row.get(5)?,
668 })
669 })
670 .and_then(Iterator::collect)
671 {
672 Ok(rows) => rows,
673 Err(e) => {
674 self.telemetry.increment_errors();
675 return Err(EngineError::Sqlite(e));
676 }
677 };
678
679 self.telemetry.increment_queries();
680 Ok(QueryRows {
681 nodes,
682 runs: Vec::new(),
683 steps: Vec::new(),
684 actions: Vec::new(),
685 was_degraded: false,
686 })
687 }
688
689 pub fn execute_compiled_search(
704 &self,
705 compiled: &CompiledSearch,
706 ) -> Result<SearchRows, EngineError> {
707 let (relaxed_query, was_degraded_at_plan_time) =
714 fathomdb_query::derive_relaxed(&compiled.text_query);
715 let relaxed = relaxed_query.map(|q| CompiledSearch {
716 root_kind: compiled.root_kind.clone(),
717 text_query: q,
718 limit: compiled.limit,
719 fusable_filters: compiled.fusable_filters.clone(),
720 residual_filters: compiled.residual_filters.clone(),
721 attribution_requested: compiled.attribution_requested,
722 });
723 let plan = CompiledSearchPlan {
724 strict: compiled.clone(),
725 relaxed,
726 was_degraded_at_plan_time,
727 };
728 self.execute_compiled_search_plan(&plan)
729 }
730
731 pub fn execute_compiled_search_plan(
750 &self,
751 plan: &CompiledSearchPlan,
752 ) -> Result<SearchRows, EngineError> {
753 let strict = &plan.strict;
754 let limit = strict.limit;
755 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
756
757 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
758 let strict_underfilled = strict_hits.len() < fallback_threshold;
759
760 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
761 let mut fallback_used = false;
762 let mut was_degraded = false;
763 if let Some(relaxed) = plan.relaxed.as_ref()
764 && strict_underfilled
765 {
766 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
767 fallback_used = true;
768 was_degraded = plan.was_degraded_at_plan_time;
769 }
770
771 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
772 if strict.attribution_requested {
776 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
777 self.populate_attribution_for_hits(
778 &mut merged,
779 &strict.text_query,
780 relaxed_text_query,
781 )?;
782 }
783 let strict_hit_count = merged
784 .iter()
785 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
786 .count();
787 let relaxed_hit_count = merged
788 .iter()
789 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
790 .count();
791 let vector_hit_count = 0;
795
796 Ok(SearchRows {
797 hits: merged,
798 strict_hit_count,
799 relaxed_hit_count,
800 vector_hit_count,
801 fallback_used,
802 was_degraded,
803 })
804 }
805
806 #[allow(clippy::too_many_lines)]
835 pub fn execute_compiled_vector_search(
836 &self,
837 compiled: &CompiledVectorSearch,
838 ) -> Result<SearchRows, EngineError> {
839 use std::fmt::Write as _;
840
841 if compiled.limit == 0 {
845 return Ok(SearchRows::default());
846 }
847
848 let filter_by_kind = !compiled.root_kind.is_empty();
849 let mut binds: Vec<BindValue> = Vec::new();
850 binds.push(BindValue::Text(compiled.query_text.clone()));
851 if filter_by_kind {
852 binds.push(BindValue::Text(compiled.root_kind.clone()));
853 }
854
855 let mut fused_clauses = String::new();
858 for predicate in &compiled.fusable_filters {
859 match predicate {
860 Predicate::KindEq(kind) => {
861 binds.push(BindValue::Text(kind.clone()));
862 let idx = binds.len();
863 let _ = write!(
864 fused_clauses,
865 "\n AND src.kind = ?{idx}"
866 );
867 }
868 Predicate::LogicalIdEq(logical_id) => {
869 binds.push(BindValue::Text(logical_id.clone()));
870 let idx = binds.len();
871 let _ = write!(
872 fused_clauses,
873 "\n AND src.logical_id = ?{idx}"
874 );
875 }
876 Predicate::SourceRefEq(source_ref) => {
877 binds.push(BindValue::Text(source_ref.clone()));
878 let idx = binds.len();
879 let _ = write!(
880 fused_clauses,
881 "\n AND src.source_ref = ?{idx}"
882 );
883 }
884 Predicate::ContentRefEq(uri) => {
885 binds.push(BindValue::Text(uri.clone()));
886 let idx = binds.len();
887 let _ = write!(
888 fused_clauses,
889 "\n AND src.content_ref = ?{idx}"
890 );
891 }
892 Predicate::ContentRefNotNull => {
893 fused_clauses
894 .push_str("\n AND src.content_ref IS NOT NULL");
895 }
896 Predicate::JsonPathFusedEq { path, value } => {
897 binds.push(BindValue::Text(path.clone()));
898 let path_idx = binds.len();
899 binds.push(BindValue::Text(value.clone()));
900 let value_idx = binds.len();
901 let _ = write!(
902 fused_clauses,
903 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
904 );
905 }
906 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
907 binds.push(BindValue::Text(path.clone()));
908 let path_idx = binds.len();
909 binds.push(BindValue::Integer(*value));
910 let value_idx = binds.len();
911 let operator = match op {
912 ComparisonOp::Gt => ">",
913 ComparisonOp::Gte => ">=",
914 ComparisonOp::Lt => "<",
915 ComparisonOp::Lte => "<=",
916 };
917 let _ = write!(
918 fused_clauses,
919 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
920 );
921 }
922 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
923 }
926 }
927 }
928
929 let mut filter_clauses = String::new();
931 for predicate in &compiled.residual_filters {
932 match predicate {
933 Predicate::JsonPathEq { path, value } => {
934 binds.push(BindValue::Text(path.clone()));
935 let path_idx = binds.len();
936 binds.push(scalar_to_bind(value));
937 let value_idx = binds.len();
938 let _ = write!(
939 filter_clauses,
940 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
941 );
942 }
943 Predicate::JsonPathCompare { path, op, value } => {
944 binds.push(BindValue::Text(path.clone()));
945 let path_idx = binds.len();
946 binds.push(scalar_to_bind(value));
947 let value_idx = binds.len();
948 let operator = match op {
949 ComparisonOp::Gt => ">",
950 ComparisonOp::Gte => ">=",
951 ComparisonOp::Lt => "<",
952 ComparisonOp::Lte => "<=",
953 };
954 let _ = write!(
955 filter_clauses,
956 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
957 );
958 }
959 Predicate::KindEq(_)
960 | Predicate::LogicalIdEq(_)
961 | Predicate::SourceRefEq(_)
962 | Predicate::ContentRefEq(_)
963 | Predicate::ContentRefNotNull
964 | Predicate::JsonPathFusedEq { .. }
965 | Predicate::JsonPathFusedTimestampCmp { .. } => {
966 }
968 }
969 }
970
971 let limit = compiled.limit;
974 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
975 let limit_idx = binds.len();
976
977 let base_limit = limit;
983 let kind_clause = if filter_by_kind {
984 "\n AND src.kind = ?2"
985 } else {
986 ""
987 };
988
989 let sql = format!(
990 "WITH vector_hits AS (
991 SELECT
992 src.row_id AS row_id,
993 src.logical_id AS logical_id,
994 src.kind AS kind,
995 src.properties AS properties,
996 src.source_ref AS source_ref,
997 src.content_ref AS content_ref,
998 src.created_at AS created_at,
999 vc.distance AS distance,
1000 vc.chunk_id AS chunk_id
1001 FROM (
1002 SELECT chunk_id, distance
1003 FROM vec_nodes_active
1004 WHERE embedding MATCH ?1
1005 LIMIT {base_limit}
1006 ) vc
1007 JOIN chunks c ON c.id = vc.chunk_id
1008 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1009 WHERE 1 = 1{kind_clause}{fused_clauses}
1010 )
1011 SELECT
1012 h.row_id,
1013 h.logical_id,
1014 h.kind,
1015 h.properties,
1016 h.content_ref,
1017 am.last_accessed_at,
1018 h.created_at,
1019 h.distance,
1020 h.chunk_id
1021 FROM vector_hits h
1022 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1023 WHERE 1 = 1{filter_clauses}
1024 ORDER BY h.distance ASC
1025 LIMIT ?{limit_idx}"
1026 );
1027
1028 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1029
1030 let conn_guard = match self.lock_connection() {
1031 Ok(g) => g,
1032 Err(e) => {
1033 self.telemetry.increment_errors();
1034 return Err(e);
1035 }
1036 };
1037 let mut statement = match conn_guard.prepare_cached(&sql) {
1038 Ok(stmt) => stmt,
1039 Err(e) if is_vec_table_absent(&e) => {
1040 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1042 trace_warn!("vector table absent, degrading vector_search to empty result");
1043 }
1044 return Ok(SearchRows {
1045 hits: Vec::new(),
1046 strict_hit_count: 0,
1047 relaxed_hit_count: 0,
1048 vector_hit_count: 0,
1049 fallback_used: false,
1050 was_degraded: true,
1051 });
1052 }
1053 Err(e) => {
1054 self.telemetry.increment_errors();
1055 return Err(EngineError::Sqlite(e));
1056 }
1057 };
1058
1059 let attribution_requested = compiled.attribution_requested;
1060 let hits = match statement
1061 .query_map(params_from_iter(bind_values.iter()), |row| {
1062 let distance: f64 = row.get(7)?;
1063 let score = -distance;
1070 Ok(SearchHit {
1071 node: fathomdb_query::NodeRowLite {
1072 row_id: row.get(0)?,
1073 logical_id: row.get(1)?,
1074 kind: row.get(2)?,
1075 properties: row.get(3)?,
1076 content_ref: row.get(4)?,
1077 last_accessed_at: row.get(5)?,
1078 },
1079 written_at: row.get(6)?,
1080 score,
1081 modality: RetrievalModality::Vector,
1082 source: SearchHitSource::Vector,
1083 match_mode: None,
1085 snippet: None,
1087 projection_row_id: row.get::<_, Option<String>>(8)?,
1088 vector_distance: Some(distance),
1089 attribution: if attribution_requested {
1090 Some(HitAttribution {
1091 matched_paths: Vec::new(),
1092 })
1093 } else {
1094 None
1095 },
1096 })
1097 })
1098 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1099 {
1100 Ok(rows) => rows,
1101 Err(e) => {
1102 if is_vec_table_absent(&e) {
1106 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
1107 trace_warn!(
1108 "vector table absent at query time, degrading vector_search to empty result"
1109 );
1110 }
1111 drop(statement);
1112 drop(conn_guard);
1113 return Ok(SearchRows {
1114 hits: Vec::new(),
1115 strict_hit_count: 0,
1116 relaxed_hit_count: 0,
1117 vector_hit_count: 0,
1118 fallback_used: false,
1119 was_degraded: true,
1120 });
1121 }
1122 self.telemetry.increment_errors();
1123 return Err(EngineError::Sqlite(e));
1124 }
1125 };
1126
1127 drop(statement);
1128 drop(conn_guard);
1129
1130 self.telemetry.increment_queries();
1131 let vector_hit_count = hits.len();
1132 Ok(SearchRows {
1133 hits,
1134 strict_hit_count: 0,
1135 relaxed_hit_count: 0,
1136 vector_hit_count,
1137 fallback_used: false,
1138 was_degraded: false,
1139 })
1140 }
1141
1142 pub fn execute_retrieval_plan(
1174 &self,
1175 plan: &CompiledRetrievalPlan,
1176 raw_query: &str,
1177 ) -> Result<SearchRows, EngineError> {
1178 let mut plan = plan.clone();
1184 let limit = plan.text.strict.limit;
1185
1186 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1188
1189 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1192 let strict_underfilled = strict_hits.len() < fallback_threshold;
1193 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1194 let mut fallback_used = false;
1195 let mut was_degraded = false;
1196 if let Some(relaxed) = plan.text.relaxed.as_ref()
1197 && strict_underfilled
1198 {
1199 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1200 fallback_used = true;
1201 was_degraded = plan.was_degraded_at_plan_time;
1202 }
1203
1204 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1211 if text_branches_empty && self.query_embedder.is_some() {
1212 self.fill_vector_branch(&mut plan, raw_query);
1213 }
1214
1215 let mut vector_hits: Vec<SearchHit> = Vec::new();
1220 if let Some(vector) = plan.vector.as_ref()
1221 && strict_hits.is_empty()
1222 && relaxed_hits.is_empty()
1223 {
1224 let vector_rows = self.execute_compiled_vector_search(vector)?;
1225 vector_hits = vector_rows.hits;
1230 if vector_rows.was_degraded {
1231 was_degraded = true;
1232 }
1233 }
1234 if text_branches_empty
1241 && plan.was_degraded_at_plan_time
1242 && plan.vector.is_none()
1243 && self.query_embedder.is_some()
1244 {
1245 was_degraded = true;
1246 }
1247
1248 let strict = &plan.text.strict;
1250 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1251 if strict.attribution_requested {
1252 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1253 self.populate_attribution_for_hits(
1254 &mut merged,
1255 &strict.text_query,
1256 relaxed_text_query,
1257 )?;
1258 }
1259
1260 let strict_hit_count = merged
1261 .iter()
1262 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1263 .count();
1264 let relaxed_hit_count = merged
1265 .iter()
1266 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1267 .count();
1268 let vector_hit_count = merged
1269 .iter()
1270 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1271 .count();
1272
1273 Ok(SearchRows {
1274 hits: merged,
1275 strict_hit_count,
1276 relaxed_hit_count,
1277 vector_hit_count,
1278 fallback_used,
1279 was_degraded,
1280 })
1281 }
1282
1283 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1299 let Some(embedder) = self.query_embedder.as_ref() else {
1300 return;
1301 };
1302 match embedder.embed_query(raw_query) {
1303 Ok(vec) => {
1304 let literal = match serde_json::to_string(&vec) {
1310 Ok(s) => s,
1311 Err(err) => {
1312 trace_warn!(
1313 error = %err,
1314 "query embedder vector serialization failed; skipping vector branch"
1315 );
1316 let _ = err; plan.was_degraded_at_plan_time = true;
1318 return;
1319 }
1320 };
1321 let strict = &plan.text.strict;
1322 plan.vector = Some(CompiledVectorSearch {
1323 root_kind: strict.root_kind.clone(),
1324 query_text: literal,
1325 limit: strict.limit,
1326 fusable_filters: strict.fusable_filters.clone(),
1327 residual_filters: strict.residual_filters.clone(),
1328 attribution_requested: strict.attribution_requested,
1329 });
1330 }
1331 Err(err) => {
1332 trace_warn!(
1333 error = %err,
1334 "query embedder unavailable, skipping vector branch"
1335 );
1336 let _ = err; plan.was_degraded_at_plan_time = true;
1338 }
1339 }
1340 }
1341
1342 #[allow(clippy::too_many_lines)]
1351 fn run_search_branch(
1352 &self,
1353 compiled: &CompiledSearch,
1354 branch: SearchBranch,
1355 ) -> Result<Vec<SearchHit>, EngineError> {
1356 use std::fmt::Write as _;
1357 if matches!(
1369 compiled.text_query,
1370 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1371 ) {
1372 return Ok(Vec::new());
1373 }
1374 let rendered_base = render_text_query_fts5(&compiled.text_query);
1375 let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
1388 if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
1389 && rendered_base
1390 .chars()
1391 .filter(|c| c.is_alphanumeric())
1392 .count()
1393 < 3
1394 {
1395 return Ok(Vec::new());
1396 }
1397 let rendered = rendered_base;
1398 let filter_by_kind = !compiled.root_kind.is_empty();
1404
1405 let conn_guard = match self.lock_connection() {
1409 Ok(g) => g,
1410 Err(e) => {
1411 self.telemetry.increment_errors();
1412 return Err(e);
1413 }
1414 };
1415
1416 let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
1434 let kind = compiled.root_kind.clone();
1435 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1436 let exists: bool = conn_guard
1437 .query_row(
1438 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1439 rusqlite::params![prop_table],
1440 |_| Ok(true),
1441 )
1442 .optional()
1443 .map_err(EngineError::Sqlite)?
1444 .unwrap_or(false);
1445 if exists {
1446 vec![(kind, prop_table)]
1447 } else {
1448 vec![]
1449 }
1450 } else {
1451 let kind_eq_values: Vec<String> = compiled
1456 .fusable_filters
1457 .iter()
1458 .filter_map(|p| match p {
1459 Predicate::KindEq(k) => Some(k.clone()),
1460 _ => None,
1461 })
1462 .collect();
1463 if kind_eq_values.len() == 1 {
1464 let kind = kind_eq_values[0].clone();
1465 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1466 let exists: bool = conn_guard
1467 .query_row(
1468 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1469 rusqlite::params![prop_table],
1470 |_| Ok(true),
1471 )
1472 .optional()
1473 .map_err(EngineError::Sqlite)?
1474 .unwrap_or(false);
1475 if exists {
1476 vec![(kind, prop_table)]
1477 } else {
1478 vec![]
1479 }
1480 } else {
1481 let mut stmt = conn_guard
1485 .prepare("SELECT kind FROM fts_property_schemas")
1486 .map_err(EngineError::Sqlite)?;
1487 let all_kinds: Vec<String> = stmt
1488 .query_map([], |r| r.get::<_, String>(0))
1489 .map_err(EngineError::Sqlite)?
1490 .collect::<Result<Vec<_>, _>>()
1491 .map_err(EngineError::Sqlite)?;
1492 drop(stmt);
1493 let mut result = Vec::new();
1494 for kind in all_kinds {
1495 let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
1496 let exists: bool = conn_guard
1497 .query_row(
1498 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
1499 rusqlite::params![prop_table],
1500 |_| Ok(true),
1501 )
1502 .optional()
1503 .map_err(EngineError::Sqlite)?
1504 .unwrap_or(false);
1505 if exists {
1506 result.push((kind, prop_table));
1507 }
1508 }
1509 result
1510 }
1511 };
1512 let use_prop_fts = !prop_fts_tables.is_empty();
1513
1514 let mut binds: Vec<BindValue> = if filter_by_kind {
1520 if use_prop_fts {
1521 vec![
1522 BindValue::Text(rendered.clone()),
1523 BindValue::Text(compiled.root_kind.clone()),
1524 BindValue::Text(rendered),
1525 ]
1526 } else {
1527 vec![
1528 BindValue::Text(rendered.clone()),
1529 BindValue::Text(compiled.root_kind.clone()),
1530 ]
1531 }
1532 } else if use_prop_fts {
1533 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1535 } else {
1536 vec![BindValue::Text(rendered)]
1537 };
1538
1539 let mut fused_clauses = String::new();
1548 for predicate in &compiled.fusable_filters {
1549 match predicate {
1550 Predicate::KindEq(kind) => {
1551 binds.push(BindValue::Text(kind.clone()));
1552 let idx = binds.len();
1553 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1554 }
1555 Predicate::LogicalIdEq(logical_id) => {
1556 binds.push(BindValue::Text(logical_id.clone()));
1557 let idx = binds.len();
1558 let _ = write!(
1559 fused_clauses,
1560 "\n AND u.logical_id = ?{idx}"
1561 );
1562 }
1563 Predicate::SourceRefEq(source_ref) => {
1564 binds.push(BindValue::Text(source_ref.clone()));
1565 let idx = binds.len();
1566 let _ = write!(
1567 fused_clauses,
1568 "\n AND u.source_ref = ?{idx}"
1569 );
1570 }
1571 Predicate::ContentRefEq(uri) => {
1572 binds.push(BindValue::Text(uri.clone()));
1573 let idx = binds.len();
1574 let _ = write!(
1575 fused_clauses,
1576 "\n AND u.content_ref = ?{idx}"
1577 );
1578 }
1579 Predicate::ContentRefNotNull => {
1580 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1581 }
1582 Predicate::JsonPathFusedEq { path, value } => {
1583 binds.push(BindValue::Text(path.clone()));
1584 let path_idx = binds.len();
1585 binds.push(BindValue::Text(value.clone()));
1586 let value_idx = binds.len();
1587 let _ = write!(
1588 fused_clauses,
1589 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1590 );
1591 }
1592 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1593 binds.push(BindValue::Text(path.clone()));
1594 let path_idx = binds.len();
1595 binds.push(BindValue::Integer(*value));
1596 let value_idx = binds.len();
1597 let operator = match op {
1598 ComparisonOp::Gt => ">",
1599 ComparisonOp::Gte => ">=",
1600 ComparisonOp::Lt => "<",
1601 ComparisonOp::Lte => "<=",
1602 };
1603 let _ = write!(
1604 fused_clauses,
1605 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1606 );
1607 }
1608 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1609 }
1612 }
1613 }
1614
1615 let mut filter_clauses = String::new();
1616 for predicate in &compiled.residual_filters {
1617 match predicate {
1618 Predicate::JsonPathEq { path, value } => {
1619 binds.push(BindValue::Text(path.clone()));
1620 let path_idx = binds.len();
1621 binds.push(scalar_to_bind(value));
1622 let value_idx = binds.len();
1623 let _ = write!(
1624 filter_clauses,
1625 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1626 );
1627 }
1628 Predicate::JsonPathCompare { path, op, value } => {
1629 binds.push(BindValue::Text(path.clone()));
1630 let path_idx = binds.len();
1631 binds.push(scalar_to_bind(value));
1632 let value_idx = binds.len();
1633 let operator = match op {
1634 ComparisonOp::Gt => ">",
1635 ComparisonOp::Gte => ">=",
1636 ComparisonOp::Lt => "<",
1637 ComparisonOp::Lte => "<=",
1638 };
1639 let _ = write!(
1640 filter_clauses,
1641 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1642 );
1643 }
1644 Predicate::KindEq(_)
1645 | Predicate::LogicalIdEq(_)
1646 | Predicate::SourceRefEq(_)
1647 | Predicate::ContentRefEq(_)
1648 | Predicate::ContentRefNotNull
1649 | Predicate::JsonPathFusedEq { .. }
1650 | Predicate::JsonPathFusedTimestampCmp { .. } => {
1651 }
1654 }
1655 }
1656
1657 let limit = compiled.limit;
1664 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1665 let limit_idx = binds.len();
1666 let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
1682 let prop_arm_sql: String = if use_prop_fts {
1683 prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
1684 let bm25_expr = conn_guard
1686 .query_row(
1687 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
1688 rusqlite::params![kind],
1689 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1690 )
1691 .ok()
1692 .map_or_else(
1693 || format!("bm25({prop_table})"),
1694 |(json, sep)| build_bm25_expr(prop_table, &json, &sep),
1695 );
1696 let is_weighted = bm25_expr != format!("bm25({prop_table})");
1699 let snippet_expr = if is_weighted {
1700 "'' AS snippet".to_owned()
1701 } else {
1702 "substr(fp.text_content, 1, 200) AS snippet".to_owned()
1703 };
1704 let _ = write!(
1705 acc,
1706 "
1707 UNION ALL
1708 SELECT
1709 src.row_id AS row_id,
1710 fp.node_logical_id AS logical_id,
1711 src.kind AS kind,
1712 src.properties AS properties,
1713 src.source_ref AS source_ref,
1714 src.content_ref AS content_ref,
1715 src.created_at AS created_at,
1716 -{bm25_expr} AS score,
1717 'property' AS source,
1718 {snippet_expr},
1719 CAST(fp.rowid AS TEXT) AS projection_row_id
1720 FROM {prop_table} fp
1721 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1722 WHERE {prop_table} MATCH ?{prop_bind_idx}"
1723 );
1724 acc
1725 })
1726 } else {
1727 String::new()
1728 };
1729 let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
1730 ("?1", "\n AND src.kind = ?2")
1731 } else {
1732 ("?1", "")
1733 };
1734 let sql = format!(
1735 "WITH search_hits AS (
1736 SELECT
1737 u.row_id AS row_id,
1738 u.logical_id AS logical_id,
1739 u.kind AS kind,
1740 u.properties AS properties,
1741 u.source_ref AS source_ref,
1742 u.content_ref AS content_ref,
1743 u.created_at AS created_at,
1744 u.score AS score,
1745 u.source AS source,
1746 u.snippet AS snippet,
1747 u.projection_row_id AS projection_row_id
1748 FROM (
1749 SELECT
1750 src.row_id AS row_id,
1751 c.node_logical_id AS logical_id,
1752 src.kind AS kind,
1753 src.properties AS properties,
1754 src.source_ref AS source_ref,
1755 src.content_ref AS content_ref,
1756 src.created_at AS created_at,
1757 -bm25(fts_nodes) AS score,
1758 'chunk' AS source,
1759 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1760 f.chunk_id AS projection_row_id
1761 FROM fts_nodes f
1762 JOIN chunks c ON c.id = f.chunk_id
1763 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1764 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
1765 ) u
1766 WHERE 1 = 1{fused_clauses}
1767 ORDER BY u.score DESC
1768 LIMIT ?{limit_idx}
1769 )
1770 SELECT
1771 h.row_id,
1772 h.logical_id,
1773 h.kind,
1774 h.properties,
1775 h.content_ref,
1776 am.last_accessed_at,
1777 h.created_at,
1778 h.score,
1779 h.source,
1780 h.snippet,
1781 h.projection_row_id
1782 FROM search_hits h
1783 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1784 WHERE 1 = 1{filter_clauses}
1785 ORDER BY h.score DESC"
1786 );
1787
1788 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1789
1790 let mut statement = match conn_guard.prepare_cached(&sql) {
1791 Ok(stmt) => stmt,
1792 Err(e) => {
1793 self.telemetry.increment_errors();
1794 return Err(EngineError::Sqlite(e));
1795 }
1796 };
1797
1798 let hits = match statement
1799 .query_map(params_from_iter(bind_values.iter()), |row| {
1800 let source_str: String = row.get(8)?;
1801 let source = if source_str == "property" {
1806 SearchHitSource::Property
1807 } else {
1808 SearchHitSource::Chunk
1809 };
1810 let match_mode = match branch {
1811 SearchBranch::Strict => SearchMatchMode::Strict,
1812 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1813 };
1814 Ok(SearchHit {
1815 node: fathomdb_query::NodeRowLite {
1816 row_id: row.get(0)?,
1817 logical_id: row.get(1)?,
1818 kind: row.get(2)?,
1819 properties: row.get(3)?,
1820 content_ref: row.get(4)?,
1821 last_accessed_at: row.get(5)?,
1822 },
1823 written_at: row.get(6)?,
1824 score: row.get(7)?,
1825 modality: RetrievalModality::Text,
1827 source,
1828 match_mode: Some(match_mode),
1829 snippet: row.get(9)?,
1830 projection_row_id: row.get(10)?,
1831 vector_distance: None,
1832 attribution: None,
1833 })
1834 })
1835 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1836 {
1837 Ok(rows) => rows,
1838 Err(e) => {
1839 self.telemetry.increment_errors();
1840 return Err(EngineError::Sqlite(e));
1841 }
1842 };
1843
1844 drop(statement);
1848 drop(conn_guard);
1849
1850 self.telemetry.increment_queries();
1851 Ok(hits)
1852 }
1853
1854 fn populate_attribution_for_hits(
1858 &self,
1859 hits: &mut [SearchHit],
1860 strict_text_query: &fathomdb_query::TextQuery,
1861 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1862 ) -> Result<(), EngineError> {
1863 let conn_guard = match self.lock_connection() {
1864 Ok(g) => g,
1865 Err(e) => {
1866 self.telemetry.increment_errors();
1867 return Err(e);
1868 }
1869 };
1870 let strict_expr = render_text_query_fts5(strict_text_query);
1871 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1872 for hit in hits.iter_mut() {
1873 let match_expr = match hit.match_mode {
1878 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1879 Some(SearchMatchMode::Relaxed) => {
1880 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1881 }
1882 None => continue,
1883 };
1884 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1885 Ok(att) => hit.attribution = Some(att),
1886 Err(e) => {
1887 self.telemetry.increment_errors();
1888 return Err(e);
1889 }
1890 }
1891 }
1892 Ok(())
1893 }
1894
1895 pub fn execute_compiled_grouped_read(
1899 &self,
1900 compiled: &CompiledGroupedQuery,
1901 ) -> Result<GroupedQueryRows, EngineError> {
1902 let root_rows = self.execute_compiled_read(&compiled.root)?;
1903 if root_rows.was_degraded {
1904 return Ok(GroupedQueryRows {
1905 roots: Vec::new(),
1906 expansions: Vec::new(),
1907 was_degraded: true,
1908 });
1909 }
1910
1911 let roots = root_rows.nodes;
1912 let mut expansions = Vec::with_capacity(compiled.expansions.len());
1913 for expansion in &compiled.expansions {
1914 let slot_rows = if roots.is_empty() {
1915 Vec::new()
1916 } else {
1917 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1918 };
1919 expansions.push(ExpansionSlotRows {
1920 slot: expansion.slot.clone(),
1921 roots: slot_rows,
1922 });
1923 }
1924
1925 Ok(GroupedQueryRows {
1926 roots,
1927 expansions,
1928 was_degraded: false,
1929 })
1930 }
1931
1932 fn read_expansion_nodes_chunked(
1938 &self,
1939 roots: &[NodeRow],
1940 expansion: &ExpansionSlot,
1941 hard_limit: usize,
1942 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1943 if roots.len() <= BATCH_CHUNK_SIZE {
1944 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1945 }
1946
1947 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1950 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1951 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1952 per_root
1953 .entry(group.root_logical_id)
1954 .or_default()
1955 .extend(group.nodes);
1956 }
1957 }
1958
1959 Ok(roots
1960 .iter()
1961 .map(|root| ExpansionRootRows {
1962 root_logical_id: root.logical_id.clone(),
1963 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1964 })
1965 .collect())
1966 }
1967
1968 fn read_expansion_nodes_batched(
1973 &self,
1974 roots: &[NodeRow],
1975 expansion: &ExpansionSlot,
1976 hard_limit: usize,
1977 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1978 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1979 let (join_condition, next_logical_id) = match expansion.direction {
1980 fathomdb_query::TraverseDirection::Out => {
1981 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1982 }
1983 fathomdb_query::TraverseDirection::In => {
1984 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1985 }
1986 };
1987
1988 if expansion.filter.as_ref().is_some_and(|f| {
1993 matches!(
1994 f,
1995 Predicate::JsonPathFusedEq { .. } | Predicate::JsonPathFusedTimestampCmp { .. }
1996 )
1997 }) {
1998 self.validate_fused_filter_for_edge_label(&expansion.label)?;
1999 }
2000
2001 let root_seed_union: String = (1..=root_ids.len())
2005 .map(|i| format!("SELECT ?{i}"))
2006 .collect::<Vec<_>>()
2007 .join(" UNION ALL ");
2008
2009 let edge_kind_param = root_ids.len() + 1;
2012 let filter_param_start = root_ids.len() + 2;
2013
2014 let (filter_sql, filter_binds) =
2018 compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
2019
2020 let sql = format!(
2024 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
2025 traversed(root_id, logical_id, depth, visited, emitted) AS (
2026 SELECT rid, rid, 0, printf(',%s,', rid), 0
2027 FROM root_ids
2028 UNION ALL
2029 SELECT
2030 t.root_id,
2031 {next_logical_id},
2032 t.depth + 1,
2033 t.visited || {next_logical_id} || ',',
2034 t.emitted + 1
2035 FROM traversed t
2036 JOIN edges e ON {join_condition}
2037 AND e.kind = ?{edge_kind_param}
2038 AND e.superseded_at IS NULL
2039 WHERE t.depth < {max_depth}
2040 AND t.emitted < {hard_limit}
2041 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
2042 ),
2043 numbered AS (
2044 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
2045 , n.content_ref, am.last_accessed_at
2046 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
2047 FROM traversed t
2048 JOIN nodes n ON n.logical_id = t.logical_id
2049 AND n.superseded_at IS NULL
2050 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
2051 WHERE t.depth > 0{filter_sql}
2052 )
2053 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
2054 FROM numbered
2055 WHERE rn <= {hard_limit}
2056 ORDER BY root_id, logical_id",
2057 max_depth = expansion.max_depth,
2058 );
2059
2060 let conn_guard = self.lock_connection()?;
2061 let mut statement = conn_guard
2062 .prepare_cached(&sql)
2063 .map_err(EngineError::Sqlite)?;
2064
2065 let mut bind_values: Vec<Value> = root_ids
2067 .iter()
2068 .map(|id| Value::Text((*id).to_owned()))
2069 .collect();
2070 bind_values.push(Value::Text(expansion.label.clone()));
2071 bind_values.extend(filter_binds);
2072
2073 let rows = statement
2074 .query_map(params_from_iter(bind_values.iter()), |row| {
2075 Ok((
2076 row.get::<_, String>(0)?, NodeRow {
2078 row_id: row.get(1)?,
2079 logical_id: row.get(2)?,
2080 kind: row.get(3)?,
2081 properties: row.get(4)?,
2082 content_ref: row.get(5)?,
2083 last_accessed_at: row.get(6)?,
2084 },
2085 ))
2086 })
2087 .map_err(EngineError::Sqlite)?
2088 .collect::<Result<Vec<_>, _>>()
2089 .map_err(EngineError::Sqlite)?;
2090
2091 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
2093 for (root_id, node) in rows {
2094 per_root.entry(root_id).or_default().push(node);
2095 }
2096
2097 let root_groups = roots
2098 .iter()
2099 .map(|root| ExpansionRootRows {
2100 root_logical_id: root.logical_id.clone(),
2101 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
2102 })
2103 .collect();
2104
2105 Ok(root_groups)
2106 }
2107
2108 fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
2122 let conn = self.lock_connection()?;
2123 let mut stmt = conn
2125 .prepare_cached(
2126 "SELECT DISTINCT n.kind \
2127 FROM edges e \
2128 JOIN nodes n ON n.logical_id = e.target_logical_id \
2129 WHERE e.kind = ?1 AND e.superseded_at IS NULL",
2130 )
2131 .map_err(EngineError::Sqlite)?;
2132 let target_kinds: Vec<String> = stmt
2133 .query_map(rusqlite::params![edge_label], |row| row.get(0))
2134 .map_err(EngineError::Sqlite)?
2135 .collect::<Result<Vec<_>, _>>()
2136 .map_err(EngineError::Sqlite)?;
2137
2138 for kind in &target_kinds {
2139 let has_schema: bool = conn
2140 .query_row(
2141 "SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
2142 rusqlite::params![kind],
2143 |row| row.get(0),
2144 )
2145 .map_err(EngineError::Sqlite)?;
2146 if !has_schema {
2147 return Err(EngineError::InvalidConfig(format!(
2148 "kind {kind:?} has no registered property-FTS schema; register one with \
2149 admin.register_fts_property_schema(..) before using fused filters on \
2150 expansion slots, or use JsonPathEq for non-fused semantics \
2151 (expand slot uses edge label {edge_label:?})"
2152 )));
2153 }
2154 }
2155 Ok(())
2156 }
2157
2158 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
2164 let conn = self.lock_connection()?;
2165 conn.query_row(
2166 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
2167 rusqlite::params![id],
2168 |row| {
2169 Ok(RunRow {
2170 id: row.get(0)?,
2171 kind: row.get(1)?,
2172 status: row.get(2)?,
2173 properties: row.get(3)?,
2174 })
2175 },
2176 )
2177 .optional()
2178 .map_err(EngineError::Sqlite)
2179 }
2180
2181 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
2187 let conn = self.lock_connection()?;
2188 conn.query_row(
2189 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
2190 rusqlite::params![id],
2191 |row| {
2192 Ok(StepRow {
2193 id: row.get(0)?,
2194 run_id: row.get(1)?,
2195 kind: row.get(2)?,
2196 status: row.get(3)?,
2197 properties: row.get(4)?,
2198 })
2199 },
2200 )
2201 .optional()
2202 .map_err(EngineError::Sqlite)
2203 }
2204
2205 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
2211 let conn = self.lock_connection()?;
2212 conn.query_row(
2213 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
2214 rusqlite::params![id],
2215 |row| {
2216 Ok(ActionRow {
2217 id: row.get(0)?,
2218 step_id: row.get(1)?,
2219 kind: row.get(2)?,
2220 status: row.get(3)?,
2221 properties: row.get(4)?,
2222 })
2223 },
2224 )
2225 .optional()
2226 .map_err(EngineError::Sqlite)
2227 }
2228
2229 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
2235 let conn = self.lock_connection()?;
2236 let mut stmt = conn
2237 .prepare_cached(
2238 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
2239 )
2240 .map_err(EngineError::Sqlite)?;
2241 let rows = stmt
2242 .query_map([], |row| {
2243 Ok(RunRow {
2244 id: row.get(0)?,
2245 kind: row.get(1)?,
2246 status: row.get(2)?,
2247 properties: row.get(3)?,
2248 })
2249 })
2250 .map_err(EngineError::Sqlite)?
2251 .collect::<Result<Vec<_>, _>>()
2252 .map_err(EngineError::Sqlite)?;
2253 Ok(rows)
2254 }
2255
2256 #[must_use]
2266 #[allow(clippy::expect_used)]
2267 pub fn shape_sql_count(&self) -> usize {
2268 self.shape_sql_map
2269 .lock()
2270 .unwrap_or_else(PoisonError::into_inner)
2271 .len()
2272 }
2273
2274 #[must_use]
2276 pub fn schema_manager(&self) -> Arc<SchemaManager> {
2277 Arc::clone(&self.schema_manager)
2278 }
2279
2280 #[must_use]
2289 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
2290 let cache_hit = self
2291 .shape_sql_map
2292 .lock()
2293 .unwrap_or_else(PoisonError::into_inner)
2294 .contains_key(&compiled.shape_hash);
2295 QueryPlan {
2296 sql: wrap_node_row_projection_sql(&compiled.sql),
2297 bind_count: compiled.binds.len(),
2298 driving_table: compiled.driving_table,
2299 shape_hash: compiled.shape_hash,
2300 cache_hit,
2301 }
2302 }
2303
2304 #[doc(hidden)]
2311 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
2312 let conn = self.lock_connection()?;
2313 let result = conn
2314 .query_row(&format!("PRAGMA {name}"), [], |row| {
2315 row.get::<_, rusqlite::types::Value>(0)
2317 })
2318 .map_err(EngineError::Sqlite)?;
2319 let s = match result {
2320 rusqlite::types::Value::Text(t) => t,
2321 rusqlite::types::Value::Integer(i) => i.to_string(),
2322 rusqlite::types::Value::Real(f) => f.to_string(),
2323 rusqlite::types::Value::Blob(_) => {
2324 return Err(EngineError::InvalidWrite(format!(
2325 "PRAGMA {name} returned an unexpected BLOB value"
2326 )));
2327 }
2328 rusqlite::types::Value::Null => String::new(),
2329 };
2330 Ok(s)
2331 }
2332
2333 pub fn query_provenance_events(
2342 &self,
2343 subject: &str,
2344 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
2345 let conn = self.lock_connection()?;
2346 let mut stmt = conn
2347 .prepare_cached(
2348 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
2349 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
2350 )
2351 .map_err(EngineError::Sqlite)?;
2352 let events = stmt
2353 .query_map(rusqlite::params![subject], |row| {
2354 Ok(ProvenanceEvent {
2355 id: row.get(0)?,
2356 event_type: row.get(1)?,
2357 subject: row.get(2)?,
2358 source_ref: row.get(3)?,
2359 metadata_json: row.get(4)?,
2360 created_at: row.get(5)?,
2361 })
2362 })
2363 .map_err(EngineError::Sqlite)?
2364 .collect::<Result<Vec<_>, _>>()
2365 .map_err(EngineError::Sqlite)?;
2366 Ok(events)
2367 }
2368
2369 fn scan_fallback_if_first_registration(
2375 &self,
2376 kind: &str,
2377 ) -> Result<Option<Vec<NodeRow>>, EngineError> {
2378 let conn = self.lock_connection()?;
2379
2380 let prop_table = fathomdb_schema::fts_kind_table_name(kind);
2383 let table_exists: bool = conn
2385 .query_row(
2386 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2387 rusqlite::params![prop_table],
2388 |_| Ok(true),
2389 )
2390 .optional()?
2391 .unwrap_or(false);
2392 let prop_empty = if table_exists {
2393 let cnt: i64 =
2394 conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
2395 r.get(0)
2396 })?;
2397 cnt == 0
2398 } else {
2399 true
2400 };
2401 let needs_scan: bool = if prop_empty {
2402 conn.query_row(
2403 "SELECT 1 FROM fts_property_rebuild_state \
2404 WHERE kind = ?1 AND is_first_registration = 1 \
2405 AND state IN ('PENDING','BUILDING','SWAPPING') \
2406 LIMIT 1",
2407 rusqlite::params![kind],
2408 |_| Ok(true),
2409 )
2410 .optional()?
2411 .unwrap_or(false)
2412 } else {
2413 false
2414 };
2415
2416 if !needs_scan {
2417 return Ok(None);
2418 }
2419
2420 let mut stmt = conn
2423 .prepare_cached(
2424 "SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
2425 am.last_accessed_at \
2426 FROM nodes n \
2427 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
2428 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
2429 )
2430 .map_err(EngineError::Sqlite)?;
2431
2432 let nodes = stmt
2433 .query_map(rusqlite::params![kind], |row| {
2434 Ok(NodeRow {
2435 row_id: row.get(0)?,
2436 logical_id: row.get(1)?,
2437 kind: row.get(2)?,
2438 properties: row.get(3)?,
2439 content_ref: row.get(4)?,
2440 last_accessed_at: row.get(5)?,
2441 })
2442 })
2443 .map_err(EngineError::Sqlite)?
2444 .collect::<Result<Vec<_>, _>>()
2445 .map_err(EngineError::Sqlite)?;
2446
2447 Ok(Some(nodes))
2448 }
2449
2450 pub fn get_property_fts_rebuild_progress(
2456 &self,
2457 kind: &str,
2458 ) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
2459 let conn = self.lock_connection()?;
2460 let row = conn
2461 .query_row(
2462 "SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
2463 FROM fts_property_rebuild_state WHERE kind = ?1",
2464 rusqlite::params![kind],
2465 |r| {
2466 Ok(crate::rebuild_actor::RebuildProgress {
2467 state: r.get(0)?,
2468 rows_total: r.get(1)?,
2469 rows_done: r.get(2)?,
2470 started_at: r.get(3)?,
2471 last_progress_at: r.get(4)?,
2472 error_message: r.get(5)?,
2473 })
2474 },
2475 )
2476 .optional()?;
2477 Ok(row)
2478 }
2479}
2480
2481fn adapt_fts_nodes_sql_for_per_kind_tables(
2487 compiled: &CompiledQuery,
2488 conn: &rusqlite::Connection,
2489) -> Result<(String, Vec<BindValue>), EngineError> {
2490 let root_kind = compiled
2491 .binds
2492 .get(1)
2493 .and_then(|b| {
2494 if let BindValue::Text(k) = b {
2495 Some(k.as_str())
2496 } else {
2497 None
2498 }
2499 })
2500 .unwrap_or("");
2501 let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
2502 let prop_table_exists: bool = conn
2503 .query_row(
2504 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2505 rusqlite::params![prop_table],
2506 |_| Ok(true),
2507 )
2508 .optional()
2509 .map_err(EngineError::Sqlite)?
2510 .unwrap_or(false);
2511
2512 let (new_sql, removed_bind_positions) = if prop_table_exists {
2517 let s = compiled
2518 .sql
2519 .replace("fts_node_properties", &prop_table)
2520 .replace("\n AND fp.kind = ?4", "");
2521 (renumber_sql_params(&s, &[4]), vec![3usize])
2522 } else {
2523 let s = strip_prop_fts_union_arm(&compiled.sql);
2524 (renumber_sql_params(&s, &[3, 4]), vec![2usize, 3])
2525 };
2526
2527 let new_binds: Vec<BindValue> = compiled
2528 .binds
2529 .iter()
2530 .enumerate()
2531 .filter(|(i, _)| !removed_bind_positions.contains(i))
2532 .map(|(_, b)| b.clone())
2533 .collect();
2534
2535 Ok((new_sql, new_binds))
2536}
2537
2538#[allow(clippy::unnecessary_wraps)]
2544fn check_vec_identity_at_open(
2545 conn: &rusqlite::Connection,
2546 embedder: &dyn QueryEmbedder,
2547) -> Result<(), EngineError> {
2548 let row: Option<String> = conn
2549 .query_row(
2550 "SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
2551 [],
2552 |row| row.get(0),
2553 )
2554 .optional()
2555 .unwrap_or(None);
2556
2557 let Some(config_json) = row else {
2558 return Ok(());
2559 };
2560
2561 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
2563 return Ok(());
2564 };
2565
2566 let identity = embedder.identity();
2567
2568 if let Some(stored_model) = parsed
2569 .get("model_identity")
2570 .and_then(serde_json::Value::as_str)
2571 && stored_model != identity.model_identity
2572 {
2573 trace_warn!(
2574 stored_model_identity = stored_model,
2575 embedder_model_identity = %identity.model_identity,
2576 "vec identity mismatch at open: model_identity differs"
2577 );
2578 }
2579
2580 if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
2581 let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
2582 if stored_dim != identity.dimension {
2583 trace_warn!(
2584 stored_dimensions = stored_dim,
2585 embedder_dimensions = identity.dimension,
2586 "vec identity mismatch at open: dimensions differ"
2587 );
2588 }
2589 }
2590
2591 Ok(())
2592}
2593
2594fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
2606 let schema_count: i64 = conn
2607 .query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
2608 row.get(0)
2609 })
2610 .map_err(EngineError::Sqlite)?;
2611 if schema_count == 0 {
2612 return Ok(());
2613 }
2614
2615 let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
2616 let needs_position_backfill = if needs_fts_rebuild {
2617 false
2618 } else {
2619 open_guard_check_positions_empty(conn)?
2620 };
2621
2622 if needs_fts_rebuild || needs_position_backfill {
2623 let per_kind_tables: Vec<String> = {
2624 let mut stmt = conn
2625 .prepare(
2626 "SELECT name FROM sqlite_master \
2627 WHERE type='table' AND name LIKE 'fts_props_%' \
2628 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2629 )
2630 .map_err(EngineError::Sqlite)?;
2631 stmt.query_map([], |r| r.get::<_, String>(0))
2632 .map_err(EngineError::Sqlite)?
2633 .collect::<Result<Vec<_>, _>>()
2634 .map_err(EngineError::Sqlite)?
2635 };
2636 let tx = conn
2637 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
2638 .map_err(EngineError::Sqlite)?;
2639 for table in &per_kind_tables {
2640 tx.execute_batch(&format!("DELETE FROM {table}"))
2641 .map_err(EngineError::Sqlite)?;
2642 }
2643 tx.execute("DELETE FROM fts_node_property_positions", [])
2644 .map_err(EngineError::Sqlite)?;
2645 crate::projection::insert_property_fts_rows(
2646 &tx,
2647 "SELECT logical_id, properties FROM nodes \
2648 WHERE kind = ?1 AND superseded_at IS NULL",
2649 )
2650 .map_err(EngineError::Sqlite)?;
2651 tx.commit().map_err(EngineError::Sqlite)?;
2652 }
2653 Ok(())
2654}
2655
2656fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2657 let kinds: Vec<String> = {
2658 let mut stmt = conn
2659 .prepare("SELECT kind FROM fts_property_schemas")
2660 .map_err(EngineError::Sqlite)?;
2661 stmt.query_map([], |row| row.get::<_, String>(0))
2662 .map_err(EngineError::Sqlite)?
2663 .collect::<Result<Vec<_>, _>>()
2664 .map_err(EngineError::Sqlite)?
2665 };
2666 for kind in &kinds {
2667 let table = fathomdb_schema::fts_kind_table_name(kind);
2668 let table_exists: bool = conn
2669 .query_row(
2670 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
2671 rusqlite::params![table],
2672 |_| Ok(true),
2673 )
2674 .optional()
2675 .map_err(EngineError::Sqlite)?
2676 .unwrap_or(false);
2677 let fts_count: i64 = if table_exists {
2678 conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
2679 row.get(0)
2680 })
2681 .map_err(EngineError::Sqlite)?
2682 } else {
2683 0
2684 };
2685 if fts_count == 0 {
2686 let node_count: i64 = conn
2687 .query_row(
2688 "SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
2689 rusqlite::params![kind],
2690 |row| row.get(0),
2691 )
2692 .map_err(EngineError::Sqlite)?;
2693 if node_count > 0 {
2694 return Ok(true);
2695 }
2696 }
2697 }
2698 Ok(false)
2699}
2700
2701fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
2702 let recursive_count: i64 = conn
2703 .query_row(
2704 "SELECT COUNT(*) FROM fts_property_schemas \
2705 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
2706 [],
2707 |row| row.get(0),
2708 )
2709 .map_err(EngineError::Sqlite)?;
2710 if recursive_count == 0 {
2711 return Ok(false);
2712 }
2713 let pos_count: i64 = conn
2714 .query_row(
2715 "SELECT COUNT(*) FROM fts_node_property_positions",
2716 [],
2717 |row| row.get(0),
2718 )
2719 .map_err(EngineError::Sqlite)?;
2720 Ok(pos_count == 0)
2721}
2722
2723fn renumber_sql_params(sql: &str, removed: &[usize]) -> String {
2734 let mut result = String::with_capacity(sql.len());
2737 let bytes = sql.as_bytes();
2738 let mut i = 0;
2739 while i < bytes.len() {
2740 if bytes[i] == b'?' {
2741 let num_start = i + 1;
2743 let mut j = num_start;
2744 while j < bytes.len() && bytes[j].is_ascii_digit() {
2745 j += 1;
2746 }
2747 if j > num_start {
2748 let num_str = &sql[num_start..j];
2750 if let Ok(n) = num_str.parse::<usize>() {
2751 let offset = removed.iter().filter(|&&r| r < n).count();
2753 result.push('?');
2754 result.push_str(&(n - offset).to_string());
2755 i = j;
2756 continue;
2757 }
2758 }
2759 }
2760 result.push(bytes[i] as char);
2761 i += 1;
2762 }
2763 result
2764}
2765
2766fn wrap_node_row_projection_sql(base_sql: &str) -> String {
2767 format!(
2768 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
2769 FROM ({base_sql}) q \
2770 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
2771 )
2772}
2773
2774fn strip_prop_fts_union_arm(sql: &str) -> String {
2794 let union_marker =
2800 " UNION\n SELECT fp.node_logical_id";
2801 if let Some(start) = sql.find(union_marker) {
2802 let end_marker = "\n ) u";
2804 if let Some(rel_end) = sql[start..].find(end_marker) {
2805 let end = start + rel_end;
2806 return format!("{}{}", &sql[..start], &sql[end..]);
2808 }
2809 }
2810 sql.to_owned()
2812}
2813
2814pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
2817 match err {
2818 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
2819 msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
2820 }
2821 _ => false,
2822 }
2823}
2824
2825fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2826 match value {
2827 ScalarValue::Text(text) => BindValue::Text(text.clone()),
2828 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2829 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2830 }
2831}
2832
2833fn merge_search_branches(
2851 strict: Vec<SearchHit>,
2852 relaxed: Vec<SearchHit>,
2853 limit: usize,
2854) -> Vec<SearchHit> {
2855 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2856}
2857
2858fn merge_search_branches_three(
2870 strict: Vec<SearchHit>,
2871 relaxed: Vec<SearchHit>,
2872 vector: Vec<SearchHit>,
2873 limit: usize,
2874) -> Vec<SearchHit> {
2875 let strict_block = dedup_branch_hits(strict);
2876 let relaxed_block = dedup_branch_hits(relaxed);
2877 let vector_block = dedup_branch_hits(vector);
2878
2879 let mut seen: std::collections::HashSet<String> = strict_block
2880 .iter()
2881 .map(|h| h.node.logical_id.clone())
2882 .collect();
2883
2884 let mut merged = strict_block;
2885 for hit in relaxed_block {
2886 if seen.insert(hit.node.logical_id.clone()) {
2887 merged.push(hit);
2888 }
2889 }
2890 for hit in vector_block {
2891 if seen.insert(hit.node.logical_id.clone()) {
2892 merged.push(hit);
2893 }
2894 }
2895
2896 if merged.len() > limit {
2897 merged.truncate(limit);
2898 }
2899 merged
2900}
2901
2902fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2906 hits.sort_by(|a, b| {
2907 b.score
2908 .partial_cmp(&a.score)
2909 .unwrap_or(std::cmp::Ordering::Equal)
2910 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2911 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2912 });
2913
2914 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2915 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2916 hits
2917}
2918
2919fn source_priority(source: SearchHitSource) -> u8 {
2920 match source {
2923 SearchHitSource::Chunk => 0,
2924 SearchHitSource::Property => 1,
2925 SearchHitSource::Vector => 2,
2926 }
2927}
2928
2929const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2947const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2948
2949fn load_position_map(
2953 conn: &Connection,
2954 logical_id: &str,
2955 kind: &str,
2956) -> Result<Vec<(usize, usize, String)>, EngineError> {
2957 let mut stmt = conn
2958 .prepare_cached(
2959 "SELECT start_offset, end_offset, leaf_path \
2960 FROM fts_node_property_positions \
2961 WHERE node_logical_id = ?1 AND kind = ?2 \
2962 ORDER BY start_offset ASC",
2963 )
2964 .map_err(EngineError::Sqlite)?;
2965 let rows = stmt
2966 .query_map(rusqlite::params![logical_id, kind], |row| {
2967 let start: i64 = row.get(0)?;
2968 let end: i64 = row.get(1)?;
2969 let path: String = row.get(2)?;
2970 let start = usize::try_from(start).unwrap_or(0);
2974 let end = usize::try_from(end).unwrap_or(0);
2975 Ok((start, end, path))
2976 })
2977 .map_err(EngineError::Sqlite)?;
2978 let mut out = Vec::new();
2979 for row in rows {
2980 out.push(row.map_err(EngineError::Sqlite)?);
2981 }
2982 Ok(out)
2983}
2984
2985fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2992 let mut offsets = Vec::new();
2993 let bytes = wrapped.as_bytes();
2994 let open_bytes = open.as_bytes();
2995 let close_bytes = close.as_bytes();
2996 let mut i = 0usize;
2997 let mut marker_bytes_seen = 0usize;
3000 while i < bytes.len() {
3001 if bytes[i..].starts_with(open_bytes) {
3002 let original_offset = i - marker_bytes_seen;
3005 offsets.push(original_offset);
3006 i += open_bytes.len();
3007 marker_bytes_seen += open_bytes.len();
3008 } else if bytes[i..].starts_with(close_bytes) {
3009 i += close_bytes.len();
3010 marker_bytes_seen += close_bytes.len();
3011 } else {
3012 i += 1;
3013 }
3014 }
3015 offsets
3016}
3017
3018fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
3021 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
3023 Ok(i) => i,
3024 Err(0) => return None,
3025 Err(i) => i - 1,
3026 };
3027 let (start, end, path) = &positions[idx];
3028 if offset >= *start && offset < *end {
3029 Some(path.as_str())
3030 } else {
3031 None
3032 }
3033}
3034
3035fn resolve_hit_attribution(
3044 conn: &Connection,
3045 hit: &SearchHit,
3046 match_expr: &str,
3047) -> Result<HitAttribution, EngineError> {
3048 if !matches!(hit.source, SearchHitSource::Property) {
3049 return Ok(HitAttribution {
3050 matched_paths: Vec::new(),
3051 });
3052 }
3053 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
3054 return Ok(HitAttribution {
3055 matched_paths: Vec::new(),
3056 });
3057 };
3058 let rowid: i64 = match rowid_str.parse() {
3059 Ok(v) => v,
3060 Err(_) => {
3061 return Ok(HitAttribution {
3062 matched_paths: Vec::new(),
3063 });
3064 }
3065 };
3066
3067 let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
3073 let highlight_sql = format!(
3074 "SELECT highlight({prop_table}, 1, ?1, ?2) \
3075 FROM {prop_table} \
3076 WHERE rowid = ?3 AND {prop_table} MATCH ?4"
3077 );
3078 let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
3079 let wrapped: Option<String> = stmt
3080 .query_row(
3081 rusqlite::params![
3082 ATTRIBUTION_HIGHLIGHT_OPEN,
3083 ATTRIBUTION_HIGHLIGHT_CLOSE,
3084 rowid,
3085 match_expr,
3086 ],
3087 |row| row.get(0),
3088 )
3089 .optional()
3090 .map_err(EngineError::Sqlite)?;
3091 let Some(wrapped) = wrapped else {
3092 return Ok(HitAttribution {
3093 matched_paths: Vec::new(),
3094 });
3095 };
3096
3097 let offsets = parse_highlight_offsets(
3098 &wrapped,
3099 ATTRIBUTION_HIGHLIGHT_OPEN,
3100 ATTRIBUTION_HIGHLIGHT_CLOSE,
3101 );
3102 if offsets.is_empty() {
3103 return Ok(HitAttribution {
3104 matched_paths: Vec::new(),
3105 });
3106 }
3107
3108 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
3109 if positions.is_empty() {
3110 return Ok(HitAttribution {
3113 matched_paths: Vec::new(),
3114 });
3115 }
3116
3117 let mut matched_paths: Vec<String> = Vec::new();
3118 for offset in offsets {
3119 if let Some(path) = find_leaf_for_offset(&positions, offset)
3120 && !matched_paths.iter().any(|p| p == path)
3121 {
3122 matched_paths.push(path.to_owned());
3123 }
3124 }
3125 Ok(HitAttribution { matched_paths })
3126}
3127
3128fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
3135 let schema = crate::writer::parse_property_schema_json(schema_json, sep);
3136 let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
3137 if !any_weighted {
3138 return format!("bm25({table})");
3139 }
3140 let weights: Vec<String> = std::iter::once("0.0".to_owned())
3142 .chain(
3143 schema
3144 .paths
3145 .iter()
3146 .map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
3147 )
3148 .collect();
3149 format!("bm25({table}, {})", weights.join(", "))
3150}
3151
3152fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
3153 match value {
3154 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
3155 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
3156 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
3157 }
3158}
3159
3160#[cfg(test)]
3161#[allow(clippy::expect_used)]
3162mod tests {
3163 use std::panic::{AssertUnwindSafe, catch_unwind};
3164 use std::sync::Arc;
3165
3166 use fathomdb_query::{BindValue, QueryBuilder};
3167 use fathomdb_schema::SchemaManager;
3168 use rusqlite::types::Value;
3169 use tempfile::NamedTempFile;
3170
3171 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
3172
3173 use fathomdb_query::{
3174 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
3175 };
3176
3177 use super::{
3178 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
3179 wrap_node_row_projection_sql,
3180 };
3181
3182 fn mk_hit(
3183 logical_id: &str,
3184 score: f64,
3185 match_mode: SearchMatchMode,
3186 source: SearchHitSource,
3187 ) -> SearchHit {
3188 SearchHit {
3189 node: NodeRowLite {
3190 row_id: format!("{logical_id}-row"),
3191 logical_id: logical_id.to_owned(),
3192 kind: "Goal".to_owned(),
3193 properties: "{}".to_owned(),
3194 content_ref: None,
3195 last_accessed_at: None,
3196 },
3197 score,
3198 modality: RetrievalModality::Text,
3199 source,
3200 match_mode: Some(match_mode),
3201 snippet: None,
3202 written_at: 0,
3203 projection_row_id: None,
3204 vector_distance: None,
3205 attribution: None,
3206 }
3207 }
3208
3209 #[test]
3210 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
3211 let strict = vec![mk_hit(
3212 "a",
3213 1.0,
3214 SearchMatchMode::Strict,
3215 SearchHitSource::Chunk,
3216 )];
3217 let relaxed = vec![mk_hit(
3219 "b",
3220 9.9,
3221 SearchMatchMode::Relaxed,
3222 SearchHitSource::Chunk,
3223 )];
3224 let merged = merge_search_branches(strict, relaxed, 10);
3225 assert_eq!(merged.len(), 2);
3226 assert_eq!(merged[0].node.logical_id, "a");
3227 assert!(matches!(
3228 merged[0].match_mode,
3229 Some(SearchMatchMode::Strict)
3230 ));
3231 assert_eq!(merged[1].node.logical_id, "b");
3232 assert!(matches!(
3233 merged[1].match_mode,
3234 Some(SearchMatchMode::Relaxed)
3235 ));
3236 }
3237
3238 #[test]
3239 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
3240 let strict = vec![mk_hit(
3241 "shared",
3242 1.0,
3243 SearchMatchMode::Strict,
3244 SearchHitSource::Chunk,
3245 )];
3246 let relaxed = vec![
3247 mk_hit(
3248 "shared",
3249 9.9,
3250 SearchMatchMode::Relaxed,
3251 SearchHitSource::Chunk,
3252 ),
3253 mk_hit(
3254 "other",
3255 2.0,
3256 SearchMatchMode::Relaxed,
3257 SearchHitSource::Chunk,
3258 ),
3259 ];
3260 let merged = merge_search_branches(strict, relaxed, 10);
3261 assert_eq!(merged.len(), 2);
3262 assert_eq!(merged[0].node.logical_id, "shared");
3263 assert!(matches!(
3264 merged[0].match_mode,
3265 Some(SearchMatchMode::Strict)
3266 ));
3267 assert_eq!(merged[1].node.logical_id, "other");
3268 assert!(matches!(
3269 merged[1].match_mode,
3270 Some(SearchMatchMode::Relaxed)
3271 ));
3272 }
3273
3274 #[test]
3275 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
3276 let strict = vec![
3277 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3278 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3279 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3280 ];
3281 let merged = merge_search_branches(strict, vec![], 10);
3282 assert_eq!(
3283 merged
3284 .iter()
3285 .map(|h| &h.node.logical_id)
3286 .collect::<Vec<_>>(),
3287 vec!["a", "c", "b"]
3288 );
3289 }
3290
3291 #[test]
3292 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
3293 let strict = vec![
3294 mk_hit(
3295 "shared",
3296 1.0,
3297 SearchMatchMode::Strict,
3298 SearchHitSource::Property,
3299 ),
3300 mk_hit(
3301 "shared",
3302 1.0,
3303 SearchMatchMode::Strict,
3304 SearchHitSource::Chunk,
3305 ),
3306 ];
3307 let merged = merge_search_branches(strict, vec![], 10);
3308 assert_eq!(merged.len(), 1);
3309 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
3310 }
3311
3312 #[test]
3313 fn merge_truncates_to_limit_after_block_merge() {
3314 let strict = vec![
3315 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3316 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3317 ];
3318 let relaxed = vec![mk_hit(
3319 "c",
3320 9.0,
3321 SearchMatchMode::Relaxed,
3322 SearchHitSource::Chunk,
3323 )];
3324 let merged = merge_search_branches(strict, relaxed, 2);
3325 assert_eq!(merged.len(), 2);
3326 assert_eq!(merged[0].node.logical_id, "a");
3327 assert_eq!(merged[1].node.logical_id, "b");
3328 }
3329
3330 #[test]
3339 fn search_architecturally_supports_three_branch_fusion() {
3340 let strict = vec![mk_hit(
3341 "alpha",
3342 1.0,
3343 SearchMatchMode::Strict,
3344 SearchHitSource::Chunk,
3345 )];
3346 let relaxed = vec![mk_hit(
3347 "bravo",
3348 5.0,
3349 SearchMatchMode::Relaxed,
3350 SearchHitSource::Chunk,
3351 )];
3352 let mut vector_hit = mk_hit(
3355 "charlie",
3356 9.9,
3357 SearchMatchMode::Strict,
3358 SearchHitSource::Vector,
3359 );
3360 vector_hit.match_mode = None;
3364 vector_hit.modality = RetrievalModality::Vector;
3365 let vector = vec![vector_hit];
3366
3367 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
3368 assert_eq!(merged.len(), 3);
3369 assert_eq!(merged[0].node.logical_id, "alpha");
3370 assert_eq!(merged[1].node.logical_id, "bravo");
3371 assert_eq!(merged[2].node.logical_id, "charlie");
3372 assert!(matches!(merged[2].source, SearchHitSource::Vector));
3374
3375 let strict2 = vec![mk_hit(
3378 "shared",
3379 0.5,
3380 SearchMatchMode::Strict,
3381 SearchHitSource::Chunk,
3382 )];
3383 let relaxed2 = vec![mk_hit(
3384 "shared",
3385 5.0,
3386 SearchMatchMode::Relaxed,
3387 SearchHitSource::Chunk,
3388 )];
3389 let mut vshared = mk_hit(
3390 "shared",
3391 9.9,
3392 SearchMatchMode::Strict,
3393 SearchHitSource::Vector,
3394 );
3395 vshared.match_mode = None;
3396 vshared.modality = RetrievalModality::Vector;
3397 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
3398 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
3399 assert!(matches!(
3400 merged2[0].match_mode,
3401 Some(SearchMatchMode::Strict)
3402 ));
3403 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
3404
3405 let mut vshared2 = mk_hit(
3407 "shared",
3408 9.9,
3409 SearchMatchMode::Strict,
3410 SearchHitSource::Vector,
3411 );
3412 vshared2.match_mode = None;
3413 vshared2.modality = RetrievalModality::Vector;
3414 let merged3 = merge_search_branches_three(
3415 vec![],
3416 vec![mk_hit(
3417 "shared",
3418 1.0,
3419 SearchMatchMode::Relaxed,
3420 SearchHitSource::Chunk,
3421 )],
3422 vec![vshared2],
3423 10,
3424 );
3425 assert_eq!(merged3.len(), 1);
3426 assert!(matches!(
3427 merged3[0].match_mode,
3428 Some(SearchMatchMode::Relaxed)
3429 ));
3430 }
3431
3432 #[test]
3446 fn merge_search_branches_three_vector_only_preserves_vector_block() {
3447 let mut vector_hit = mk_hit(
3448 "solo",
3449 0.75,
3450 SearchMatchMode::Strict,
3451 SearchHitSource::Vector,
3452 );
3453 vector_hit.match_mode = None;
3454 vector_hit.modality = RetrievalModality::Vector;
3455
3456 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
3457
3458 assert_eq!(merged.len(), 1);
3459 assert_eq!(merged[0].node.logical_id, "solo");
3460 assert!(matches!(merged[0].source, SearchHitSource::Vector));
3461 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
3462 assert!(
3463 merged[0].match_mode.is_none(),
3464 "vector hits carry match_mode=None per addendum 1"
3465 );
3466 }
3467
3468 #[test]
3480 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
3481 let strict = vec![
3482 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3483 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3484 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
3485 ];
3486 let relaxed = vec![mk_hit(
3487 "d",
3488 9.0,
3489 SearchMatchMode::Relaxed,
3490 SearchHitSource::Chunk,
3491 )];
3492 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
3493 vector_hit.match_mode = None;
3494 vector_hit.modality = RetrievalModality::Vector;
3495 let vector = vec![vector_hit];
3496
3497 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
3498
3499 assert_eq!(merged.len(), 2);
3500 assert_eq!(merged[0].node.logical_id, "a");
3501 assert_eq!(merged[1].node.logical_id, "b");
3502 assert!(
3504 merged
3505 .iter()
3506 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
3507 "strict block must win limit contention against higher-scored relaxed/vector hits"
3508 );
3509 assert!(
3510 merged
3511 .iter()
3512 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
3513 "no vector source hits should leak past the limit"
3514 );
3515 }
3516
3517 #[test]
3518 fn is_vec_table_absent_matches_known_error_messages() {
3519 use rusqlite::ffi;
3520 fn make_err(msg: &str) -> rusqlite::Error {
3521 rusqlite::Error::SqliteFailure(
3522 ffi::Error {
3523 code: ffi::ErrorCode::Unknown,
3524 extended_code: 1,
3525 },
3526 Some(msg.to_owned()),
3527 )
3528 }
3529 assert!(is_vec_table_absent(&make_err(
3530 "no such table: vec_nodes_active"
3531 )));
3532 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
3533 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
3534 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
3535 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
3536 }
3537
3538 #[test]
3539 fn bind_value_text_maps_to_sql_text() {
3540 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
3541 assert_eq!(val, Value::Text("hello".to_owned()));
3542 }
3543
3544 #[test]
3545 fn bind_value_integer_maps_to_sql_integer() {
3546 let val = bind_value_to_sql(&BindValue::Integer(42));
3547 assert_eq!(val, Value::Integer(42));
3548 }
3549
3550 #[test]
3551 fn bind_value_bool_true_maps_to_integer_one() {
3552 let val = bind_value_to_sql(&BindValue::Bool(true));
3553 assert_eq!(val, Value::Integer(1));
3554 }
3555
3556 #[test]
3557 fn bind_value_bool_false_maps_to_integer_zero() {
3558 let val = bind_value_to_sql(&BindValue::Bool(false));
3559 assert_eq!(val, Value::Integer(0));
3560 }
3561
3562 #[test]
3563 fn same_shape_queries_share_one_cache_entry() {
3564 let db = NamedTempFile::new().expect("temporary db");
3565 let coordinator = ExecutionCoordinator::open(
3566 db.path(),
3567 Arc::new(SchemaManager::new()),
3568 None,
3569 1,
3570 Arc::new(TelemetryCounters::default()),
3571 None,
3572 )
3573 .expect("coordinator");
3574
3575 let compiled_a = QueryBuilder::nodes("Meeting")
3576 .text_search("budget", 5)
3577 .limit(10)
3578 .compile()
3579 .expect("compiled a");
3580 let compiled_b = QueryBuilder::nodes("Meeting")
3581 .text_search("standup", 5)
3582 .limit(10)
3583 .compile()
3584 .expect("compiled b");
3585
3586 coordinator
3587 .execute_compiled_read(&compiled_a)
3588 .expect("read a");
3589 coordinator
3590 .execute_compiled_read(&compiled_b)
3591 .expect("read b");
3592
3593 assert_eq!(
3594 compiled_a.shape_hash, compiled_b.shape_hash,
3595 "different bind values, same structural shape → same hash"
3596 );
3597 assert_eq!(coordinator.shape_sql_count(), 1);
3598 }
3599
3600 #[test]
3601 fn vector_read_degrades_gracefully_when_vec_table_absent() {
3602 let db = NamedTempFile::new().expect("temporary db");
3603 let coordinator = ExecutionCoordinator::open(
3604 db.path(),
3605 Arc::new(SchemaManager::new()),
3606 None,
3607 1,
3608 Arc::new(TelemetryCounters::default()),
3609 None,
3610 )
3611 .expect("coordinator");
3612
3613 let compiled = QueryBuilder::nodes("Meeting")
3614 .vector_search("budget embeddings", 5)
3615 .compile()
3616 .expect("vector query compiles");
3617
3618 let result = coordinator.execute_compiled_read(&compiled);
3619 let rows = result.expect("degraded read must succeed, not error");
3620 assert!(
3621 rows.was_degraded,
3622 "result must be flagged as degraded when vec_nodes_active is absent"
3623 );
3624 assert!(
3625 rows.nodes.is_empty(),
3626 "degraded result must return empty nodes"
3627 );
3628 }
3629
3630 #[test]
3631 fn coordinator_caches_by_shape_hash() {
3632 let db = NamedTempFile::new().expect("temporary db");
3633 let coordinator = ExecutionCoordinator::open(
3634 db.path(),
3635 Arc::new(SchemaManager::new()),
3636 None,
3637 1,
3638 Arc::new(TelemetryCounters::default()),
3639 None,
3640 )
3641 .expect("coordinator");
3642
3643 let compiled = QueryBuilder::nodes("Meeting")
3644 .text_search("budget", 5)
3645 .compile()
3646 .expect("compiled query");
3647
3648 coordinator
3649 .execute_compiled_read(&compiled)
3650 .expect("execute compiled read");
3651 assert_eq!(coordinator.shape_sql_count(), 1);
3652 }
3653
3654 #[test]
3657 fn explain_returns_correct_sql() {
3658 let db = NamedTempFile::new().expect("temporary db");
3659 let coordinator = ExecutionCoordinator::open(
3660 db.path(),
3661 Arc::new(SchemaManager::new()),
3662 None,
3663 1,
3664 Arc::new(TelemetryCounters::default()),
3665 None,
3666 )
3667 .expect("coordinator");
3668
3669 let compiled = QueryBuilder::nodes("Meeting")
3670 .text_search("budget", 5)
3671 .compile()
3672 .expect("compiled query");
3673
3674 let plan = coordinator.explain_compiled_read(&compiled);
3675
3676 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
3677 }
3678
3679 #[test]
3680 fn explain_returns_correct_driving_table() {
3681 use fathomdb_query::DrivingTable;
3682
3683 let db = NamedTempFile::new().expect("temporary db");
3684 let coordinator = ExecutionCoordinator::open(
3685 db.path(),
3686 Arc::new(SchemaManager::new()),
3687 None,
3688 1,
3689 Arc::new(TelemetryCounters::default()),
3690 None,
3691 )
3692 .expect("coordinator");
3693
3694 let compiled = QueryBuilder::nodes("Meeting")
3695 .text_search("budget", 5)
3696 .compile()
3697 .expect("compiled query");
3698
3699 let plan = coordinator.explain_compiled_read(&compiled);
3700
3701 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
3702 }
3703
3704 #[test]
3705 fn explain_reports_cache_miss_then_hit() {
3706 let db = NamedTempFile::new().expect("temporary db");
3707 let coordinator = ExecutionCoordinator::open(
3708 db.path(),
3709 Arc::new(SchemaManager::new()),
3710 None,
3711 1,
3712 Arc::new(TelemetryCounters::default()),
3713 None,
3714 )
3715 .expect("coordinator");
3716
3717 let compiled = QueryBuilder::nodes("Meeting")
3718 .text_search("budget", 5)
3719 .compile()
3720 .expect("compiled query");
3721
3722 let plan_before = coordinator.explain_compiled_read(&compiled);
3724 assert!(
3725 !plan_before.cache_hit,
3726 "cache miss expected before first execute"
3727 );
3728
3729 coordinator
3731 .execute_compiled_read(&compiled)
3732 .expect("execute read");
3733
3734 let plan_after = coordinator.explain_compiled_read(&compiled);
3736 assert!(
3737 plan_after.cache_hit,
3738 "cache hit expected after first execute"
3739 );
3740 }
3741
3742 #[test]
3743 fn explain_does_not_execute_query() {
3744 let db = NamedTempFile::new().expect("temporary db");
3749 let coordinator = ExecutionCoordinator::open(
3750 db.path(),
3751 Arc::new(SchemaManager::new()),
3752 None,
3753 1,
3754 Arc::new(TelemetryCounters::default()),
3755 None,
3756 )
3757 .expect("coordinator");
3758
3759 let compiled = QueryBuilder::nodes("Meeting")
3760 .text_search("anything", 5)
3761 .compile()
3762 .expect("compiled query");
3763
3764 let plan = coordinator.explain_compiled_read(&compiled);
3766
3767 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
3768 assert_eq!(plan.bind_count, compiled.binds.len());
3769 }
3770
3771 #[test]
3772 fn coordinator_executes_compiled_read() {
3773 let db = NamedTempFile::new().expect("temporary db");
3774 let coordinator = ExecutionCoordinator::open(
3775 db.path(),
3776 Arc::new(SchemaManager::new()),
3777 None,
3778 1,
3779 Arc::new(TelemetryCounters::default()),
3780 None,
3781 )
3782 .expect("coordinator");
3783 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3784
3785 conn.execute_batch(
3786 r#"
3787 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3788 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
3789 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3790 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
3791 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3792 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
3793 "#,
3794 )
3795 .expect("seed data");
3796
3797 let compiled = QueryBuilder::nodes("Meeting")
3798 .text_search("budget", 5)
3799 .limit(5)
3800 .compile()
3801 .expect("compiled query");
3802
3803 let rows = coordinator
3804 .execute_compiled_read(&compiled)
3805 .expect("execute read");
3806
3807 assert_eq!(rows.nodes.len(), 1);
3808 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3809 }
3810
3811 #[test]
3812 fn text_search_finds_structured_only_node_via_property_fts() {
3813 let db = NamedTempFile::new().expect("temporary db");
3814 let coordinator = ExecutionCoordinator::open(
3815 db.path(),
3816 Arc::new(SchemaManager::new()),
3817 None,
3818 1,
3819 Arc::new(TelemetryCounters::default()),
3820 None,
3821 )
3822 .expect("coordinator");
3823 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3824
3825 conn.execute_batch(
3828 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
3829 node_logical_id UNINDEXED, text_content, \
3830 tokenize = 'porter unicode61 remove_diacritics 2'\
3831 )",
3832 )
3833 .expect("create per-kind fts table");
3834 conn.execute_batch(
3835 r#"
3836 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3837 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
3838 INSERT INTO fts_props_goal (node_logical_id, text_content)
3839 VALUES ('goal-1', 'Ship v2');
3840 "#,
3841 )
3842 .expect("seed data");
3843
3844 let compiled = QueryBuilder::nodes("Goal")
3845 .text_search("Ship", 5)
3846 .limit(5)
3847 .compile()
3848 .expect("compiled query");
3849
3850 let rows = coordinator
3851 .execute_compiled_read(&compiled)
3852 .expect("execute read");
3853
3854 assert_eq!(rows.nodes.len(), 1);
3855 assert_eq!(rows.nodes[0].logical_id, "goal-1");
3856 }
3857
3858 #[test]
3859 fn text_search_returns_both_chunk_and_property_backed_hits() {
3860 let db = NamedTempFile::new().expect("temporary db");
3861 let coordinator = ExecutionCoordinator::open(
3862 db.path(),
3863 Arc::new(SchemaManager::new()),
3864 None,
3865 1,
3866 Arc::new(TelemetryCounters::default()),
3867 None,
3868 )
3869 .expect("coordinator");
3870 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3871
3872 conn.execute_batch(
3874 r"
3875 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3876 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3877 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3878 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3879 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3880 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3881 ",
3882 )
3883 .expect("seed chunk-backed node");
3884
3885 conn.execute_batch(
3888 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
3889 node_logical_id UNINDEXED, text_content, \
3890 tokenize = 'porter unicode61 remove_diacritics 2'\
3891 )",
3892 )
3893 .expect("create per-kind fts table");
3894 conn.execute_batch(
3895 r#"
3896 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3897 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3898 INSERT INTO fts_props_meeting (node_logical_id, text_content)
3899 VALUES ('meeting-2', 'quarterly sync');
3900 "#,
3901 )
3902 .expect("seed property-backed node");
3903
3904 let compiled = QueryBuilder::nodes("Meeting")
3905 .text_search("quarterly", 10)
3906 .limit(10)
3907 .compile()
3908 .expect("compiled query");
3909
3910 let rows = coordinator
3911 .execute_compiled_read(&compiled)
3912 .expect("execute read");
3913
3914 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3915 ids.sort_unstable();
3916 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3917 }
3918
3919 #[test]
3920 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3921 let db = NamedTempFile::new().expect("temporary db");
3922 let coordinator = ExecutionCoordinator::open(
3923 db.path(),
3924 Arc::new(SchemaManager::new()),
3925 None,
3926 1,
3927 Arc::new(TelemetryCounters::default()),
3928 None,
3929 )
3930 .expect("coordinator");
3931 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3932
3933 conn.execute_batch(
3934 r"
3935 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3936 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3937 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3938 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3939 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3940 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3941 ",
3942 )
3943 .expect("seed chunk-backed node");
3944
3945 let compiled = QueryBuilder::nodes("Meeting")
3946 .text_search("not a ship", 10)
3947 .limit(10)
3948 .compile()
3949 .expect("compiled query");
3950
3951 let rows = coordinator
3952 .execute_compiled_read(&compiled)
3953 .expect("execute read");
3954
3955 assert_eq!(rows.nodes.len(), 1);
3956 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3957 }
3958
3959 #[test]
3962 fn capability_gate_reports_false_without_feature() {
3963 let db = NamedTempFile::new().expect("temporary db");
3964 let coordinator = ExecutionCoordinator::open(
3967 db.path(),
3968 Arc::new(SchemaManager::new()),
3969 None,
3970 1,
3971 Arc::new(TelemetryCounters::default()),
3972 None,
3973 )
3974 .expect("coordinator");
3975 assert!(
3976 !coordinator.vector_enabled(),
3977 "vector_enabled must be false when no dimension is requested"
3978 );
3979 }
3980
3981 #[cfg(feature = "sqlite-vec")]
3982 #[test]
3983 fn capability_gate_reports_true_when_feature_enabled() {
3984 let db = NamedTempFile::new().expect("temporary db");
3985 let coordinator = ExecutionCoordinator::open(
3986 db.path(),
3987 Arc::new(SchemaManager::new()),
3988 Some(128),
3989 1,
3990 Arc::new(TelemetryCounters::default()),
3991 None,
3992 )
3993 .expect("coordinator");
3994 assert!(
3995 coordinator.vector_enabled(),
3996 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3997 );
3998 }
3999
4000 #[test]
4003 fn read_run_returns_inserted_run() {
4004 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4005
4006 let db = NamedTempFile::new().expect("temporary db");
4007 let writer = WriterActor::start(
4008 db.path(),
4009 Arc::new(SchemaManager::new()),
4010 ProvenanceMode::Warn,
4011 Arc::new(TelemetryCounters::default()),
4012 )
4013 .expect("writer");
4014 writer
4015 .submit(WriteRequest {
4016 label: "runtime".to_owned(),
4017 nodes: vec![],
4018 node_retires: vec![],
4019 edges: vec![],
4020 edge_retires: vec![],
4021 chunks: vec![],
4022 runs: vec![RunInsert {
4023 id: "run-r1".to_owned(),
4024 kind: "session".to_owned(),
4025 status: "active".to_owned(),
4026 properties: "{}".to_owned(),
4027 source_ref: Some("src-1".to_owned()),
4028 upsert: false,
4029 supersedes_id: None,
4030 }],
4031 steps: vec![],
4032 actions: vec![],
4033 optional_backfills: vec![],
4034 vec_inserts: vec![],
4035 operational_writes: vec![],
4036 })
4037 .expect("write run");
4038
4039 let coordinator = ExecutionCoordinator::open(
4040 db.path(),
4041 Arc::new(SchemaManager::new()),
4042 None,
4043 1,
4044 Arc::new(TelemetryCounters::default()),
4045 None,
4046 )
4047 .expect("coordinator");
4048 let row = coordinator
4049 .read_run("run-r1")
4050 .expect("read_run")
4051 .expect("row exists");
4052 assert_eq!(row.id, "run-r1");
4053 assert_eq!(row.kind, "session");
4054 assert_eq!(row.status, "active");
4055 }
4056
4057 #[test]
4058 fn read_step_returns_inserted_step() {
4059 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
4060
4061 let db = NamedTempFile::new().expect("temporary db");
4062 let writer = WriterActor::start(
4063 db.path(),
4064 Arc::new(SchemaManager::new()),
4065 ProvenanceMode::Warn,
4066 Arc::new(TelemetryCounters::default()),
4067 )
4068 .expect("writer");
4069 writer
4070 .submit(WriteRequest {
4071 label: "runtime".to_owned(),
4072 nodes: vec![],
4073 node_retires: vec![],
4074 edges: vec![],
4075 edge_retires: vec![],
4076 chunks: vec![],
4077 runs: vec![RunInsert {
4078 id: "run-s1".to_owned(),
4079 kind: "session".to_owned(),
4080 status: "active".to_owned(),
4081 properties: "{}".to_owned(),
4082 source_ref: Some("src-1".to_owned()),
4083 upsert: false,
4084 supersedes_id: None,
4085 }],
4086 steps: vec![StepInsert {
4087 id: "step-s1".to_owned(),
4088 run_id: "run-s1".to_owned(),
4089 kind: "llm".to_owned(),
4090 status: "completed".to_owned(),
4091 properties: "{}".to_owned(),
4092 source_ref: Some("src-1".to_owned()),
4093 upsert: false,
4094 supersedes_id: None,
4095 }],
4096 actions: vec![],
4097 optional_backfills: vec![],
4098 vec_inserts: vec![],
4099 operational_writes: vec![],
4100 })
4101 .expect("write step");
4102
4103 let coordinator = ExecutionCoordinator::open(
4104 db.path(),
4105 Arc::new(SchemaManager::new()),
4106 None,
4107 1,
4108 Arc::new(TelemetryCounters::default()),
4109 None,
4110 )
4111 .expect("coordinator");
4112 let row = coordinator
4113 .read_step("step-s1")
4114 .expect("read_step")
4115 .expect("row exists");
4116 assert_eq!(row.id, "step-s1");
4117 assert_eq!(row.run_id, "run-s1");
4118 assert_eq!(row.kind, "llm");
4119 }
4120
4121 #[test]
4122 fn read_action_returns_inserted_action() {
4123 use crate::{
4124 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
4125 writer::{ActionInsert, StepInsert},
4126 };
4127
4128 let db = NamedTempFile::new().expect("temporary db");
4129 let writer = WriterActor::start(
4130 db.path(),
4131 Arc::new(SchemaManager::new()),
4132 ProvenanceMode::Warn,
4133 Arc::new(TelemetryCounters::default()),
4134 )
4135 .expect("writer");
4136 writer
4137 .submit(WriteRequest {
4138 label: "runtime".to_owned(),
4139 nodes: vec![],
4140 node_retires: vec![],
4141 edges: vec![],
4142 edge_retires: vec![],
4143 chunks: vec![],
4144 runs: vec![RunInsert {
4145 id: "run-a1".to_owned(),
4146 kind: "session".to_owned(),
4147 status: "active".to_owned(),
4148 properties: "{}".to_owned(),
4149 source_ref: Some("src-1".to_owned()),
4150 upsert: false,
4151 supersedes_id: None,
4152 }],
4153 steps: vec![StepInsert {
4154 id: "step-a1".to_owned(),
4155 run_id: "run-a1".to_owned(),
4156 kind: "llm".to_owned(),
4157 status: "completed".to_owned(),
4158 properties: "{}".to_owned(),
4159 source_ref: Some("src-1".to_owned()),
4160 upsert: false,
4161 supersedes_id: None,
4162 }],
4163 actions: vec![ActionInsert {
4164 id: "action-a1".to_owned(),
4165 step_id: "step-a1".to_owned(),
4166 kind: "emit".to_owned(),
4167 status: "completed".to_owned(),
4168 properties: "{}".to_owned(),
4169 source_ref: Some("src-1".to_owned()),
4170 upsert: false,
4171 supersedes_id: None,
4172 }],
4173 optional_backfills: vec![],
4174 vec_inserts: vec![],
4175 operational_writes: vec![],
4176 })
4177 .expect("write action");
4178
4179 let coordinator = ExecutionCoordinator::open(
4180 db.path(),
4181 Arc::new(SchemaManager::new()),
4182 None,
4183 1,
4184 Arc::new(TelemetryCounters::default()),
4185 None,
4186 )
4187 .expect("coordinator");
4188 let row = coordinator
4189 .read_action("action-a1")
4190 .expect("read_action")
4191 .expect("row exists");
4192 assert_eq!(row.id, "action-a1");
4193 assert_eq!(row.step_id, "step-a1");
4194 assert_eq!(row.kind, "emit");
4195 }
4196
4197 #[test]
4198 fn read_active_runs_excludes_superseded() {
4199 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
4200
4201 let db = NamedTempFile::new().expect("temporary db");
4202 let writer = WriterActor::start(
4203 db.path(),
4204 Arc::new(SchemaManager::new()),
4205 ProvenanceMode::Warn,
4206 Arc::new(TelemetryCounters::default()),
4207 )
4208 .expect("writer");
4209
4210 writer
4212 .submit(WriteRequest {
4213 label: "v1".to_owned(),
4214 nodes: vec![],
4215 node_retires: vec![],
4216 edges: vec![],
4217 edge_retires: vec![],
4218 chunks: vec![],
4219 runs: vec![RunInsert {
4220 id: "run-v1".to_owned(),
4221 kind: "session".to_owned(),
4222 status: "active".to_owned(),
4223 properties: "{}".to_owned(),
4224 source_ref: Some("src-1".to_owned()),
4225 upsert: false,
4226 supersedes_id: None,
4227 }],
4228 steps: vec![],
4229 actions: vec![],
4230 optional_backfills: vec![],
4231 vec_inserts: vec![],
4232 operational_writes: vec![],
4233 })
4234 .expect("v1 write");
4235
4236 writer
4238 .submit(WriteRequest {
4239 label: "v2".to_owned(),
4240 nodes: vec![],
4241 node_retires: vec![],
4242 edges: vec![],
4243 edge_retires: vec![],
4244 chunks: vec![],
4245 runs: vec![RunInsert {
4246 id: "run-v2".to_owned(),
4247 kind: "session".to_owned(),
4248 status: "completed".to_owned(),
4249 properties: "{}".to_owned(),
4250 source_ref: Some("src-2".to_owned()),
4251 upsert: true,
4252 supersedes_id: Some("run-v1".to_owned()),
4253 }],
4254 steps: vec![],
4255 actions: vec![],
4256 optional_backfills: vec![],
4257 vec_inserts: vec![],
4258 operational_writes: vec![],
4259 })
4260 .expect("v2 write");
4261
4262 let coordinator = ExecutionCoordinator::open(
4263 db.path(),
4264 Arc::new(SchemaManager::new()),
4265 None,
4266 1,
4267 Arc::new(TelemetryCounters::default()),
4268 None,
4269 )
4270 .expect("coordinator");
4271 let active = coordinator.read_active_runs().expect("read_active_runs");
4272
4273 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
4274 assert_eq!(active[0].id, "run-v2");
4275 }
4276
4277 #[allow(clippy::panic)]
4278 fn poison_connection(coordinator: &ExecutionCoordinator) {
4279 let result = catch_unwind(AssertUnwindSafe(|| {
4280 let _guard = coordinator.pool.connections[0]
4281 .lock()
4282 .expect("poison test lock");
4283 panic!("poison coordinator connection mutex");
4284 }));
4285 assert!(
4286 result.is_err(),
4287 "poison test must unwind while holding the connection mutex"
4288 );
4289 }
4290
4291 #[allow(clippy::panic)]
4292 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
4293 where
4294 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
4295 {
4296 match op(coordinator) {
4297 Err(EngineError::Bridge(message)) => {
4298 assert_eq!(message, "connection mutex poisoned");
4299 }
4300 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
4301 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
4302 }
4303 }
4304
4305 #[test]
4306 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
4307 let db = NamedTempFile::new().expect("temporary db");
4308 let coordinator = ExecutionCoordinator::open(
4309 db.path(),
4310 Arc::new(SchemaManager::new()),
4311 None,
4312 1,
4313 Arc::new(TelemetryCounters::default()),
4314 None,
4315 )
4316 .expect("coordinator");
4317
4318 poison_connection(&coordinator);
4319
4320 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
4321 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
4322 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
4323 assert_poisoned_connection_error(
4324 &coordinator,
4325 super::ExecutionCoordinator::read_active_runs,
4326 );
4327 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
4328 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
4329 }
4330
4331 #[test]
4334 fn shape_cache_stays_bounded() {
4335 use fathomdb_query::ShapeHash;
4336
4337 let db = NamedTempFile::new().expect("temporary db");
4338 let coordinator = ExecutionCoordinator::open(
4339 db.path(),
4340 Arc::new(SchemaManager::new()),
4341 None,
4342 1,
4343 Arc::new(TelemetryCounters::default()),
4344 None,
4345 )
4346 .expect("coordinator");
4347
4348 {
4350 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
4351 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
4352 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
4353 }
4354 }
4355 let compiled = QueryBuilder::nodes("Meeting")
4360 .text_search("budget", 5)
4361 .limit(10)
4362 .compile()
4363 .expect("compiled query");
4364
4365 coordinator
4366 .execute_compiled_read(&compiled)
4367 .expect("execute read");
4368
4369 assert!(
4370 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
4371 "shape cache must stay bounded: got {} entries, max {}",
4372 coordinator.shape_sql_count(),
4373 super::MAX_SHAPE_CACHE_SIZE
4374 );
4375 }
4376
4377 #[test]
4380 fn read_pool_size_configurable() {
4381 let db = NamedTempFile::new().expect("temporary db");
4382 let coordinator = ExecutionCoordinator::open(
4383 db.path(),
4384 Arc::new(SchemaManager::new()),
4385 None,
4386 2,
4387 Arc::new(TelemetryCounters::default()),
4388 None,
4389 )
4390 .expect("coordinator with pool_size=2");
4391
4392 assert_eq!(coordinator.pool.size(), 2);
4393
4394 let compiled = QueryBuilder::nodes("Meeting")
4396 .text_search("budget", 5)
4397 .limit(10)
4398 .compile()
4399 .expect("compiled query");
4400
4401 let result = coordinator.execute_compiled_read(&compiled);
4402 assert!(result.is_ok(), "read through pool must succeed");
4403 }
4404
4405 #[test]
4408 fn grouped_read_results_match_baseline() {
4409 use fathomdb_query::TraverseDirection;
4410
4411 let db = NamedTempFile::new().expect("temporary db");
4412
4413 let coordinator = ExecutionCoordinator::open(
4415 db.path(),
4416 Arc::new(SchemaManager::new()),
4417 None,
4418 1,
4419 Arc::new(TelemetryCounters::default()),
4420 None,
4421 )
4422 .expect("coordinator");
4423
4424 {
4427 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
4428 for i in 0..10 {
4429 conn.execute_batch(&format!(
4430 r#"
4431 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4432 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
4433 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4434 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
4435 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4436 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
4437
4438 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4439 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
4440 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4441 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
4442
4443 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4444 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
4445 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
4446 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
4447 "#,
4448 )).expect("seed data");
4449 }
4450 }
4451
4452 let compiled = QueryBuilder::nodes("Meeting")
4453 .text_search("meeting", 10)
4454 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None)
4455 .limit(10)
4456 .compile_grouped()
4457 .expect("compiled grouped query");
4458
4459 let result = coordinator
4460 .execute_compiled_grouped_read(&compiled)
4461 .expect("grouped read");
4462
4463 assert!(!result.was_degraded, "grouped read should not be degraded");
4464 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
4465 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
4466 assert_eq!(result.expansions[0].slot, "tasks");
4467 assert_eq!(
4468 result.expansions[0].roots.len(),
4469 10,
4470 "each expansion slot should have entries for all 10 roots"
4471 );
4472
4473 for root_expansion in &result.expansions[0].roots {
4475 assert_eq!(
4476 root_expansion.nodes.len(),
4477 2,
4478 "root {} should have 2 expansion nodes, got {}",
4479 root_expansion.root_logical_id,
4480 root_expansion.nodes.len()
4481 );
4482 }
4483 }
4484
4485 #[test]
4488 fn build_bm25_expr_no_weights() {
4489 let schema_json = r#"["$.title","$.body"]"#;
4490 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4491 assert_eq!(result, "bm25(fts_props_testkind)");
4492 }
4493
4494 #[test]
4495 fn build_bm25_expr_with_weights() {
4496 let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
4497 let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
4498 assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
4499 }
4500
4501 #[test]
4504 #[allow(clippy::too_many_lines)]
4505 fn weighted_schema_bm25_orders_title_match_above_body_match() {
4506 use crate::{
4507 AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
4508 WriterActor, writer::ChunkPolicy,
4509 };
4510 use fathomdb_schema::fts_column_name;
4511
4512 let db = NamedTempFile::new().expect("temporary db");
4513 let schema_manager = Arc::new(SchemaManager::new());
4514
4515 {
4517 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4518 admin
4519 .register_fts_property_schema_with_entries(
4520 "Article",
4521 &[
4522 FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
4523 FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
4524 ],
4525 None,
4526 &[],
4527 crate::rebuild_actor::RebuildMode::Eager,
4528 )
4529 .expect("register schema with weights");
4530 }
4531
4532 let writer = WriterActor::start(
4534 db.path(),
4535 Arc::clone(&schema_manager),
4536 ProvenanceMode::Warn,
4537 Arc::new(TelemetryCounters::default()),
4538 )
4539 .expect("writer");
4540
4541 writer
4543 .submit(WriteRequest {
4544 label: "insert-a".to_owned(),
4545 nodes: vec![NodeInsert {
4546 row_id: "row-a".to_owned(),
4547 logical_id: "article-a".to_owned(),
4548 kind: "Article".to_owned(),
4549 properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
4550 source_ref: Some("src-a".to_owned()),
4551 upsert: false,
4552 chunk_policy: ChunkPolicy::Preserve,
4553 content_ref: None,
4554 }],
4555 node_retires: vec![],
4556 edges: vec![],
4557 edge_retires: vec![],
4558 chunks: vec![],
4559 runs: vec![],
4560 steps: vec![],
4561 actions: vec![],
4562 optional_backfills: vec![],
4563 vec_inserts: vec![],
4564 operational_writes: vec![],
4565 })
4566 .expect("write node A");
4567
4568 writer
4570 .submit(WriteRequest {
4571 label: "insert-b".to_owned(),
4572 nodes: vec![NodeInsert {
4573 row_id: "row-b".to_owned(),
4574 logical_id: "article-b".to_owned(),
4575 kind: "Article".to_owned(),
4576 properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
4577 source_ref: Some("src-b".to_owned()),
4578 upsert: false,
4579 chunk_policy: ChunkPolicy::Preserve,
4580 content_ref: None,
4581 }],
4582 node_retires: vec![],
4583 edges: vec![],
4584 edge_retires: vec![],
4585 chunks: vec![],
4586 runs: vec![],
4587 steps: vec![],
4588 actions: vec![],
4589 optional_backfills: vec![],
4590 vec_inserts: vec![],
4591 operational_writes: vec![],
4592 })
4593 .expect("write node B");
4594
4595 drop(writer);
4596
4597 {
4599 let title_col = fts_column_name("$.title", false);
4600 let body_col = fts_column_name("$.body", false);
4601 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4602 let count: i64 = conn
4603 .query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
4604 .expect("count fts rows");
4605 assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
4606 let (title_a, body_a): (String, String) = conn
4607 .query_row(
4608 &format!(
4609 "SELECT {title_col}, {body_col} FROM fts_props_article \
4610 WHERE node_logical_id = 'article-a'"
4611 ),
4612 [],
4613 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
4614 )
4615 .expect("select article-a");
4616 assert_eq!(
4617 title_a, "rust",
4618 "article-a must have 'rust' in title column"
4619 );
4620 assert_eq!(
4621 body_a, "other",
4622 "article-a must have 'other' in body column"
4623 );
4624 }
4625
4626 let coordinator = ExecutionCoordinator::open(
4628 db.path(),
4629 Arc::clone(&schema_manager),
4630 None,
4631 1,
4632 Arc::new(TelemetryCounters::default()),
4633 None,
4634 )
4635 .expect("coordinator");
4636
4637 let compiled = fathomdb_query::QueryBuilder::nodes("Article")
4638 .text_search("rust", 5)
4639 .limit(10)
4640 .compile()
4641 .expect("compiled query");
4642
4643 let rows = coordinator
4644 .execute_compiled_read(&compiled)
4645 .expect("execute read");
4646
4647 assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
4648 assert_eq!(
4649 rows.nodes[0].logical_id, "article-a",
4650 "article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
4651 );
4652 }
4653
4654 #[test]
4665 fn property_fts_hit_matched_paths_from_positions() {
4666 use crate::{AdminService, rebuild_actor::RebuildMode};
4667 use fathomdb_query::compile_search;
4668
4669 let db = NamedTempFile::new().expect("temporary db");
4670 let schema_manager = Arc::new(SchemaManager::new());
4671
4672 {
4675 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4676 admin
4677 .register_fts_property_schema_with_entries(
4678 "Item",
4679 &[
4680 crate::FtsPropertyPathSpec::scalar("$.body"),
4681 crate::FtsPropertyPathSpec::scalar("$.title"),
4682 ],
4683 None,
4684 &[],
4685 RebuildMode::Eager,
4686 )
4687 .expect("register Item FTS schema");
4688 }
4689
4690 let coordinator = ExecutionCoordinator::open(
4691 db.path(),
4692 Arc::clone(&schema_manager),
4693 None,
4694 1,
4695 Arc::new(TelemetryCounters::default()),
4696 None,
4697 )
4698 .expect("coordinator");
4699
4700 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4701
4702 let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
4707 assert_eq!(
4709 crate::writer::LEAF_SEPARATOR.len(),
4710 29,
4711 "LEAF_SEPARATOR length changed; update position offsets"
4712 );
4713
4714 conn.execute(
4715 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4716 VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
4717 [],
4718 )
4719 .expect("insert node");
4720 conn.execute(
4722 "INSERT INTO fts_props_item (node_logical_id, text_content) \
4723 VALUES ('item-1', ?1)",
4724 rusqlite::params![blob],
4725 )
4726 .expect("insert fts row");
4727 conn.execute(
4728 "INSERT INTO fts_node_property_positions \
4729 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4730 VALUES ('item-1', 'Item', 0, 5, '$.body')",
4731 [],
4732 )
4733 .expect("insert body position");
4734 conn.execute(
4735 "INSERT INTO fts_node_property_positions \
4736 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4737 VALUES ('item-1', 'Item', 34, 44, '$.title')",
4738 [],
4739 )
4740 .expect("insert title position");
4741
4742 let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
4743 let mut compiled = compile_search(ast.ast()).expect("compile search");
4744 compiled.attribution_requested = true;
4745
4746 let rows = coordinator
4747 .execute_compiled_search(&compiled)
4748 .expect("search");
4749
4750 assert!(!rows.hits.is_empty(), "expected at least one hit");
4751 let hit = rows
4752 .hits
4753 .iter()
4754 .find(|h| h.node.logical_id == "item-1")
4755 .expect("item-1 must be in hits");
4756
4757 let att = hit
4758 .attribution
4759 .as_ref()
4760 .expect("attribution must be Some when attribution_requested");
4761 assert!(
4762 att.matched_paths.contains(&"$.title".to_owned()),
4763 "matched_paths must contain '$.title', got {:?}",
4764 att.matched_paths,
4765 );
4766 assert!(
4767 !att.matched_paths.contains(&"$.body".to_owned()),
4768 "matched_paths must NOT contain '$.body', got {:?}",
4769 att.matched_paths,
4770 );
4771 }
4772
4773 #[test]
4781 fn vector_hit_has_no_attribution() {
4782 use fathomdb_query::compile_vector_search;
4783
4784 let db = NamedTempFile::new().expect("temporary db");
4785 let coordinator = ExecutionCoordinator::open(
4786 db.path(),
4787 Arc::new(SchemaManager::new()),
4788 None,
4789 1,
4790 Arc::new(TelemetryCounters::default()),
4791 None,
4792 )
4793 .expect("coordinator");
4794
4795 let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
4797 let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
4798 compiled.attribution_requested = true;
4799
4800 let rows = coordinator
4803 .execute_compiled_vector_search(&compiled)
4804 .expect("vector search must not error");
4805
4806 assert!(
4807 rows.was_degraded,
4808 "vector search without vec table must degrade"
4809 );
4810 for hit in &rows.hits {
4811 assert!(
4812 hit.attribution.is_none(),
4813 "vector hits must carry attribution = None, got {:?}",
4814 hit.attribution
4815 );
4816 }
4817 }
4818
4819 #[test]
4833 fn chunk_hit_has_text_content_attribution() {
4834 use fathomdb_query::compile_search;
4835
4836 let db = NamedTempFile::new().expect("temporary db");
4837 let coordinator = ExecutionCoordinator::open(
4838 db.path(),
4839 Arc::new(SchemaManager::new()),
4840 None,
4841 1,
4842 Arc::new(TelemetryCounters::default()),
4843 None,
4844 )
4845 .expect("coordinator");
4846
4847 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4848
4849 conn.execute_batch(
4850 r"
4851 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
4852 VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
4853 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
4854 VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
4855 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
4856 VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
4857 ",
4858 )
4859 .expect("seed chunk node");
4860
4861 let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
4862 let mut compiled = compile_search(ast.ast()).expect("compile search");
4863 compiled.attribution_requested = true;
4864
4865 let rows = coordinator
4866 .execute_compiled_search(&compiled)
4867 .expect("search");
4868
4869 assert!(!rows.hits.is_empty(), "expected chunk hit");
4870 let hit = rows
4871 .hits
4872 .iter()
4873 .find(|h| matches!(h.source, SearchHitSource::Chunk))
4874 .expect("must have a Chunk hit");
4875
4876 let att = hit
4881 .attribution
4882 .as_ref()
4883 .expect("attribution must be Some when attribution_requested");
4884 assert!(
4885 att.matched_paths.is_empty(),
4886 "placeholder: chunk matched_paths must be empty until integration \
4887 tests are updated; got {:?}",
4888 att.matched_paths,
4889 );
4890 }
4891
4892 #[test]
4899 #[allow(clippy::too_many_lines)]
4900 fn mixed_kind_results_get_per_kind_matched_paths() {
4901 use crate::{AdminService, rebuild_actor::RebuildMode};
4902 use fathomdb_query::compile_search;
4903
4904 let db = NamedTempFile::new().expect("temporary db");
4905 let schema_manager = Arc::new(SchemaManager::new());
4906
4907 {
4910 let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
4911 admin
4912 .register_fts_property_schema_with_entries(
4913 "KindA",
4914 &[crate::FtsPropertyPathSpec::scalar("$.alpha")],
4915 None,
4916 &[],
4917 RebuildMode::Eager,
4918 )
4919 .expect("register KindA FTS schema");
4920 admin
4921 .register_fts_property_schema_with_entries(
4922 "KindB",
4923 &[crate::FtsPropertyPathSpec::scalar("$.beta")],
4924 None,
4925 &[],
4926 RebuildMode::Eager,
4927 )
4928 .expect("register KindB FTS schema");
4929 }
4930
4931 let coordinator = ExecutionCoordinator::open(
4932 db.path(),
4933 Arc::clone(&schema_manager),
4934 None,
4935 1,
4936 Arc::new(TelemetryCounters::default()),
4937 None,
4938 )
4939 .expect("coordinator");
4940
4941 let conn = rusqlite::Connection::open(db.path()).expect("open db");
4942
4943 conn.execute(
4945 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4946 VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
4947 [],
4948 )
4949 .expect("insert KindA node");
4950 conn.execute(
4952 "INSERT INTO fts_props_kinda (node_logical_id, text_content) \
4953 VALUES ('node-a', 'xenoterm')",
4954 [],
4955 )
4956 .expect("insert KindA fts row");
4957 conn.execute(
4958 "INSERT INTO fts_node_property_positions \
4959 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4960 VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
4961 [],
4962 )
4963 .expect("insert KindA position");
4964
4965 conn.execute(
4967 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4968 VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
4969 [],
4970 )
4971 .expect("insert KindB node");
4972 conn.execute(
4974 "INSERT INTO fts_props_kindb (node_logical_id, text_content) \
4975 VALUES ('node-b', 'xenoterm')",
4976 [],
4977 )
4978 .expect("insert KindB fts row");
4979 conn.execute(
4980 "INSERT INTO fts_node_property_positions \
4981 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
4982 VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
4983 [],
4984 )
4985 .expect("insert KindB position");
4986
4987 let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
4989 let mut compiled = compile_search(ast.ast()).expect("compile search");
4990 compiled.attribution_requested = true;
4991
4992 let rows = coordinator
4993 .execute_compiled_search(&compiled)
4994 .expect("search");
4995
4996 assert!(
4998 rows.hits.len() >= 2,
4999 "expected hits for both kinds, got {}",
5000 rows.hits.len()
5001 );
5002
5003 for hit in &rows.hits {
5004 let att = hit
5005 .attribution
5006 .as_ref()
5007 .expect("attribution must be Some when attribution_requested");
5008 match hit.node.kind.as_str() {
5009 "KindA" => {
5010 assert_eq!(
5011 att.matched_paths,
5012 vec!["$.alpha".to_owned()],
5013 "KindA hit must have matched_paths=['$.alpha'], got {:?}",
5014 att.matched_paths,
5015 );
5016 }
5017 "KindB" => {
5018 assert_eq!(
5019 att.matched_paths,
5020 vec!["$.beta".to_owned()],
5021 "KindB hit must have matched_paths=['$.beta'], got {:?}",
5022 att.matched_paths,
5023 );
5024 }
5025 other => {
5026 assert_eq!(other, "KindA", "unexpected kind in result: {other}");
5028 }
5029 }
5030 }
5031 }
5032
5033 #[test]
5036 fn tokenizer_strategy_from_str() {
5037 use super::TokenizerStrategy;
5038 assert_eq!(
5039 TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
5040 TokenizerStrategy::RecallOptimizedEnglish,
5041 );
5042 assert_eq!(
5043 TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
5044 TokenizerStrategy::PrecisionOptimized,
5045 );
5046 assert_eq!(
5047 TokenizerStrategy::from_str("trigram"),
5048 TokenizerStrategy::SubstringTrigram,
5049 );
5050 assert_eq!(
5051 TokenizerStrategy::from_str("icu"),
5052 TokenizerStrategy::GlobalCjk,
5053 );
5054 assert_eq!(
5055 TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
5056 TokenizerStrategy::SourceCode,
5057 );
5058 assert_eq!(
5060 TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
5061 TokenizerStrategy::SourceCode,
5062 );
5063 assert_eq!(
5064 TokenizerStrategy::from_str("my_custom_tokenizer"),
5065 TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
5066 );
5067 }
5068
5069 #[test]
5070 fn trigram_short_query_returns_empty() {
5071 use fathomdb_query::compile_search;
5072
5073 let db = NamedTempFile::new().expect("temporary db");
5074 let schema_manager = Arc::new(SchemaManager::new());
5075
5076 {
5078 let bootstrap = ExecutionCoordinator::open(
5079 db.path(),
5080 Arc::clone(&schema_manager),
5081 None,
5082 1,
5083 Arc::new(TelemetryCounters::default()),
5084 None,
5085 )
5086 .expect("bootstrap coordinator");
5087 drop(bootstrap);
5088 }
5089
5090 {
5092 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5093 conn.execute_batch(
5094 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5095 VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
5096 )
5097 .expect("insert profile");
5098 }
5099
5100 let coordinator = ExecutionCoordinator::open(
5102 db.path(),
5103 Arc::clone(&schema_manager),
5104 None,
5105 1,
5106 Arc::new(TelemetryCounters::default()),
5107 None,
5108 )
5109 .expect("coordinator reopen");
5110
5111 let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
5113 let compiled = compile_search(ast.ast()).expect("compile search");
5114 let rows = coordinator
5115 .execute_compiled_search(&compiled)
5116 .expect("short trigram query must not error");
5117 assert!(
5118 rows.hits.is_empty(),
5119 "2-char trigram query must return empty"
5120 );
5121 }
5122
5123 #[test]
5124 fn source_code_strategy_does_not_corrupt_fts5_syntax() {
5125 use fathomdb_query::compile_search;
5135
5136 let db = NamedTempFile::new().expect("temporary db");
5137 let schema_manager = Arc::new(SchemaManager::new());
5138
5139 {
5141 let bootstrap = ExecutionCoordinator::open(
5142 db.path(),
5143 Arc::clone(&schema_manager),
5144 None,
5145 1,
5146 Arc::new(TelemetryCounters::default()),
5147 None,
5148 )
5149 .expect("bootstrap coordinator");
5150 drop(bootstrap);
5151 }
5152
5153 {
5155 let conn = rusqlite::Connection::open(db.path()).expect("open db");
5156 conn.execute(
5157 "INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
5158 VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
5159 [],
5160 )
5161 .expect("insert profile");
5162 conn.execute_batch(
5163 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
5164 VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
5165 INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5166 VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
5167 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
5168 VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
5169 )
5170 .expect("insert node and fts row");
5171 }
5172
5173 let coordinator = ExecutionCoordinator::open(
5175 db.path(),
5176 Arc::clone(&schema_manager),
5177 None,
5178 1,
5179 Arc::new(TelemetryCounters::default()),
5180 None,
5181 )
5182 .expect("coordinator reopen");
5183
5184 let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
5186 let compiled = compile_search(ast.ast()).expect("compile search");
5187 let rows = coordinator
5188 .execute_compiled_search(&compiled)
5189 .expect("source code search must not error");
5190 assert!(
5191 !rows.hits.is_empty(),
5192 "SourceCode strategy search for 'std.io' must return the document; \
5193 got empty — FTS5 expression was likely corrupted by post-render escaping"
5194 );
5195 }
5196
5197 #[derive(Debug)]
5200 struct StubEmbedder {
5201 model_identity: String,
5202 dimension: usize,
5203 }
5204
5205 impl StubEmbedder {
5206 fn new(model_identity: &str, dimension: usize) -> Self {
5207 Self {
5208 model_identity: model_identity.to_owned(),
5209 dimension,
5210 }
5211 }
5212 }
5213
5214 impl crate::embedder::QueryEmbedder for StubEmbedder {
5215 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
5216 Ok(vec![0.0; self.dimension])
5217 }
5218 fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
5219 crate::embedder::QueryEmbedderIdentity {
5220 model_identity: self.model_identity.clone(),
5221 model_version: "1.0".to_owned(),
5222 dimension: self.dimension,
5223 normalization_policy: "l2".to_owned(),
5224 }
5225 }
5226 }
5227
5228 fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
5229 let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
5230 conn.execute_batch(
5231 "CREATE TABLE IF NOT EXISTS projection_profiles (
5232 kind TEXT NOT NULL,
5233 facet TEXT NOT NULL,
5234 config_json TEXT NOT NULL,
5235 active_at INTEGER,
5236 created_at INTEGER,
5237 PRIMARY KEY (kind, facet)
5238 );",
5239 )
5240 .expect("create projection_profiles");
5241 conn
5242 }
5243
5244 #[test]
5245 fn check_vec_identity_no_profile_no_panic() {
5246 let conn = make_in_memory_db_with_projection_profiles();
5247 let embedder = StubEmbedder::new("bge-small", 384);
5248 let result = super::check_vec_identity_at_open(&conn, &embedder);
5249 assert!(result.is_ok(), "no profile row must return Ok(())");
5250 }
5251
5252 #[test]
5253 fn check_vec_identity_matching_identity_ok() {
5254 let conn = make_in_memory_db_with_projection_profiles();
5255 conn.execute(
5256 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5257 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5258 [],
5259 )
5260 .expect("insert profile");
5261 let embedder = StubEmbedder::new("bge-small", 384);
5262 let result = super::check_vec_identity_at_open(&conn, &embedder);
5263 assert!(result.is_ok(), "matching profile must return Ok(())");
5264 }
5265
5266 #[test]
5267 fn check_vec_identity_mismatched_dimensions_ok() {
5268 let conn = make_in_memory_db_with_projection_profiles();
5269 conn.execute(
5270 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5271 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5272 [],
5273 )
5274 .expect("insert profile");
5275 let embedder = StubEmbedder::new("bge-small", 768);
5277 let result = super::check_vec_identity_at_open(&conn, &embedder);
5278 assert!(
5279 result.is_ok(),
5280 "dimension mismatch must warn and return Ok(())"
5281 );
5282 }
5283
5284 #[test]
5285 fn custom_tokenizer_passthrough() {
5286 use super::TokenizerStrategy;
5287 let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
5288 assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
5290 assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
5292 assert_ne!(strategy, TokenizerStrategy::SourceCode);
5293 }
5294
5295 #[test]
5296 fn check_vec_identity_mismatched_model_ok() {
5297 let conn = make_in_memory_db_with_projection_profiles();
5298 conn.execute(
5299 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
5300 VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
5301 [],
5302 )
5303 .expect("insert profile");
5304 let embedder = StubEmbedder::new("bge-large", 384);
5306 let result = super::check_vec_identity_at_open(&conn, &embedder);
5307 assert!(
5308 result.is_ok(),
5309 "model_identity mismatch must warn and return Ok(())"
5310 );
5311 }
5312}