1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10 FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11 SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25const BATCH_CHUNK_SIZE: usize = 200;
30
31struct ReadPool {
36 connections: Vec<Mutex<Connection>>,
37}
38
39impl fmt::Debug for ReadPool {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("ReadPool")
42 .field("size", &self.connections.len())
43 .finish()
44 }
45}
46
47impl ReadPool {
48 fn new(
59 db_path: &Path,
60 pool_size: usize,
61 schema_manager: &SchemaManager,
62 vector_enabled: bool,
63 ) -> Result<Self, EngineError> {
64 let mut connections = Vec::with_capacity(pool_size);
65 for _ in 0..pool_size {
66 let conn = if vector_enabled {
67 #[cfg(feature = "sqlite-vec")]
68 {
69 sqlite::open_readonly_connection_with_vec(db_path)?
70 }
71 #[cfg(not(feature = "sqlite-vec"))]
72 {
73 sqlite::open_readonly_connection(db_path)?
74 }
75 } else {
76 sqlite::open_readonly_connection(db_path)?
77 };
78 schema_manager
79 .initialize_reader_connection(&conn)
80 .map_err(EngineError::Schema)?;
81 connections.push(Mutex::new(conn));
82 }
83 Ok(Self { connections })
84 }
85
86 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
95 for conn in &self.connections {
97 if let Ok(guard) = conn.try_lock() {
98 return Ok(guard);
99 }
100 }
101 self.connections[0].lock().map_err(|_| {
103 trace_error!("read pool: connection mutex poisoned");
104 EngineError::Bridge("connection mutex poisoned".to_owned())
105 })
106 }
107
108 #[cfg(test)]
110 fn size(&self) -> usize {
111 self.connections.len()
112 }
113}
114
115#[derive(Clone, Debug, PartialEq, Eq)]
119pub struct QueryPlan {
120 pub sql: String,
121 pub bind_count: usize,
122 pub driving_table: DrivingTable,
123 pub shape_hash: ShapeHash,
124 pub cache_hit: bool,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct NodeRow {
130 pub row_id: String,
132 pub logical_id: String,
134 pub kind: String,
136 pub properties: String,
138 pub content_ref: Option<String>,
140 pub last_accessed_at: Option<i64>,
142}
143
144#[derive(Clone, Debug, PartialEq, Eq)]
146pub struct RunRow {
147 pub id: String,
149 pub kind: String,
151 pub status: String,
153 pub properties: String,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepRow {
160 pub id: String,
162 pub run_id: String,
164 pub kind: String,
166 pub status: String,
168 pub properties: String,
170}
171
172#[derive(Clone, Debug, PartialEq, Eq)]
174pub struct ActionRow {
175 pub id: String,
177 pub step_id: String,
179 pub kind: String,
181 pub status: String,
183 pub properties: String,
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
189pub struct ProvenanceEvent {
190 pub id: String,
191 pub event_type: String,
192 pub subject: String,
193 pub source_ref: Option<String>,
194 pub metadata_json: String,
195 pub created_at: i64,
196}
197
198#[derive(Clone, Debug, Default, PartialEq, Eq)]
200pub struct QueryRows {
201 pub nodes: Vec<NodeRow>,
203 pub runs: Vec<RunRow>,
205 pub steps: Vec<StepRow>,
207 pub actions: Vec<ActionRow>,
209 pub was_degraded: bool,
212}
213
214#[derive(Clone, Debug, PartialEq, Eq)]
216pub struct ExpansionRootRows {
217 pub root_logical_id: String,
219 pub nodes: Vec<NodeRow>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct ExpansionSlotRows {
226 pub slot: String,
228 pub roots: Vec<ExpansionRootRows>,
230}
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
234pub struct GroupedQueryRows {
235 pub roots: Vec<NodeRow>,
237 pub expansions: Vec<ExpansionSlotRows>,
239 pub was_degraded: bool,
241}
242
243pub struct ExecutionCoordinator {
245 database_path: PathBuf,
246 schema_manager: Arc<SchemaManager>,
247 pool: ReadPool,
248 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
249 vector_enabled: bool,
250 vec_degradation_warned: AtomicBool,
251 telemetry: Arc<TelemetryCounters>,
252 query_embedder: Option<Arc<dyn QueryEmbedder>>,
259}
260
261impl fmt::Debug for ExecutionCoordinator {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("ExecutionCoordinator")
264 .field("database_path", &self.database_path)
265 .finish_non_exhaustive()
266 }
267}
268
269impl ExecutionCoordinator {
270 pub fn open(
273 path: impl AsRef<Path>,
274 schema_manager: Arc<SchemaManager>,
275 vector_dimension: Option<usize>,
276 pool_size: usize,
277 telemetry: Arc<TelemetryCounters>,
278 query_embedder: Option<Arc<dyn QueryEmbedder>>,
279 ) -> Result<Self, EngineError> {
280 let path = path.as_ref().to_path_buf();
281 #[cfg(feature = "sqlite-vec")]
282 let conn = if vector_dimension.is_some() {
283 sqlite::open_connection_with_vec(&path)?
284 } else {
285 sqlite::open_connection(&path)?
286 };
287 #[cfg(not(feature = "sqlite-vec"))]
288 let conn = sqlite::open_connection(&path)?;
289
290 let report = schema_manager.bootstrap(&conn)?;
291
292 let needs_property_fts_rebuild = {
328 let schema_count: i64 =
329 conn.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
330 row.get(0)
331 })?;
332 if schema_count == 0 {
333 false
334 } else {
335 let fts_count: i64 =
336 conn.query_row("SELECT COUNT(*) FROM fts_node_properties", [], |row| {
337 row.get(0)
338 })?;
339 fts_count == 0
340 }
341 };
342 let needs_position_backfill = {
348 let recursive_schema_count: i64 = conn.query_row(
357 "SELECT COUNT(*) FROM fts_property_schemas \
358 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
359 [],
360 |row| row.get(0),
361 )?;
362 if recursive_schema_count == 0 {
363 false
364 } else {
365 let position_count: i64 = conn.query_row(
366 "SELECT COUNT(*) FROM fts_node_property_positions",
367 [],
368 |row| row.get(0),
369 )?;
370 position_count == 0
371 }
372 };
373 if needs_property_fts_rebuild || needs_position_backfill {
374 let tx = conn.unchecked_transaction()?;
375 tx.execute("DELETE FROM fts_node_properties", [])?;
376 tx.execute("DELETE FROM fts_node_property_positions", [])?;
377 crate::projection::insert_property_fts_rows(
378 &tx,
379 "SELECT logical_id, properties FROM nodes \
380 WHERE kind = ?1 AND superseded_at IS NULL",
381 )?;
382 tx.commit()?;
383 }
384
385 #[cfg(feature = "sqlite-vec")]
386 let mut vector_enabled = report.vector_profile_enabled;
387 #[cfg(not(feature = "sqlite-vec"))]
388 let vector_enabled = {
389 let _ = &report;
390 false
391 };
392
393 if let Some(dim) = vector_dimension {
394 schema_manager
395 .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
396 .map_err(EngineError::Schema)?;
397 #[cfg(feature = "sqlite-vec")]
399 {
400 vector_enabled = true;
401 }
402 }
403
404 drop(conn);
406
407 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
408
409 Ok(Self {
410 database_path: path,
411 schema_manager,
412 pool,
413 shape_sql_map: Mutex::new(HashMap::new()),
414 vector_enabled,
415 vec_degradation_warned: AtomicBool::new(false),
416 telemetry,
417 query_embedder,
418 })
419 }
420
421 pub fn database_path(&self) -> &Path {
423 &self.database_path
424 }
425
426 #[must_use]
428 pub fn vector_enabled(&self) -> bool {
429 self.vector_enabled
430 }
431
432 #[must_use]
439 pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
440 self.query_embedder.as_ref()
441 }
442
443 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
444 self.pool.acquire()
445 }
446
447 #[must_use]
453 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
454 let mut total = SqliteCacheStatus::default();
455 for conn_mutex in &self.pool.connections {
456 if let Ok(conn) = conn_mutex.try_lock() {
457 total.add(&read_db_cache_status(&conn));
458 }
459 }
460 total
461 }
462
463 #[allow(clippy::expect_used)]
466 pub fn execute_compiled_read(
467 &self,
468 compiled: &CompiledQuery,
469 ) -> Result<QueryRows, EngineError> {
470 let row_sql = wrap_node_row_projection_sql(&compiled.sql);
471 {
477 let mut cache = self
478 .shape_sql_map
479 .lock()
480 .unwrap_or_else(PoisonError::into_inner);
481 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
482 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
483 cache.clear();
484 }
485 cache.insert(compiled.shape_hash, row_sql.clone());
486 }
487
488 let bind_values = compiled
489 .binds
490 .iter()
491 .map(bind_value_to_sql)
492 .collect::<Vec<_>>();
493
494 let conn_guard = match self.lock_connection() {
499 Ok(g) => g,
500 Err(e) => {
501 self.telemetry.increment_errors();
502 return Err(e);
503 }
504 };
505 let mut statement = match conn_guard.prepare_cached(&row_sql) {
506 Ok(stmt) => stmt,
507 Err(e) if is_vec_table_absent(&e) => {
508 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
509 trace_warn!("vector table absent, degrading to non-vector query");
510 }
511 return Ok(QueryRows {
512 was_degraded: true,
513 ..Default::default()
514 });
515 }
516 Err(e) => {
517 self.telemetry.increment_errors();
518 return Err(EngineError::Sqlite(e));
519 }
520 };
521 let nodes = match statement
522 .query_map(params_from_iter(bind_values.iter()), |row| {
523 Ok(NodeRow {
524 row_id: row.get(0)?,
525 logical_id: row.get(1)?,
526 kind: row.get(2)?,
527 properties: row.get(3)?,
528 content_ref: row.get(4)?,
529 last_accessed_at: row.get(5)?,
530 })
531 })
532 .and_then(Iterator::collect)
533 {
534 Ok(rows) => rows,
535 Err(e) => {
536 self.telemetry.increment_errors();
537 return Err(EngineError::Sqlite(e));
538 }
539 };
540
541 self.telemetry.increment_queries();
542 Ok(QueryRows {
543 nodes,
544 runs: Vec::new(),
545 steps: Vec::new(),
546 actions: Vec::new(),
547 was_degraded: false,
548 })
549 }
550
551 pub fn execute_compiled_search(
566 &self,
567 compiled: &CompiledSearch,
568 ) -> Result<SearchRows, EngineError> {
569 let (relaxed_query, was_degraded_at_plan_time) =
576 fathomdb_query::derive_relaxed(&compiled.text_query);
577 let relaxed = relaxed_query.map(|q| CompiledSearch {
578 root_kind: compiled.root_kind.clone(),
579 text_query: q,
580 limit: compiled.limit,
581 fusable_filters: compiled.fusable_filters.clone(),
582 residual_filters: compiled.residual_filters.clone(),
583 attribution_requested: compiled.attribution_requested,
584 });
585 let plan = CompiledSearchPlan {
586 strict: compiled.clone(),
587 relaxed,
588 was_degraded_at_plan_time,
589 };
590 self.execute_compiled_search_plan(&plan)
591 }
592
593 pub fn execute_compiled_search_plan(
612 &self,
613 plan: &CompiledSearchPlan,
614 ) -> Result<SearchRows, EngineError> {
615 let strict = &plan.strict;
616 let limit = strict.limit;
617 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
618
619 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
620 let strict_underfilled = strict_hits.len() < fallback_threshold;
621
622 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
623 let mut fallback_used = false;
624 let mut was_degraded = false;
625 if let Some(relaxed) = plan.relaxed.as_ref()
626 && strict_underfilled
627 {
628 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
629 fallback_used = true;
630 was_degraded = plan.was_degraded_at_plan_time;
631 }
632
633 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
634 if strict.attribution_requested {
638 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
639 self.populate_attribution_for_hits(
640 &mut merged,
641 &strict.text_query,
642 relaxed_text_query,
643 )?;
644 }
645 let strict_hit_count = merged
646 .iter()
647 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
648 .count();
649 let relaxed_hit_count = merged
650 .iter()
651 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
652 .count();
653 let vector_hit_count = 0;
657
658 Ok(SearchRows {
659 hits: merged,
660 strict_hit_count,
661 relaxed_hit_count,
662 vector_hit_count,
663 fallback_used,
664 was_degraded,
665 })
666 }
667
668 #[allow(clippy::too_many_lines)]
697 pub fn execute_compiled_vector_search(
698 &self,
699 compiled: &CompiledVectorSearch,
700 ) -> Result<SearchRows, EngineError> {
701 use std::fmt::Write as _;
702
703 if compiled.limit == 0 {
707 return Ok(SearchRows::default());
708 }
709
710 let filter_by_kind = !compiled.root_kind.is_empty();
711 let mut binds: Vec<BindValue> = Vec::new();
712 binds.push(BindValue::Text(compiled.query_text.clone()));
713 if filter_by_kind {
714 binds.push(BindValue::Text(compiled.root_kind.clone()));
715 }
716
717 let mut fused_clauses = String::new();
720 for predicate in &compiled.fusable_filters {
721 match predicate {
722 Predicate::KindEq(kind) => {
723 binds.push(BindValue::Text(kind.clone()));
724 let idx = binds.len();
725 let _ = write!(
726 fused_clauses,
727 "\n AND src.kind = ?{idx}"
728 );
729 }
730 Predicate::LogicalIdEq(logical_id) => {
731 binds.push(BindValue::Text(logical_id.clone()));
732 let idx = binds.len();
733 let _ = write!(
734 fused_clauses,
735 "\n AND src.logical_id = ?{idx}"
736 );
737 }
738 Predicate::SourceRefEq(source_ref) => {
739 binds.push(BindValue::Text(source_ref.clone()));
740 let idx = binds.len();
741 let _ = write!(
742 fused_clauses,
743 "\n AND src.source_ref = ?{idx}"
744 );
745 }
746 Predicate::ContentRefEq(uri) => {
747 binds.push(BindValue::Text(uri.clone()));
748 let idx = binds.len();
749 let _ = write!(
750 fused_clauses,
751 "\n AND src.content_ref = ?{idx}"
752 );
753 }
754 Predicate::ContentRefNotNull => {
755 fused_clauses
756 .push_str("\n AND src.content_ref IS NOT NULL");
757 }
758 Predicate::JsonPathFusedEq { path, value } => {
759 binds.push(BindValue::Text(path.clone()));
760 let path_idx = binds.len();
761 binds.push(BindValue::Text(value.clone()));
762 let value_idx = binds.len();
763 let _ = write!(
764 fused_clauses,
765 "\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
766 );
767 }
768 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
769 binds.push(BindValue::Text(path.clone()));
770 let path_idx = binds.len();
771 binds.push(BindValue::Integer(*value));
772 let value_idx = binds.len();
773 let operator = match op {
774 ComparisonOp::Gt => ">",
775 ComparisonOp::Gte => ">=",
776 ComparisonOp::Lt => "<",
777 ComparisonOp::Lte => "<=",
778 };
779 let _ = write!(
780 fused_clauses,
781 "\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
782 );
783 }
784 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
785 }
788 }
789 }
790
791 let mut filter_clauses = String::new();
793 for predicate in &compiled.residual_filters {
794 match predicate {
795 Predicate::JsonPathEq { path, value } => {
796 binds.push(BindValue::Text(path.clone()));
797 let path_idx = binds.len();
798 binds.push(scalar_to_bind(value));
799 let value_idx = binds.len();
800 let _ = write!(
801 filter_clauses,
802 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
803 );
804 }
805 Predicate::JsonPathCompare { path, op, value } => {
806 binds.push(BindValue::Text(path.clone()));
807 let path_idx = binds.len();
808 binds.push(scalar_to_bind(value));
809 let value_idx = binds.len();
810 let operator = match op {
811 ComparisonOp::Gt => ">",
812 ComparisonOp::Gte => ">=",
813 ComparisonOp::Lt => "<",
814 ComparisonOp::Lte => "<=",
815 };
816 let _ = write!(
817 filter_clauses,
818 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
819 );
820 }
821 Predicate::KindEq(_)
822 | Predicate::LogicalIdEq(_)
823 | Predicate::SourceRefEq(_)
824 | Predicate::ContentRefEq(_)
825 | Predicate::ContentRefNotNull
826 | Predicate::JsonPathFusedEq { .. }
827 | Predicate::JsonPathFusedTimestampCmp { .. } => {
828 }
830 }
831 }
832
833 let limit = compiled.limit;
836 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
837 let limit_idx = binds.len();
838
839 let base_limit = limit;
845 let kind_clause = if filter_by_kind {
846 "\n AND src.kind = ?2"
847 } else {
848 ""
849 };
850
851 let sql = format!(
852 "WITH vector_hits AS (
853 SELECT
854 src.row_id AS row_id,
855 src.logical_id AS logical_id,
856 src.kind AS kind,
857 src.properties AS properties,
858 src.source_ref AS source_ref,
859 src.content_ref AS content_ref,
860 src.created_at AS created_at,
861 vc.distance AS distance,
862 vc.chunk_id AS chunk_id
863 FROM (
864 SELECT chunk_id, distance
865 FROM vec_nodes_active
866 WHERE embedding MATCH ?1
867 LIMIT {base_limit}
868 ) vc
869 JOIN chunks c ON c.id = vc.chunk_id
870 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
871 WHERE 1 = 1{kind_clause}{fused_clauses}
872 )
873 SELECT
874 h.row_id,
875 h.logical_id,
876 h.kind,
877 h.properties,
878 h.content_ref,
879 am.last_accessed_at,
880 h.created_at,
881 h.distance,
882 h.chunk_id
883 FROM vector_hits h
884 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
885 WHERE 1 = 1{filter_clauses}
886 ORDER BY h.distance ASC
887 LIMIT ?{limit_idx}"
888 );
889
890 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
891
892 let conn_guard = match self.lock_connection() {
893 Ok(g) => g,
894 Err(e) => {
895 self.telemetry.increment_errors();
896 return Err(e);
897 }
898 };
899 let mut statement = match conn_guard.prepare_cached(&sql) {
900 Ok(stmt) => stmt,
901 Err(e) if is_vec_table_absent(&e) => {
902 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
904 trace_warn!("vector table absent, degrading vector_search to empty result");
905 }
906 return Ok(SearchRows {
907 hits: Vec::new(),
908 strict_hit_count: 0,
909 relaxed_hit_count: 0,
910 vector_hit_count: 0,
911 fallback_used: false,
912 was_degraded: true,
913 });
914 }
915 Err(e) => {
916 self.telemetry.increment_errors();
917 return Err(EngineError::Sqlite(e));
918 }
919 };
920
921 let attribution_requested = compiled.attribution_requested;
922 let hits = match statement
923 .query_map(params_from_iter(bind_values.iter()), |row| {
924 let distance: f64 = row.get(7)?;
925 let score = -distance;
932 Ok(SearchHit {
933 node: fathomdb_query::NodeRowLite {
934 row_id: row.get(0)?,
935 logical_id: row.get(1)?,
936 kind: row.get(2)?,
937 properties: row.get(3)?,
938 content_ref: row.get(4)?,
939 last_accessed_at: row.get(5)?,
940 },
941 written_at: row.get(6)?,
942 score,
943 modality: RetrievalModality::Vector,
944 source: SearchHitSource::Vector,
945 match_mode: None,
947 snippet: None,
949 projection_row_id: row.get::<_, Option<String>>(8)?,
950 vector_distance: Some(distance),
951 attribution: if attribution_requested {
952 Some(HitAttribution {
953 matched_paths: Vec::new(),
954 })
955 } else {
956 None
957 },
958 })
959 })
960 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
961 {
962 Ok(rows) => rows,
963 Err(e) => {
964 if is_vec_table_absent(&e) {
968 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
969 trace_warn!(
970 "vector table absent at query time, degrading vector_search to empty result"
971 );
972 }
973 drop(statement);
974 drop(conn_guard);
975 return Ok(SearchRows {
976 hits: Vec::new(),
977 strict_hit_count: 0,
978 relaxed_hit_count: 0,
979 vector_hit_count: 0,
980 fallback_used: false,
981 was_degraded: true,
982 });
983 }
984 self.telemetry.increment_errors();
985 return Err(EngineError::Sqlite(e));
986 }
987 };
988
989 drop(statement);
990 drop(conn_guard);
991
992 self.telemetry.increment_queries();
993 let vector_hit_count = hits.len();
994 Ok(SearchRows {
995 hits,
996 strict_hit_count: 0,
997 relaxed_hit_count: 0,
998 vector_hit_count,
999 fallback_used: false,
1000 was_degraded: false,
1001 })
1002 }
1003
1004 pub fn execute_retrieval_plan(
1036 &self,
1037 plan: &CompiledRetrievalPlan,
1038 raw_query: &str,
1039 ) -> Result<SearchRows, EngineError> {
1040 let mut plan = plan.clone();
1046 let limit = plan.text.strict.limit;
1047
1048 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1050
1051 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1054 let strict_underfilled = strict_hits.len() < fallback_threshold;
1055 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1056 let mut fallback_used = false;
1057 let mut was_degraded = false;
1058 if let Some(relaxed) = plan.text.relaxed.as_ref()
1059 && strict_underfilled
1060 {
1061 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1062 fallback_used = true;
1063 was_degraded = plan.was_degraded_at_plan_time;
1064 }
1065
1066 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1073 if text_branches_empty && self.query_embedder.is_some() {
1074 self.fill_vector_branch(&mut plan, raw_query);
1075 }
1076
1077 let mut vector_hits: Vec<SearchHit> = Vec::new();
1082 if let Some(vector) = plan.vector.as_ref()
1083 && strict_hits.is_empty()
1084 && relaxed_hits.is_empty()
1085 {
1086 let vector_rows = self.execute_compiled_vector_search(vector)?;
1087 vector_hits = vector_rows.hits;
1092 if vector_rows.was_degraded {
1093 was_degraded = true;
1094 }
1095 }
1096 if text_branches_empty
1103 && plan.was_degraded_at_plan_time
1104 && plan.vector.is_none()
1105 && self.query_embedder.is_some()
1106 {
1107 was_degraded = true;
1108 }
1109
1110 let strict = &plan.text.strict;
1112 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1113 if strict.attribution_requested {
1114 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1115 self.populate_attribution_for_hits(
1116 &mut merged,
1117 &strict.text_query,
1118 relaxed_text_query,
1119 )?;
1120 }
1121
1122 let strict_hit_count = merged
1123 .iter()
1124 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1125 .count();
1126 let relaxed_hit_count = merged
1127 .iter()
1128 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1129 .count();
1130 let vector_hit_count = merged
1131 .iter()
1132 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1133 .count();
1134
1135 Ok(SearchRows {
1136 hits: merged,
1137 strict_hit_count,
1138 relaxed_hit_count,
1139 vector_hit_count,
1140 fallback_used,
1141 was_degraded,
1142 })
1143 }
1144
1145 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1161 let Some(embedder) = self.query_embedder.as_ref() else {
1162 return;
1163 };
1164 match embedder.embed_query(raw_query) {
1165 Ok(vec) => {
1166 let literal = match serde_json::to_string(&vec) {
1172 Ok(s) => s,
1173 Err(err) => {
1174 trace_warn!(
1175 error = %err,
1176 "query embedder vector serialization failed; skipping vector branch"
1177 );
1178 let _ = err; plan.was_degraded_at_plan_time = true;
1180 return;
1181 }
1182 };
1183 let strict = &plan.text.strict;
1184 plan.vector = Some(CompiledVectorSearch {
1185 root_kind: strict.root_kind.clone(),
1186 query_text: literal,
1187 limit: strict.limit,
1188 fusable_filters: strict.fusable_filters.clone(),
1189 residual_filters: strict.residual_filters.clone(),
1190 attribution_requested: strict.attribution_requested,
1191 });
1192 }
1193 Err(err) => {
1194 trace_warn!(
1195 error = %err,
1196 "query embedder unavailable, skipping vector branch"
1197 );
1198 let _ = err; plan.was_degraded_at_plan_time = true;
1200 }
1201 }
1202 }
1203
1204 #[allow(clippy::too_many_lines)]
1213 fn run_search_branch(
1214 &self,
1215 compiled: &CompiledSearch,
1216 branch: SearchBranch,
1217 ) -> Result<Vec<SearchHit>, EngineError> {
1218 use std::fmt::Write as _;
1219 if matches!(
1231 compiled.text_query,
1232 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1233 ) {
1234 return Ok(Vec::new());
1235 }
1236 let rendered = render_text_query_fts5(&compiled.text_query);
1237 let filter_by_kind = !compiled.root_kind.is_empty();
1243 let mut binds: Vec<BindValue> = if filter_by_kind {
1244 vec![
1245 BindValue::Text(rendered.clone()),
1246 BindValue::Text(compiled.root_kind.clone()),
1247 BindValue::Text(rendered),
1248 BindValue::Text(compiled.root_kind.clone()),
1249 ]
1250 } else {
1251 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1252 };
1253
1254 let mut fused_clauses = String::new();
1263 for predicate in &compiled.fusable_filters {
1264 match predicate {
1265 Predicate::KindEq(kind) => {
1266 binds.push(BindValue::Text(kind.clone()));
1267 let idx = binds.len();
1268 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1269 }
1270 Predicate::LogicalIdEq(logical_id) => {
1271 binds.push(BindValue::Text(logical_id.clone()));
1272 let idx = binds.len();
1273 let _ = write!(
1274 fused_clauses,
1275 "\n AND u.logical_id = ?{idx}"
1276 );
1277 }
1278 Predicate::SourceRefEq(source_ref) => {
1279 binds.push(BindValue::Text(source_ref.clone()));
1280 let idx = binds.len();
1281 let _ = write!(
1282 fused_clauses,
1283 "\n AND u.source_ref = ?{idx}"
1284 );
1285 }
1286 Predicate::ContentRefEq(uri) => {
1287 binds.push(BindValue::Text(uri.clone()));
1288 let idx = binds.len();
1289 let _ = write!(
1290 fused_clauses,
1291 "\n AND u.content_ref = ?{idx}"
1292 );
1293 }
1294 Predicate::ContentRefNotNull => {
1295 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1296 }
1297 Predicate::JsonPathFusedEq { path, value } => {
1298 binds.push(BindValue::Text(path.clone()));
1299 let path_idx = binds.len();
1300 binds.push(BindValue::Text(value.clone()));
1301 let value_idx = binds.len();
1302 let _ = write!(
1303 fused_clauses,
1304 "\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
1305 );
1306 }
1307 Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
1308 binds.push(BindValue::Text(path.clone()));
1309 let path_idx = binds.len();
1310 binds.push(BindValue::Integer(*value));
1311 let value_idx = binds.len();
1312 let operator = match op {
1313 ComparisonOp::Gt => ">",
1314 ComparisonOp::Gte => ">=",
1315 ComparisonOp::Lt => "<",
1316 ComparisonOp::Lte => "<=",
1317 };
1318 let _ = write!(
1319 fused_clauses,
1320 "\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
1321 );
1322 }
1323 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1324 }
1327 }
1328 }
1329
1330 let mut filter_clauses = String::new();
1331 for predicate in &compiled.residual_filters {
1332 match predicate {
1333 Predicate::JsonPathEq { path, value } => {
1334 binds.push(BindValue::Text(path.clone()));
1335 let path_idx = binds.len();
1336 binds.push(scalar_to_bind(value));
1337 let value_idx = binds.len();
1338 let _ = write!(
1339 filter_clauses,
1340 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1341 );
1342 }
1343 Predicate::JsonPathCompare { path, op, value } => {
1344 binds.push(BindValue::Text(path.clone()));
1345 let path_idx = binds.len();
1346 binds.push(scalar_to_bind(value));
1347 let value_idx = binds.len();
1348 let operator = match op {
1349 ComparisonOp::Gt => ">",
1350 ComparisonOp::Gte => ">=",
1351 ComparisonOp::Lt => "<",
1352 ComparisonOp::Lte => "<=",
1353 };
1354 let _ = write!(
1355 filter_clauses,
1356 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1357 );
1358 }
1359 Predicate::KindEq(_)
1360 | Predicate::LogicalIdEq(_)
1361 | Predicate::SourceRefEq(_)
1362 | Predicate::ContentRefEq(_)
1363 | Predicate::ContentRefNotNull
1364 | Predicate::JsonPathFusedEq { .. }
1365 | Predicate::JsonPathFusedTimestampCmp { .. } => {
1366 }
1369 }
1370 }
1371
1372 let limit = compiled.limit;
1379 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1380 let limit_idx = binds.len();
1381 let (chunk_fts_bind, chunk_kind_clause, prop_fts_bind, prop_kind_clause) = if filter_by_kind
1391 {
1392 (
1393 "?1",
1394 "\n AND src.kind = ?2",
1395 "?3",
1396 "\n AND fp.kind = ?4",
1397 )
1398 } else {
1399 ("?1", "", "?2", "")
1400 };
1401 let sql = format!(
1402 "WITH search_hits AS (
1403 SELECT
1404 u.row_id AS row_id,
1405 u.logical_id AS logical_id,
1406 u.kind AS kind,
1407 u.properties AS properties,
1408 u.source_ref AS source_ref,
1409 u.content_ref AS content_ref,
1410 u.created_at AS created_at,
1411 u.score AS score,
1412 u.source AS source,
1413 u.snippet AS snippet,
1414 u.projection_row_id AS projection_row_id
1415 FROM (
1416 SELECT
1417 src.row_id AS row_id,
1418 c.node_logical_id AS logical_id,
1419 src.kind AS kind,
1420 src.properties AS properties,
1421 src.source_ref AS source_ref,
1422 src.content_ref AS content_ref,
1423 src.created_at AS created_at,
1424 -bm25(fts_nodes) AS score,
1425 'chunk' AS source,
1426 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1427 f.chunk_id AS projection_row_id
1428 FROM fts_nodes f
1429 JOIN chunks c ON c.id = f.chunk_id
1430 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1431 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}
1432 UNION ALL
1433 SELECT
1434 src.row_id AS row_id,
1435 fp.node_logical_id AS logical_id,
1436 src.kind AS kind,
1437 src.properties AS properties,
1438 src.source_ref AS source_ref,
1439 src.content_ref AS content_ref,
1440 src.created_at AS created_at,
1441 -bm25(fts_node_properties) AS score,
1442 'property' AS source,
1443 substr(fp.text_content, 1, 200) AS snippet,
1444 CAST(fp.rowid AS TEXT) AS projection_row_id
1445 FROM fts_node_properties fp
1446 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1447 WHERE fts_node_properties MATCH {prop_fts_bind}{prop_kind_clause}
1448 ) u
1449 WHERE 1 = 1{fused_clauses}
1450 ORDER BY u.score DESC
1451 LIMIT ?{limit_idx}
1452 )
1453 SELECT
1454 h.row_id,
1455 h.logical_id,
1456 h.kind,
1457 h.properties,
1458 h.content_ref,
1459 am.last_accessed_at,
1460 h.created_at,
1461 h.score,
1462 h.source,
1463 h.snippet,
1464 h.projection_row_id
1465 FROM search_hits h
1466 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1467 WHERE 1 = 1{filter_clauses}
1468 ORDER BY h.score DESC"
1469 );
1470
1471 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1472
1473 let conn_guard = match self.lock_connection() {
1474 Ok(g) => g,
1475 Err(e) => {
1476 self.telemetry.increment_errors();
1477 return Err(e);
1478 }
1479 };
1480 let mut statement = match conn_guard.prepare_cached(&sql) {
1481 Ok(stmt) => stmt,
1482 Err(e) => {
1483 self.telemetry.increment_errors();
1484 return Err(EngineError::Sqlite(e));
1485 }
1486 };
1487
1488 let hits = match statement
1489 .query_map(params_from_iter(bind_values.iter()), |row| {
1490 let source_str: String = row.get(8)?;
1491 let source = if source_str == "property" {
1496 SearchHitSource::Property
1497 } else {
1498 SearchHitSource::Chunk
1499 };
1500 let match_mode = match branch {
1501 SearchBranch::Strict => SearchMatchMode::Strict,
1502 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1503 };
1504 Ok(SearchHit {
1505 node: fathomdb_query::NodeRowLite {
1506 row_id: row.get(0)?,
1507 logical_id: row.get(1)?,
1508 kind: row.get(2)?,
1509 properties: row.get(3)?,
1510 content_ref: row.get(4)?,
1511 last_accessed_at: row.get(5)?,
1512 },
1513 written_at: row.get(6)?,
1514 score: row.get(7)?,
1515 modality: RetrievalModality::Text,
1517 source,
1518 match_mode: Some(match_mode),
1519 snippet: row.get(9)?,
1520 projection_row_id: row.get(10)?,
1521 vector_distance: None,
1522 attribution: None,
1523 })
1524 })
1525 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1526 {
1527 Ok(rows) => rows,
1528 Err(e) => {
1529 self.telemetry.increment_errors();
1530 return Err(EngineError::Sqlite(e));
1531 }
1532 };
1533
1534 drop(statement);
1538 drop(conn_guard);
1539
1540 self.telemetry.increment_queries();
1541 Ok(hits)
1542 }
1543
1544 fn populate_attribution_for_hits(
1548 &self,
1549 hits: &mut [SearchHit],
1550 strict_text_query: &fathomdb_query::TextQuery,
1551 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1552 ) -> Result<(), EngineError> {
1553 let conn_guard = match self.lock_connection() {
1554 Ok(g) => g,
1555 Err(e) => {
1556 self.telemetry.increment_errors();
1557 return Err(e);
1558 }
1559 };
1560 let strict_expr = render_text_query_fts5(strict_text_query);
1561 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1562 for hit in hits.iter_mut() {
1563 let match_expr = match hit.match_mode {
1568 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1569 Some(SearchMatchMode::Relaxed) => {
1570 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1571 }
1572 None => continue,
1573 };
1574 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1575 Ok(att) => hit.attribution = Some(att),
1576 Err(e) => {
1577 self.telemetry.increment_errors();
1578 return Err(e);
1579 }
1580 }
1581 }
1582 Ok(())
1583 }
1584
1585 pub fn execute_compiled_grouped_read(
1589 &self,
1590 compiled: &CompiledGroupedQuery,
1591 ) -> Result<GroupedQueryRows, EngineError> {
1592 let root_rows = self.execute_compiled_read(&compiled.root)?;
1593 if root_rows.was_degraded {
1594 return Ok(GroupedQueryRows {
1595 roots: Vec::new(),
1596 expansions: Vec::new(),
1597 was_degraded: true,
1598 });
1599 }
1600
1601 let roots = root_rows.nodes;
1602 let mut expansions = Vec::with_capacity(compiled.expansions.len());
1603 for expansion in &compiled.expansions {
1604 let slot_rows = if roots.is_empty() {
1605 Vec::new()
1606 } else {
1607 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1608 };
1609 expansions.push(ExpansionSlotRows {
1610 slot: expansion.slot.clone(),
1611 roots: slot_rows,
1612 });
1613 }
1614
1615 Ok(GroupedQueryRows {
1616 roots,
1617 expansions,
1618 was_degraded: false,
1619 })
1620 }
1621
1622 fn read_expansion_nodes_chunked(
1628 &self,
1629 roots: &[NodeRow],
1630 expansion: &ExpansionSlot,
1631 hard_limit: usize,
1632 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1633 if roots.len() <= BATCH_CHUNK_SIZE {
1634 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1635 }
1636
1637 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1640 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1641 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1642 per_root
1643 .entry(group.root_logical_id)
1644 .or_default()
1645 .extend(group.nodes);
1646 }
1647 }
1648
1649 Ok(roots
1650 .iter()
1651 .map(|root| ExpansionRootRows {
1652 root_logical_id: root.logical_id.clone(),
1653 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1654 })
1655 .collect())
1656 }
1657
1658 fn read_expansion_nodes_batched(
1663 &self,
1664 roots: &[NodeRow],
1665 expansion: &ExpansionSlot,
1666 hard_limit: usize,
1667 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1668 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1669 let (join_condition, next_logical_id) = match expansion.direction {
1670 fathomdb_query::TraverseDirection::Out => {
1671 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1672 }
1673 fathomdb_query::TraverseDirection::In => {
1674 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1675 }
1676 };
1677
1678 let root_seed_union: String = (1..=root_ids.len())
1682 .map(|i| format!("SELECT ?{i}"))
1683 .collect::<Vec<_>>()
1684 .join(" UNION ALL ");
1685
1686 let sql = format!(
1690 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
1691 traversed(root_id, logical_id, depth, visited, emitted) AS (
1692 SELECT rid, rid, 0, printf(',%s,', rid), 0
1693 FROM root_ids
1694 UNION ALL
1695 SELECT
1696 t.root_id,
1697 {next_logical_id},
1698 t.depth + 1,
1699 t.visited || {next_logical_id} || ',',
1700 t.emitted + 1
1701 FROM traversed t
1702 JOIN edges e ON {join_condition}
1703 AND e.kind = ?{edge_kind_param}
1704 AND e.superseded_at IS NULL
1705 WHERE t.depth < {max_depth}
1706 AND t.emitted < {hard_limit}
1707 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
1708 ),
1709 numbered AS (
1710 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
1711 , n.content_ref, am.last_accessed_at
1712 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
1713 FROM traversed t
1714 JOIN nodes n ON n.logical_id = t.logical_id
1715 AND n.superseded_at IS NULL
1716 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
1717 WHERE t.depth > 0
1718 )
1719 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
1720 FROM numbered
1721 WHERE rn <= {hard_limit}
1722 ORDER BY root_id, logical_id",
1723 edge_kind_param = root_ids.len() + 1,
1724 max_depth = expansion.max_depth,
1725 );
1726
1727 let conn_guard = self.lock_connection()?;
1728 let mut statement = conn_guard
1729 .prepare_cached(&sql)
1730 .map_err(EngineError::Sqlite)?;
1731
1732 let mut bind_values: Vec<Value> = root_ids
1734 .iter()
1735 .map(|id| Value::Text((*id).to_owned()))
1736 .collect();
1737 bind_values.push(Value::Text(expansion.label.clone()));
1738
1739 let rows = statement
1740 .query_map(params_from_iter(bind_values.iter()), |row| {
1741 Ok((
1742 row.get::<_, String>(0)?, NodeRow {
1744 row_id: row.get(1)?,
1745 logical_id: row.get(2)?,
1746 kind: row.get(3)?,
1747 properties: row.get(4)?,
1748 content_ref: row.get(5)?,
1749 last_accessed_at: row.get(6)?,
1750 },
1751 ))
1752 })
1753 .map_err(EngineError::Sqlite)?
1754 .collect::<Result<Vec<_>, _>>()
1755 .map_err(EngineError::Sqlite)?;
1756
1757 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1759 for (root_id, node) in rows {
1760 per_root.entry(root_id).or_default().push(node);
1761 }
1762
1763 let root_groups = roots
1764 .iter()
1765 .map(|root| ExpansionRootRows {
1766 root_logical_id: root.logical_id.clone(),
1767 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1768 })
1769 .collect();
1770
1771 Ok(root_groups)
1772 }
1773
1774 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
1780 let conn = self.lock_connection()?;
1781 conn.query_row(
1782 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
1783 rusqlite::params![id],
1784 |row| {
1785 Ok(RunRow {
1786 id: row.get(0)?,
1787 kind: row.get(1)?,
1788 status: row.get(2)?,
1789 properties: row.get(3)?,
1790 })
1791 },
1792 )
1793 .optional()
1794 .map_err(EngineError::Sqlite)
1795 }
1796
1797 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
1803 let conn = self.lock_connection()?;
1804 conn.query_row(
1805 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
1806 rusqlite::params![id],
1807 |row| {
1808 Ok(StepRow {
1809 id: row.get(0)?,
1810 run_id: row.get(1)?,
1811 kind: row.get(2)?,
1812 status: row.get(3)?,
1813 properties: row.get(4)?,
1814 })
1815 },
1816 )
1817 .optional()
1818 .map_err(EngineError::Sqlite)
1819 }
1820
1821 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
1827 let conn = self.lock_connection()?;
1828 conn.query_row(
1829 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
1830 rusqlite::params![id],
1831 |row| {
1832 Ok(ActionRow {
1833 id: row.get(0)?,
1834 step_id: row.get(1)?,
1835 kind: row.get(2)?,
1836 status: row.get(3)?,
1837 properties: row.get(4)?,
1838 })
1839 },
1840 )
1841 .optional()
1842 .map_err(EngineError::Sqlite)
1843 }
1844
1845 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
1851 let conn = self.lock_connection()?;
1852 let mut stmt = conn
1853 .prepare_cached(
1854 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
1855 )
1856 .map_err(EngineError::Sqlite)?;
1857 let rows = stmt
1858 .query_map([], |row| {
1859 Ok(RunRow {
1860 id: row.get(0)?,
1861 kind: row.get(1)?,
1862 status: row.get(2)?,
1863 properties: row.get(3)?,
1864 })
1865 })
1866 .map_err(EngineError::Sqlite)?
1867 .collect::<Result<Vec<_>, _>>()
1868 .map_err(EngineError::Sqlite)?;
1869 Ok(rows)
1870 }
1871
1872 #[must_use]
1882 #[allow(clippy::expect_used)]
1883 pub fn shape_sql_count(&self) -> usize {
1884 self.shape_sql_map
1885 .lock()
1886 .unwrap_or_else(PoisonError::into_inner)
1887 .len()
1888 }
1889
1890 #[must_use]
1892 pub fn schema_manager(&self) -> Arc<SchemaManager> {
1893 Arc::clone(&self.schema_manager)
1894 }
1895
1896 #[must_use]
1905 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
1906 let cache_hit = self
1907 .shape_sql_map
1908 .lock()
1909 .unwrap_or_else(PoisonError::into_inner)
1910 .contains_key(&compiled.shape_hash);
1911 QueryPlan {
1912 sql: wrap_node_row_projection_sql(&compiled.sql),
1913 bind_count: compiled.binds.len(),
1914 driving_table: compiled.driving_table,
1915 shape_hash: compiled.shape_hash,
1916 cache_hit,
1917 }
1918 }
1919
1920 #[doc(hidden)]
1927 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
1928 let conn = self.lock_connection()?;
1929 let result = conn
1930 .query_row(&format!("PRAGMA {name}"), [], |row| {
1931 row.get::<_, rusqlite::types::Value>(0)
1933 })
1934 .map_err(EngineError::Sqlite)?;
1935 let s = match result {
1936 rusqlite::types::Value::Text(t) => t,
1937 rusqlite::types::Value::Integer(i) => i.to_string(),
1938 rusqlite::types::Value::Real(f) => f.to_string(),
1939 rusqlite::types::Value::Blob(_) => {
1940 return Err(EngineError::InvalidWrite(format!(
1941 "PRAGMA {name} returned an unexpected BLOB value"
1942 )));
1943 }
1944 rusqlite::types::Value::Null => String::new(),
1945 };
1946 Ok(s)
1947 }
1948
1949 pub fn query_provenance_events(
1958 &self,
1959 subject: &str,
1960 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
1961 let conn = self.lock_connection()?;
1962 let mut stmt = conn
1963 .prepare_cached(
1964 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
1965 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
1966 )
1967 .map_err(EngineError::Sqlite)?;
1968 let events = stmt
1969 .query_map(rusqlite::params![subject], |row| {
1970 Ok(ProvenanceEvent {
1971 id: row.get(0)?,
1972 event_type: row.get(1)?,
1973 subject: row.get(2)?,
1974 source_ref: row.get(3)?,
1975 metadata_json: row.get(4)?,
1976 created_at: row.get(5)?,
1977 })
1978 })
1979 .map_err(EngineError::Sqlite)?
1980 .collect::<Result<Vec<_>, _>>()
1981 .map_err(EngineError::Sqlite)?;
1982 Ok(events)
1983 }
1984}
1985
1986fn wrap_node_row_projection_sql(base_sql: &str) -> String {
1987 format!(
1988 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
1989 FROM ({base_sql}) q \
1990 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
1991 )
1992}
1993
1994pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
1997 match err {
1998 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
1999 msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
2000 }
2001 _ => false,
2002 }
2003}
2004
2005fn scalar_to_bind(value: &ScalarValue) -> BindValue {
2006 match value {
2007 ScalarValue::Text(text) => BindValue::Text(text.clone()),
2008 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
2009 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
2010 }
2011}
2012
2013fn merge_search_branches(
2031 strict: Vec<SearchHit>,
2032 relaxed: Vec<SearchHit>,
2033 limit: usize,
2034) -> Vec<SearchHit> {
2035 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
2036}
2037
2038fn merge_search_branches_three(
2050 strict: Vec<SearchHit>,
2051 relaxed: Vec<SearchHit>,
2052 vector: Vec<SearchHit>,
2053 limit: usize,
2054) -> Vec<SearchHit> {
2055 let strict_block = dedup_branch_hits(strict);
2056 let relaxed_block = dedup_branch_hits(relaxed);
2057 let vector_block = dedup_branch_hits(vector);
2058
2059 let mut seen: std::collections::HashSet<String> = strict_block
2060 .iter()
2061 .map(|h| h.node.logical_id.clone())
2062 .collect();
2063
2064 let mut merged = strict_block;
2065 for hit in relaxed_block {
2066 if seen.insert(hit.node.logical_id.clone()) {
2067 merged.push(hit);
2068 }
2069 }
2070 for hit in vector_block {
2071 if seen.insert(hit.node.logical_id.clone()) {
2072 merged.push(hit);
2073 }
2074 }
2075
2076 if merged.len() > limit {
2077 merged.truncate(limit);
2078 }
2079 merged
2080}
2081
2082fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2086 hits.sort_by(|a, b| {
2087 b.score
2088 .partial_cmp(&a.score)
2089 .unwrap_or(std::cmp::Ordering::Equal)
2090 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2091 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2092 });
2093
2094 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2095 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2096 hits
2097}
2098
2099fn source_priority(source: SearchHitSource) -> u8 {
2100 match source {
2103 SearchHitSource::Chunk => 0,
2104 SearchHitSource::Property => 1,
2105 SearchHitSource::Vector => 2,
2106 }
2107}
2108
2109const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2127const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2128
2129fn load_position_map(
2133 conn: &Connection,
2134 logical_id: &str,
2135 kind: &str,
2136) -> Result<Vec<(usize, usize, String)>, EngineError> {
2137 let mut stmt = conn
2138 .prepare_cached(
2139 "SELECT start_offset, end_offset, leaf_path \
2140 FROM fts_node_property_positions \
2141 WHERE node_logical_id = ?1 AND kind = ?2 \
2142 ORDER BY start_offset ASC",
2143 )
2144 .map_err(EngineError::Sqlite)?;
2145 let rows = stmt
2146 .query_map(rusqlite::params![logical_id, kind], |row| {
2147 let start: i64 = row.get(0)?;
2148 let end: i64 = row.get(1)?;
2149 let path: String = row.get(2)?;
2150 let start = usize::try_from(start).unwrap_or(0);
2154 let end = usize::try_from(end).unwrap_or(0);
2155 Ok((start, end, path))
2156 })
2157 .map_err(EngineError::Sqlite)?;
2158 let mut out = Vec::new();
2159 for row in rows {
2160 out.push(row.map_err(EngineError::Sqlite)?);
2161 }
2162 Ok(out)
2163}
2164
2165fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2172 let mut offsets = Vec::new();
2173 let bytes = wrapped.as_bytes();
2174 let open_bytes = open.as_bytes();
2175 let close_bytes = close.as_bytes();
2176 let mut i = 0usize;
2177 let mut marker_bytes_seen = 0usize;
2180 while i < bytes.len() {
2181 if bytes[i..].starts_with(open_bytes) {
2182 let original_offset = i - marker_bytes_seen;
2185 offsets.push(original_offset);
2186 i += open_bytes.len();
2187 marker_bytes_seen += open_bytes.len();
2188 } else if bytes[i..].starts_with(close_bytes) {
2189 i += close_bytes.len();
2190 marker_bytes_seen += close_bytes.len();
2191 } else {
2192 i += 1;
2193 }
2194 }
2195 offsets
2196}
2197
2198fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
2201 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
2203 Ok(i) => i,
2204 Err(0) => return None,
2205 Err(i) => i - 1,
2206 };
2207 let (start, end, path) = &positions[idx];
2208 if offset >= *start && offset < *end {
2209 Some(path.as_str())
2210 } else {
2211 None
2212 }
2213}
2214
2215fn resolve_hit_attribution(
2224 conn: &Connection,
2225 hit: &SearchHit,
2226 match_expr: &str,
2227) -> Result<HitAttribution, EngineError> {
2228 if !matches!(hit.source, SearchHitSource::Property) {
2229 return Ok(HitAttribution {
2230 matched_paths: Vec::new(),
2231 });
2232 }
2233 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
2234 return Ok(HitAttribution {
2235 matched_paths: Vec::new(),
2236 });
2237 };
2238 let rowid: i64 = match rowid_str.parse() {
2239 Ok(v) => v,
2240 Err(_) => {
2241 return Ok(HitAttribution {
2242 matched_paths: Vec::new(),
2243 });
2244 }
2245 };
2246
2247 let mut stmt = conn
2251 .prepare_cached(
2252 "SELECT highlight(fts_node_properties, 2, ?1, ?2) \
2253 FROM fts_node_properties \
2254 WHERE rowid = ?3 AND fts_node_properties MATCH ?4",
2255 )
2256 .map_err(EngineError::Sqlite)?;
2257 let wrapped: Option<String> = stmt
2258 .query_row(
2259 rusqlite::params![
2260 ATTRIBUTION_HIGHLIGHT_OPEN,
2261 ATTRIBUTION_HIGHLIGHT_CLOSE,
2262 rowid,
2263 match_expr,
2264 ],
2265 |row| row.get(0),
2266 )
2267 .optional()
2268 .map_err(EngineError::Sqlite)?;
2269 let Some(wrapped) = wrapped else {
2270 return Ok(HitAttribution {
2271 matched_paths: Vec::new(),
2272 });
2273 };
2274
2275 let offsets = parse_highlight_offsets(
2276 &wrapped,
2277 ATTRIBUTION_HIGHLIGHT_OPEN,
2278 ATTRIBUTION_HIGHLIGHT_CLOSE,
2279 );
2280 if offsets.is_empty() {
2281 return Ok(HitAttribution {
2282 matched_paths: Vec::new(),
2283 });
2284 }
2285
2286 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
2287 if positions.is_empty() {
2288 return Ok(HitAttribution {
2291 matched_paths: Vec::new(),
2292 });
2293 }
2294
2295 let mut matched_paths: Vec<String> = Vec::new();
2296 for offset in offsets {
2297 if let Some(path) = find_leaf_for_offset(&positions, offset)
2298 && !matched_paths.iter().any(|p| p == path)
2299 {
2300 matched_paths.push(path.to_owned());
2301 }
2302 }
2303 Ok(HitAttribution { matched_paths })
2304}
2305
2306fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
2307 match value {
2308 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
2309 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
2310 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
2311 }
2312}
2313
2314#[cfg(test)]
2315#[allow(clippy::expect_used)]
2316mod tests {
2317 use std::panic::{AssertUnwindSafe, catch_unwind};
2318 use std::sync::Arc;
2319
2320 use fathomdb_query::{BindValue, QueryBuilder};
2321 use fathomdb_schema::SchemaManager;
2322 use rusqlite::types::Value;
2323 use tempfile::NamedTempFile;
2324
2325 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
2326
2327 use fathomdb_query::{
2328 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
2329 };
2330
2331 use super::{
2332 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
2333 wrap_node_row_projection_sql,
2334 };
2335
2336 fn mk_hit(
2337 logical_id: &str,
2338 score: f64,
2339 match_mode: SearchMatchMode,
2340 source: SearchHitSource,
2341 ) -> SearchHit {
2342 SearchHit {
2343 node: NodeRowLite {
2344 row_id: format!("{logical_id}-row"),
2345 logical_id: logical_id.to_owned(),
2346 kind: "Goal".to_owned(),
2347 properties: "{}".to_owned(),
2348 content_ref: None,
2349 last_accessed_at: None,
2350 },
2351 score,
2352 modality: RetrievalModality::Text,
2353 source,
2354 match_mode: Some(match_mode),
2355 snippet: None,
2356 written_at: 0,
2357 projection_row_id: None,
2358 vector_distance: None,
2359 attribution: None,
2360 }
2361 }
2362
2363 #[test]
2364 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
2365 let strict = vec![mk_hit(
2366 "a",
2367 1.0,
2368 SearchMatchMode::Strict,
2369 SearchHitSource::Chunk,
2370 )];
2371 let relaxed = vec![mk_hit(
2373 "b",
2374 9.9,
2375 SearchMatchMode::Relaxed,
2376 SearchHitSource::Chunk,
2377 )];
2378 let merged = merge_search_branches(strict, relaxed, 10);
2379 assert_eq!(merged.len(), 2);
2380 assert_eq!(merged[0].node.logical_id, "a");
2381 assert!(matches!(
2382 merged[0].match_mode,
2383 Some(SearchMatchMode::Strict)
2384 ));
2385 assert_eq!(merged[1].node.logical_id, "b");
2386 assert!(matches!(
2387 merged[1].match_mode,
2388 Some(SearchMatchMode::Relaxed)
2389 ));
2390 }
2391
2392 #[test]
2393 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
2394 let strict = vec![mk_hit(
2395 "shared",
2396 1.0,
2397 SearchMatchMode::Strict,
2398 SearchHitSource::Chunk,
2399 )];
2400 let relaxed = vec![
2401 mk_hit(
2402 "shared",
2403 9.9,
2404 SearchMatchMode::Relaxed,
2405 SearchHitSource::Chunk,
2406 ),
2407 mk_hit(
2408 "other",
2409 2.0,
2410 SearchMatchMode::Relaxed,
2411 SearchHitSource::Chunk,
2412 ),
2413 ];
2414 let merged = merge_search_branches(strict, relaxed, 10);
2415 assert_eq!(merged.len(), 2);
2416 assert_eq!(merged[0].node.logical_id, "shared");
2417 assert!(matches!(
2418 merged[0].match_mode,
2419 Some(SearchMatchMode::Strict)
2420 ));
2421 assert_eq!(merged[1].node.logical_id, "other");
2422 assert!(matches!(
2423 merged[1].match_mode,
2424 Some(SearchMatchMode::Relaxed)
2425 ));
2426 }
2427
2428 #[test]
2429 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
2430 let strict = vec![
2431 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2432 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2433 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2434 ];
2435 let merged = merge_search_branches(strict, vec![], 10);
2436 assert_eq!(
2437 merged
2438 .iter()
2439 .map(|h| &h.node.logical_id)
2440 .collect::<Vec<_>>(),
2441 vec!["a", "c", "b"]
2442 );
2443 }
2444
2445 #[test]
2446 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
2447 let strict = vec![
2448 mk_hit(
2449 "shared",
2450 1.0,
2451 SearchMatchMode::Strict,
2452 SearchHitSource::Property,
2453 ),
2454 mk_hit(
2455 "shared",
2456 1.0,
2457 SearchMatchMode::Strict,
2458 SearchHitSource::Chunk,
2459 ),
2460 ];
2461 let merged = merge_search_branches(strict, vec![], 10);
2462 assert_eq!(merged.len(), 1);
2463 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
2464 }
2465
2466 #[test]
2467 fn merge_truncates_to_limit_after_block_merge() {
2468 let strict = vec![
2469 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2470 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2471 ];
2472 let relaxed = vec![mk_hit(
2473 "c",
2474 9.0,
2475 SearchMatchMode::Relaxed,
2476 SearchHitSource::Chunk,
2477 )];
2478 let merged = merge_search_branches(strict, relaxed, 2);
2479 assert_eq!(merged.len(), 2);
2480 assert_eq!(merged[0].node.logical_id, "a");
2481 assert_eq!(merged[1].node.logical_id, "b");
2482 }
2483
2484 #[test]
2493 fn search_architecturally_supports_three_branch_fusion() {
2494 let strict = vec![mk_hit(
2495 "alpha",
2496 1.0,
2497 SearchMatchMode::Strict,
2498 SearchHitSource::Chunk,
2499 )];
2500 let relaxed = vec![mk_hit(
2501 "bravo",
2502 5.0,
2503 SearchMatchMode::Relaxed,
2504 SearchHitSource::Chunk,
2505 )];
2506 let mut vector_hit = mk_hit(
2509 "charlie",
2510 9.9,
2511 SearchMatchMode::Strict,
2512 SearchHitSource::Vector,
2513 );
2514 vector_hit.match_mode = None;
2518 vector_hit.modality = RetrievalModality::Vector;
2519 let vector = vec![vector_hit];
2520
2521 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
2522 assert_eq!(merged.len(), 3);
2523 assert_eq!(merged[0].node.logical_id, "alpha");
2524 assert_eq!(merged[1].node.logical_id, "bravo");
2525 assert_eq!(merged[2].node.logical_id, "charlie");
2526 assert!(matches!(merged[2].source, SearchHitSource::Vector));
2528
2529 let strict2 = vec![mk_hit(
2532 "shared",
2533 0.5,
2534 SearchMatchMode::Strict,
2535 SearchHitSource::Chunk,
2536 )];
2537 let relaxed2 = vec![mk_hit(
2538 "shared",
2539 5.0,
2540 SearchMatchMode::Relaxed,
2541 SearchHitSource::Chunk,
2542 )];
2543 let mut vshared = mk_hit(
2544 "shared",
2545 9.9,
2546 SearchMatchMode::Strict,
2547 SearchHitSource::Vector,
2548 );
2549 vshared.match_mode = None;
2550 vshared.modality = RetrievalModality::Vector;
2551 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
2552 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
2553 assert!(matches!(
2554 merged2[0].match_mode,
2555 Some(SearchMatchMode::Strict)
2556 ));
2557 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
2558
2559 let mut vshared2 = mk_hit(
2561 "shared",
2562 9.9,
2563 SearchMatchMode::Strict,
2564 SearchHitSource::Vector,
2565 );
2566 vshared2.match_mode = None;
2567 vshared2.modality = RetrievalModality::Vector;
2568 let merged3 = merge_search_branches_three(
2569 vec![],
2570 vec![mk_hit(
2571 "shared",
2572 1.0,
2573 SearchMatchMode::Relaxed,
2574 SearchHitSource::Chunk,
2575 )],
2576 vec![vshared2],
2577 10,
2578 );
2579 assert_eq!(merged3.len(), 1);
2580 assert!(matches!(
2581 merged3[0].match_mode,
2582 Some(SearchMatchMode::Relaxed)
2583 ));
2584 }
2585
2586 #[test]
2600 fn merge_search_branches_three_vector_only_preserves_vector_block() {
2601 let mut vector_hit = mk_hit(
2602 "solo",
2603 0.75,
2604 SearchMatchMode::Strict,
2605 SearchHitSource::Vector,
2606 );
2607 vector_hit.match_mode = None;
2608 vector_hit.modality = RetrievalModality::Vector;
2609
2610 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
2611
2612 assert_eq!(merged.len(), 1);
2613 assert_eq!(merged[0].node.logical_id, "solo");
2614 assert!(matches!(merged[0].source, SearchHitSource::Vector));
2615 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
2616 assert!(
2617 merged[0].match_mode.is_none(),
2618 "vector hits carry match_mode=None per addendum 1"
2619 );
2620 }
2621
2622 #[test]
2634 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
2635 let strict = vec![
2636 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2637 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2638 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2639 ];
2640 let relaxed = vec![mk_hit(
2641 "d",
2642 9.0,
2643 SearchMatchMode::Relaxed,
2644 SearchHitSource::Chunk,
2645 )];
2646 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
2647 vector_hit.match_mode = None;
2648 vector_hit.modality = RetrievalModality::Vector;
2649 let vector = vec![vector_hit];
2650
2651 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
2652
2653 assert_eq!(merged.len(), 2);
2654 assert_eq!(merged[0].node.logical_id, "a");
2655 assert_eq!(merged[1].node.logical_id, "b");
2656 assert!(
2658 merged
2659 .iter()
2660 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
2661 "strict block must win limit contention against higher-scored relaxed/vector hits"
2662 );
2663 assert!(
2664 merged
2665 .iter()
2666 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
2667 "no vector source hits should leak past the limit"
2668 );
2669 }
2670
2671 #[test]
2672 fn is_vec_table_absent_matches_known_error_messages() {
2673 use rusqlite::ffi;
2674 fn make_err(msg: &str) -> rusqlite::Error {
2675 rusqlite::Error::SqliteFailure(
2676 ffi::Error {
2677 code: ffi::ErrorCode::Unknown,
2678 extended_code: 1,
2679 },
2680 Some(msg.to_owned()),
2681 )
2682 }
2683 assert!(is_vec_table_absent(&make_err(
2684 "no such table: vec_nodes_active"
2685 )));
2686 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
2687 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
2688 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
2689 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
2690 }
2691
2692 #[test]
2693 fn bind_value_text_maps_to_sql_text() {
2694 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
2695 assert_eq!(val, Value::Text("hello".to_owned()));
2696 }
2697
2698 #[test]
2699 fn bind_value_integer_maps_to_sql_integer() {
2700 let val = bind_value_to_sql(&BindValue::Integer(42));
2701 assert_eq!(val, Value::Integer(42));
2702 }
2703
2704 #[test]
2705 fn bind_value_bool_true_maps_to_integer_one() {
2706 let val = bind_value_to_sql(&BindValue::Bool(true));
2707 assert_eq!(val, Value::Integer(1));
2708 }
2709
2710 #[test]
2711 fn bind_value_bool_false_maps_to_integer_zero() {
2712 let val = bind_value_to_sql(&BindValue::Bool(false));
2713 assert_eq!(val, Value::Integer(0));
2714 }
2715
2716 #[test]
2717 fn same_shape_queries_share_one_cache_entry() {
2718 let db = NamedTempFile::new().expect("temporary db");
2719 let coordinator = ExecutionCoordinator::open(
2720 db.path(),
2721 Arc::new(SchemaManager::new()),
2722 None,
2723 1,
2724 Arc::new(TelemetryCounters::default()),
2725 None,
2726 )
2727 .expect("coordinator");
2728
2729 let compiled_a = QueryBuilder::nodes("Meeting")
2730 .text_search("budget", 5)
2731 .limit(10)
2732 .compile()
2733 .expect("compiled a");
2734 let compiled_b = QueryBuilder::nodes("Meeting")
2735 .text_search("standup", 5)
2736 .limit(10)
2737 .compile()
2738 .expect("compiled b");
2739
2740 coordinator
2741 .execute_compiled_read(&compiled_a)
2742 .expect("read a");
2743 coordinator
2744 .execute_compiled_read(&compiled_b)
2745 .expect("read b");
2746
2747 assert_eq!(
2748 compiled_a.shape_hash, compiled_b.shape_hash,
2749 "different bind values, same structural shape → same hash"
2750 );
2751 assert_eq!(coordinator.shape_sql_count(), 1);
2752 }
2753
2754 #[test]
2755 fn vector_read_degrades_gracefully_when_vec_table_absent() {
2756 let db = NamedTempFile::new().expect("temporary db");
2757 let coordinator = ExecutionCoordinator::open(
2758 db.path(),
2759 Arc::new(SchemaManager::new()),
2760 None,
2761 1,
2762 Arc::new(TelemetryCounters::default()),
2763 None,
2764 )
2765 .expect("coordinator");
2766
2767 let compiled = QueryBuilder::nodes("Meeting")
2768 .vector_search("budget embeddings", 5)
2769 .compile()
2770 .expect("vector query compiles");
2771
2772 let result = coordinator.execute_compiled_read(&compiled);
2773 let rows = result.expect("degraded read must succeed, not error");
2774 assert!(
2775 rows.was_degraded,
2776 "result must be flagged as degraded when vec_nodes_active is absent"
2777 );
2778 assert!(
2779 rows.nodes.is_empty(),
2780 "degraded result must return empty nodes"
2781 );
2782 }
2783
2784 #[test]
2785 fn coordinator_caches_by_shape_hash() {
2786 let db = NamedTempFile::new().expect("temporary db");
2787 let coordinator = ExecutionCoordinator::open(
2788 db.path(),
2789 Arc::new(SchemaManager::new()),
2790 None,
2791 1,
2792 Arc::new(TelemetryCounters::default()),
2793 None,
2794 )
2795 .expect("coordinator");
2796
2797 let compiled = QueryBuilder::nodes("Meeting")
2798 .text_search("budget", 5)
2799 .compile()
2800 .expect("compiled query");
2801
2802 coordinator
2803 .execute_compiled_read(&compiled)
2804 .expect("execute compiled read");
2805 assert_eq!(coordinator.shape_sql_count(), 1);
2806 }
2807
2808 #[test]
2811 fn explain_returns_correct_sql() {
2812 let db = NamedTempFile::new().expect("temporary db");
2813 let coordinator = ExecutionCoordinator::open(
2814 db.path(),
2815 Arc::new(SchemaManager::new()),
2816 None,
2817 1,
2818 Arc::new(TelemetryCounters::default()),
2819 None,
2820 )
2821 .expect("coordinator");
2822
2823 let compiled = QueryBuilder::nodes("Meeting")
2824 .text_search("budget", 5)
2825 .compile()
2826 .expect("compiled query");
2827
2828 let plan = coordinator.explain_compiled_read(&compiled);
2829
2830 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
2831 }
2832
2833 #[test]
2834 fn explain_returns_correct_driving_table() {
2835 use fathomdb_query::DrivingTable;
2836
2837 let db = NamedTempFile::new().expect("temporary db");
2838 let coordinator = ExecutionCoordinator::open(
2839 db.path(),
2840 Arc::new(SchemaManager::new()),
2841 None,
2842 1,
2843 Arc::new(TelemetryCounters::default()),
2844 None,
2845 )
2846 .expect("coordinator");
2847
2848 let compiled = QueryBuilder::nodes("Meeting")
2849 .text_search("budget", 5)
2850 .compile()
2851 .expect("compiled query");
2852
2853 let plan = coordinator.explain_compiled_read(&compiled);
2854
2855 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
2856 }
2857
2858 #[test]
2859 fn explain_reports_cache_miss_then_hit() {
2860 let db = NamedTempFile::new().expect("temporary db");
2861 let coordinator = ExecutionCoordinator::open(
2862 db.path(),
2863 Arc::new(SchemaManager::new()),
2864 None,
2865 1,
2866 Arc::new(TelemetryCounters::default()),
2867 None,
2868 )
2869 .expect("coordinator");
2870
2871 let compiled = QueryBuilder::nodes("Meeting")
2872 .text_search("budget", 5)
2873 .compile()
2874 .expect("compiled query");
2875
2876 let plan_before = coordinator.explain_compiled_read(&compiled);
2878 assert!(
2879 !plan_before.cache_hit,
2880 "cache miss expected before first execute"
2881 );
2882
2883 coordinator
2885 .execute_compiled_read(&compiled)
2886 .expect("execute read");
2887
2888 let plan_after = coordinator.explain_compiled_read(&compiled);
2890 assert!(
2891 plan_after.cache_hit,
2892 "cache hit expected after first execute"
2893 );
2894 }
2895
2896 #[test]
2897 fn explain_does_not_execute_query() {
2898 let db = NamedTempFile::new().expect("temporary db");
2903 let coordinator = ExecutionCoordinator::open(
2904 db.path(),
2905 Arc::new(SchemaManager::new()),
2906 None,
2907 1,
2908 Arc::new(TelemetryCounters::default()),
2909 None,
2910 )
2911 .expect("coordinator");
2912
2913 let compiled = QueryBuilder::nodes("Meeting")
2914 .text_search("anything", 5)
2915 .compile()
2916 .expect("compiled query");
2917
2918 let plan = coordinator.explain_compiled_read(&compiled);
2920
2921 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
2922 assert_eq!(plan.bind_count, compiled.binds.len());
2923 }
2924
2925 #[test]
2926 fn coordinator_executes_compiled_read() {
2927 let db = NamedTempFile::new().expect("temporary db");
2928 let coordinator = ExecutionCoordinator::open(
2929 db.path(),
2930 Arc::new(SchemaManager::new()),
2931 None,
2932 1,
2933 Arc::new(TelemetryCounters::default()),
2934 None,
2935 )
2936 .expect("coordinator");
2937 let conn = rusqlite::Connection::open(db.path()).expect("open db");
2938
2939 conn.execute_batch(
2940 r#"
2941 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
2942 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
2943 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
2944 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
2945 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
2946 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
2947 "#,
2948 )
2949 .expect("seed data");
2950
2951 let compiled = QueryBuilder::nodes("Meeting")
2952 .text_search("budget", 5)
2953 .limit(5)
2954 .compile()
2955 .expect("compiled query");
2956
2957 let rows = coordinator
2958 .execute_compiled_read(&compiled)
2959 .expect("execute read");
2960
2961 assert_eq!(rows.nodes.len(), 1);
2962 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
2963 }
2964
2965 #[test]
2966 fn text_search_finds_structured_only_node_via_property_fts() {
2967 let db = NamedTempFile::new().expect("temporary db");
2968 let coordinator = ExecutionCoordinator::open(
2969 db.path(),
2970 Arc::new(SchemaManager::new()),
2971 None,
2972 1,
2973 Arc::new(TelemetryCounters::default()),
2974 None,
2975 )
2976 .expect("coordinator");
2977 let conn = rusqlite::Connection::open(db.path()).expect("open db");
2978
2979 conn.execute_batch(
2981 r#"
2982 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2983 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
2984 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
2985 VALUES ('goal-1', 'Goal', 'Ship v2');
2986 "#,
2987 )
2988 .expect("seed data");
2989
2990 let compiled = QueryBuilder::nodes("Goal")
2991 .text_search("Ship", 5)
2992 .limit(5)
2993 .compile()
2994 .expect("compiled query");
2995
2996 let rows = coordinator
2997 .execute_compiled_read(&compiled)
2998 .expect("execute read");
2999
3000 assert_eq!(rows.nodes.len(), 1);
3001 assert_eq!(rows.nodes[0].logical_id, "goal-1");
3002 }
3003
3004 #[test]
3005 fn text_search_returns_both_chunk_and_property_backed_hits() {
3006 let db = NamedTempFile::new().expect("temporary db");
3007 let coordinator = ExecutionCoordinator::open(
3008 db.path(),
3009 Arc::new(SchemaManager::new()),
3010 None,
3011 1,
3012 Arc::new(TelemetryCounters::default()),
3013 None,
3014 )
3015 .expect("coordinator");
3016 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3017
3018 conn.execute_batch(
3020 r"
3021 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3022 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3023 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3024 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
3025 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3026 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
3027 ",
3028 )
3029 .expect("seed chunk-backed node");
3030
3031 conn.execute_batch(
3033 r#"
3034 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3035 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
3036 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
3037 VALUES ('meeting-2', 'Meeting', 'quarterly sync');
3038 "#,
3039 )
3040 .expect("seed property-backed node");
3041
3042 let compiled = QueryBuilder::nodes("Meeting")
3043 .text_search("quarterly", 10)
3044 .limit(10)
3045 .compile()
3046 .expect("compiled query");
3047
3048 let rows = coordinator
3049 .execute_compiled_read(&compiled)
3050 .expect("execute read");
3051
3052 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
3053 ids.sort_unstable();
3054 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
3055 }
3056
3057 #[test]
3058 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
3059 let db = NamedTempFile::new().expect("temporary db");
3060 let coordinator = ExecutionCoordinator::open(
3061 db.path(),
3062 Arc::new(SchemaManager::new()),
3063 None,
3064 1,
3065 Arc::new(TelemetryCounters::default()),
3066 None,
3067 )
3068 .expect("coordinator");
3069 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3070
3071 conn.execute_batch(
3072 r"
3073 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3074 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3075 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3076 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3077 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3078 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3079 ",
3080 )
3081 .expect("seed chunk-backed node");
3082
3083 let compiled = QueryBuilder::nodes("Meeting")
3084 .text_search("not a ship", 10)
3085 .limit(10)
3086 .compile()
3087 .expect("compiled query");
3088
3089 let rows = coordinator
3090 .execute_compiled_read(&compiled)
3091 .expect("execute read");
3092
3093 assert_eq!(rows.nodes.len(), 1);
3094 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3095 }
3096
3097 #[test]
3100 fn capability_gate_reports_false_without_feature() {
3101 let db = NamedTempFile::new().expect("temporary db");
3102 let coordinator = ExecutionCoordinator::open(
3105 db.path(),
3106 Arc::new(SchemaManager::new()),
3107 None,
3108 1,
3109 Arc::new(TelemetryCounters::default()),
3110 None,
3111 )
3112 .expect("coordinator");
3113 assert!(
3114 !coordinator.vector_enabled(),
3115 "vector_enabled must be false when no dimension is requested"
3116 );
3117 }
3118
3119 #[cfg(feature = "sqlite-vec")]
3120 #[test]
3121 fn capability_gate_reports_true_when_feature_enabled() {
3122 let db = NamedTempFile::new().expect("temporary db");
3123 let coordinator = ExecutionCoordinator::open(
3124 db.path(),
3125 Arc::new(SchemaManager::new()),
3126 Some(128),
3127 1,
3128 Arc::new(TelemetryCounters::default()),
3129 None,
3130 )
3131 .expect("coordinator");
3132 assert!(
3133 coordinator.vector_enabled(),
3134 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3135 );
3136 }
3137
3138 #[test]
3141 fn read_run_returns_inserted_run() {
3142 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3143
3144 let db = NamedTempFile::new().expect("temporary db");
3145 let writer = WriterActor::start(
3146 db.path(),
3147 Arc::new(SchemaManager::new()),
3148 ProvenanceMode::Warn,
3149 Arc::new(TelemetryCounters::default()),
3150 )
3151 .expect("writer");
3152 writer
3153 .submit(WriteRequest {
3154 label: "runtime".to_owned(),
3155 nodes: vec![],
3156 node_retires: vec![],
3157 edges: vec![],
3158 edge_retires: vec![],
3159 chunks: vec![],
3160 runs: vec![RunInsert {
3161 id: "run-r1".to_owned(),
3162 kind: "session".to_owned(),
3163 status: "active".to_owned(),
3164 properties: "{}".to_owned(),
3165 source_ref: Some("src-1".to_owned()),
3166 upsert: false,
3167 supersedes_id: None,
3168 }],
3169 steps: vec![],
3170 actions: vec![],
3171 optional_backfills: vec![],
3172 vec_inserts: vec![],
3173 operational_writes: vec![],
3174 })
3175 .expect("write run");
3176
3177 let coordinator = ExecutionCoordinator::open(
3178 db.path(),
3179 Arc::new(SchemaManager::new()),
3180 None,
3181 1,
3182 Arc::new(TelemetryCounters::default()),
3183 None,
3184 )
3185 .expect("coordinator");
3186 let row = coordinator
3187 .read_run("run-r1")
3188 .expect("read_run")
3189 .expect("row exists");
3190 assert_eq!(row.id, "run-r1");
3191 assert_eq!(row.kind, "session");
3192 assert_eq!(row.status, "active");
3193 }
3194
3195 #[test]
3196 fn read_step_returns_inserted_step() {
3197 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
3198
3199 let db = NamedTempFile::new().expect("temporary db");
3200 let writer = WriterActor::start(
3201 db.path(),
3202 Arc::new(SchemaManager::new()),
3203 ProvenanceMode::Warn,
3204 Arc::new(TelemetryCounters::default()),
3205 )
3206 .expect("writer");
3207 writer
3208 .submit(WriteRequest {
3209 label: "runtime".to_owned(),
3210 nodes: vec![],
3211 node_retires: vec![],
3212 edges: vec![],
3213 edge_retires: vec![],
3214 chunks: vec![],
3215 runs: vec![RunInsert {
3216 id: "run-s1".to_owned(),
3217 kind: "session".to_owned(),
3218 status: "active".to_owned(),
3219 properties: "{}".to_owned(),
3220 source_ref: Some("src-1".to_owned()),
3221 upsert: false,
3222 supersedes_id: None,
3223 }],
3224 steps: vec![StepInsert {
3225 id: "step-s1".to_owned(),
3226 run_id: "run-s1".to_owned(),
3227 kind: "llm".to_owned(),
3228 status: "completed".to_owned(),
3229 properties: "{}".to_owned(),
3230 source_ref: Some("src-1".to_owned()),
3231 upsert: false,
3232 supersedes_id: None,
3233 }],
3234 actions: vec![],
3235 optional_backfills: vec![],
3236 vec_inserts: vec![],
3237 operational_writes: vec![],
3238 })
3239 .expect("write step");
3240
3241 let coordinator = ExecutionCoordinator::open(
3242 db.path(),
3243 Arc::new(SchemaManager::new()),
3244 None,
3245 1,
3246 Arc::new(TelemetryCounters::default()),
3247 None,
3248 )
3249 .expect("coordinator");
3250 let row = coordinator
3251 .read_step("step-s1")
3252 .expect("read_step")
3253 .expect("row exists");
3254 assert_eq!(row.id, "step-s1");
3255 assert_eq!(row.run_id, "run-s1");
3256 assert_eq!(row.kind, "llm");
3257 }
3258
3259 #[test]
3260 fn read_action_returns_inserted_action() {
3261 use crate::{
3262 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
3263 writer::{ActionInsert, StepInsert},
3264 };
3265
3266 let db = NamedTempFile::new().expect("temporary db");
3267 let writer = WriterActor::start(
3268 db.path(),
3269 Arc::new(SchemaManager::new()),
3270 ProvenanceMode::Warn,
3271 Arc::new(TelemetryCounters::default()),
3272 )
3273 .expect("writer");
3274 writer
3275 .submit(WriteRequest {
3276 label: "runtime".to_owned(),
3277 nodes: vec![],
3278 node_retires: vec![],
3279 edges: vec![],
3280 edge_retires: vec![],
3281 chunks: vec![],
3282 runs: vec![RunInsert {
3283 id: "run-a1".to_owned(),
3284 kind: "session".to_owned(),
3285 status: "active".to_owned(),
3286 properties: "{}".to_owned(),
3287 source_ref: Some("src-1".to_owned()),
3288 upsert: false,
3289 supersedes_id: None,
3290 }],
3291 steps: vec![StepInsert {
3292 id: "step-a1".to_owned(),
3293 run_id: "run-a1".to_owned(),
3294 kind: "llm".to_owned(),
3295 status: "completed".to_owned(),
3296 properties: "{}".to_owned(),
3297 source_ref: Some("src-1".to_owned()),
3298 upsert: false,
3299 supersedes_id: None,
3300 }],
3301 actions: vec![ActionInsert {
3302 id: "action-a1".to_owned(),
3303 step_id: "step-a1".to_owned(),
3304 kind: "emit".to_owned(),
3305 status: "completed".to_owned(),
3306 properties: "{}".to_owned(),
3307 source_ref: Some("src-1".to_owned()),
3308 upsert: false,
3309 supersedes_id: None,
3310 }],
3311 optional_backfills: vec![],
3312 vec_inserts: vec![],
3313 operational_writes: vec![],
3314 })
3315 .expect("write action");
3316
3317 let coordinator = ExecutionCoordinator::open(
3318 db.path(),
3319 Arc::new(SchemaManager::new()),
3320 None,
3321 1,
3322 Arc::new(TelemetryCounters::default()),
3323 None,
3324 )
3325 .expect("coordinator");
3326 let row = coordinator
3327 .read_action("action-a1")
3328 .expect("read_action")
3329 .expect("row exists");
3330 assert_eq!(row.id, "action-a1");
3331 assert_eq!(row.step_id, "step-a1");
3332 assert_eq!(row.kind, "emit");
3333 }
3334
3335 #[test]
3336 fn read_active_runs_excludes_superseded() {
3337 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3338
3339 let db = NamedTempFile::new().expect("temporary db");
3340 let writer = WriterActor::start(
3341 db.path(),
3342 Arc::new(SchemaManager::new()),
3343 ProvenanceMode::Warn,
3344 Arc::new(TelemetryCounters::default()),
3345 )
3346 .expect("writer");
3347
3348 writer
3350 .submit(WriteRequest {
3351 label: "v1".to_owned(),
3352 nodes: vec![],
3353 node_retires: vec![],
3354 edges: vec![],
3355 edge_retires: vec![],
3356 chunks: vec![],
3357 runs: vec![RunInsert {
3358 id: "run-v1".to_owned(),
3359 kind: "session".to_owned(),
3360 status: "active".to_owned(),
3361 properties: "{}".to_owned(),
3362 source_ref: Some("src-1".to_owned()),
3363 upsert: false,
3364 supersedes_id: None,
3365 }],
3366 steps: vec![],
3367 actions: vec![],
3368 optional_backfills: vec![],
3369 vec_inserts: vec![],
3370 operational_writes: vec![],
3371 })
3372 .expect("v1 write");
3373
3374 writer
3376 .submit(WriteRequest {
3377 label: "v2".to_owned(),
3378 nodes: vec![],
3379 node_retires: vec![],
3380 edges: vec![],
3381 edge_retires: vec![],
3382 chunks: vec![],
3383 runs: vec![RunInsert {
3384 id: "run-v2".to_owned(),
3385 kind: "session".to_owned(),
3386 status: "completed".to_owned(),
3387 properties: "{}".to_owned(),
3388 source_ref: Some("src-2".to_owned()),
3389 upsert: true,
3390 supersedes_id: Some("run-v1".to_owned()),
3391 }],
3392 steps: vec![],
3393 actions: vec![],
3394 optional_backfills: vec![],
3395 vec_inserts: vec![],
3396 operational_writes: vec![],
3397 })
3398 .expect("v2 write");
3399
3400 let coordinator = ExecutionCoordinator::open(
3401 db.path(),
3402 Arc::new(SchemaManager::new()),
3403 None,
3404 1,
3405 Arc::new(TelemetryCounters::default()),
3406 None,
3407 )
3408 .expect("coordinator");
3409 let active = coordinator.read_active_runs().expect("read_active_runs");
3410
3411 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
3412 assert_eq!(active[0].id, "run-v2");
3413 }
3414
3415 #[allow(clippy::panic)]
3416 fn poison_connection(coordinator: &ExecutionCoordinator) {
3417 let result = catch_unwind(AssertUnwindSafe(|| {
3418 let _guard = coordinator.pool.connections[0]
3419 .lock()
3420 .expect("poison test lock");
3421 panic!("poison coordinator connection mutex");
3422 }));
3423 assert!(
3424 result.is_err(),
3425 "poison test must unwind while holding the connection mutex"
3426 );
3427 }
3428
3429 #[allow(clippy::panic)]
3430 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
3431 where
3432 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
3433 {
3434 match op(coordinator) {
3435 Err(EngineError::Bridge(message)) => {
3436 assert_eq!(message, "connection mutex poisoned");
3437 }
3438 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
3439 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
3440 }
3441 }
3442
3443 #[test]
3444 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
3445 let db = NamedTempFile::new().expect("temporary db");
3446 let coordinator = ExecutionCoordinator::open(
3447 db.path(),
3448 Arc::new(SchemaManager::new()),
3449 None,
3450 1,
3451 Arc::new(TelemetryCounters::default()),
3452 None,
3453 )
3454 .expect("coordinator");
3455
3456 poison_connection(&coordinator);
3457
3458 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
3459 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
3460 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
3461 assert_poisoned_connection_error(
3462 &coordinator,
3463 super::ExecutionCoordinator::read_active_runs,
3464 );
3465 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
3466 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
3467 }
3468
3469 #[test]
3472 fn shape_cache_stays_bounded() {
3473 use fathomdb_query::ShapeHash;
3474
3475 let db = NamedTempFile::new().expect("temporary db");
3476 let coordinator = ExecutionCoordinator::open(
3477 db.path(),
3478 Arc::new(SchemaManager::new()),
3479 None,
3480 1,
3481 Arc::new(TelemetryCounters::default()),
3482 None,
3483 )
3484 .expect("coordinator");
3485
3486 {
3488 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
3489 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
3490 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
3491 }
3492 }
3493 let compiled = QueryBuilder::nodes("Meeting")
3498 .text_search("budget", 5)
3499 .limit(10)
3500 .compile()
3501 .expect("compiled query");
3502
3503 coordinator
3504 .execute_compiled_read(&compiled)
3505 .expect("execute read");
3506
3507 assert!(
3508 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
3509 "shape cache must stay bounded: got {} entries, max {}",
3510 coordinator.shape_sql_count(),
3511 super::MAX_SHAPE_CACHE_SIZE
3512 );
3513 }
3514
3515 #[test]
3518 fn read_pool_size_configurable() {
3519 let db = NamedTempFile::new().expect("temporary db");
3520 let coordinator = ExecutionCoordinator::open(
3521 db.path(),
3522 Arc::new(SchemaManager::new()),
3523 None,
3524 2,
3525 Arc::new(TelemetryCounters::default()),
3526 None,
3527 )
3528 .expect("coordinator with pool_size=2");
3529
3530 assert_eq!(coordinator.pool.size(), 2);
3531
3532 let compiled = QueryBuilder::nodes("Meeting")
3534 .text_search("budget", 5)
3535 .limit(10)
3536 .compile()
3537 .expect("compiled query");
3538
3539 let result = coordinator.execute_compiled_read(&compiled);
3540 assert!(result.is_ok(), "read through pool must succeed");
3541 }
3542
3543 #[test]
3546 fn grouped_read_results_match_baseline() {
3547 use fathomdb_query::TraverseDirection;
3548
3549 let db = NamedTempFile::new().expect("temporary db");
3550
3551 let coordinator = ExecutionCoordinator::open(
3553 db.path(),
3554 Arc::new(SchemaManager::new()),
3555 None,
3556 1,
3557 Arc::new(TelemetryCounters::default()),
3558 None,
3559 )
3560 .expect("coordinator");
3561
3562 {
3565 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
3566 for i in 0..10 {
3567 conn.execute_batch(&format!(
3568 r#"
3569 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3570 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
3571 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3572 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
3573 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3574 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
3575
3576 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3577 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
3578 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3579 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
3580
3581 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3582 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
3583 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3584 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
3585 "#,
3586 )).expect("seed data");
3587 }
3588 }
3589
3590 let compiled = QueryBuilder::nodes("Meeting")
3591 .text_search("meeting", 10)
3592 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
3593 .limit(10)
3594 .compile_grouped()
3595 .expect("compiled grouped query");
3596
3597 let result = coordinator
3598 .execute_compiled_grouped_read(&compiled)
3599 .expect("grouped read");
3600
3601 assert!(!result.was_degraded, "grouped read should not be degraded");
3602 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
3603 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
3604 assert_eq!(result.expansions[0].slot, "tasks");
3605 assert_eq!(
3606 result.expansions[0].roots.len(),
3607 10,
3608 "each expansion slot should have entries for all 10 roots"
3609 );
3610
3611 for root_expansion in &result.expansions[0].roots {
3613 assert_eq!(
3614 root_expansion.nodes.len(),
3615 2,
3616 "root {} should have 2 expansion nodes, got {}",
3617 root_expansion.root_logical_id,
3618 root_expansion.nodes.len()
3619 );
3620 }
3621 }
3622}