1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{CompiledGroupedQuery, CompiledQuery, DrivingTable, ExpansionSlot, ShapeHash};
8use fathomdb_schema::SchemaManager;
9use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
10
11use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
12use crate::{EngineError, sqlite};
13
14const MAX_SHAPE_CACHE_SIZE: usize = 4096;
18
19const BATCH_CHUNK_SIZE: usize = 200;
24
25struct ReadPool {
30 connections: Vec<Mutex<Connection>>,
31}
32
33impl fmt::Debug for ReadPool {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("ReadPool")
36 .field("size", &self.connections.len())
37 .finish()
38 }
39}
40
41impl ReadPool {
42 fn new(
53 db_path: &Path,
54 pool_size: usize,
55 schema_manager: &SchemaManager,
56 vector_enabled: bool,
57 ) -> Result<Self, EngineError> {
58 let mut connections = Vec::with_capacity(pool_size);
59 for _ in 0..pool_size {
60 let conn = if vector_enabled {
61 #[cfg(feature = "sqlite-vec")]
62 {
63 sqlite::open_readonly_connection_with_vec(db_path)?
64 }
65 #[cfg(not(feature = "sqlite-vec"))]
66 {
67 sqlite::open_readonly_connection(db_path)?
68 }
69 } else {
70 sqlite::open_readonly_connection(db_path)?
71 };
72 schema_manager
73 .initialize_reader_connection(&conn)
74 .map_err(EngineError::Schema)?;
75 connections.push(Mutex::new(conn));
76 }
77 Ok(Self { connections })
78 }
79
80 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
89 for conn in &self.connections {
91 if let Ok(guard) = conn.try_lock() {
92 return Ok(guard);
93 }
94 }
95 self.connections[0].lock().map_err(|_| {
97 trace_error!("read pool: connection mutex poisoned");
98 EngineError::Bridge("connection mutex poisoned".to_owned())
99 })
100 }
101
102 #[cfg(test)]
104 fn size(&self) -> usize {
105 self.connections.len()
106 }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq)]
113pub struct QueryPlan {
114 pub sql: String,
115 pub bind_count: usize,
116 pub driving_table: DrivingTable,
117 pub shape_hash: ShapeHash,
118 pub cache_hit: bool,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct NodeRow {
124 pub row_id: String,
126 pub logical_id: String,
128 pub kind: String,
130 pub properties: String,
132 pub content_ref: Option<String>,
134 pub last_accessed_at: Option<i64>,
136}
137
138#[derive(Clone, Debug, PartialEq, Eq)]
140pub struct RunRow {
141 pub id: String,
143 pub kind: String,
145 pub status: String,
147 pub properties: String,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct StepRow {
154 pub id: String,
156 pub run_id: String,
158 pub kind: String,
160 pub status: String,
162 pub properties: String,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
168pub struct ActionRow {
169 pub id: String,
171 pub step_id: String,
173 pub kind: String,
175 pub status: String,
177 pub properties: String,
179}
180
181#[derive(Clone, Debug, PartialEq, Eq)]
183pub struct ProvenanceEvent {
184 pub id: String,
185 pub event_type: String,
186 pub subject: String,
187 pub source_ref: Option<String>,
188 pub metadata_json: String,
189 pub created_at: i64,
190}
191
192#[derive(Clone, Debug, Default, PartialEq, Eq)]
194pub struct QueryRows {
195 pub nodes: Vec<NodeRow>,
197 pub runs: Vec<RunRow>,
199 pub steps: Vec<StepRow>,
201 pub actions: Vec<ActionRow>,
203 pub was_degraded: bool,
206}
207
208#[derive(Clone, Debug, PartialEq, Eq)]
210pub struct ExpansionRootRows {
211 pub root_logical_id: String,
213 pub nodes: Vec<NodeRow>,
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
219pub struct ExpansionSlotRows {
220 pub slot: String,
222 pub roots: Vec<ExpansionRootRows>,
224}
225
226#[derive(Clone, Debug, Default, PartialEq, Eq)]
228pub struct GroupedQueryRows {
229 pub roots: Vec<NodeRow>,
231 pub expansions: Vec<ExpansionSlotRows>,
233 pub was_degraded: bool,
235}
236
237pub struct ExecutionCoordinator {
239 database_path: PathBuf,
240 schema_manager: Arc<SchemaManager>,
241 pool: ReadPool,
242 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
243 vector_enabled: bool,
244 vec_degradation_warned: AtomicBool,
245 telemetry: Arc<TelemetryCounters>,
246}
247
248impl fmt::Debug for ExecutionCoordinator {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 f.debug_struct("ExecutionCoordinator")
251 .field("database_path", &self.database_path)
252 .finish_non_exhaustive()
253 }
254}
255
256impl ExecutionCoordinator {
257 pub fn open(
260 path: impl AsRef<Path>,
261 schema_manager: Arc<SchemaManager>,
262 vector_dimension: Option<usize>,
263 pool_size: usize,
264 telemetry: Arc<TelemetryCounters>,
265 ) -> Result<Self, EngineError> {
266 let path = path.as_ref().to_path_buf();
267 #[cfg(feature = "sqlite-vec")]
268 let conn = if vector_dimension.is_some() {
269 sqlite::open_connection_with_vec(&path)?
270 } else {
271 sqlite::open_connection(&path)?
272 };
273 #[cfg(not(feature = "sqlite-vec"))]
274 let conn = sqlite::open_connection(&path)?;
275
276 let report = schema_manager.bootstrap(&conn)?;
277
278 #[cfg(feature = "sqlite-vec")]
279 let mut vector_enabled = report.vector_profile_enabled;
280 #[cfg(not(feature = "sqlite-vec"))]
281 let vector_enabled = {
282 let _ = &report;
283 false
284 };
285
286 if let Some(dim) = vector_dimension {
287 schema_manager
288 .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
289 .map_err(EngineError::Schema)?;
290 #[cfg(feature = "sqlite-vec")]
292 {
293 vector_enabled = true;
294 }
295 }
296
297 drop(conn);
299
300 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
301
302 Ok(Self {
303 database_path: path,
304 schema_manager,
305 pool,
306 shape_sql_map: Mutex::new(HashMap::new()),
307 vector_enabled,
308 vec_degradation_warned: AtomicBool::new(false),
309 telemetry,
310 })
311 }
312
313 pub fn database_path(&self) -> &Path {
315 &self.database_path
316 }
317
318 #[must_use]
320 pub fn vector_enabled(&self) -> bool {
321 self.vector_enabled
322 }
323
324 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
325 self.pool.acquire()
326 }
327
328 #[must_use]
334 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
335 let mut total = SqliteCacheStatus::default();
336 for conn_mutex in &self.pool.connections {
337 if let Ok(conn) = conn_mutex.try_lock() {
338 total.add(&read_db_cache_status(&conn));
339 }
340 }
341 total
342 }
343
344 #[allow(clippy::expect_used)]
347 pub fn execute_compiled_read(
348 &self,
349 compiled: &CompiledQuery,
350 ) -> Result<QueryRows, EngineError> {
351 let row_sql = wrap_node_row_projection_sql(&compiled.sql);
352 {
358 let mut cache = self
359 .shape_sql_map
360 .lock()
361 .unwrap_or_else(PoisonError::into_inner);
362 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
363 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
364 cache.clear();
365 }
366 cache.insert(compiled.shape_hash, row_sql.clone());
367 }
368
369 let bind_values = compiled
370 .binds
371 .iter()
372 .map(bind_value_to_sql)
373 .collect::<Vec<_>>();
374
375 let conn_guard = match self.lock_connection() {
380 Ok(g) => g,
381 Err(e) => {
382 self.telemetry.increment_errors();
383 return Err(e);
384 }
385 };
386 let mut statement = match conn_guard.prepare_cached(&row_sql) {
387 Ok(stmt) => stmt,
388 Err(e) if is_vec_table_absent(&e) => {
389 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
390 trace_warn!("vector table absent, degrading to non-vector query");
391 }
392 return Ok(QueryRows {
393 was_degraded: true,
394 ..Default::default()
395 });
396 }
397 Err(e) => {
398 self.telemetry.increment_errors();
399 return Err(EngineError::Sqlite(e));
400 }
401 };
402 let nodes = match statement
403 .query_map(params_from_iter(bind_values.iter()), |row| {
404 Ok(NodeRow {
405 row_id: row.get(0)?,
406 logical_id: row.get(1)?,
407 kind: row.get(2)?,
408 properties: row.get(3)?,
409 content_ref: row.get(4)?,
410 last_accessed_at: row.get(5)?,
411 })
412 })
413 .and_then(Iterator::collect)
414 {
415 Ok(rows) => rows,
416 Err(e) => {
417 self.telemetry.increment_errors();
418 return Err(EngineError::Sqlite(e));
419 }
420 };
421
422 self.telemetry.increment_queries();
423 Ok(QueryRows {
424 nodes,
425 runs: Vec::new(),
426 steps: Vec::new(),
427 actions: Vec::new(),
428 was_degraded: false,
429 })
430 }
431
432 pub fn execute_compiled_grouped_read(
436 &self,
437 compiled: &CompiledGroupedQuery,
438 ) -> Result<GroupedQueryRows, EngineError> {
439 let root_rows = self.execute_compiled_read(&compiled.root)?;
440 if root_rows.was_degraded {
441 return Ok(GroupedQueryRows {
442 roots: Vec::new(),
443 expansions: Vec::new(),
444 was_degraded: true,
445 });
446 }
447
448 let roots = root_rows.nodes;
449 let mut expansions = Vec::with_capacity(compiled.expansions.len());
450 for expansion in &compiled.expansions {
451 let slot_rows = if roots.is_empty() {
452 Vec::new()
453 } else {
454 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
455 };
456 expansions.push(ExpansionSlotRows {
457 slot: expansion.slot.clone(),
458 roots: slot_rows,
459 });
460 }
461
462 Ok(GroupedQueryRows {
463 roots,
464 expansions,
465 was_degraded: false,
466 })
467 }
468
469 fn read_expansion_nodes_chunked(
475 &self,
476 roots: &[NodeRow],
477 expansion: &ExpansionSlot,
478 hard_limit: usize,
479 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
480 if roots.len() <= BATCH_CHUNK_SIZE {
481 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
482 }
483
484 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
487 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
488 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
489 per_root
490 .entry(group.root_logical_id)
491 .or_default()
492 .extend(group.nodes);
493 }
494 }
495
496 Ok(roots
497 .iter()
498 .map(|root| ExpansionRootRows {
499 root_logical_id: root.logical_id.clone(),
500 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
501 })
502 .collect())
503 }
504
505 fn read_expansion_nodes_batched(
510 &self,
511 roots: &[NodeRow],
512 expansion: &ExpansionSlot,
513 hard_limit: usize,
514 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
515 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
516 let (join_condition, next_logical_id) = match expansion.direction {
517 fathomdb_query::TraverseDirection::Out => {
518 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
519 }
520 fathomdb_query::TraverseDirection::In => {
521 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
522 }
523 };
524
525 let root_seed_union: String = (1..=root_ids.len())
529 .map(|i| format!("SELECT ?{i}"))
530 .collect::<Vec<_>>()
531 .join(" UNION ALL ");
532
533 let sql = format!(
537 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
538 traversed(root_id, logical_id, depth, visited, emitted) AS (
539 SELECT rid, rid, 0, printf(',%s,', rid), 0
540 FROM root_ids
541 UNION ALL
542 SELECT
543 t.root_id,
544 {next_logical_id},
545 t.depth + 1,
546 t.visited || {next_logical_id} || ',',
547 t.emitted + 1
548 FROM traversed t
549 JOIN edges e ON {join_condition}
550 AND e.kind = ?{edge_kind_param}
551 AND e.superseded_at IS NULL
552 WHERE t.depth < {max_depth}
553 AND t.emitted < {hard_limit}
554 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
555 ),
556 numbered AS (
557 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
558 , n.content_ref, am.last_accessed_at
559 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
560 FROM traversed t
561 JOIN nodes n ON n.logical_id = t.logical_id
562 AND n.superseded_at IS NULL
563 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
564 WHERE t.depth > 0
565 )
566 SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
567 FROM numbered
568 WHERE rn <= {hard_limit}
569 ORDER BY root_id, logical_id",
570 edge_kind_param = root_ids.len() + 1,
571 max_depth = expansion.max_depth,
572 );
573
574 let conn_guard = self.lock_connection()?;
575 let mut statement = conn_guard
576 .prepare_cached(&sql)
577 .map_err(EngineError::Sqlite)?;
578
579 let mut bind_values: Vec<Value> = root_ids
581 .iter()
582 .map(|id| Value::Text((*id).to_owned()))
583 .collect();
584 bind_values.push(Value::Text(expansion.label.clone()));
585
586 let rows = statement
587 .query_map(params_from_iter(bind_values.iter()), |row| {
588 Ok((
589 row.get::<_, String>(0)?, NodeRow {
591 row_id: row.get(1)?,
592 logical_id: row.get(2)?,
593 kind: row.get(3)?,
594 properties: row.get(4)?,
595 content_ref: row.get(5)?,
596 last_accessed_at: row.get(6)?,
597 },
598 ))
599 })
600 .map_err(EngineError::Sqlite)?
601 .collect::<Result<Vec<_>, _>>()
602 .map_err(EngineError::Sqlite)?;
603
604 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
606 for (root_id, node) in rows {
607 per_root.entry(root_id).or_default().push(node);
608 }
609
610 let root_groups = roots
611 .iter()
612 .map(|root| ExpansionRootRows {
613 root_logical_id: root.logical_id.clone(),
614 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
615 })
616 .collect();
617
618 Ok(root_groups)
619 }
620
621 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
627 let conn = self.lock_connection()?;
628 conn.query_row(
629 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
630 rusqlite::params![id],
631 |row| {
632 Ok(RunRow {
633 id: row.get(0)?,
634 kind: row.get(1)?,
635 status: row.get(2)?,
636 properties: row.get(3)?,
637 })
638 },
639 )
640 .optional()
641 .map_err(EngineError::Sqlite)
642 }
643
644 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
650 let conn = self.lock_connection()?;
651 conn.query_row(
652 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
653 rusqlite::params![id],
654 |row| {
655 Ok(StepRow {
656 id: row.get(0)?,
657 run_id: row.get(1)?,
658 kind: row.get(2)?,
659 status: row.get(3)?,
660 properties: row.get(4)?,
661 })
662 },
663 )
664 .optional()
665 .map_err(EngineError::Sqlite)
666 }
667
668 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
674 let conn = self.lock_connection()?;
675 conn.query_row(
676 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
677 rusqlite::params![id],
678 |row| {
679 Ok(ActionRow {
680 id: row.get(0)?,
681 step_id: row.get(1)?,
682 kind: row.get(2)?,
683 status: row.get(3)?,
684 properties: row.get(4)?,
685 })
686 },
687 )
688 .optional()
689 .map_err(EngineError::Sqlite)
690 }
691
692 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
698 let conn = self.lock_connection()?;
699 let mut stmt = conn
700 .prepare_cached(
701 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
702 )
703 .map_err(EngineError::Sqlite)?;
704 let rows = stmt
705 .query_map([], |row| {
706 Ok(RunRow {
707 id: row.get(0)?,
708 kind: row.get(1)?,
709 status: row.get(2)?,
710 properties: row.get(3)?,
711 })
712 })
713 .map_err(EngineError::Sqlite)?
714 .collect::<Result<Vec<_>, _>>()
715 .map_err(EngineError::Sqlite)?;
716 Ok(rows)
717 }
718
719 #[must_use]
729 #[allow(clippy::expect_used)]
730 pub fn shape_sql_count(&self) -> usize {
731 self.shape_sql_map
732 .lock()
733 .unwrap_or_else(PoisonError::into_inner)
734 .len()
735 }
736
737 #[must_use]
739 pub fn schema_manager(&self) -> Arc<SchemaManager> {
740 Arc::clone(&self.schema_manager)
741 }
742
743 #[must_use]
752 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
753 let cache_hit = self
754 .shape_sql_map
755 .lock()
756 .unwrap_or_else(PoisonError::into_inner)
757 .contains_key(&compiled.shape_hash);
758 QueryPlan {
759 sql: wrap_node_row_projection_sql(&compiled.sql),
760 bind_count: compiled.binds.len(),
761 driving_table: compiled.driving_table,
762 shape_hash: compiled.shape_hash,
763 cache_hit,
764 }
765 }
766
767 #[doc(hidden)]
774 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
775 let conn = self.lock_connection()?;
776 let result = conn
777 .query_row(&format!("PRAGMA {name}"), [], |row| {
778 row.get::<_, rusqlite::types::Value>(0)
780 })
781 .map_err(EngineError::Sqlite)?;
782 let s = match result {
783 rusqlite::types::Value::Text(t) => t,
784 rusqlite::types::Value::Integer(i) => i.to_string(),
785 rusqlite::types::Value::Real(f) => f.to_string(),
786 rusqlite::types::Value::Blob(_) => {
787 return Err(EngineError::InvalidWrite(format!(
788 "PRAGMA {name} returned an unexpected BLOB value"
789 )));
790 }
791 rusqlite::types::Value::Null => String::new(),
792 };
793 Ok(s)
794 }
795
796 pub fn query_provenance_events(
805 &self,
806 subject: &str,
807 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
808 let conn = self.lock_connection()?;
809 let mut stmt = conn
810 .prepare_cached(
811 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
812 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
813 )
814 .map_err(EngineError::Sqlite)?;
815 let events = stmt
816 .query_map(rusqlite::params![subject], |row| {
817 Ok(ProvenanceEvent {
818 id: row.get(0)?,
819 event_type: row.get(1)?,
820 subject: row.get(2)?,
821 source_ref: row.get(3)?,
822 metadata_json: row.get(4)?,
823 created_at: row.get(5)?,
824 })
825 })
826 .map_err(EngineError::Sqlite)?
827 .collect::<Result<Vec<_>, _>>()
828 .map_err(EngineError::Sqlite)?;
829 Ok(events)
830 }
831}
832
833fn wrap_node_row_projection_sql(base_sql: &str) -> String {
834 format!(
835 "SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
836 FROM ({base_sql}) q \
837 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
838 )
839}
840
841pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
844 match err {
845 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
846 msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
847 }
848 _ => false,
849 }
850}
851
852fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
853 match value {
854 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
855 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
856 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
857 }
858}
859
860#[cfg(test)]
861#[allow(clippy::expect_used)]
862mod tests {
863 use std::panic::{AssertUnwindSafe, catch_unwind};
864 use std::sync::Arc;
865
866 use fathomdb_query::{BindValue, QueryBuilder};
867 use fathomdb_schema::SchemaManager;
868 use rusqlite::types::Value;
869 use tempfile::NamedTempFile;
870
871 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
872
873 use super::{bind_value_to_sql, is_vec_table_absent, wrap_node_row_projection_sql};
874
875 #[test]
876 fn is_vec_table_absent_matches_known_error_messages() {
877 use rusqlite::ffi;
878 fn make_err(msg: &str) -> rusqlite::Error {
879 rusqlite::Error::SqliteFailure(
880 ffi::Error {
881 code: ffi::ErrorCode::Unknown,
882 extended_code: 1,
883 },
884 Some(msg.to_owned()),
885 )
886 }
887 assert!(is_vec_table_absent(&make_err(
888 "no such table: vec_nodes_active"
889 )));
890 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
891 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
892 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
893 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
894 }
895
896 #[test]
897 fn bind_value_text_maps_to_sql_text() {
898 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
899 assert_eq!(val, Value::Text("hello".to_owned()));
900 }
901
902 #[test]
903 fn bind_value_integer_maps_to_sql_integer() {
904 let val = bind_value_to_sql(&BindValue::Integer(42));
905 assert_eq!(val, Value::Integer(42));
906 }
907
908 #[test]
909 fn bind_value_bool_true_maps_to_integer_one() {
910 let val = bind_value_to_sql(&BindValue::Bool(true));
911 assert_eq!(val, Value::Integer(1));
912 }
913
914 #[test]
915 fn bind_value_bool_false_maps_to_integer_zero() {
916 let val = bind_value_to_sql(&BindValue::Bool(false));
917 assert_eq!(val, Value::Integer(0));
918 }
919
920 #[test]
921 fn same_shape_queries_share_one_cache_entry() {
922 let db = NamedTempFile::new().expect("temporary db");
923 let coordinator = ExecutionCoordinator::open(
924 db.path(),
925 Arc::new(SchemaManager::new()),
926 None,
927 1,
928 Arc::new(TelemetryCounters::default()),
929 )
930 .expect("coordinator");
931
932 let compiled_a = QueryBuilder::nodes("Meeting")
933 .text_search("budget", 5)
934 .limit(10)
935 .compile()
936 .expect("compiled a");
937 let compiled_b = QueryBuilder::nodes("Meeting")
938 .text_search("standup", 5)
939 .limit(10)
940 .compile()
941 .expect("compiled b");
942
943 coordinator
944 .execute_compiled_read(&compiled_a)
945 .expect("read a");
946 coordinator
947 .execute_compiled_read(&compiled_b)
948 .expect("read b");
949
950 assert_eq!(
951 compiled_a.shape_hash, compiled_b.shape_hash,
952 "different bind values, same structural shape → same hash"
953 );
954 assert_eq!(coordinator.shape_sql_count(), 1);
955 }
956
957 #[test]
958 fn vector_read_degrades_gracefully_when_vec_table_absent() {
959 let db = NamedTempFile::new().expect("temporary db");
960 let coordinator = ExecutionCoordinator::open(
961 db.path(),
962 Arc::new(SchemaManager::new()),
963 None,
964 1,
965 Arc::new(TelemetryCounters::default()),
966 )
967 .expect("coordinator");
968
969 let compiled = QueryBuilder::nodes("Meeting")
970 .vector_search("budget embeddings", 5)
971 .compile()
972 .expect("vector query compiles");
973
974 let result = coordinator.execute_compiled_read(&compiled);
975 let rows = result.expect("degraded read must succeed, not error");
976 assert!(
977 rows.was_degraded,
978 "result must be flagged as degraded when vec_nodes_active is absent"
979 );
980 assert!(
981 rows.nodes.is_empty(),
982 "degraded result must return empty nodes"
983 );
984 }
985
986 #[test]
987 fn coordinator_caches_by_shape_hash() {
988 let db = NamedTempFile::new().expect("temporary db");
989 let coordinator = ExecutionCoordinator::open(
990 db.path(),
991 Arc::new(SchemaManager::new()),
992 None,
993 1,
994 Arc::new(TelemetryCounters::default()),
995 )
996 .expect("coordinator");
997
998 let compiled = QueryBuilder::nodes("Meeting")
999 .text_search("budget", 5)
1000 .compile()
1001 .expect("compiled query");
1002
1003 coordinator
1004 .execute_compiled_read(&compiled)
1005 .expect("execute compiled read");
1006 assert_eq!(coordinator.shape_sql_count(), 1);
1007 }
1008
1009 #[test]
1012 fn explain_returns_correct_sql() {
1013 let db = NamedTempFile::new().expect("temporary db");
1014 let coordinator = ExecutionCoordinator::open(
1015 db.path(),
1016 Arc::new(SchemaManager::new()),
1017 None,
1018 1,
1019 Arc::new(TelemetryCounters::default()),
1020 )
1021 .expect("coordinator");
1022
1023 let compiled = QueryBuilder::nodes("Meeting")
1024 .text_search("budget", 5)
1025 .compile()
1026 .expect("compiled query");
1027
1028 let plan = coordinator.explain_compiled_read(&compiled);
1029
1030 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
1031 }
1032
1033 #[test]
1034 fn explain_returns_correct_driving_table() {
1035 use fathomdb_query::DrivingTable;
1036
1037 let db = NamedTempFile::new().expect("temporary db");
1038 let coordinator = ExecutionCoordinator::open(
1039 db.path(),
1040 Arc::new(SchemaManager::new()),
1041 None,
1042 1,
1043 Arc::new(TelemetryCounters::default()),
1044 )
1045 .expect("coordinator");
1046
1047 let compiled = QueryBuilder::nodes("Meeting")
1048 .text_search("budget", 5)
1049 .compile()
1050 .expect("compiled query");
1051
1052 let plan = coordinator.explain_compiled_read(&compiled);
1053
1054 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
1055 }
1056
1057 #[test]
1058 fn explain_reports_cache_miss_then_hit() {
1059 let db = NamedTempFile::new().expect("temporary db");
1060 let coordinator = ExecutionCoordinator::open(
1061 db.path(),
1062 Arc::new(SchemaManager::new()),
1063 None,
1064 1,
1065 Arc::new(TelemetryCounters::default()),
1066 )
1067 .expect("coordinator");
1068
1069 let compiled = QueryBuilder::nodes("Meeting")
1070 .text_search("budget", 5)
1071 .compile()
1072 .expect("compiled query");
1073
1074 let plan_before = coordinator.explain_compiled_read(&compiled);
1076 assert!(
1077 !plan_before.cache_hit,
1078 "cache miss expected before first execute"
1079 );
1080
1081 coordinator
1083 .execute_compiled_read(&compiled)
1084 .expect("execute read");
1085
1086 let plan_after = coordinator.explain_compiled_read(&compiled);
1088 assert!(
1089 plan_after.cache_hit,
1090 "cache hit expected after first execute"
1091 );
1092 }
1093
1094 #[test]
1095 fn explain_does_not_execute_query() {
1096 let db = NamedTempFile::new().expect("temporary db");
1101 let coordinator = ExecutionCoordinator::open(
1102 db.path(),
1103 Arc::new(SchemaManager::new()),
1104 None,
1105 1,
1106 Arc::new(TelemetryCounters::default()),
1107 )
1108 .expect("coordinator");
1109
1110 let compiled = QueryBuilder::nodes("Meeting")
1111 .text_search("anything", 5)
1112 .compile()
1113 .expect("compiled query");
1114
1115 let plan = coordinator.explain_compiled_read(&compiled);
1117
1118 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
1119 assert_eq!(plan.bind_count, compiled.binds.len());
1120 }
1121
1122 #[test]
1123 fn coordinator_executes_compiled_read() {
1124 let db = NamedTempFile::new().expect("temporary db");
1125 let coordinator = ExecutionCoordinator::open(
1126 db.path(),
1127 Arc::new(SchemaManager::new()),
1128 None,
1129 1,
1130 Arc::new(TelemetryCounters::default()),
1131 )
1132 .expect("coordinator");
1133 let conn = rusqlite::Connection::open(db.path()).expect("open db");
1134
1135 conn.execute_batch(
1136 r#"
1137 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1138 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
1139 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1140 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
1141 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1142 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
1143 "#,
1144 )
1145 .expect("seed data");
1146
1147 let compiled = QueryBuilder::nodes("Meeting")
1148 .text_search("budget", 5)
1149 .limit(5)
1150 .compile()
1151 .expect("compiled query");
1152
1153 let rows = coordinator
1154 .execute_compiled_read(&compiled)
1155 .expect("execute read");
1156
1157 assert_eq!(rows.nodes.len(), 1);
1158 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
1159 }
1160
1161 #[test]
1162 fn text_search_finds_structured_only_node_via_property_fts() {
1163 let db = NamedTempFile::new().expect("temporary db");
1164 let coordinator = ExecutionCoordinator::open(
1165 db.path(),
1166 Arc::new(SchemaManager::new()),
1167 None,
1168 1,
1169 Arc::new(TelemetryCounters::default()),
1170 )
1171 .expect("coordinator");
1172 let conn = rusqlite::Connection::open(db.path()).expect("open db");
1173
1174 conn.execute_batch(
1176 r#"
1177 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
1178 VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
1179 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
1180 VALUES ('goal-1', 'Goal', 'Ship v2');
1181 "#,
1182 )
1183 .expect("seed data");
1184
1185 let compiled = QueryBuilder::nodes("Goal")
1186 .text_search("Ship", 5)
1187 .limit(5)
1188 .compile()
1189 .expect("compiled query");
1190
1191 let rows = coordinator
1192 .execute_compiled_read(&compiled)
1193 .expect("execute read");
1194
1195 assert_eq!(rows.nodes.len(), 1);
1196 assert_eq!(rows.nodes[0].logical_id, "goal-1");
1197 }
1198
1199 #[test]
1200 fn text_search_returns_both_chunk_and_property_backed_hits() {
1201 let db = NamedTempFile::new().expect("temporary db");
1202 let coordinator = ExecutionCoordinator::open(
1203 db.path(),
1204 Arc::new(SchemaManager::new()),
1205 None,
1206 1,
1207 Arc::new(TelemetryCounters::default()),
1208 )
1209 .expect("coordinator");
1210 let conn = rusqlite::Connection::open(db.path()).expect("open db");
1211
1212 conn.execute_batch(
1214 r"
1215 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
1216 VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
1217 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1218 VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
1219 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1220 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
1221 ",
1222 )
1223 .expect("seed chunk-backed node");
1224
1225 conn.execute_batch(
1227 r#"
1228 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
1229 VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
1230 INSERT INTO fts_node_properties (node_logical_id, kind, text_content)
1231 VALUES ('meeting-2', 'Meeting', 'quarterly sync');
1232 "#,
1233 )
1234 .expect("seed property-backed node");
1235
1236 let compiled = QueryBuilder::nodes("Meeting")
1237 .text_search("quarterly", 10)
1238 .limit(10)
1239 .compile()
1240 .expect("compiled query");
1241
1242 let rows = coordinator
1243 .execute_compiled_read(&compiled)
1244 .expect("execute read");
1245
1246 let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
1247 ids.sort_unstable();
1248 assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
1249 }
1250
1251 #[test]
1254 fn capability_gate_reports_false_without_feature() {
1255 let db = NamedTempFile::new().expect("temporary db");
1256 let coordinator = ExecutionCoordinator::open(
1259 db.path(),
1260 Arc::new(SchemaManager::new()),
1261 None,
1262 1,
1263 Arc::new(TelemetryCounters::default()),
1264 )
1265 .expect("coordinator");
1266 assert!(
1267 !coordinator.vector_enabled(),
1268 "vector_enabled must be false when no dimension is requested"
1269 );
1270 }
1271
1272 #[cfg(feature = "sqlite-vec")]
1273 #[test]
1274 fn capability_gate_reports_true_when_feature_enabled() {
1275 let db = NamedTempFile::new().expect("temporary db");
1276 let coordinator = ExecutionCoordinator::open(
1277 db.path(),
1278 Arc::new(SchemaManager::new()),
1279 Some(128),
1280 1,
1281 Arc::new(TelemetryCounters::default()),
1282 )
1283 .expect("coordinator");
1284 assert!(
1285 coordinator.vector_enabled(),
1286 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
1287 );
1288 }
1289
1290 #[test]
1293 fn read_run_returns_inserted_run() {
1294 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1295
1296 let db = NamedTempFile::new().expect("temporary db");
1297 let writer = WriterActor::start(
1298 db.path(),
1299 Arc::new(SchemaManager::new()),
1300 ProvenanceMode::Warn,
1301 Arc::new(TelemetryCounters::default()),
1302 )
1303 .expect("writer");
1304 writer
1305 .submit(WriteRequest {
1306 label: "runtime".to_owned(),
1307 nodes: vec![],
1308 node_retires: vec![],
1309 edges: vec![],
1310 edge_retires: vec![],
1311 chunks: vec![],
1312 runs: vec![RunInsert {
1313 id: "run-r1".to_owned(),
1314 kind: "session".to_owned(),
1315 status: "active".to_owned(),
1316 properties: "{}".to_owned(),
1317 source_ref: Some("src-1".to_owned()),
1318 upsert: false,
1319 supersedes_id: None,
1320 }],
1321 steps: vec![],
1322 actions: vec![],
1323 optional_backfills: vec![],
1324 vec_inserts: vec![],
1325 operational_writes: vec![],
1326 })
1327 .expect("write run");
1328
1329 let coordinator = ExecutionCoordinator::open(
1330 db.path(),
1331 Arc::new(SchemaManager::new()),
1332 None,
1333 1,
1334 Arc::new(TelemetryCounters::default()),
1335 )
1336 .expect("coordinator");
1337 let row = coordinator
1338 .read_run("run-r1")
1339 .expect("read_run")
1340 .expect("row exists");
1341 assert_eq!(row.id, "run-r1");
1342 assert_eq!(row.kind, "session");
1343 assert_eq!(row.status, "active");
1344 }
1345
1346 #[test]
1347 fn read_step_returns_inserted_step() {
1348 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
1349
1350 let db = NamedTempFile::new().expect("temporary db");
1351 let writer = WriterActor::start(
1352 db.path(),
1353 Arc::new(SchemaManager::new()),
1354 ProvenanceMode::Warn,
1355 Arc::new(TelemetryCounters::default()),
1356 )
1357 .expect("writer");
1358 writer
1359 .submit(WriteRequest {
1360 label: "runtime".to_owned(),
1361 nodes: vec![],
1362 node_retires: vec![],
1363 edges: vec![],
1364 edge_retires: vec![],
1365 chunks: vec![],
1366 runs: vec![RunInsert {
1367 id: "run-s1".to_owned(),
1368 kind: "session".to_owned(),
1369 status: "active".to_owned(),
1370 properties: "{}".to_owned(),
1371 source_ref: Some("src-1".to_owned()),
1372 upsert: false,
1373 supersedes_id: None,
1374 }],
1375 steps: vec![StepInsert {
1376 id: "step-s1".to_owned(),
1377 run_id: "run-s1".to_owned(),
1378 kind: "llm".to_owned(),
1379 status: "completed".to_owned(),
1380 properties: "{}".to_owned(),
1381 source_ref: Some("src-1".to_owned()),
1382 upsert: false,
1383 supersedes_id: None,
1384 }],
1385 actions: vec![],
1386 optional_backfills: vec![],
1387 vec_inserts: vec![],
1388 operational_writes: vec![],
1389 })
1390 .expect("write step");
1391
1392 let coordinator = ExecutionCoordinator::open(
1393 db.path(),
1394 Arc::new(SchemaManager::new()),
1395 None,
1396 1,
1397 Arc::new(TelemetryCounters::default()),
1398 )
1399 .expect("coordinator");
1400 let row = coordinator
1401 .read_step("step-s1")
1402 .expect("read_step")
1403 .expect("row exists");
1404 assert_eq!(row.id, "step-s1");
1405 assert_eq!(row.run_id, "run-s1");
1406 assert_eq!(row.kind, "llm");
1407 }
1408
1409 #[test]
1410 fn read_action_returns_inserted_action() {
1411 use crate::{
1412 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
1413 writer::{ActionInsert, StepInsert},
1414 };
1415
1416 let db = NamedTempFile::new().expect("temporary db");
1417 let writer = WriterActor::start(
1418 db.path(),
1419 Arc::new(SchemaManager::new()),
1420 ProvenanceMode::Warn,
1421 Arc::new(TelemetryCounters::default()),
1422 )
1423 .expect("writer");
1424 writer
1425 .submit(WriteRequest {
1426 label: "runtime".to_owned(),
1427 nodes: vec![],
1428 node_retires: vec![],
1429 edges: vec![],
1430 edge_retires: vec![],
1431 chunks: vec![],
1432 runs: vec![RunInsert {
1433 id: "run-a1".to_owned(),
1434 kind: "session".to_owned(),
1435 status: "active".to_owned(),
1436 properties: "{}".to_owned(),
1437 source_ref: Some("src-1".to_owned()),
1438 upsert: false,
1439 supersedes_id: None,
1440 }],
1441 steps: vec![StepInsert {
1442 id: "step-a1".to_owned(),
1443 run_id: "run-a1".to_owned(),
1444 kind: "llm".to_owned(),
1445 status: "completed".to_owned(),
1446 properties: "{}".to_owned(),
1447 source_ref: Some("src-1".to_owned()),
1448 upsert: false,
1449 supersedes_id: None,
1450 }],
1451 actions: vec![ActionInsert {
1452 id: "action-a1".to_owned(),
1453 step_id: "step-a1".to_owned(),
1454 kind: "emit".to_owned(),
1455 status: "completed".to_owned(),
1456 properties: "{}".to_owned(),
1457 source_ref: Some("src-1".to_owned()),
1458 upsert: false,
1459 supersedes_id: None,
1460 }],
1461 optional_backfills: vec![],
1462 vec_inserts: vec![],
1463 operational_writes: vec![],
1464 })
1465 .expect("write action");
1466
1467 let coordinator = ExecutionCoordinator::open(
1468 db.path(),
1469 Arc::new(SchemaManager::new()),
1470 None,
1471 1,
1472 Arc::new(TelemetryCounters::default()),
1473 )
1474 .expect("coordinator");
1475 let row = coordinator
1476 .read_action("action-a1")
1477 .expect("read_action")
1478 .expect("row exists");
1479 assert_eq!(row.id, "action-a1");
1480 assert_eq!(row.step_id, "step-a1");
1481 assert_eq!(row.kind, "emit");
1482 }
1483
1484 #[test]
1485 fn read_active_runs_excludes_superseded() {
1486 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1487
1488 let db = NamedTempFile::new().expect("temporary db");
1489 let writer = WriterActor::start(
1490 db.path(),
1491 Arc::new(SchemaManager::new()),
1492 ProvenanceMode::Warn,
1493 Arc::new(TelemetryCounters::default()),
1494 )
1495 .expect("writer");
1496
1497 writer
1499 .submit(WriteRequest {
1500 label: "v1".to_owned(),
1501 nodes: vec![],
1502 node_retires: vec![],
1503 edges: vec![],
1504 edge_retires: vec![],
1505 chunks: vec![],
1506 runs: vec![RunInsert {
1507 id: "run-v1".to_owned(),
1508 kind: "session".to_owned(),
1509 status: "active".to_owned(),
1510 properties: "{}".to_owned(),
1511 source_ref: Some("src-1".to_owned()),
1512 upsert: false,
1513 supersedes_id: None,
1514 }],
1515 steps: vec![],
1516 actions: vec![],
1517 optional_backfills: vec![],
1518 vec_inserts: vec![],
1519 operational_writes: vec![],
1520 })
1521 .expect("v1 write");
1522
1523 writer
1525 .submit(WriteRequest {
1526 label: "v2".to_owned(),
1527 nodes: vec![],
1528 node_retires: vec![],
1529 edges: vec![],
1530 edge_retires: vec![],
1531 chunks: vec![],
1532 runs: vec![RunInsert {
1533 id: "run-v2".to_owned(),
1534 kind: "session".to_owned(),
1535 status: "completed".to_owned(),
1536 properties: "{}".to_owned(),
1537 source_ref: Some("src-2".to_owned()),
1538 upsert: true,
1539 supersedes_id: Some("run-v1".to_owned()),
1540 }],
1541 steps: vec![],
1542 actions: vec![],
1543 optional_backfills: vec![],
1544 vec_inserts: vec![],
1545 operational_writes: vec![],
1546 })
1547 .expect("v2 write");
1548
1549 let coordinator = ExecutionCoordinator::open(
1550 db.path(),
1551 Arc::new(SchemaManager::new()),
1552 None,
1553 1,
1554 Arc::new(TelemetryCounters::default()),
1555 )
1556 .expect("coordinator");
1557 let active = coordinator.read_active_runs().expect("read_active_runs");
1558
1559 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
1560 assert_eq!(active[0].id, "run-v2");
1561 }
1562
1563 #[allow(clippy::panic)]
1564 fn poison_connection(coordinator: &ExecutionCoordinator) {
1565 let result = catch_unwind(AssertUnwindSafe(|| {
1566 let _guard = coordinator.pool.connections[0]
1567 .lock()
1568 .expect("poison test lock");
1569 panic!("poison coordinator connection mutex");
1570 }));
1571 assert!(
1572 result.is_err(),
1573 "poison test must unwind while holding the connection mutex"
1574 );
1575 }
1576
1577 #[allow(clippy::panic)]
1578 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
1579 where
1580 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
1581 {
1582 match op(coordinator) {
1583 Err(EngineError::Bridge(message)) => {
1584 assert_eq!(message, "connection mutex poisoned");
1585 }
1586 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
1587 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
1588 }
1589 }
1590
1591 #[test]
1592 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
1593 let db = NamedTempFile::new().expect("temporary db");
1594 let coordinator = ExecutionCoordinator::open(
1595 db.path(),
1596 Arc::new(SchemaManager::new()),
1597 None,
1598 1,
1599 Arc::new(TelemetryCounters::default()),
1600 )
1601 .expect("coordinator");
1602
1603 poison_connection(&coordinator);
1604
1605 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
1606 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
1607 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
1608 assert_poisoned_connection_error(
1609 &coordinator,
1610 super::ExecutionCoordinator::read_active_runs,
1611 );
1612 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
1613 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
1614 }
1615
1616 #[test]
1619 fn shape_cache_stays_bounded() {
1620 use fathomdb_query::ShapeHash;
1621
1622 let db = NamedTempFile::new().expect("temporary db");
1623 let coordinator = ExecutionCoordinator::open(
1624 db.path(),
1625 Arc::new(SchemaManager::new()),
1626 None,
1627 1,
1628 Arc::new(TelemetryCounters::default()),
1629 )
1630 .expect("coordinator");
1631
1632 {
1634 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
1635 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
1636 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
1637 }
1638 }
1639 let compiled = QueryBuilder::nodes("Meeting")
1644 .text_search("budget", 5)
1645 .limit(10)
1646 .compile()
1647 .expect("compiled query");
1648
1649 coordinator
1650 .execute_compiled_read(&compiled)
1651 .expect("execute read");
1652
1653 assert!(
1654 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
1655 "shape cache must stay bounded: got {} entries, max {}",
1656 coordinator.shape_sql_count(),
1657 super::MAX_SHAPE_CACHE_SIZE
1658 );
1659 }
1660
1661 #[test]
1664 fn read_pool_size_configurable() {
1665 let db = NamedTempFile::new().expect("temporary db");
1666 let coordinator = ExecutionCoordinator::open(
1667 db.path(),
1668 Arc::new(SchemaManager::new()),
1669 None,
1670 2,
1671 Arc::new(TelemetryCounters::default()),
1672 )
1673 .expect("coordinator with pool_size=2");
1674
1675 assert_eq!(coordinator.pool.size(), 2);
1676
1677 let compiled = QueryBuilder::nodes("Meeting")
1679 .text_search("budget", 5)
1680 .limit(10)
1681 .compile()
1682 .expect("compiled query");
1683
1684 let result = coordinator.execute_compiled_read(&compiled);
1685 assert!(result.is_ok(), "read through pool must succeed");
1686 }
1687
1688 #[test]
1691 fn grouped_read_results_match_baseline() {
1692 use fathomdb_query::TraverseDirection;
1693
1694 let db = NamedTempFile::new().expect("temporary db");
1695
1696 let coordinator = ExecutionCoordinator::open(
1698 db.path(),
1699 Arc::new(SchemaManager::new()),
1700 None,
1701 1,
1702 Arc::new(TelemetryCounters::default()),
1703 )
1704 .expect("coordinator");
1705
1706 {
1709 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
1710 for i in 0..10 {
1711 conn.execute_batch(&format!(
1712 r#"
1713 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1714 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
1715 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1716 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
1717 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1718 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
1719
1720 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1721 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
1722 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1723 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
1724
1725 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1726 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
1727 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1728 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
1729 "#,
1730 )).expect("seed data");
1731 }
1732 }
1733
1734 let compiled = QueryBuilder::nodes("Meeting")
1735 .text_search("meeting", 10)
1736 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
1737 .limit(10)
1738 .compile_grouped()
1739 .expect("compiled grouped query");
1740
1741 let result = coordinator
1742 .execute_compiled_grouped_read(&compiled)
1743 .expect("grouped read");
1744
1745 assert!(!result.was_degraded, "grouped read should not be degraded");
1746 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
1747 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
1748 assert_eq!(result.expansions[0].slot, "tasks");
1749 assert_eq!(
1750 result.expansions[0].roots.len(),
1751 10,
1752 "each expansion slot should have entries for all 10 roots"
1753 );
1754
1755 for root_expansion in &result.expansions[0].roots {
1757 assert_eq!(
1758 root_expansion.nodes.len(),
1759 2,
1760 "root {} should have 2 expansion nodes, got {}",
1761 root_expansion.root_logical_id,
1762 root_expansion.nodes.len()
1763 );
1764 }
1765 }
1766}