1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{
8 BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
9 CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, ExpansionSlot,
10 FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue, SearchBranch,
11 SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash, render_text_query_fts5,
12};
13use fathomdb_schema::SchemaManager;
14use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
15
16use crate::embedder::QueryEmbedder;
17use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
18use crate::{EngineError, sqlite};
19
20const MAX_SHAPE_CACHE_SIZE: usize = 4096;
24
25const BATCH_CHUNK_SIZE: usize = 200;
30
31struct ReadPool {
36 connections: Vec<Mutex<Connection>>,
37}
38
39impl fmt::Debug for ReadPool {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("ReadPool")
42 .field("size", &self.connections.len())
43 .finish()
44 }
45}
46
47impl ReadPool {
48 fn new(
59 db_path: &Path,
60 pool_size: usize,
61 schema_manager: &SchemaManager,
62 vector_enabled: bool,
63 ) -> Result<Self, EngineError> {
64 let mut connections = Vec::with_capacity(pool_size);
65 for _ in 0..pool_size {
66 let conn = if vector_enabled {
67 #[cfg(feature = "sqlite-vec")]
68 {
69 sqlite::open_readonly_connection_with_vec(db_path)?
70 }
71 #[cfg(not(feature = "sqlite-vec"))]
72 {
73 sqlite::open_readonly_connection(db_path)?
74 }
75 } else {
76 sqlite::open_readonly_connection(db_path)?
77 };
78 schema_manager
79 .initialize_reader_connection(&conn)
80 .map_err(EngineError::Schema)?;
81 connections.push(Mutex::new(conn));
82 }
83 Ok(Self { connections })
84 }
85
86 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
95 for conn in &self.connections {
97 if let Ok(guard) = conn.try_lock() {
98 return Ok(guard);
99 }
100 }
101 self.connections[0].lock().map_err(|_| {
103 trace_error!("read pool: connection mutex poisoned");
104 EngineError::Bridge("connection mutex poisoned".to_owned())
105 })
106 }
107
108 #[cfg(test)]
110 fn size(&self) -> usize {
111 self.connections.len()
112 }
113}
114
115#[derive(Clone, Debug, PartialEq, Eq)]
119pub struct QueryPlan {
120 pub sql: String,
121 pub bind_count: usize,
122 pub driving_table: DrivingTable,
123 pub shape_hash: ShapeHash,
124 pub cache_hit: bool,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct NodeRow {
130 pub row_id: String,
132 pub logical_id: String,
134 pub kind: String,
136 pub properties: String,
138 pub content_ref: Option<String>,
140 pub last_accessed_at: Option<i64>,
142}
143
144#[derive(Clone, Debug, PartialEq, Eq)]
146pub struct RunRow {
147 pub id: String,
149 pub kind: String,
151 pub status: String,
153 pub properties: String,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepRow {
160 pub id: String,
162 pub run_id: String,
164 pub kind: String,
166 pub status: String,
168 pub properties: String,
170}
171
172#[derive(Clone, Debug, PartialEq, Eq)]
174pub struct ActionRow {
175 pub id: String,
177 pub step_id: String,
179 pub kind: String,
181 pub status: String,
183 pub properties: String,
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
189pub struct ProvenanceEvent {
190 pub id: String,
191 pub event_type: String,
192 pub subject: String,
193 pub source_ref: Option<String>,
194 pub metadata_json: String,
195 pub created_at: i64,
196}
197
198#[derive(Clone, Debug, Default, PartialEq, Eq)]
200pub struct QueryRows {
201 pub nodes: Vec<NodeRow>,
203 pub runs: Vec<RunRow>,
205 pub steps: Vec<StepRow>,
207 pub actions: Vec<ActionRow>,
209 pub was_degraded: bool,
212}
213
214#[derive(Clone, Debug, PartialEq, Eq)]
216pub struct ExpansionRootRows {
217 pub root_logical_id: String,
219 pub nodes: Vec<NodeRow>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct ExpansionSlotRows {
226 pub slot: String,
228 pub roots: Vec<ExpansionRootRows>,
230}
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
234pub struct GroupedQueryRows {
235 pub roots: Vec<NodeRow>,
237 pub expansions: Vec<ExpansionSlotRows>,
239 pub was_degraded: bool,
241}
242
243pub struct ExecutionCoordinator {
245 database_path: PathBuf,
246 schema_manager: Arc<SchemaManager>,
247 pool: ReadPool,
248 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
249 vector_enabled: bool,
250 vec_degradation_warned: AtomicBool,
251 telemetry: Arc<TelemetryCounters>,
252 query_embedder: Option<Arc<dyn QueryEmbedder>>,
259}
260
261impl fmt::Debug for ExecutionCoordinator {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("ExecutionCoordinator")
264 .field("database_path", &self.database_path)
265 .finish_non_exhaustive()
266 }
267}
268
269impl ExecutionCoordinator {
270 pub fn open(
273 path: impl AsRef<Path>,
274 schema_manager: Arc<SchemaManager>,
275 vector_dimension: Option<usize>,
276 pool_size: usize,
277 telemetry: Arc<TelemetryCounters>,
278 query_embedder: Option<Arc<dyn QueryEmbedder>>,
279 ) -> Result<Self, EngineError> {
280 let path = path.as_ref().to_path_buf();
281 #[cfg(feature = "sqlite-vec")]
282 let conn = if vector_dimension.is_some() {
283 sqlite::open_connection_with_vec(&path)?
284 } else {
285 sqlite::open_connection(&path)?
286 };
287 #[cfg(not(feature = "sqlite-vec"))]
288 let conn = sqlite::open_connection(&path)?;
289
290 let report = schema_manager.bootstrap(&conn)?;
291
292 let needs_property_fts_rebuild = {
328 let schema_count: i64 =
329 conn.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
330 row.get(0)
331 })?;
332 if schema_count == 0 {
333 false
334 } else {
335 let fts_count: i64 =
336 conn.query_row("SELECT COUNT(*) FROM fts_node_properties", [], |row| {
337 row.get(0)
338 })?;
339 fts_count == 0
340 }
341 };
342 let needs_position_backfill = {
348 let recursive_schema_count: i64 = conn.query_row(
357 "SELECT COUNT(*) FROM fts_property_schemas \
358 WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
359 [],
360 |row| row.get(0),
361 )?;
362 if recursive_schema_count == 0 {
363 false
364 } else {
365 let position_count: i64 = conn.query_row(
366 "SELECT COUNT(*) FROM fts_node_property_positions",
367 [],
368 |row| row.get(0),
369 )?;
370 position_count == 0
371 }
372 };
373 if needs_property_fts_rebuild || needs_position_backfill {
374 let tx = conn.unchecked_transaction()?;
375 tx.execute("DELETE FROM fts_node_properties", [])?;
376 tx.execute("DELETE FROM fts_node_property_positions", [])?;
377 crate::projection::insert_property_fts_rows(
378 &tx,
379 "SELECT logical_id, properties FROM nodes \
380 WHERE kind = ?1 AND superseded_at IS NULL",
381 )?;
382 tx.commit()?;
383 }
384
385 #[cfg(feature = "sqlite-vec")]
386 let mut vector_enabled = report.vector_profile_enabled;
387 #[cfg(not(feature = "sqlite-vec"))]
388 let vector_enabled = {
389 let _ = &report;
390 false
391 };
392
393 if let Some(dim) = vector_dimension {
394 schema_manager
395 .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
396 .map_err(EngineError::Schema)?;
397 #[cfg(feature = "sqlite-vec")]
399 {
400 vector_enabled = true;
401 }
402 }
403
404 drop(conn);
406
407 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
408
409 Ok(Self {
410 database_path: path,
411 schema_manager,
412 pool,
413 shape_sql_map: Mutex::new(HashMap::new()),
414 vector_enabled,
415 vec_degradation_warned: AtomicBool::new(false),
416 telemetry,
417 query_embedder,
418 })
419 }
420
421 pub fn database_path(&self) -> &Path {
423 &self.database_path
424 }
425
426 #[must_use]
428 pub fn vector_enabled(&self) -> bool {
429 self.vector_enabled
430 }
431
432 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
433 self.pool.acquire()
434 }
435
436 #[must_use]
442 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
443 let mut total = SqliteCacheStatus::default();
444 for conn_mutex in &self.pool.connections {
445 if let Ok(conn) = conn_mutex.try_lock() {
446 total.add(&read_db_cache_status(&conn));
447 }
448 }
449 total
450 }
451
452 #[allow(clippy::expect_used)]
455 pub fn execute_compiled_read(
456 &self,
457 compiled: &CompiledQuery,
458 ) -> Result<QueryRows, EngineError> {
459 let row_sql = wrap_node_row_projection_sql(&compiled.sql);
460 {
466 let mut cache = self
467 .shape_sql_map
468 .lock()
469 .unwrap_or_else(PoisonError::into_inner);
470 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
471 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
472 cache.clear();
473 }
474 cache.insert(compiled.shape_hash, row_sql.clone());
475 }
476
477 let bind_values = compiled
478 .binds
479 .iter()
480 .map(bind_value_to_sql)
481 .collect::<Vec<_>>();
482
483 let conn_guard = match self.lock_connection() {
488 Ok(g) => g,
489 Err(e) => {
490 self.telemetry.increment_errors();
491 return Err(e);
492 }
493 };
494 let mut statement = match conn_guard.prepare_cached(&row_sql) {
495 Ok(stmt) => stmt,
496 Err(e) if is_vec_table_absent(&e) => {
497 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
498 trace_warn!("vector table absent, degrading to non-vector query");
499 }
500 return Ok(QueryRows {
501 was_degraded: true,
502 ..Default::default()
503 });
504 }
505 Err(e) => {
506 self.telemetry.increment_errors();
507 return Err(EngineError::Sqlite(e));
508 }
509 };
510 let nodes = match statement
511 .query_map(params_from_iter(bind_values.iter()), |row| {
512 Ok(NodeRow {
513 row_id: row.get(0)?,
514 logical_id: row.get(1)?,
515 kind: row.get(2)?,
516 properties: row.get(3)?,
517 content_ref: row.get(4)?,
518 last_accessed_at: row.get(5)?,
519 })
520 })
521 .and_then(Iterator::collect)
522 {
523 Ok(rows) => rows,
524 Err(e) => {
525 self.telemetry.increment_errors();
526 return Err(EngineError::Sqlite(e));
527 }
528 };
529
530 self.telemetry.increment_queries();
531 Ok(QueryRows {
532 nodes,
533 runs: Vec::new(),
534 steps: Vec::new(),
535 actions: Vec::new(),
536 was_degraded: false,
537 })
538 }
539
540 pub fn execute_compiled_search(
555 &self,
556 compiled: &CompiledSearch,
557 ) -> Result<SearchRows, EngineError> {
558 let (relaxed_query, was_degraded_at_plan_time) =
565 fathomdb_query::derive_relaxed(&compiled.text_query);
566 let relaxed = relaxed_query.map(|q| CompiledSearch {
567 root_kind: compiled.root_kind.clone(),
568 text_query: q,
569 limit: compiled.limit,
570 fusable_filters: compiled.fusable_filters.clone(),
571 residual_filters: compiled.residual_filters.clone(),
572 attribution_requested: compiled.attribution_requested,
573 });
574 let plan = CompiledSearchPlan {
575 strict: compiled.clone(),
576 relaxed,
577 was_degraded_at_plan_time,
578 };
579 self.execute_compiled_search_plan(&plan)
580 }
581
582 pub fn execute_compiled_search_plan(
601 &self,
602 plan: &CompiledSearchPlan,
603 ) -> Result<SearchRows, EngineError> {
604 let strict = &plan.strict;
605 let limit = strict.limit;
606 let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
607
608 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
609 let strict_underfilled = strict_hits.len() < fallback_threshold;
610
611 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
612 let mut fallback_used = false;
613 let mut was_degraded = false;
614 if let Some(relaxed) = plan.relaxed.as_ref()
615 && strict_underfilled
616 {
617 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
618 fallback_used = true;
619 was_degraded = plan.was_degraded_at_plan_time;
620 }
621
622 let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
623 if strict.attribution_requested {
627 let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
628 self.populate_attribution_for_hits(
629 &mut merged,
630 &strict.text_query,
631 relaxed_text_query,
632 )?;
633 }
634 let strict_hit_count = merged
635 .iter()
636 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
637 .count();
638 let relaxed_hit_count = merged
639 .iter()
640 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
641 .count();
642 let vector_hit_count = 0;
646
647 Ok(SearchRows {
648 hits: merged,
649 strict_hit_count,
650 relaxed_hit_count,
651 vector_hit_count,
652 fallback_used,
653 was_degraded,
654 })
655 }
656
657 #[allow(clippy::too_many_lines)]
686 pub fn execute_compiled_vector_search(
687 &self,
688 compiled: &CompiledVectorSearch,
689 ) -> Result<SearchRows, EngineError> {
690 use std::fmt::Write as _;
691
692 if compiled.limit == 0 {
696 return Ok(SearchRows::default());
697 }
698
699 let filter_by_kind = !compiled.root_kind.is_empty();
700 let mut binds: Vec<BindValue> = Vec::new();
701 binds.push(BindValue::Text(compiled.query_text.clone()));
702 if filter_by_kind {
703 binds.push(BindValue::Text(compiled.root_kind.clone()));
704 }
705
706 let mut fused_clauses = String::new();
709 for predicate in &compiled.fusable_filters {
710 match predicate {
711 Predicate::KindEq(kind) => {
712 binds.push(BindValue::Text(kind.clone()));
713 let idx = binds.len();
714 let _ = write!(
715 fused_clauses,
716 "\n AND src.kind = ?{idx}"
717 );
718 }
719 Predicate::LogicalIdEq(logical_id) => {
720 binds.push(BindValue::Text(logical_id.clone()));
721 let idx = binds.len();
722 let _ = write!(
723 fused_clauses,
724 "\n AND src.logical_id = ?{idx}"
725 );
726 }
727 Predicate::SourceRefEq(source_ref) => {
728 binds.push(BindValue::Text(source_ref.clone()));
729 let idx = binds.len();
730 let _ = write!(
731 fused_clauses,
732 "\n AND src.source_ref = ?{idx}"
733 );
734 }
735 Predicate::ContentRefEq(uri) => {
736 binds.push(BindValue::Text(uri.clone()));
737 let idx = binds.len();
738 let _ = write!(
739 fused_clauses,
740 "\n AND src.content_ref = ?{idx}"
741 );
742 }
743 Predicate::ContentRefNotNull => {
744 fused_clauses
745 .push_str("\n AND src.content_ref IS NOT NULL");
746 }
747 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
748 }
751 }
752 }
753
754 let mut filter_clauses = String::new();
756 for predicate in &compiled.residual_filters {
757 match predicate {
758 Predicate::JsonPathEq { path, value } => {
759 binds.push(BindValue::Text(path.clone()));
760 let path_idx = binds.len();
761 binds.push(scalar_to_bind(value));
762 let value_idx = binds.len();
763 let _ = write!(
764 filter_clauses,
765 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
766 );
767 }
768 Predicate::JsonPathCompare { path, op, value } => {
769 binds.push(BindValue::Text(path.clone()));
770 let path_idx = binds.len();
771 binds.push(scalar_to_bind(value));
772 let value_idx = binds.len();
773 let operator = match op {
774 ComparisonOp::Gt => ">",
775 ComparisonOp::Gte => ">=",
776 ComparisonOp::Lt => "<",
777 ComparisonOp::Lte => "<=",
778 };
779 let _ = write!(
780 filter_clauses,
781 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
782 );
783 }
784 Predicate::KindEq(_)
785 | Predicate::LogicalIdEq(_)
786 | Predicate::SourceRefEq(_)
787 | Predicate::ContentRefEq(_)
788 | Predicate::ContentRefNotNull => {
789 }
791 }
792 }
793
794 let limit = compiled.limit;
797 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
798 let limit_idx = binds.len();
799
800 let base_limit = limit;
806 let kind_clause = if filter_by_kind {
807 "\n AND src.kind = ?2"
808 } else {
809 ""
810 };
811
812 let sql = format!(
813 "WITH vector_hits AS (
814 SELECT
815 src.row_id AS row_id,
816 src.logical_id AS logical_id,
817 src.kind AS kind,
818 src.properties AS properties,
819 src.source_ref AS source_ref,
820 src.content_ref AS content_ref,
821 src.created_at AS created_at,
822 vc.distance AS distance,
823 vc.chunk_id AS chunk_id
824 FROM (
825 SELECT chunk_id, distance
826 FROM vec_nodes_active
827 WHERE embedding MATCH ?1
828 LIMIT {base_limit}
829 ) vc
830 JOIN chunks c ON c.id = vc.chunk_id
831 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
832 WHERE 1 = 1{kind_clause}{fused_clauses}
833 )
834 SELECT
835 h.row_id,
836 h.logical_id,
837 h.kind,
838 h.properties,
839 h.content_ref,
840 am.last_accessed_at,
841 h.created_at,
842 h.distance,
843 h.chunk_id
844 FROM vector_hits h
845 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
846 WHERE 1 = 1{filter_clauses}
847 ORDER BY h.distance ASC
848 LIMIT ?{limit_idx}"
849 );
850
851 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
852
853 let conn_guard = match self.lock_connection() {
854 Ok(g) => g,
855 Err(e) => {
856 self.telemetry.increment_errors();
857 return Err(e);
858 }
859 };
860 let mut statement = match conn_guard.prepare_cached(&sql) {
861 Ok(stmt) => stmt,
862 Err(e) if is_vec_table_absent(&e) => {
863 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
865 trace_warn!("vector table absent, degrading vector_search to empty result");
866 }
867 return Ok(SearchRows {
868 hits: Vec::new(),
869 strict_hit_count: 0,
870 relaxed_hit_count: 0,
871 vector_hit_count: 0,
872 fallback_used: false,
873 was_degraded: true,
874 });
875 }
876 Err(e) => {
877 self.telemetry.increment_errors();
878 return Err(EngineError::Sqlite(e));
879 }
880 };
881
882 let attribution_requested = compiled.attribution_requested;
883 let hits = match statement
884 .query_map(params_from_iter(bind_values.iter()), |row| {
885 let distance: f64 = row.get(7)?;
886 let score = -distance;
893 Ok(SearchHit {
894 node: fathomdb_query::NodeRowLite {
895 row_id: row.get(0)?,
896 logical_id: row.get(1)?,
897 kind: row.get(2)?,
898 properties: row.get(3)?,
899 content_ref: row.get(4)?,
900 last_accessed_at: row.get(5)?,
901 },
902 written_at: row.get(6)?,
903 score,
904 modality: RetrievalModality::Vector,
905 source: SearchHitSource::Vector,
906 match_mode: None,
908 snippet: None,
910 projection_row_id: row.get::<_, Option<String>>(8)?,
911 vector_distance: Some(distance),
912 attribution: if attribution_requested {
913 Some(HitAttribution {
914 matched_paths: Vec::new(),
915 })
916 } else {
917 None
918 },
919 })
920 })
921 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
922 {
923 Ok(rows) => rows,
924 Err(e) => {
925 if is_vec_table_absent(&e) {
929 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
930 trace_warn!(
931 "vector table absent at query time, degrading vector_search to empty result"
932 );
933 }
934 drop(statement);
935 drop(conn_guard);
936 return Ok(SearchRows {
937 hits: Vec::new(),
938 strict_hit_count: 0,
939 relaxed_hit_count: 0,
940 vector_hit_count: 0,
941 fallback_used: false,
942 was_degraded: true,
943 });
944 }
945 self.telemetry.increment_errors();
946 return Err(EngineError::Sqlite(e));
947 }
948 };
949
950 drop(statement);
951 drop(conn_guard);
952
953 self.telemetry.increment_queries();
954 let vector_hit_count = hits.len();
955 Ok(SearchRows {
956 hits,
957 strict_hit_count: 0,
958 relaxed_hit_count: 0,
959 vector_hit_count,
960 fallback_used: false,
961 was_degraded: false,
962 })
963 }
964
965 pub fn execute_retrieval_plan(
997 &self,
998 plan: &CompiledRetrievalPlan,
999 raw_query: &str,
1000 ) -> Result<SearchRows, EngineError> {
1001 let mut plan = plan.clone();
1007 let limit = plan.text.strict.limit;
1008
1009 let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
1011
1012 let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
1015 let strict_underfilled = strict_hits.len() < fallback_threshold;
1016 let mut relaxed_hits: Vec<SearchHit> = Vec::new();
1017 let mut fallback_used = false;
1018 let mut was_degraded = false;
1019 if let Some(relaxed) = plan.text.relaxed.as_ref()
1020 && strict_underfilled
1021 {
1022 relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
1023 fallback_used = true;
1024 was_degraded = plan.was_degraded_at_plan_time;
1025 }
1026
1027 let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
1034 if text_branches_empty && self.query_embedder.is_some() {
1035 self.fill_vector_branch(&mut plan, raw_query);
1036 }
1037
1038 let mut vector_hits: Vec<SearchHit> = Vec::new();
1043 if let Some(vector) = plan.vector.as_ref()
1044 && strict_hits.is_empty()
1045 && relaxed_hits.is_empty()
1046 {
1047 let vector_rows = self.execute_compiled_vector_search(vector)?;
1048 vector_hits = vector_rows.hits;
1053 if vector_rows.was_degraded {
1054 was_degraded = true;
1055 }
1056 }
1057 if text_branches_empty
1064 && plan.was_degraded_at_plan_time
1065 && plan.vector.is_none()
1066 && self.query_embedder.is_some()
1067 {
1068 was_degraded = true;
1069 }
1070
1071 let strict = &plan.text.strict;
1073 let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
1074 if strict.attribution_requested {
1075 let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
1076 self.populate_attribution_for_hits(
1077 &mut merged,
1078 &strict.text_query,
1079 relaxed_text_query,
1080 )?;
1081 }
1082
1083 let strict_hit_count = merged
1084 .iter()
1085 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
1086 .count();
1087 let relaxed_hit_count = merged
1088 .iter()
1089 .filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
1090 .count();
1091 let vector_hit_count = merged
1092 .iter()
1093 .filter(|h| matches!(h.modality, RetrievalModality::Vector))
1094 .count();
1095
1096 Ok(SearchRows {
1097 hits: merged,
1098 strict_hit_count,
1099 relaxed_hit_count,
1100 vector_hit_count,
1101 fallback_used,
1102 was_degraded,
1103 })
1104 }
1105
1106 fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
1122 let Some(embedder) = self.query_embedder.as_ref() else {
1123 return;
1124 };
1125 match embedder.embed_query(raw_query) {
1126 Ok(vec) => {
1127 let literal = match serde_json::to_string(&vec) {
1133 Ok(s) => s,
1134 Err(err) => {
1135 trace_warn!(
1136 error = %err,
1137 "query embedder vector serialization failed; skipping vector branch"
1138 );
1139 let _ = err; plan.was_degraded_at_plan_time = true;
1141 return;
1142 }
1143 };
1144 let strict = &plan.text.strict;
1145 plan.vector = Some(CompiledVectorSearch {
1146 root_kind: strict.root_kind.clone(),
1147 query_text: literal,
1148 limit: strict.limit,
1149 fusable_filters: strict.fusable_filters.clone(),
1150 residual_filters: strict.residual_filters.clone(),
1151 attribution_requested: strict.attribution_requested,
1152 });
1153 }
1154 Err(err) => {
1155 trace_warn!(
1156 error = %err,
1157 "query embedder unavailable, skipping vector branch"
1158 );
1159 let _ = err; plan.was_degraded_at_plan_time = true;
1161 }
1162 }
1163 }
1164
1165 #[allow(clippy::too_many_lines)]
1174 fn run_search_branch(
1175 &self,
1176 compiled: &CompiledSearch,
1177 branch: SearchBranch,
1178 ) -> Result<Vec<SearchHit>, EngineError> {
1179 use std::fmt::Write as _;
1180 if matches!(
1192 compiled.text_query,
1193 fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
1194 ) {
1195 return Ok(Vec::new());
1196 }
1197 let rendered = render_text_query_fts5(&compiled.text_query);
1198 let filter_by_kind = !compiled.root_kind.is_empty();
1204 let mut binds: Vec<BindValue> = if filter_by_kind {
1205 vec![
1206 BindValue::Text(rendered.clone()),
1207 BindValue::Text(compiled.root_kind.clone()),
1208 BindValue::Text(rendered),
1209 BindValue::Text(compiled.root_kind.clone()),
1210 ]
1211 } else {
1212 vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
1213 };
1214
1215 let mut fused_clauses = String::new();
1224 for predicate in &compiled.fusable_filters {
1225 match predicate {
1226 Predicate::KindEq(kind) => {
1227 binds.push(BindValue::Text(kind.clone()));
1228 let idx = binds.len();
1229 let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
1230 }
1231 Predicate::LogicalIdEq(logical_id) => {
1232 binds.push(BindValue::Text(logical_id.clone()));
1233 let idx = binds.len();
1234 let _ = write!(
1235 fused_clauses,
1236 "\n AND u.logical_id = ?{idx}"
1237 );
1238 }
1239 Predicate::SourceRefEq(source_ref) => {
1240 binds.push(BindValue::Text(source_ref.clone()));
1241 let idx = binds.len();
1242 let _ = write!(
1243 fused_clauses,
1244 "\n AND u.source_ref = ?{idx}"
1245 );
1246 }
1247 Predicate::ContentRefEq(uri) => {
1248 binds.push(BindValue::Text(uri.clone()));
1249 let idx = binds.len();
1250 let _ = write!(
1251 fused_clauses,
1252 "\n AND u.content_ref = ?{idx}"
1253 );
1254 }
1255 Predicate::ContentRefNotNull => {
1256 fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
1257 }
1258 Predicate::JsonPathEq { .. } | Predicate::JsonPathCompare { .. } => {
1259 }
1262 }
1263 }
1264
1265 let mut filter_clauses = String::new();
1266 for predicate in &compiled.residual_filters {
1267 match predicate {
1268 Predicate::JsonPathEq { path, value } => {
1269 binds.push(BindValue::Text(path.clone()));
1270 let path_idx = binds.len();
1271 binds.push(scalar_to_bind(value));
1272 let value_idx = binds.len();
1273 let _ = write!(
1274 filter_clauses,
1275 "\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
1276 );
1277 }
1278 Predicate::JsonPathCompare { path, op, value } => {
1279 binds.push(BindValue::Text(path.clone()));
1280 let path_idx = binds.len();
1281 binds.push(scalar_to_bind(value));
1282 let value_idx = binds.len();
1283 let operator = match op {
1284 ComparisonOp::Gt => ">",
1285 ComparisonOp::Gte => ">=",
1286 ComparisonOp::Lt => "<",
1287 ComparisonOp::Lte => "<=",
1288 };
1289 let _ = write!(
1290 filter_clauses,
1291 "\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
1292 );
1293 }
1294 Predicate::KindEq(_)
1295 | Predicate::LogicalIdEq(_)
1296 | Predicate::SourceRefEq(_)
1297 | Predicate::ContentRefEq(_)
1298 | Predicate::ContentRefNotNull => {
1299 }
1302 }
1303 }
1304
1305 let limit = compiled.limit;
1312 binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
1313 let limit_idx = binds.len();
1314 let (chunk_fts_bind, chunk_kind_clause, prop_fts_bind, prop_kind_clause) = if filter_by_kind
1324 {
1325 (
1326 "?1",
1327 "\n AND src.kind = ?2",
1328 "?3",
1329 "\n AND fp.kind = ?4",
1330 )
1331 } else {
1332 ("?1", "", "?2", "")
1333 };
1334 let sql = format!(
1335 "WITH search_hits AS (
1336 SELECT
1337 u.row_id AS row_id,
1338 u.logical_id AS logical_id,
1339 u.kind AS kind,
1340 u.properties AS properties,
1341 u.source_ref AS source_ref,
1342 u.content_ref AS content_ref,
1343 u.created_at AS created_at,
1344 u.score AS score,
1345 u.source AS source,
1346 u.snippet AS snippet,
1347 u.projection_row_id AS projection_row_id
1348 FROM (
1349 SELECT
1350 src.row_id AS row_id,
1351 c.node_logical_id AS logical_id,
1352 src.kind AS kind,
1353 src.properties AS properties,
1354 src.source_ref AS source_ref,
1355 src.content_ref AS content_ref,
1356 src.created_at AS created_at,
1357 -bm25(fts_nodes) AS score,
1358 'chunk' AS source,
1359 snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
1360 f.chunk_id AS projection_row_id
1361 FROM fts_nodes f
1362 JOIN chunks c ON c.id = f.chunk_id
1363 JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
1364 WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}
1365 UNION ALL
1366 SELECT
1367 src.row_id AS row_id,
1368 fp.node_logical_id AS logical_id,
1369 src.kind AS kind,
1370 src.properties AS properties,
1371 src.source_ref AS source_ref,
1372 src.content_ref AS content_ref,
1373 src.created_at AS created_at,
1374 -bm25(fts_node_properties) AS score,
1375 'property' AS source,
1376 substr(fp.text_content, 1, 200) AS snippet,
1377 CAST(fp.rowid AS TEXT) AS projection_row_id
1378 FROM fts_node_properties fp
1379 JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
1380 WHERE fts_node_properties MATCH {prop_fts_bind}{prop_kind_clause}
1381 ) u
1382 WHERE 1 = 1{fused_clauses}
1383 ORDER BY u.score DESC
1384 LIMIT ?{limit_idx}
1385 )
1386 SELECT
1387 h.row_id,
1388 h.logical_id,
1389 h.kind,
1390 h.properties,
1391 h.content_ref,
1392 am.last_accessed_at,
1393 h.created_at,
1394 h.score,
1395 h.source,
1396 h.snippet,
1397 h.projection_row_id
1398 FROM search_hits h
1399 LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
1400 WHERE 1 = 1{filter_clauses}
1401 ORDER BY h.score DESC"
1402 );
1403
1404 let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
1405
1406 let conn_guard = match self.lock_connection() {
1407 Ok(g) => g,
1408 Err(e) => {
1409 self.telemetry.increment_errors();
1410 return Err(e);
1411 }
1412 };
1413 let mut statement = match conn_guard.prepare_cached(&sql) {
1414 Ok(stmt) => stmt,
1415 Err(e) => {
1416 self.telemetry.increment_errors();
1417 return Err(EngineError::Sqlite(e));
1418 }
1419 };
1420
1421 let hits = match statement
1422 .query_map(params_from_iter(bind_values.iter()), |row| {
1423 let source_str: String = row.get(8)?;
1424 let source = if source_str == "property" {
1429 SearchHitSource::Property
1430 } else {
1431 SearchHitSource::Chunk
1432 };
1433 let match_mode = match branch {
1434 SearchBranch::Strict => SearchMatchMode::Strict,
1435 SearchBranch::Relaxed => SearchMatchMode::Relaxed,
1436 };
1437 Ok(SearchHit {
1438 node: fathomdb_query::NodeRowLite {
1439 row_id: row.get(0)?,
1440 logical_id: row.get(1)?,
1441 kind: row.get(2)?,
1442 properties: row.get(3)?,
1443 content_ref: row.get(4)?,
1444 last_accessed_at: row.get(5)?,
1445 },
1446 written_at: row.get(6)?,
1447 score: row.get(7)?,
1448 modality: RetrievalModality::Text,
1450 source,
1451 match_mode: Some(match_mode),
1452 snippet: row.get(9)?,
1453 projection_row_id: row.get(10)?,
1454 vector_distance: None,
1455 attribution: None,
1456 })
1457 })
1458 .and_then(Iterator::collect::<Result<Vec<_>, _>>)
1459 {
1460 Ok(rows) => rows,
1461 Err(e) => {
1462 self.telemetry.increment_errors();
1463 return Err(EngineError::Sqlite(e));
1464 }
1465 };
1466
1467 drop(statement);
1471 drop(conn_guard);
1472
1473 self.telemetry.increment_queries();
1474 Ok(hits)
1475 }
1476
1477 fn populate_attribution_for_hits(
1481 &self,
1482 hits: &mut [SearchHit],
1483 strict_text_query: &fathomdb_query::TextQuery,
1484 relaxed_text_query: Option<&fathomdb_query::TextQuery>,
1485 ) -> Result<(), EngineError> {
1486 let conn_guard = match self.lock_connection() {
1487 Ok(g) => g,
1488 Err(e) => {
1489 self.telemetry.increment_errors();
1490 return Err(e);
1491 }
1492 };
1493 let strict_expr = render_text_query_fts5(strict_text_query);
1494 let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
1495 for hit in hits.iter_mut() {
1496 let match_expr = match hit.match_mode {
1501 Some(SearchMatchMode::Strict) => strict_expr.as_str(),
1502 Some(SearchMatchMode::Relaxed) => {
1503 relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
1504 }
1505 None => continue,
1506 };
1507 match resolve_hit_attribution(&conn_guard, hit, match_expr) {
1508 Ok(att) => hit.attribution = Some(att),
1509 Err(e) => {
1510 self.telemetry.increment_errors();
1511 return Err(e);
1512 }
1513 }
1514 }
1515 Ok(())
1516 }
1517
1518 pub fn execute_compiled_grouped_read(
1522 &self,
1523 compiled: &CompiledGroupedQuery,
1524 ) -> Result<GroupedQueryRows, EngineError> {
1525 let root_rows = self.execute_compiled_read(&compiled.root)?;
1526 if root_rows.was_degraded {
1527 return Ok(GroupedQueryRows {
1528 roots: Vec::new(),
1529 expansions: Vec::new(),
1530 was_degraded: true,
1531 });
1532 }
1533
1534 let roots = root_rows.nodes;
1535 let mut expansions = Vec::with_capacity(compiled.expansions.len());
1536 for expansion in &compiled.expansions {
1537 let slot_rows = if roots.is_empty() {
1538 Vec::new()
1539 } else {
1540 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
1541 };
1542 expansions.push(ExpansionSlotRows {
1543 slot: expansion.slot.clone(),
1544 roots: slot_rows,
1545 });
1546 }
1547
1548 Ok(GroupedQueryRows {
1549 roots,
1550 expansions,
1551 was_degraded: false,
1552 })
1553 }
1554
1555 fn read_expansion_nodes_chunked(
1561 &self,
1562 roots: &[NodeRow],
1563 expansion: &ExpansionSlot,
1564 hard_limit: usize,
1565 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1566 if roots.len() <= BATCH_CHUNK_SIZE {
1567 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
1568 }
1569
1570 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1573 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
1574 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
1575 per_root
1576 .entry(group.root_logical_id)
1577 .or_default()
1578 .extend(group.nodes);
1579 }
1580 }
1581
1582 Ok(roots
1583 .iter()
1584 .map(|root| ExpansionRootRows {
1585 root_logical_id: root.logical_id.clone(),
1586 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1587 })
1588 .collect())
1589 }
1590
1591 fn read_expansion_nodes_batched(
1596 &self,
1597 roots: &[NodeRow],
1598 expansion: &ExpansionSlot,
1599 hard_limit: usize,
1600 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
1601 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
1602 let (join_condition, next_logical_id) = match expansion.direction {
1603 fathomdb_query::TraverseDirection::Out => {
1604 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
1605 }
1606 fathomdb_query::TraverseDirection::In => {
1607 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
1608 }
1609 };
1610
1611 let root_seed_union: String = (1..=root_ids.len())
1615 .map(|i| format!("SELECT ?{i}"))
1616 .collect::<Vec<_>>()
1617 .join(" UNION ALL ");
1618
1619 let sql = format!(
1623 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
1624 traversed(root_id, logical_id, depth, visited, emitted) AS (
1625 SELECT rid, rid, 0, printf(',%s,', rid), 0
1626 FROM root_ids
1627 UNION ALL
1628 SELECT
1629 t.root_id,
1630 {next_logical_id},
1631 t.depth + 1,
1632 t.visited || {next_logical_id} || ',',
1633 t.emitted + 1
1634 FROM traversed t
1635 JOIN edges e ON {join_condition}
1636 AND e.kind = ?{edge_kind_param}
1637 AND e.superseded_at IS NULL
1638 WHERE t.depth < {max_depth}
1639 AND t.emitted < {hard_limit}
1640 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
1641 ),
1642 numbered AS (
1643 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
1644 , n.content_ref, am.last_accessed_at
1645 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
1646 FROM traversed t
1647 JOIN nodes n ON n.logical_id = t.logical_id
1648 AND n.superseded_at IS NULL
1649 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
1650 WHERE t.depth > 0
1651 )
1652 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
1653 FROM numbered
1654 WHERE rn <= {hard_limit}
1655 ORDER BY root_id, logical_id",
1656 edge_kind_param = root_ids.len() + 1,
1657 max_depth = expansion.max_depth,
1658 );
1659
1660 let conn_guard = self.lock_connection()?;
1661 let mut statement = conn_guard
1662 .prepare_cached(&sql)
1663 .map_err(EngineError::Sqlite)?;
1664
1665 let mut bind_values: Vec<Value> = root_ids
1667 .iter()
1668 .map(|id| Value::Text((*id).to_owned()))
1669 .collect();
1670 bind_values.push(Value::Text(expansion.label.clone()));
1671
1672 let rows = statement
1673 .query_map(params_from_iter(bind_values.iter()), |row| {
1674 Ok((
1675 row.get::<_, String>(0)?, NodeRow {
1677 row_id: row.get(1)?,
1678 logical_id: row.get(2)?,
1679 kind: row.get(3)?,
1680 properties: row.get(4)?,
1681 content_ref: row.get(5)?,
1682 last_accessed_at: row.get(6)?,
1683 },
1684 ))
1685 })
1686 .map_err(EngineError::Sqlite)?
1687 .collect::<Result<Vec<_>, _>>()
1688 .map_err(EngineError::Sqlite)?;
1689
1690 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
1692 for (root_id, node) in rows {
1693 per_root.entry(root_id).or_default().push(node);
1694 }
1695
1696 let root_groups = roots
1697 .iter()
1698 .map(|root| ExpansionRootRows {
1699 root_logical_id: root.logical_id.clone(),
1700 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
1701 })
1702 .collect();
1703
1704 Ok(root_groups)
1705 }
1706
1707 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
1713 let conn = self.lock_connection()?;
1714 conn.query_row(
1715 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
1716 rusqlite::params![id],
1717 |row| {
1718 Ok(RunRow {
1719 id: row.get(0)?,
1720 kind: row.get(1)?,
1721 status: row.get(2)?,
1722 properties: row.get(3)?,
1723 })
1724 },
1725 )
1726 .optional()
1727 .map_err(EngineError::Sqlite)
1728 }
1729
1730 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
1736 let conn = self.lock_connection()?;
1737 conn.query_row(
1738 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
1739 rusqlite::params![id],
1740 |row| {
1741 Ok(StepRow {
1742 id: row.get(0)?,
1743 run_id: row.get(1)?,
1744 kind: row.get(2)?,
1745 status: row.get(3)?,
1746 properties: row.get(4)?,
1747 })
1748 },
1749 )
1750 .optional()
1751 .map_err(EngineError::Sqlite)
1752 }
1753
1754 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
1760 let conn = self.lock_connection()?;
1761 conn.query_row(
1762 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
1763 rusqlite::params![id],
1764 |row| {
1765 Ok(ActionRow {
1766 id: row.get(0)?,
1767 step_id: row.get(1)?,
1768 kind: row.get(2)?,
1769 status: row.get(3)?,
1770 properties: row.get(4)?,
1771 })
1772 },
1773 )
1774 .optional()
1775 .map_err(EngineError::Sqlite)
1776 }
1777
1778 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
1784 let conn = self.lock_connection()?;
1785 let mut stmt = conn
1786 .prepare_cached(
1787 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
1788 )
1789 .map_err(EngineError::Sqlite)?;
1790 let rows = stmt
1791 .query_map([], |row| {
1792 Ok(RunRow {
1793 id: row.get(0)?,
1794 kind: row.get(1)?,
1795 status: row.get(2)?,
1796 properties: row.get(3)?,
1797 })
1798 })
1799 .map_err(EngineError::Sqlite)?
1800 .collect::<Result<Vec<_>, _>>()
1801 .map_err(EngineError::Sqlite)?;
1802 Ok(rows)
1803 }
1804
1805 #[must_use]
1815 #[allow(clippy::expect_used)]
1816 pub fn shape_sql_count(&self) -> usize {
1817 self.shape_sql_map
1818 .lock()
1819 .unwrap_or_else(PoisonError::into_inner)
1820 .len()
1821 }
1822
1823 #[must_use]
1825 pub fn schema_manager(&self) -> Arc<SchemaManager> {
1826 Arc::clone(&self.schema_manager)
1827 }
1828
1829 #[must_use]
1838 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
1839 let cache_hit = self
1840 .shape_sql_map
1841 .lock()
1842 .unwrap_or_else(PoisonError::into_inner)
1843 .contains_key(&compiled.shape_hash);
1844 QueryPlan {
1845 sql: wrap_node_row_projection_sql(&compiled.sql),
1846 bind_count: compiled.binds.len(),
1847 driving_table: compiled.driving_table,
1848 shape_hash: compiled.shape_hash,
1849 cache_hit,
1850 }
1851 }
1852
1853 #[doc(hidden)]
1860 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
1861 let conn = self.lock_connection()?;
1862 let result = conn
1863 .query_row(&format!("PRAGMA {name}"), [], |row| {
1864 row.get::<_, rusqlite::types::Value>(0)
1866 })
1867 .map_err(EngineError::Sqlite)?;
1868 let s = match result {
1869 rusqlite::types::Value::Text(t) => t,
1870 rusqlite::types::Value::Integer(i) => i.to_string(),
1871 rusqlite::types::Value::Real(f) => f.to_string(),
1872 rusqlite::types::Value::Blob(_) => {
1873 return Err(EngineError::InvalidWrite(format!(
1874 "PRAGMA {name} returned an unexpected BLOB value"
1875 )));
1876 }
1877 rusqlite::types::Value::Null => String::new(),
1878 };
1879 Ok(s)
1880 }
1881
1882 pub fn query_provenance_events(
1891 &self,
1892 subject: &str,
1893 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
1894 let conn = self.lock_connection()?;
1895 let mut stmt = conn
1896 .prepare_cached(
1897 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
1898 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
1899 )
1900 .map_err(EngineError::Sqlite)?;
1901 let events = stmt
1902 .query_map(rusqlite::params![subject], |row| {
1903 Ok(ProvenanceEvent {
1904 id: row.get(0)?,
1905 event_type: row.get(1)?,
1906 subject: row.get(2)?,
1907 source_ref: row.get(3)?,
1908 metadata_json: row.get(4)?,
1909 created_at: row.get(5)?,
1910 })
1911 })
1912 .map_err(EngineError::Sqlite)?
1913 .collect::<Result<Vec<_>, _>>()
1914 .map_err(EngineError::Sqlite)?;
1915 Ok(events)
1916 }
1917}
1918
1919fn wrap_node_row_projection_sql(base_sql: &str) -> String {
1920 format!(
1921 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
1922 FROM ({base_sql}) q \
1923 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
1924 )
1925}
1926
1927pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
1930 match err {
1931 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
1932 msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
1933 }
1934 _ => false,
1935 }
1936}
1937
1938fn scalar_to_bind(value: &ScalarValue) -> BindValue {
1939 match value {
1940 ScalarValue::Text(text) => BindValue::Text(text.clone()),
1941 ScalarValue::Integer(integer) => BindValue::Integer(*integer),
1942 ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
1943 }
1944}
1945
1946fn merge_search_branches(
1964 strict: Vec<SearchHit>,
1965 relaxed: Vec<SearchHit>,
1966 limit: usize,
1967) -> Vec<SearchHit> {
1968 merge_search_branches_three(strict, relaxed, Vec::new(), limit)
1969}
1970
1971fn merge_search_branches_three(
1983 strict: Vec<SearchHit>,
1984 relaxed: Vec<SearchHit>,
1985 vector: Vec<SearchHit>,
1986 limit: usize,
1987) -> Vec<SearchHit> {
1988 let strict_block = dedup_branch_hits(strict);
1989 let relaxed_block = dedup_branch_hits(relaxed);
1990 let vector_block = dedup_branch_hits(vector);
1991
1992 let mut seen: std::collections::HashSet<String> = strict_block
1993 .iter()
1994 .map(|h| h.node.logical_id.clone())
1995 .collect();
1996
1997 let mut merged = strict_block;
1998 for hit in relaxed_block {
1999 if seen.insert(hit.node.logical_id.clone()) {
2000 merged.push(hit);
2001 }
2002 }
2003 for hit in vector_block {
2004 if seen.insert(hit.node.logical_id.clone()) {
2005 merged.push(hit);
2006 }
2007 }
2008
2009 if merged.len() > limit {
2010 merged.truncate(limit);
2011 }
2012 merged
2013}
2014
2015fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
2019 hits.sort_by(|a, b| {
2020 b.score
2021 .partial_cmp(&a.score)
2022 .unwrap_or(std::cmp::Ordering::Equal)
2023 .then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
2024 .then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
2025 });
2026
2027 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2028 hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
2029 hits
2030}
2031
2032fn source_priority(source: SearchHitSource) -> u8 {
2033 match source {
2036 SearchHitSource::Chunk => 0,
2037 SearchHitSource::Property => 1,
2038 SearchHitSource::Vector => 2,
2039 }
2040}
2041
2042const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
2060const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
2061
2062fn load_position_map(
2066 conn: &Connection,
2067 logical_id: &str,
2068 kind: &str,
2069) -> Result<Vec<(usize, usize, String)>, EngineError> {
2070 let mut stmt = conn
2071 .prepare_cached(
2072 "SELECT start_offset, end_offset, leaf_path \
2073 FROM fts_node_property_positions \
2074 WHERE node_logical_id = ?1 AND kind = ?2 \
2075 ORDER BY start_offset ASC",
2076 )
2077 .map_err(EngineError::Sqlite)?;
2078 let rows = stmt
2079 .query_map(rusqlite::params![logical_id, kind], |row| {
2080 let start: i64 = row.get(0)?;
2081 let end: i64 = row.get(1)?;
2082 let path: String = row.get(2)?;
2083 let start = usize::try_from(start).unwrap_or(0);
2087 let end = usize::try_from(end).unwrap_or(0);
2088 Ok((start, end, path))
2089 })
2090 .map_err(EngineError::Sqlite)?;
2091 let mut out = Vec::new();
2092 for row in rows {
2093 out.push(row.map_err(EngineError::Sqlite)?);
2094 }
2095 Ok(out)
2096}
2097
2098fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
2105 let mut offsets = Vec::new();
2106 let bytes = wrapped.as_bytes();
2107 let open_bytes = open.as_bytes();
2108 let close_bytes = close.as_bytes();
2109 let mut i = 0usize;
2110 let mut marker_bytes_seen = 0usize;
2113 while i < bytes.len() {
2114 if bytes[i..].starts_with(open_bytes) {
2115 let original_offset = i - marker_bytes_seen;
2118 offsets.push(original_offset);
2119 i += open_bytes.len();
2120 marker_bytes_seen += open_bytes.len();
2121 } else if bytes[i..].starts_with(close_bytes) {
2122 i += close_bytes.len();
2123 marker_bytes_seen += close_bytes.len();
2124 } else {
2125 i += 1;
2126 }
2127 }
2128 offsets
2129}
2130
2131fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
2134 let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
2136 Ok(i) => i,
2137 Err(0) => return None,
2138 Err(i) => i - 1,
2139 };
2140 let (start, end, path) = &positions[idx];
2141 if offset >= *start && offset < *end {
2142 Some(path.as_str())
2143 } else {
2144 None
2145 }
2146}
2147
2148fn resolve_hit_attribution(
2157 conn: &Connection,
2158 hit: &SearchHit,
2159 match_expr: &str,
2160) -> Result<HitAttribution, EngineError> {
2161 if !matches!(hit.source, SearchHitSource::Property) {
2162 return Ok(HitAttribution {
2163 matched_paths: Vec::new(),
2164 });
2165 }
2166 let Some(rowid_str) = hit.projection_row_id.as_deref() else {
2167 return Ok(HitAttribution {
2168 matched_paths: Vec::new(),
2169 });
2170 };
2171 let rowid: i64 = match rowid_str.parse() {
2172 Ok(v) => v,
2173 Err(_) => {
2174 return Ok(HitAttribution {
2175 matched_paths: Vec::new(),
2176 });
2177 }
2178 };
2179
2180 let mut stmt = conn
2184 .prepare_cached(
2185 "SELECT highlight(fts_node_properties, 2, ?1, ?2) \
2186 FROM fts_node_properties \
2187 WHERE rowid = ?3 AND fts_node_properties MATCH ?4",
2188 )
2189 .map_err(EngineError::Sqlite)?;
2190 let wrapped: Option<String> = stmt
2191 .query_row(
2192 rusqlite::params![
2193 ATTRIBUTION_HIGHLIGHT_OPEN,
2194 ATTRIBUTION_HIGHLIGHT_CLOSE,
2195 rowid,
2196 match_expr,
2197 ],
2198 |row| row.get(0),
2199 )
2200 .optional()
2201 .map_err(EngineError::Sqlite)?;
2202 let Some(wrapped) = wrapped else {
2203 return Ok(HitAttribution {
2204 matched_paths: Vec::new(),
2205 });
2206 };
2207
2208 let offsets = parse_highlight_offsets(
2209 &wrapped,
2210 ATTRIBUTION_HIGHLIGHT_OPEN,
2211 ATTRIBUTION_HIGHLIGHT_CLOSE,
2212 );
2213 if offsets.is_empty() {
2214 return Ok(HitAttribution {
2215 matched_paths: Vec::new(),
2216 });
2217 }
2218
2219 let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
2220 if positions.is_empty() {
2221 return Ok(HitAttribution {
2224 matched_paths: Vec::new(),
2225 });
2226 }
2227
2228 let mut matched_paths: Vec<String> = Vec::new();
2229 for offset in offsets {
2230 if let Some(path) = find_leaf_for_offset(&positions, offset)
2231 && !matched_paths.iter().any(|p| p == path)
2232 {
2233 matched_paths.push(path.to_owned());
2234 }
2235 }
2236 Ok(HitAttribution { matched_paths })
2237}
2238
2239fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
2240 match value {
2241 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
2242 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
2243 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
2244 }
2245}
2246
2247#[cfg(test)]
2248#[allow(clippy::expect_used)]
2249mod tests {
2250 use std::panic::{AssertUnwindSafe, catch_unwind};
2251 use std::sync::Arc;
2252
2253 use fathomdb_query::{BindValue, QueryBuilder};
2254 use fathomdb_schema::SchemaManager;
2255 use rusqlite::types::Value;
2256 use tempfile::NamedTempFile;
2257
2258 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
2259
2260 use fathomdb_query::{
2261 NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
2262 };
2263
2264 use super::{
2265 bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
2266 wrap_node_row_projection_sql,
2267 };
2268
2269 fn mk_hit(
2270 logical_id: &str,
2271 score: f64,
2272 match_mode: SearchMatchMode,
2273 source: SearchHitSource,
2274 ) -> SearchHit {
2275 SearchHit {
2276 node: NodeRowLite {
2277 row_id: format!("{logical_id}-row"),
2278 logical_id: logical_id.to_owned(),
2279 kind: "Goal".to_owned(),
2280 properties: "{}".to_owned(),
2281 content_ref: None,
2282 last_accessed_at: None,
2283 },
2284 score,
2285 modality: RetrievalModality::Text,
2286 source,
2287 match_mode: Some(match_mode),
2288 snippet: None,
2289 written_at: 0,
2290 projection_row_id: None,
2291 vector_distance: None,
2292 attribution: None,
2293 }
2294 }
2295
2296 #[test]
2297 fn merge_places_strict_block_before_relaxed_regardless_of_score() {
2298 let strict = vec![mk_hit(
2299 "a",
2300 1.0,
2301 SearchMatchMode::Strict,
2302 SearchHitSource::Chunk,
2303 )];
2304 let relaxed = vec![mk_hit(
2306 "b",
2307 9.9,
2308 SearchMatchMode::Relaxed,
2309 SearchHitSource::Chunk,
2310 )];
2311 let merged = merge_search_branches(strict, relaxed, 10);
2312 assert_eq!(merged.len(), 2);
2313 assert_eq!(merged[0].node.logical_id, "a");
2314 assert!(matches!(
2315 merged[0].match_mode,
2316 Some(SearchMatchMode::Strict)
2317 ));
2318 assert_eq!(merged[1].node.logical_id, "b");
2319 assert!(matches!(
2320 merged[1].match_mode,
2321 Some(SearchMatchMode::Relaxed)
2322 ));
2323 }
2324
2325 #[test]
2326 fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
2327 let strict = vec![mk_hit(
2328 "shared",
2329 1.0,
2330 SearchMatchMode::Strict,
2331 SearchHitSource::Chunk,
2332 )];
2333 let relaxed = vec![
2334 mk_hit(
2335 "shared",
2336 9.9,
2337 SearchMatchMode::Relaxed,
2338 SearchHitSource::Chunk,
2339 ),
2340 mk_hit(
2341 "other",
2342 2.0,
2343 SearchMatchMode::Relaxed,
2344 SearchHitSource::Chunk,
2345 ),
2346 ];
2347 let merged = merge_search_branches(strict, relaxed, 10);
2348 assert_eq!(merged.len(), 2);
2349 assert_eq!(merged[0].node.logical_id, "shared");
2350 assert!(matches!(
2351 merged[0].match_mode,
2352 Some(SearchMatchMode::Strict)
2353 ));
2354 assert_eq!(merged[1].node.logical_id, "other");
2355 assert!(matches!(
2356 merged[1].match_mode,
2357 Some(SearchMatchMode::Relaxed)
2358 ));
2359 }
2360
2361 #[test]
2362 fn merge_sorts_within_block_by_score_desc_then_logical_id() {
2363 let strict = vec![
2364 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2365 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2366 mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2367 ];
2368 let merged = merge_search_branches(strict, vec![], 10);
2369 assert_eq!(
2370 merged
2371 .iter()
2372 .map(|h| &h.node.logical_id)
2373 .collect::<Vec<_>>(),
2374 vec!["a", "c", "b"]
2375 );
2376 }
2377
2378 #[test]
2379 fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
2380 let strict = vec![
2381 mk_hit(
2382 "shared",
2383 1.0,
2384 SearchMatchMode::Strict,
2385 SearchHitSource::Property,
2386 ),
2387 mk_hit(
2388 "shared",
2389 1.0,
2390 SearchMatchMode::Strict,
2391 SearchHitSource::Chunk,
2392 ),
2393 ];
2394 let merged = merge_search_branches(strict, vec![], 10);
2395 assert_eq!(merged.len(), 1);
2396 assert!(matches!(merged[0].source, SearchHitSource::Chunk));
2397 }
2398
2399 #[test]
2400 fn merge_truncates_to_limit_after_block_merge() {
2401 let strict = vec![
2402 mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2403 mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2404 ];
2405 let relaxed = vec![mk_hit(
2406 "c",
2407 9.0,
2408 SearchMatchMode::Relaxed,
2409 SearchHitSource::Chunk,
2410 )];
2411 let merged = merge_search_branches(strict, relaxed, 2);
2412 assert_eq!(merged.len(), 2);
2413 assert_eq!(merged[0].node.logical_id, "a");
2414 assert_eq!(merged[1].node.logical_id, "b");
2415 }
2416
2417 #[test]
2426 fn search_architecturally_supports_three_branch_fusion() {
2427 let strict = vec![mk_hit(
2428 "alpha",
2429 1.0,
2430 SearchMatchMode::Strict,
2431 SearchHitSource::Chunk,
2432 )];
2433 let relaxed = vec![mk_hit(
2434 "bravo",
2435 5.0,
2436 SearchMatchMode::Relaxed,
2437 SearchHitSource::Chunk,
2438 )];
2439 let mut vector_hit = mk_hit(
2442 "charlie",
2443 9.9,
2444 SearchMatchMode::Strict,
2445 SearchHitSource::Vector,
2446 );
2447 vector_hit.match_mode = None;
2451 vector_hit.modality = RetrievalModality::Vector;
2452 let vector = vec![vector_hit];
2453
2454 let merged = merge_search_branches_three(strict, relaxed, vector, 10);
2455 assert_eq!(merged.len(), 3);
2456 assert_eq!(merged[0].node.logical_id, "alpha");
2457 assert_eq!(merged[1].node.logical_id, "bravo");
2458 assert_eq!(merged[2].node.logical_id, "charlie");
2459 assert!(matches!(merged[2].source, SearchHitSource::Vector));
2461
2462 let strict2 = vec![mk_hit(
2465 "shared",
2466 0.5,
2467 SearchMatchMode::Strict,
2468 SearchHitSource::Chunk,
2469 )];
2470 let relaxed2 = vec![mk_hit(
2471 "shared",
2472 5.0,
2473 SearchMatchMode::Relaxed,
2474 SearchHitSource::Chunk,
2475 )];
2476 let mut vshared = mk_hit(
2477 "shared",
2478 9.9,
2479 SearchMatchMode::Strict,
2480 SearchHitSource::Vector,
2481 );
2482 vshared.match_mode = None;
2483 vshared.modality = RetrievalModality::Vector;
2484 let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
2485 assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
2486 assert!(matches!(
2487 merged2[0].match_mode,
2488 Some(SearchMatchMode::Strict)
2489 ));
2490 assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
2491
2492 let mut vshared2 = mk_hit(
2494 "shared",
2495 9.9,
2496 SearchMatchMode::Strict,
2497 SearchHitSource::Vector,
2498 );
2499 vshared2.match_mode = None;
2500 vshared2.modality = RetrievalModality::Vector;
2501 let merged3 = merge_search_branches_three(
2502 vec![],
2503 vec![mk_hit(
2504 "shared",
2505 1.0,
2506 SearchMatchMode::Relaxed,
2507 SearchHitSource::Chunk,
2508 )],
2509 vec![vshared2],
2510 10,
2511 );
2512 assert_eq!(merged3.len(), 1);
2513 assert!(matches!(
2514 merged3[0].match_mode,
2515 Some(SearchMatchMode::Relaxed)
2516 ));
2517 }
2518
2519 #[test]
2533 fn merge_search_branches_three_vector_only_preserves_vector_block() {
2534 let mut vector_hit = mk_hit(
2535 "solo",
2536 0.75,
2537 SearchMatchMode::Strict,
2538 SearchHitSource::Vector,
2539 );
2540 vector_hit.match_mode = None;
2541 vector_hit.modality = RetrievalModality::Vector;
2542
2543 let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
2544
2545 assert_eq!(merged.len(), 1);
2546 assert_eq!(merged[0].node.logical_id, "solo");
2547 assert!(matches!(merged[0].source, SearchHitSource::Vector));
2548 assert!(matches!(merged[0].modality, RetrievalModality::Vector));
2549 assert!(
2550 merged[0].match_mode.is_none(),
2551 "vector hits carry match_mode=None per addendum 1"
2552 );
2553 }
2554
2555 #[test]
2567 fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
2568 let strict = vec![
2569 mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2570 mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2571 mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
2572 ];
2573 let relaxed = vec![mk_hit(
2574 "d",
2575 9.0,
2576 SearchMatchMode::Relaxed,
2577 SearchHitSource::Chunk,
2578 )];
2579 let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
2580 vector_hit.match_mode = None;
2581 vector_hit.modality = RetrievalModality::Vector;
2582 let vector = vec![vector_hit];
2583
2584 let merged = merge_search_branches_three(strict, relaxed, vector, 2);
2585
2586 assert_eq!(merged.len(), 2);
2587 assert_eq!(merged[0].node.logical_id, "a");
2588 assert_eq!(merged[1].node.logical_id, "b");
2589 assert!(
2591 merged
2592 .iter()
2593 .all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
2594 "strict block must win limit contention against higher-scored relaxed/vector hits"
2595 );
2596 assert!(
2597 merged
2598 .iter()
2599 .all(|h| matches!(h.source, SearchHitSource::Chunk)),
2600 "no vector source hits should leak past the limit"
2601 );
2602 }
2603
2604 #[test]
2605 fn is_vec_table_absent_matches_known_error_messages() {
2606 use rusqlite::ffi;
2607 fn make_err(msg: &str) -> rusqlite::Error {
2608 rusqlite::Error::SqliteFailure(
2609 ffi::Error {
2610 code: ffi::ErrorCode::Unknown,
2611 extended_code: 1,
2612 },
2613 Some(msg.to_owned()),
2614 )
2615 }
2616 assert!(is_vec_table_absent(&make_err(
2617 "no such table: vec_nodes_active"
2618 )));
2619 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
2620 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
2621 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
2622 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
2623 }
2624
2625 #[test]
2626 fn bind_value_text_maps_to_sql_text() {
2627 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
2628 assert_eq!(val, Value::Text("hello".to_owned()));
2629 }
2630
2631 #[test]
2632 fn bind_value_integer_maps_to_sql_integer() {
2633 let val = bind_value_to_sql(&BindValue::Integer(42));
2634 assert_eq!(val, Value::Integer(42));
2635 }
2636
2637 #[test]
2638 fn bind_value_bool_true_maps_to_integer_one() {
2639 let val = bind_value_to_sql(&BindValue::Bool(true));
2640 assert_eq!(val, Value::Integer(1));
2641 }
2642
2643 #[test]
2644 fn bind_value_bool_false_maps_to_integer_zero() {
2645 let val = bind_value_to_sql(&BindValue::Bool(false));
2646 assert_eq!(val, Value::Integer(0));
2647 }
2648
2649 #[test]
2650 fn same_shape_queries_share_one_cache_entry() {
2651 let db = NamedTempFile::new().expect("temporary db");
2652 let coordinator = ExecutionCoordinator::open(
2653 db.path(),
2654 Arc::new(SchemaManager::new()),
2655 None,
2656 1,
2657 Arc::new(TelemetryCounters::default()),
2658 None,
2659 )
2660 .expect("coordinator");
2661
2662 let compiled_a = QueryBuilder::nodes("Meeting")
2663 .text_search("budget", 5)
2664 .limit(10)
2665 .compile()
2666 .expect("compiled a");
2667 let compiled_b = QueryBuilder::nodes("Meeting")
2668 .text_search("standup", 5)
2669 .limit(10)
2670 .compile()
2671 .expect("compiled b");
2672
2673 coordinator
2674 .execute_compiled_read(&compiled_a)
2675 .expect("read a");
2676 coordinator
2677 .execute_compiled_read(&compiled_b)
2678 .expect("read b");
2679
2680 assert_eq!(
2681 compiled_a.shape_hash, compiled_b.shape_hash,
2682 "different bind values, same structural shape → same hash"
2683 );
2684 assert_eq!(coordinator.shape_sql_count(), 1);
2685 }
2686
2687 #[test]
2688 fn vector_read_degrades_gracefully_when_vec_table_absent() {
2689 let db = NamedTempFile::new().expect("temporary db");
2690 let coordinator = ExecutionCoordinator::open(
2691 db.path(),
2692 Arc::new(SchemaManager::new()),
2693 None,
2694 1,
2695 Arc::new(TelemetryCounters::default()),
2696 None,
2697 )
2698 .expect("coordinator");
2699
2700 let compiled = QueryBuilder::nodes("Meeting")
2701 .vector_search("budget embeddings", 5)
2702 .compile()
2703 .expect("vector query compiles");
2704
2705 let result = coordinator.execute_compiled_read(&compiled);
2706 let rows = result.expect("degraded read must succeed, not error");
2707 assert!(
2708 rows.was_degraded,
2709 "result must be flagged as degraded when vec_nodes_active is absent"
2710 );
2711 assert!(
2712 rows.nodes.is_empty(),
2713 "degraded result must return empty nodes"
2714 );
2715 }
2716
2717 #[test]
2718 fn coordinator_caches_by_shape_hash() {
2719 let db = NamedTempFile::new().expect("temporary db");
2720 let coordinator = ExecutionCoordinator::open(
2721 db.path(),
2722 Arc::new(SchemaManager::new()),
2723 None,
2724 1,
2725 Arc::new(TelemetryCounters::default()),
2726 None,
2727 )
2728 .expect("coordinator");
2729
2730 let compiled = QueryBuilder::nodes("Meeting")
2731 .text_search("budget", 5)
2732 .compile()
2733 .expect("compiled query");
2734
2735 coordinator
2736 .execute_compiled_read(&compiled)
2737 .expect("execute compiled read");
2738 assert_eq!(coordinator.shape_sql_count(), 1);
2739 }
2740
2741 #[test]
2744 fn explain_returns_correct_sql() {
2745 let db = NamedTempFile::new().expect("temporary db");
2746 let coordinator = ExecutionCoordinator::open(
2747 db.path(),
2748 Arc::new(SchemaManager::new()),
2749 None,
2750 1,
2751 Arc::new(TelemetryCounters::default()),
2752 None,
2753 )
2754 .expect("coordinator");
2755
2756 let compiled = QueryBuilder::nodes("Meeting")
2757 .text_search("budget", 5)
2758 .compile()
2759 .expect("compiled query");
2760
2761 let plan = coordinator.explain_compiled_read(&compiled);
2762
2763 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
2764 }
2765
2766 #[test]
2767 fn explain_returns_correct_driving_table() {
2768 use fathomdb_query::DrivingTable;
2769
2770 let db = NamedTempFile::new().expect("temporary db");
2771 let coordinator = ExecutionCoordinator::open(
2772 db.path(),
2773 Arc::new(SchemaManager::new()),
2774 None,
2775 1,
2776 Arc::new(TelemetryCounters::default()),
2777 None,
2778 )
2779 .expect("coordinator");
2780
2781 let compiled = QueryBuilder::nodes("Meeting")
2782 .text_search("budget", 5)
2783 .compile()
2784 .expect("compiled query");
2785
2786 let plan = coordinator.explain_compiled_read(&compiled);
2787
2788 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
2789 }
2790
2791 #[test]
2792 fn explain_reports_cache_miss_then_hit() {
2793 let db = NamedTempFile::new().expect("temporary db");
2794 let coordinator = ExecutionCoordinator::open(
2795 db.path(),
2796 Arc::new(SchemaManager::new()),
2797 None,
2798 1,
2799 Arc::new(TelemetryCounters::default()),
2800 None,
2801 )
2802 .expect("coordinator");
2803
2804 let compiled = QueryBuilder::nodes("Meeting")
2805 .text_search("budget", 5)
2806 .compile()
2807 .expect("compiled query");
2808
2809 let plan_before = coordinator.explain_compiled_read(&compiled);
2811 assert!(
2812 !plan_before.cache_hit,
2813 "cache miss expected before first execute"
2814 );
2815
2816 coordinator
2818 .execute_compiled_read(&compiled)
2819 .expect("execute read");
2820
2821 let plan_after = coordinator.explain_compiled_read(&compiled);
2823 assert!(
2824 plan_after.cache_hit,
2825 "cache hit expected after first execute"
2826 );
2827 }
2828
2829 #[test]
2830 fn explain_does_not_execute_query() {
2831 let db = NamedTempFile::new().expect("temporary db");
2836 let coordinator = ExecutionCoordinator::open(
2837 db.path(),
2838 Arc::new(SchemaManager::new()),
2839 None,
2840 1,
2841 Arc::new(TelemetryCounters::default()),
2842 None,
2843 )
2844 .expect("coordinator");
2845
2846 let compiled = QueryBuilder::nodes("Meeting")
2847 .text_search("anything", 5)
2848 .compile()
2849 .expect("compiled query");
2850
2851 let plan = coordinator.explain_compiled_read(&compiled);
2853
2854 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
2855 assert_eq!(plan.bind_count, compiled.binds.len());
2856 }
2857
2858 #[test]
2859 fn coordinator_executes_compiled_read() {
2860 let db = NamedTempFile::new().expect("temporary db");
2861 let coordinator = ExecutionCoordinator::open(
2862 db.path(),
2863 Arc::new(SchemaManager::new()),
2864 None,
2865 1,
2866 Arc::new(TelemetryCounters::default()),
2867 None,
2868 )
2869 .expect("coordinator");
2870 let conn = rusqlite::Connection::open(db.path()).expect("open db");
2871
2872 conn.execute_batch(
2873 r#"
2874 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
2875 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
2876 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
2877 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
2878 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
2879 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
2880 "#,
2881 )
2882 .expect("seed data");
2883
2884 let compiled = QueryBuilder::nodes("Meeting")
2885 .text_search("budget", 5)
2886 .limit(5)
2887 .compile()
2888 .expect("compiled query");
2889
2890 let rows = coordinator
2891 .execute_compiled_read(&compiled)
2892 .expect("execute read");
2893
2894 assert_eq!(rows.nodes.len(), 1);
2895 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
2896 }
2897
2898 #[test]
2899 fn text_search_finds_structured_only_node_via_property_fts() {
2900 let db = NamedTempFile::new().expect("temporary db");
2901 let coordinator = ExecutionCoordinator::open(
2902 db.path(),
2903 Arc::new(SchemaManager::new()),
2904 None,
2905 1,
2906 Arc::new(TelemetryCounters::default()),
2907 None,
2908 )
2909 .expect("coordinator");
2910 let conn = rusqlite::Connection::open(db.path()).expect("open db");
2911
2912 conn.execute_batch(
2914 r#"
2915 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2916 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
2917 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
2918 VALUES ('goal-1', 'Goal', 'Ship v2');
2919 "#,
2920 )
2921 .expect("seed data");
2922
2923 let compiled = QueryBuilder::nodes("Goal")
2924 .text_search("Ship", 5)
2925 .limit(5)
2926 .compile()
2927 .expect("compiled query");
2928
2929 let rows = coordinator
2930 .execute_compiled_read(&compiled)
2931 .expect("execute read");
2932
2933 assert_eq!(rows.nodes.len(), 1);
2934 assert_eq!(rows.nodes[0].logical_id, "goal-1");
2935 }
2936
2937 #[test]
2938 fn text_search_returns_both_chunk_and_property_backed_hits() {
2939 let db = NamedTempFile::new().expect("temporary db");
2940 let coordinator = ExecutionCoordinator::open(
2941 db.path(),
2942 Arc::new(SchemaManager::new()),
2943 None,
2944 1,
2945 Arc::new(TelemetryCounters::default()),
2946 None,
2947 )
2948 .expect("coordinator");
2949 let conn = rusqlite::Connection::open(db.path()).expect("open db");
2950
2951 conn.execute_batch(
2953 r"
2954 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2955 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
2956 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
2957 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
2958 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
2959 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
2960 ",
2961 )
2962 .expect("seed chunk-backed node");
2963
2964 conn.execute_batch(
2966 r#"
2967 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
2968 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
2969 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
2970 VALUES ('meeting-2', 'Meeting', 'quarterly sync');
2971 "#,
2972 )
2973 .expect("seed property-backed node");
2974
2975 let compiled = QueryBuilder::nodes("Meeting")
2976 .text_search("quarterly", 10)
2977 .limit(10)
2978 .compile()
2979 .expect("compiled query");
2980
2981 let rows = coordinator
2982 .execute_compiled_read(&compiled)
2983 .expect("execute read");
2984
2985 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
2986 ids.sort_unstable();
2987 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
2988 }
2989
2990 #[test]
2991 fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
2992 let db = NamedTempFile::new().expect("temporary db");
2993 let coordinator = ExecutionCoordinator::open(
2994 db.path(),
2995 Arc::new(SchemaManager::new()),
2996 None,
2997 1,
2998 Arc::new(TelemetryCounters::default()),
2999 None,
3000 )
3001 .expect("coordinator");
3002 let conn = rusqlite::Connection::open(db.path()).expect("open db");
3003
3004 conn.execute_batch(
3005 r"
3006 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
3007 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
3008 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3009 VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
3010 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3011 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
3012 ",
3013 )
3014 .expect("seed chunk-backed node");
3015
3016 let compiled = QueryBuilder::nodes("Meeting")
3017 .text_search("not a ship", 10)
3018 .limit(10)
3019 .compile()
3020 .expect("compiled query");
3021
3022 let rows = coordinator
3023 .execute_compiled_read(&compiled)
3024 .expect("execute read");
3025
3026 assert_eq!(rows.nodes.len(), 1);
3027 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
3028 }
3029
3030 #[test]
3033 fn capability_gate_reports_false_without_feature() {
3034 let db = NamedTempFile::new().expect("temporary db");
3035 let coordinator = ExecutionCoordinator::open(
3038 db.path(),
3039 Arc::new(SchemaManager::new()),
3040 None,
3041 1,
3042 Arc::new(TelemetryCounters::default()),
3043 None,
3044 )
3045 .expect("coordinator");
3046 assert!(
3047 !coordinator.vector_enabled(),
3048 "vector_enabled must be false when no dimension is requested"
3049 );
3050 }
3051
3052 #[cfg(feature = "sqlite-vec")]
3053 #[test]
3054 fn capability_gate_reports_true_when_feature_enabled() {
3055 let db = NamedTempFile::new().expect("temporary db");
3056 let coordinator = ExecutionCoordinator::open(
3057 db.path(),
3058 Arc::new(SchemaManager::new()),
3059 Some(128),
3060 1,
3061 Arc::new(TelemetryCounters::default()),
3062 None,
3063 )
3064 .expect("coordinator");
3065 assert!(
3066 coordinator.vector_enabled(),
3067 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
3068 );
3069 }
3070
3071 #[test]
3074 fn read_run_returns_inserted_run() {
3075 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3076
3077 let db = NamedTempFile::new().expect("temporary db");
3078 let writer = WriterActor::start(
3079 db.path(),
3080 Arc::new(SchemaManager::new()),
3081 ProvenanceMode::Warn,
3082 Arc::new(TelemetryCounters::default()),
3083 )
3084 .expect("writer");
3085 writer
3086 .submit(WriteRequest {
3087 label: "runtime".to_owned(),
3088 nodes: vec![],
3089 node_retires: vec![],
3090 edges: vec![],
3091 edge_retires: vec![],
3092 chunks: vec![],
3093 runs: vec![RunInsert {
3094 id: "run-r1".to_owned(),
3095 kind: "session".to_owned(),
3096 status: "active".to_owned(),
3097 properties: "{}".to_owned(),
3098 source_ref: Some("src-1".to_owned()),
3099 upsert: false,
3100 supersedes_id: None,
3101 }],
3102 steps: vec![],
3103 actions: vec![],
3104 optional_backfills: vec![],
3105 vec_inserts: vec![],
3106 operational_writes: vec![],
3107 })
3108 .expect("write run");
3109
3110 let coordinator = ExecutionCoordinator::open(
3111 db.path(),
3112 Arc::new(SchemaManager::new()),
3113 None,
3114 1,
3115 Arc::new(TelemetryCounters::default()),
3116 None,
3117 )
3118 .expect("coordinator");
3119 let row = coordinator
3120 .read_run("run-r1")
3121 .expect("read_run")
3122 .expect("row exists");
3123 assert_eq!(row.id, "run-r1");
3124 assert_eq!(row.kind, "session");
3125 assert_eq!(row.status, "active");
3126 }
3127
3128 #[test]
3129 fn read_step_returns_inserted_step() {
3130 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
3131
3132 let db = NamedTempFile::new().expect("temporary db");
3133 let writer = WriterActor::start(
3134 db.path(),
3135 Arc::new(SchemaManager::new()),
3136 ProvenanceMode::Warn,
3137 Arc::new(TelemetryCounters::default()),
3138 )
3139 .expect("writer");
3140 writer
3141 .submit(WriteRequest {
3142 label: "runtime".to_owned(),
3143 nodes: vec![],
3144 node_retires: vec![],
3145 edges: vec![],
3146 edge_retires: vec![],
3147 chunks: vec![],
3148 runs: vec![RunInsert {
3149 id: "run-s1".to_owned(),
3150 kind: "session".to_owned(),
3151 status: "active".to_owned(),
3152 properties: "{}".to_owned(),
3153 source_ref: Some("src-1".to_owned()),
3154 upsert: false,
3155 supersedes_id: None,
3156 }],
3157 steps: vec![StepInsert {
3158 id: "step-s1".to_owned(),
3159 run_id: "run-s1".to_owned(),
3160 kind: "llm".to_owned(),
3161 status: "completed".to_owned(),
3162 properties: "{}".to_owned(),
3163 source_ref: Some("src-1".to_owned()),
3164 upsert: false,
3165 supersedes_id: None,
3166 }],
3167 actions: vec![],
3168 optional_backfills: vec![],
3169 vec_inserts: vec![],
3170 operational_writes: vec![],
3171 })
3172 .expect("write step");
3173
3174 let coordinator = ExecutionCoordinator::open(
3175 db.path(),
3176 Arc::new(SchemaManager::new()),
3177 None,
3178 1,
3179 Arc::new(TelemetryCounters::default()),
3180 None,
3181 )
3182 .expect("coordinator");
3183 let row = coordinator
3184 .read_step("step-s1")
3185 .expect("read_step")
3186 .expect("row exists");
3187 assert_eq!(row.id, "step-s1");
3188 assert_eq!(row.run_id, "run-s1");
3189 assert_eq!(row.kind, "llm");
3190 }
3191
3192 #[test]
3193 fn read_action_returns_inserted_action() {
3194 use crate::{
3195 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
3196 writer::{ActionInsert, StepInsert},
3197 };
3198
3199 let db = NamedTempFile::new().expect("temporary db");
3200 let writer = WriterActor::start(
3201 db.path(),
3202 Arc::new(SchemaManager::new()),
3203 ProvenanceMode::Warn,
3204 Arc::new(TelemetryCounters::default()),
3205 )
3206 .expect("writer");
3207 writer
3208 .submit(WriteRequest {
3209 label: "runtime".to_owned(),
3210 nodes: vec![],
3211 node_retires: vec![],
3212 edges: vec![],
3213 edge_retires: vec![],
3214 chunks: vec![],
3215 runs: vec![RunInsert {
3216 id: "run-a1".to_owned(),
3217 kind: "session".to_owned(),
3218 status: "active".to_owned(),
3219 properties: "{}".to_owned(),
3220 source_ref: Some("src-1".to_owned()),
3221 upsert: false,
3222 supersedes_id: None,
3223 }],
3224 steps: vec![StepInsert {
3225 id: "step-a1".to_owned(),
3226 run_id: "run-a1".to_owned(),
3227 kind: "llm".to_owned(),
3228 status: "completed".to_owned(),
3229 properties: "{}".to_owned(),
3230 source_ref: Some("src-1".to_owned()),
3231 upsert: false,
3232 supersedes_id: None,
3233 }],
3234 actions: vec![ActionInsert {
3235 id: "action-a1".to_owned(),
3236 step_id: "step-a1".to_owned(),
3237 kind: "emit".to_owned(),
3238 status: "completed".to_owned(),
3239 properties: "{}".to_owned(),
3240 source_ref: Some("src-1".to_owned()),
3241 upsert: false,
3242 supersedes_id: None,
3243 }],
3244 optional_backfills: vec![],
3245 vec_inserts: vec![],
3246 operational_writes: vec![],
3247 })
3248 .expect("write action");
3249
3250 let coordinator = ExecutionCoordinator::open(
3251 db.path(),
3252 Arc::new(SchemaManager::new()),
3253 None,
3254 1,
3255 Arc::new(TelemetryCounters::default()),
3256 None,
3257 )
3258 .expect("coordinator");
3259 let row = coordinator
3260 .read_action("action-a1")
3261 .expect("read_action")
3262 .expect("row exists");
3263 assert_eq!(row.id, "action-a1");
3264 assert_eq!(row.step_id, "step-a1");
3265 assert_eq!(row.kind, "emit");
3266 }
3267
3268 #[test]
3269 fn read_active_runs_excludes_superseded() {
3270 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
3271
3272 let db = NamedTempFile::new().expect("temporary db");
3273 let writer = WriterActor::start(
3274 db.path(),
3275 Arc::new(SchemaManager::new()),
3276 ProvenanceMode::Warn,
3277 Arc::new(TelemetryCounters::default()),
3278 )
3279 .expect("writer");
3280
3281 writer
3283 .submit(WriteRequest {
3284 label: "v1".to_owned(),
3285 nodes: vec![],
3286 node_retires: vec![],
3287 edges: vec![],
3288 edge_retires: vec![],
3289 chunks: vec![],
3290 runs: vec![RunInsert {
3291 id: "run-v1".to_owned(),
3292 kind: "session".to_owned(),
3293 status: "active".to_owned(),
3294 properties: "{}".to_owned(),
3295 source_ref: Some("src-1".to_owned()),
3296 upsert: false,
3297 supersedes_id: None,
3298 }],
3299 steps: vec![],
3300 actions: vec![],
3301 optional_backfills: vec![],
3302 vec_inserts: vec![],
3303 operational_writes: vec![],
3304 })
3305 .expect("v1 write");
3306
3307 writer
3309 .submit(WriteRequest {
3310 label: "v2".to_owned(),
3311 nodes: vec![],
3312 node_retires: vec![],
3313 edges: vec![],
3314 edge_retires: vec![],
3315 chunks: vec![],
3316 runs: vec![RunInsert {
3317 id: "run-v2".to_owned(),
3318 kind: "session".to_owned(),
3319 status: "completed".to_owned(),
3320 properties: "{}".to_owned(),
3321 source_ref: Some("src-2".to_owned()),
3322 upsert: true,
3323 supersedes_id: Some("run-v1".to_owned()),
3324 }],
3325 steps: vec![],
3326 actions: vec![],
3327 optional_backfills: vec![],
3328 vec_inserts: vec![],
3329 operational_writes: vec![],
3330 })
3331 .expect("v2 write");
3332
3333 let coordinator = ExecutionCoordinator::open(
3334 db.path(),
3335 Arc::new(SchemaManager::new()),
3336 None,
3337 1,
3338 Arc::new(TelemetryCounters::default()),
3339 None,
3340 )
3341 .expect("coordinator");
3342 let active = coordinator.read_active_runs().expect("read_active_runs");
3343
3344 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
3345 assert_eq!(active[0].id, "run-v2");
3346 }
3347
3348 #[allow(clippy::panic)]
3349 fn poison_connection(coordinator: &ExecutionCoordinator) {
3350 let result = catch_unwind(AssertUnwindSafe(|| {
3351 let _guard = coordinator.pool.connections[0]
3352 .lock()
3353 .expect("poison test lock");
3354 panic!("poison coordinator connection mutex");
3355 }));
3356 assert!(
3357 result.is_err(),
3358 "poison test must unwind while holding the connection mutex"
3359 );
3360 }
3361
3362 #[allow(clippy::panic)]
3363 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
3364 where
3365 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
3366 {
3367 match op(coordinator) {
3368 Err(EngineError::Bridge(message)) => {
3369 assert_eq!(message, "connection mutex poisoned");
3370 }
3371 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
3372 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
3373 }
3374 }
3375
3376 #[test]
3377 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
3378 let db = NamedTempFile::new().expect("temporary db");
3379 let coordinator = ExecutionCoordinator::open(
3380 db.path(),
3381 Arc::new(SchemaManager::new()),
3382 None,
3383 1,
3384 Arc::new(TelemetryCounters::default()),
3385 None,
3386 )
3387 .expect("coordinator");
3388
3389 poison_connection(&coordinator);
3390
3391 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
3392 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
3393 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
3394 assert_poisoned_connection_error(
3395 &coordinator,
3396 super::ExecutionCoordinator::read_active_runs,
3397 );
3398 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
3399 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
3400 }
3401
3402 #[test]
3405 fn shape_cache_stays_bounded() {
3406 use fathomdb_query::ShapeHash;
3407
3408 let db = NamedTempFile::new().expect("temporary db");
3409 let coordinator = ExecutionCoordinator::open(
3410 db.path(),
3411 Arc::new(SchemaManager::new()),
3412 None,
3413 1,
3414 Arc::new(TelemetryCounters::default()),
3415 None,
3416 )
3417 .expect("coordinator");
3418
3419 {
3421 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
3422 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
3423 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
3424 }
3425 }
3426 let compiled = QueryBuilder::nodes("Meeting")
3431 .text_search("budget", 5)
3432 .limit(10)
3433 .compile()
3434 .expect("compiled query");
3435
3436 coordinator
3437 .execute_compiled_read(&compiled)
3438 .expect("execute read");
3439
3440 assert!(
3441 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
3442 "shape cache must stay bounded: got {} entries, max {}",
3443 coordinator.shape_sql_count(),
3444 super::MAX_SHAPE_CACHE_SIZE
3445 );
3446 }
3447
3448 #[test]
3451 fn read_pool_size_configurable() {
3452 let db = NamedTempFile::new().expect("temporary db");
3453 let coordinator = ExecutionCoordinator::open(
3454 db.path(),
3455 Arc::new(SchemaManager::new()),
3456 None,
3457 2,
3458 Arc::new(TelemetryCounters::default()),
3459 None,
3460 )
3461 .expect("coordinator with pool_size=2");
3462
3463 assert_eq!(coordinator.pool.size(), 2);
3464
3465 let compiled = QueryBuilder::nodes("Meeting")
3467 .text_search("budget", 5)
3468 .limit(10)
3469 .compile()
3470 .expect("compiled query");
3471
3472 let result = coordinator.execute_compiled_read(&compiled);
3473 assert!(result.is_ok(), "read through pool must succeed");
3474 }
3475
3476 #[test]
3479 fn grouped_read_results_match_baseline() {
3480 use fathomdb_query::TraverseDirection;
3481
3482 let db = NamedTempFile::new().expect("temporary db");
3483
3484 let coordinator = ExecutionCoordinator::open(
3486 db.path(),
3487 Arc::new(SchemaManager::new()),
3488 None,
3489 1,
3490 Arc::new(TelemetryCounters::default()),
3491 None,
3492 )
3493 .expect("coordinator");
3494
3495 {
3498 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
3499 for i in 0..10 {
3500 conn.execute_batch(&format!(
3501 r#"
3502 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3503 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
3504 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
3505 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
3506 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
3507 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
3508
3509 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3510 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
3511 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
3512 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
3513
3514 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3515 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
3516 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
3517 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
3518 "#,
3519 )).expect("seed data");
3520 }
3521 }
3522
3523 let compiled = QueryBuilder::nodes("Meeting")
3524 .text_search("meeting", 10)
3525 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
3526 .limit(10)
3527 .compile_grouped()
3528 .expect("compiled grouped query");
3529
3530 let result = coordinator
3531 .execute_compiled_grouped_read(&compiled)
3532 .expect("grouped read");
3533
3534 assert!(!result.was_degraded, "grouped read should not be degraded");
3535 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
3536 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
3537 assert_eq!(result.expansions[0].slot, "tasks");
3538 assert_eq!(
3539 result.expansions[0].roots.len(),
3540 10,
3541 "each expansion slot should have entries for all 10 roots"
3542 );
3543
3544 for root_expansion in &result.expansions[0].roots {
3546 assert_eq!(
3547 root_expansion.nodes.len(),
3548 2,
3549 "root {} should have 2 expansion nodes, got {}",
3550 root_expansion.root_logical_id,
3551 root_expansion.nodes.len()
3552 );
3553 }
3554 }
3555}