1use std::collections::HashMap;
2use std::fmt;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
6
7use fathomdb_query::{CompiledGroupedQuery, CompiledQuery, DrivingTable, ExpansionSlot, ShapeHash};
8use fathomdb_schema::SchemaManager;
9use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
10
11use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
12use crate::{EngineError, sqlite};
13
14const MAX_SHAPE_CACHE_SIZE: usize = 4096;
18
19const BATCH_CHUNK_SIZE: usize = 200;
24
25struct ReadPool {
30 connections: Vec<Mutex<Connection>>,
31}
32
33impl fmt::Debug for ReadPool {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("ReadPool")
36 .field("size", &self.connections.len())
37 .finish()
38 }
39}
40
41impl ReadPool {
42 fn new(
53 db_path: &Path,
54 pool_size: usize,
55 schema_manager: &SchemaManager,
56 vector_enabled: bool,
57 ) -> Result<Self, EngineError> {
58 let mut connections = Vec::with_capacity(pool_size);
59 for _ in 0..pool_size {
60 let conn = if vector_enabled {
61 #[cfg(feature = "sqlite-vec")]
62 {
63 sqlite::open_readonly_connection_with_vec(db_path)?
64 }
65 #[cfg(not(feature = "sqlite-vec"))]
66 {
67 sqlite::open_readonly_connection(db_path)?
68 }
69 } else {
70 sqlite::open_readonly_connection(db_path)?
71 };
72 schema_manager
73 .initialize_reader_connection(&conn)
74 .map_err(EngineError::Schema)?;
75 connections.push(Mutex::new(conn));
76 }
77 Ok(Self { connections })
78 }
79
80 fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
89 for conn in &self.connections {
91 if let Ok(guard) = conn.try_lock() {
92 return Ok(guard);
93 }
94 }
95 self.connections[0].lock().map_err(|_| {
97 trace_error!("read pool: connection mutex poisoned");
98 EngineError::Bridge("connection mutex poisoned".to_owned())
99 })
100 }
101
102 #[cfg(test)]
104 fn size(&self) -> usize {
105 self.connections.len()
106 }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq)]
113pub struct QueryPlan {
114 pub sql: String,
115 pub bind_count: usize,
116 pub driving_table: DrivingTable,
117 pub shape_hash: ShapeHash,
118 pub cache_hit: bool,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct NodeRow {
124 pub row_id: String,
126 pub logical_id: String,
128 pub kind: String,
130 pub properties: String,
132 pub last_accessed_at: Option<i64>,
134}
135
136#[derive(Clone, Debug, PartialEq, Eq)]
138pub struct RunRow {
139 pub id: String,
141 pub kind: String,
143 pub status: String,
145 pub properties: String,
147}
148
149#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct StepRow {
152 pub id: String,
154 pub run_id: String,
156 pub kind: String,
158 pub status: String,
160 pub properties: String,
162}
163
164#[derive(Clone, Debug, PartialEq, Eq)]
166pub struct ActionRow {
167 pub id: String,
169 pub step_id: String,
171 pub kind: String,
173 pub status: String,
175 pub properties: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Eq)]
181pub struct ProvenanceEvent {
182 pub id: String,
183 pub event_type: String,
184 pub subject: String,
185 pub source_ref: Option<String>,
186 pub metadata_json: String,
187 pub created_at: i64,
188}
189
190#[derive(Clone, Debug, Default, PartialEq, Eq)]
192pub struct QueryRows {
193 pub nodes: Vec<NodeRow>,
195 pub runs: Vec<RunRow>,
197 pub steps: Vec<StepRow>,
199 pub actions: Vec<ActionRow>,
201 pub was_degraded: bool,
204}
205
206#[derive(Clone, Debug, PartialEq, Eq)]
208pub struct ExpansionRootRows {
209 pub root_logical_id: String,
211 pub nodes: Vec<NodeRow>,
213}
214
215#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct ExpansionSlotRows {
218 pub slot: String,
220 pub roots: Vec<ExpansionRootRows>,
222}
223
224#[derive(Clone, Debug, Default, PartialEq, Eq)]
226pub struct GroupedQueryRows {
227 pub roots: Vec<NodeRow>,
229 pub expansions: Vec<ExpansionSlotRows>,
231 pub was_degraded: bool,
233}
234
235pub struct ExecutionCoordinator {
237 database_path: PathBuf,
238 schema_manager: Arc<SchemaManager>,
239 pool: ReadPool,
240 shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
241 vector_enabled: bool,
242 vec_degradation_warned: AtomicBool,
243 telemetry: Arc<TelemetryCounters>,
244}
245
246impl fmt::Debug for ExecutionCoordinator {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 f.debug_struct("ExecutionCoordinator")
249 .field("database_path", &self.database_path)
250 .finish_non_exhaustive()
251 }
252}
253
254impl ExecutionCoordinator {
255 pub fn open(
258 path: impl AsRef<Path>,
259 schema_manager: Arc<SchemaManager>,
260 vector_dimension: Option<usize>,
261 pool_size: usize,
262 telemetry: Arc<TelemetryCounters>,
263 ) -> Result<Self, EngineError> {
264 let path = path.as_ref().to_path_buf();
265 #[cfg(feature = "sqlite-vec")]
266 let conn = if vector_dimension.is_some() {
267 sqlite::open_connection_with_vec(&path)?
268 } else {
269 sqlite::open_connection(&path)?
270 };
271 #[cfg(not(feature = "sqlite-vec"))]
272 let conn = sqlite::open_connection(&path)?;
273
274 let report = schema_manager.bootstrap(&conn)?;
275
276 #[cfg(feature = "sqlite-vec")]
277 let mut vector_enabled = report.vector_profile_enabled;
278 #[cfg(not(feature = "sqlite-vec"))]
279 let vector_enabled = {
280 let _ = &report;
281 false
282 };
283
284 if let Some(dim) = vector_dimension {
285 schema_manager
286 .ensure_vector_profile(&conn, "default", "vec_nodes_active", dim)
287 .map_err(EngineError::Schema)?;
288 #[cfg(feature = "sqlite-vec")]
290 {
291 vector_enabled = true;
292 }
293 }
294
295 drop(conn);
297
298 let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
299
300 Ok(Self {
301 database_path: path,
302 schema_manager,
303 pool,
304 shape_sql_map: Mutex::new(HashMap::new()),
305 vector_enabled,
306 vec_degradation_warned: AtomicBool::new(false),
307 telemetry,
308 })
309 }
310
311 pub fn database_path(&self) -> &Path {
313 &self.database_path
314 }
315
316 #[must_use]
318 pub fn vector_enabled(&self) -> bool {
319 self.vector_enabled
320 }
321
322 fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
323 self.pool.acquire()
324 }
325
326 #[must_use]
332 pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
333 let mut total = SqliteCacheStatus::default();
334 for conn_mutex in &self.pool.connections {
335 if let Ok(conn) = conn_mutex.try_lock() {
336 total.add(&read_db_cache_status(&conn));
337 }
338 }
339 total
340 }
341
342 #[allow(clippy::expect_used)]
345 pub fn execute_compiled_read(
346 &self,
347 compiled: &CompiledQuery,
348 ) -> Result<QueryRows, EngineError> {
349 let row_sql = wrap_node_row_projection_sql(&compiled.sql);
350 {
356 let mut cache = self
357 .shape_sql_map
358 .lock()
359 .unwrap_or_else(PoisonError::into_inner);
360 if cache.len() >= MAX_SHAPE_CACHE_SIZE {
361 trace_debug!(evicted = cache.len(), "shape cache full, clearing");
362 cache.clear();
363 }
364 cache.insert(compiled.shape_hash, row_sql.clone());
365 }
366
367 let bind_values = compiled
368 .binds
369 .iter()
370 .map(bind_value_to_sql)
371 .collect::<Vec<_>>();
372
373 let conn_guard = match self.lock_connection() {
378 Ok(g) => g,
379 Err(e) => {
380 self.telemetry.increment_errors();
381 return Err(e);
382 }
383 };
384 let mut statement = match conn_guard.prepare_cached(&row_sql) {
385 Ok(stmt) => stmt,
386 Err(e) if is_vec_table_absent(&e) => {
387 if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
388 trace_warn!("vector table absent, degrading to non-vector query");
389 }
390 return Ok(QueryRows {
391 was_degraded: true,
392 ..Default::default()
393 });
394 }
395 Err(e) => {
396 self.telemetry.increment_errors();
397 return Err(EngineError::Sqlite(e));
398 }
399 };
400 let nodes = match statement
401 .query_map(params_from_iter(bind_values.iter()), |row| {
402 Ok(NodeRow {
403 row_id: row.get(0)?,
404 logical_id: row.get(1)?,
405 kind: row.get(2)?,
406 properties: row.get(3)?,
407 last_accessed_at: row.get(4)?,
408 })
409 })
410 .and_then(Iterator::collect)
411 {
412 Ok(rows) => rows,
413 Err(e) => {
414 self.telemetry.increment_errors();
415 return Err(EngineError::Sqlite(e));
416 }
417 };
418
419 self.telemetry.increment_queries();
420 Ok(QueryRows {
421 nodes,
422 runs: Vec::new(),
423 steps: Vec::new(),
424 actions: Vec::new(),
425 was_degraded: false,
426 })
427 }
428
429 pub fn execute_compiled_grouped_read(
433 &self,
434 compiled: &CompiledGroupedQuery,
435 ) -> Result<GroupedQueryRows, EngineError> {
436 let root_rows = self.execute_compiled_read(&compiled.root)?;
437 if root_rows.was_degraded {
438 return Ok(GroupedQueryRows {
439 roots: Vec::new(),
440 expansions: Vec::new(),
441 was_degraded: true,
442 });
443 }
444
445 let roots = root_rows.nodes;
446 let mut expansions = Vec::with_capacity(compiled.expansions.len());
447 for expansion in &compiled.expansions {
448 let slot_rows = if roots.is_empty() {
449 Vec::new()
450 } else {
451 self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
452 };
453 expansions.push(ExpansionSlotRows {
454 slot: expansion.slot.clone(),
455 roots: slot_rows,
456 });
457 }
458
459 Ok(GroupedQueryRows {
460 roots,
461 expansions,
462 was_degraded: false,
463 })
464 }
465
466 fn read_expansion_nodes_chunked(
472 &self,
473 roots: &[NodeRow],
474 expansion: &ExpansionSlot,
475 hard_limit: usize,
476 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
477 if roots.len() <= BATCH_CHUNK_SIZE {
478 return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
479 }
480
481 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
484 for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
485 for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
486 per_root
487 .entry(group.root_logical_id)
488 .or_default()
489 .extend(group.nodes);
490 }
491 }
492
493 Ok(roots
494 .iter()
495 .map(|root| ExpansionRootRows {
496 root_logical_id: root.logical_id.clone(),
497 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
498 })
499 .collect())
500 }
501
502 fn read_expansion_nodes_batched(
507 &self,
508 roots: &[NodeRow],
509 expansion: &ExpansionSlot,
510 hard_limit: usize,
511 ) -> Result<Vec<ExpansionRootRows>, EngineError> {
512 let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
513 let (join_condition, next_logical_id) = match expansion.direction {
514 fathomdb_query::TraverseDirection::Out => {
515 ("e.source_logical_id = t.logical_id", "e.target_logical_id")
516 }
517 fathomdb_query::TraverseDirection::In => {
518 ("e.target_logical_id = t.logical_id", "e.source_logical_id")
519 }
520 };
521
522 let root_seed_union: String = (1..=root_ids.len())
526 .map(|i| format!("SELECT ?{i}"))
527 .collect::<Vec<_>>()
528 .join(" UNION ALL ");
529
530 let sql = format!(
534 "WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
535 traversed(root_id, logical_id, depth, visited, emitted) AS (
536 SELECT rid, rid, 0, printf(',%s,', rid), 0
537 FROM root_ids
538 UNION ALL
539 SELECT
540 t.root_id,
541 {next_logical_id},
542 t.depth + 1,
543 t.visited || {next_logical_id} || ',',
544 t.emitted + 1
545 FROM traversed t
546 JOIN edges e ON {join_condition}
547 AND e.kind = ?{edge_kind_param}
548 AND e.superseded_at IS NULL
549 WHERE t.depth < {max_depth}
550 AND t.emitted < {hard_limit}
551 AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
552 ),
553 numbered AS (
554 SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
555 , am.last_accessed_at
556 , ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
557 FROM traversed t
558 JOIN nodes n ON n.logical_id = t.logical_id
559 AND n.superseded_at IS NULL
560 LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
561 WHERE t.depth > 0
562 )
563 SELECT root_id, row_id, logical_id, kind, properties, last_accessed_at
564 FROM numbered
565 WHERE rn <= {hard_limit}
566 ORDER BY root_id, logical_id",
567 edge_kind_param = root_ids.len() + 1,
568 max_depth = expansion.max_depth,
569 );
570
571 let conn_guard = self.lock_connection()?;
572 let mut statement = conn_guard
573 .prepare_cached(&sql)
574 .map_err(EngineError::Sqlite)?;
575
576 let mut bind_values: Vec<Value> = root_ids
578 .iter()
579 .map(|id| Value::Text((*id).to_owned()))
580 .collect();
581 bind_values.push(Value::Text(expansion.label.clone()));
582
583 let rows = statement
584 .query_map(params_from_iter(bind_values.iter()), |row| {
585 Ok((
586 row.get::<_, String>(0)?, NodeRow {
588 row_id: row.get(1)?,
589 logical_id: row.get(2)?,
590 kind: row.get(3)?,
591 properties: row.get(4)?,
592 last_accessed_at: row.get(5)?,
593 },
594 ))
595 })
596 .map_err(EngineError::Sqlite)?
597 .collect::<Result<Vec<_>, _>>()
598 .map_err(EngineError::Sqlite)?;
599
600 let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
602 for (root_id, node) in rows {
603 per_root.entry(root_id).or_default().push(node);
604 }
605
606 let root_groups = roots
607 .iter()
608 .map(|root| ExpansionRootRows {
609 root_logical_id: root.logical_id.clone(),
610 nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
611 })
612 .collect();
613
614 Ok(root_groups)
615 }
616
617 pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
623 let conn = self.lock_connection()?;
624 conn.query_row(
625 "SELECT id, kind, status, properties FROM runs WHERE id = ?1",
626 rusqlite::params![id],
627 |row| {
628 Ok(RunRow {
629 id: row.get(0)?,
630 kind: row.get(1)?,
631 status: row.get(2)?,
632 properties: row.get(3)?,
633 })
634 },
635 )
636 .optional()
637 .map_err(EngineError::Sqlite)
638 }
639
640 pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
646 let conn = self.lock_connection()?;
647 conn.query_row(
648 "SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
649 rusqlite::params![id],
650 |row| {
651 Ok(StepRow {
652 id: row.get(0)?,
653 run_id: row.get(1)?,
654 kind: row.get(2)?,
655 status: row.get(3)?,
656 properties: row.get(4)?,
657 })
658 },
659 )
660 .optional()
661 .map_err(EngineError::Sqlite)
662 }
663
664 pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
670 let conn = self.lock_connection()?;
671 conn.query_row(
672 "SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
673 rusqlite::params![id],
674 |row| {
675 Ok(ActionRow {
676 id: row.get(0)?,
677 step_id: row.get(1)?,
678 kind: row.get(2)?,
679 status: row.get(3)?,
680 properties: row.get(4)?,
681 })
682 },
683 )
684 .optional()
685 .map_err(EngineError::Sqlite)
686 }
687
688 pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
694 let conn = self.lock_connection()?;
695 let mut stmt = conn
696 .prepare_cached(
697 "SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
698 )
699 .map_err(EngineError::Sqlite)?;
700 let rows = stmt
701 .query_map([], |row| {
702 Ok(RunRow {
703 id: row.get(0)?,
704 kind: row.get(1)?,
705 status: row.get(2)?,
706 properties: row.get(3)?,
707 })
708 })
709 .map_err(EngineError::Sqlite)?
710 .collect::<Result<Vec<_>, _>>()
711 .map_err(EngineError::Sqlite)?;
712 Ok(rows)
713 }
714
715 #[must_use]
725 #[allow(clippy::expect_used)]
726 pub fn shape_sql_count(&self) -> usize {
727 self.shape_sql_map
728 .lock()
729 .unwrap_or_else(PoisonError::into_inner)
730 .len()
731 }
732
733 #[must_use]
735 pub fn schema_manager(&self) -> Arc<SchemaManager> {
736 Arc::clone(&self.schema_manager)
737 }
738
739 #[must_use]
748 pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
749 let cache_hit = self
750 .shape_sql_map
751 .lock()
752 .unwrap_or_else(PoisonError::into_inner)
753 .contains_key(&compiled.shape_hash);
754 QueryPlan {
755 sql: wrap_node_row_projection_sql(&compiled.sql),
756 bind_count: compiled.binds.len(),
757 driving_table: compiled.driving_table,
758 shape_hash: compiled.shape_hash,
759 cache_hit,
760 }
761 }
762
763 #[doc(hidden)]
770 pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
771 let conn = self.lock_connection()?;
772 let result = conn
773 .query_row(&format!("PRAGMA {name}"), [], |row| {
774 row.get::<_, rusqlite::types::Value>(0)
776 })
777 .map_err(EngineError::Sqlite)?;
778 let s = match result {
779 rusqlite::types::Value::Text(t) => t,
780 rusqlite::types::Value::Integer(i) => i.to_string(),
781 rusqlite::types::Value::Real(f) => f.to_string(),
782 rusqlite::types::Value::Blob(_) => {
783 return Err(EngineError::InvalidWrite(format!(
784 "PRAGMA {name} returned an unexpected BLOB value"
785 )));
786 }
787 rusqlite::types::Value::Null => String::new(),
788 };
789 Ok(s)
790 }
791
792 pub fn query_provenance_events(
801 &self,
802 subject: &str,
803 ) -> Result<Vec<ProvenanceEvent>, EngineError> {
804 let conn = self.lock_connection()?;
805 let mut stmt = conn
806 .prepare_cached(
807 "SELECT id, event_type, subject, source_ref, metadata_json, created_at \
808 FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
809 )
810 .map_err(EngineError::Sqlite)?;
811 let events = stmt
812 .query_map(rusqlite::params![subject], |row| {
813 Ok(ProvenanceEvent {
814 id: row.get(0)?,
815 event_type: row.get(1)?,
816 subject: row.get(2)?,
817 source_ref: row.get(3)?,
818 metadata_json: row.get(4)?,
819 created_at: row.get(5)?,
820 })
821 })
822 .map_err(EngineError::Sqlite)?
823 .collect::<Result<Vec<_>, _>>()
824 .map_err(EngineError::Sqlite)?;
825 Ok(events)
826 }
827}
828
829fn wrap_node_row_projection_sql(base_sql: &str) -> String {
830 format!(
831 "SELECT q.row_id, q.logical_id, q.kind, q.properties, am.last_accessed_at \
832 FROM ({base_sql}) q \
833 LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
834 )
835}
836
837pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
840 match err {
841 rusqlite::Error::SqliteFailure(_, Some(msg)) => {
842 msg.contains("vec_nodes_active") || msg.contains("no such module: vec0")
843 }
844 _ => false,
845 }
846}
847
848fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
849 match value {
850 fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
851 fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
852 fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
853 }
854}
855
856#[cfg(test)]
857#[allow(clippy::expect_used)]
858mod tests {
859 use std::panic::{AssertUnwindSafe, catch_unwind};
860 use std::sync::Arc;
861
862 use fathomdb_query::{BindValue, QueryBuilder};
863 use fathomdb_schema::SchemaManager;
864 use rusqlite::types::Value;
865 use tempfile::NamedTempFile;
866
867 use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
868
869 use super::{bind_value_to_sql, is_vec_table_absent, wrap_node_row_projection_sql};
870
871 #[test]
872 fn is_vec_table_absent_matches_known_error_messages() {
873 use rusqlite::ffi;
874 fn make_err(msg: &str) -> rusqlite::Error {
875 rusqlite::Error::SqliteFailure(
876 ffi::Error {
877 code: ffi::ErrorCode::Unknown,
878 extended_code: 1,
879 },
880 Some(msg.to_owned()),
881 )
882 }
883 assert!(is_vec_table_absent(&make_err(
884 "no such table: vec_nodes_active"
885 )));
886 assert!(is_vec_table_absent(&make_err("no such module: vec0")));
887 assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
888 assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
889 assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
890 }
891
892 #[test]
893 fn bind_value_text_maps_to_sql_text() {
894 let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
895 assert_eq!(val, Value::Text("hello".to_owned()));
896 }
897
898 #[test]
899 fn bind_value_integer_maps_to_sql_integer() {
900 let val = bind_value_to_sql(&BindValue::Integer(42));
901 assert_eq!(val, Value::Integer(42));
902 }
903
904 #[test]
905 fn bind_value_bool_true_maps_to_integer_one() {
906 let val = bind_value_to_sql(&BindValue::Bool(true));
907 assert_eq!(val, Value::Integer(1));
908 }
909
910 #[test]
911 fn bind_value_bool_false_maps_to_integer_zero() {
912 let val = bind_value_to_sql(&BindValue::Bool(false));
913 assert_eq!(val, Value::Integer(0));
914 }
915
916 #[test]
917 fn same_shape_queries_share_one_cache_entry() {
918 let db = NamedTempFile::new().expect("temporary db");
919 let coordinator = ExecutionCoordinator::open(
920 db.path(),
921 Arc::new(SchemaManager::new()),
922 None,
923 1,
924 Arc::new(TelemetryCounters::default()),
925 )
926 .expect("coordinator");
927
928 let compiled_a = QueryBuilder::nodes("Meeting")
929 .text_search("budget", 5)
930 .limit(10)
931 .compile()
932 .expect("compiled a");
933 let compiled_b = QueryBuilder::nodes("Meeting")
934 .text_search("standup", 5)
935 .limit(10)
936 .compile()
937 .expect("compiled b");
938
939 coordinator
940 .execute_compiled_read(&compiled_a)
941 .expect("read a");
942 coordinator
943 .execute_compiled_read(&compiled_b)
944 .expect("read b");
945
946 assert_eq!(
947 compiled_a.shape_hash, compiled_b.shape_hash,
948 "different bind values, same structural shape → same hash"
949 );
950 assert_eq!(coordinator.shape_sql_count(), 1);
951 }
952
953 #[test]
954 fn vector_read_degrades_gracefully_when_vec_table_absent() {
955 let db = NamedTempFile::new().expect("temporary db");
956 let coordinator = ExecutionCoordinator::open(
957 db.path(),
958 Arc::new(SchemaManager::new()),
959 None,
960 1,
961 Arc::new(TelemetryCounters::default()),
962 )
963 .expect("coordinator");
964
965 let compiled = QueryBuilder::nodes("Meeting")
966 .vector_search("budget embeddings", 5)
967 .compile()
968 .expect("vector query compiles");
969
970 let result = coordinator.execute_compiled_read(&compiled);
971 let rows = result.expect("degraded read must succeed, not error");
972 assert!(
973 rows.was_degraded,
974 "result must be flagged as degraded when vec_nodes_active is absent"
975 );
976 assert!(
977 rows.nodes.is_empty(),
978 "degraded result must return empty nodes"
979 );
980 }
981
982 #[test]
983 fn coordinator_caches_by_shape_hash() {
984 let db = NamedTempFile::new().expect("temporary db");
985 let coordinator = ExecutionCoordinator::open(
986 db.path(),
987 Arc::new(SchemaManager::new()),
988 None,
989 1,
990 Arc::new(TelemetryCounters::default()),
991 )
992 .expect("coordinator");
993
994 let compiled = QueryBuilder::nodes("Meeting")
995 .text_search("budget", 5)
996 .compile()
997 .expect("compiled query");
998
999 coordinator
1000 .execute_compiled_read(&compiled)
1001 .expect("execute compiled read");
1002 assert_eq!(coordinator.shape_sql_count(), 1);
1003 }
1004
1005 #[test]
1008 fn explain_returns_correct_sql() {
1009 let db = NamedTempFile::new().expect("temporary db");
1010 let coordinator = ExecutionCoordinator::open(
1011 db.path(),
1012 Arc::new(SchemaManager::new()),
1013 None,
1014 1,
1015 Arc::new(TelemetryCounters::default()),
1016 )
1017 .expect("coordinator");
1018
1019 let compiled = QueryBuilder::nodes("Meeting")
1020 .text_search("budget", 5)
1021 .compile()
1022 .expect("compiled query");
1023
1024 let plan = coordinator.explain_compiled_read(&compiled);
1025
1026 assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
1027 }
1028
1029 #[test]
1030 fn explain_returns_correct_driving_table() {
1031 use fathomdb_query::DrivingTable;
1032
1033 let db = NamedTempFile::new().expect("temporary db");
1034 let coordinator = ExecutionCoordinator::open(
1035 db.path(),
1036 Arc::new(SchemaManager::new()),
1037 None,
1038 1,
1039 Arc::new(TelemetryCounters::default()),
1040 )
1041 .expect("coordinator");
1042
1043 let compiled = QueryBuilder::nodes("Meeting")
1044 .text_search("budget", 5)
1045 .compile()
1046 .expect("compiled query");
1047
1048 let plan = coordinator.explain_compiled_read(&compiled);
1049
1050 assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
1051 }
1052
1053 #[test]
1054 fn explain_reports_cache_miss_then_hit() {
1055 let db = NamedTempFile::new().expect("temporary db");
1056 let coordinator = ExecutionCoordinator::open(
1057 db.path(),
1058 Arc::new(SchemaManager::new()),
1059 None,
1060 1,
1061 Arc::new(TelemetryCounters::default()),
1062 )
1063 .expect("coordinator");
1064
1065 let compiled = QueryBuilder::nodes("Meeting")
1066 .text_search("budget", 5)
1067 .compile()
1068 .expect("compiled query");
1069
1070 let plan_before = coordinator.explain_compiled_read(&compiled);
1072 assert!(
1073 !plan_before.cache_hit,
1074 "cache miss expected before first execute"
1075 );
1076
1077 coordinator
1079 .execute_compiled_read(&compiled)
1080 .expect("execute read");
1081
1082 let plan_after = coordinator.explain_compiled_read(&compiled);
1084 assert!(
1085 plan_after.cache_hit,
1086 "cache hit expected after first execute"
1087 );
1088 }
1089
1090 #[test]
1091 fn explain_does_not_execute_query() {
1092 let db = NamedTempFile::new().expect("temporary db");
1097 let coordinator = ExecutionCoordinator::open(
1098 db.path(),
1099 Arc::new(SchemaManager::new()),
1100 None,
1101 1,
1102 Arc::new(TelemetryCounters::default()),
1103 )
1104 .expect("coordinator");
1105
1106 let compiled = QueryBuilder::nodes("Meeting")
1107 .text_search("anything", 5)
1108 .compile()
1109 .expect("compiled query");
1110
1111 let plan = coordinator.explain_compiled_read(&compiled);
1113
1114 assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
1115 assert_eq!(plan.bind_count, compiled.binds.len());
1116 }
1117
1118 #[test]
1119 fn coordinator_executes_compiled_read() {
1120 let db = NamedTempFile::new().expect("temporary db");
1121 let coordinator = ExecutionCoordinator::open(
1122 db.path(),
1123 Arc::new(SchemaManager::new()),
1124 None,
1125 1,
1126 Arc::new(TelemetryCounters::default()),
1127 )
1128 .expect("coordinator");
1129 let conn = rusqlite::Connection::open(db.path()).expect("open db");
1130
1131 conn.execute_batch(
1132 r#"
1133 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1134 VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
1135 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1136 VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
1137 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1138 VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
1139 "#,
1140 )
1141 .expect("seed data");
1142
1143 let compiled = QueryBuilder::nodes("Meeting")
1144 .text_search("budget", 5)
1145 .limit(5)
1146 .compile()
1147 .expect("compiled query");
1148
1149 let rows = coordinator
1150 .execute_compiled_read(&compiled)
1151 .expect("execute read");
1152
1153 assert_eq!(rows.nodes.len(), 1);
1154 assert_eq!(rows.nodes[0].logical_id, "meeting-1");
1155 }
1156
1157 #[test]
1160 fn capability_gate_reports_false_without_feature() {
1161 let db = NamedTempFile::new().expect("temporary db");
1162 let coordinator = ExecutionCoordinator::open(
1165 db.path(),
1166 Arc::new(SchemaManager::new()),
1167 None,
1168 1,
1169 Arc::new(TelemetryCounters::default()),
1170 )
1171 .expect("coordinator");
1172 assert!(
1173 !coordinator.vector_enabled(),
1174 "vector_enabled must be false when no dimension is requested"
1175 );
1176 }
1177
1178 #[cfg(feature = "sqlite-vec")]
1179 #[test]
1180 fn capability_gate_reports_true_when_feature_enabled() {
1181 let db = NamedTempFile::new().expect("temporary db");
1182 let coordinator = ExecutionCoordinator::open(
1183 db.path(),
1184 Arc::new(SchemaManager::new()),
1185 Some(128),
1186 1,
1187 Arc::new(TelemetryCounters::default()),
1188 )
1189 .expect("coordinator");
1190 assert!(
1191 coordinator.vector_enabled(),
1192 "vector_enabled must be true when sqlite-vec feature is active and dimension is set"
1193 );
1194 }
1195
1196 #[test]
1199 fn read_run_returns_inserted_run() {
1200 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1201
1202 let db = NamedTempFile::new().expect("temporary db");
1203 let writer = WriterActor::start(
1204 db.path(),
1205 Arc::new(SchemaManager::new()),
1206 ProvenanceMode::Warn,
1207 Arc::new(TelemetryCounters::default()),
1208 )
1209 .expect("writer");
1210 writer
1211 .submit(WriteRequest {
1212 label: "runtime".to_owned(),
1213 nodes: vec![],
1214 node_retires: vec![],
1215 edges: vec![],
1216 edge_retires: vec![],
1217 chunks: vec![],
1218 runs: vec![RunInsert {
1219 id: "run-r1".to_owned(),
1220 kind: "session".to_owned(),
1221 status: "active".to_owned(),
1222 properties: "{}".to_owned(),
1223 source_ref: Some("src-1".to_owned()),
1224 upsert: false,
1225 supersedes_id: None,
1226 }],
1227 steps: vec![],
1228 actions: vec![],
1229 optional_backfills: vec![],
1230 vec_inserts: vec![],
1231 operational_writes: vec![],
1232 })
1233 .expect("write run");
1234
1235 let coordinator = ExecutionCoordinator::open(
1236 db.path(),
1237 Arc::new(SchemaManager::new()),
1238 None,
1239 1,
1240 Arc::new(TelemetryCounters::default()),
1241 )
1242 .expect("coordinator");
1243 let row = coordinator
1244 .read_run("run-r1")
1245 .expect("read_run")
1246 .expect("row exists");
1247 assert_eq!(row.id, "run-r1");
1248 assert_eq!(row.kind, "session");
1249 assert_eq!(row.status, "active");
1250 }
1251
1252 #[test]
1253 fn read_step_returns_inserted_step() {
1254 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
1255
1256 let db = NamedTempFile::new().expect("temporary db");
1257 let writer = WriterActor::start(
1258 db.path(),
1259 Arc::new(SchemaManager::new()),
1260 ProvenanceMode::Warn,
1261 Arc::new(TelemetryCounters::default()),
1262 )
1263 .expect("writer");
1264 writer
1265 .submit(WriteRequest {
1266 label: "runtime".to_owned(),
1267 nodes: vec![],
1268 node_retires: vec![],
1269 edges: vec![],
1270 edge_retires: vec![],
1271 chunks: vec![],
1272 runs: vec![RunInsert {
1273 id: "run-s1".to_owned(),
1274 kind: "session".to_owned(),
1275 status: "active".to_owned(),
1276 properties: "{}".to_owned(),
1277 source_ref: Some("src-1".to_owned()),
1278 upsert: false,
1279 supersedes_id: None,
1280 }],
1281 steps: vec![StepInsert {
1282 id: "step-s1".to_owned(),
1283 run_id: "run-s1".to_owned(),
1284 kind: "llm".to_owned(),
1285 status: "completed".to_owned(),
1286 properties: "{}".to_owned(),
1287 source_ref: Some("src-1".to_owned()),
1288 upsert: false,
1289 supersedes_id: None,
1290 }],
1291 actions: vec![],
1292 optional_backfills: vec![],
1293 vec_inserts: vec![],
1294 operational_writes: vec![],
1295 })
1296 .expect("write step");
1297
1298 let coordinator = ExecutionCoordinator::open(
1299 db.path(),
1300 Arc::new(SchemaManager::new()),
1301 None,
1302 1,
1303 Arc::new(TelemetryCounters::default()),
1304 )
1305 .expect("coordinator");
1306 let row = coordinator
1307 .read_step("step-s1")
1308 .expect("read_step")
1309 .expect("row exists");
1310 assert_eq!(row.id, "step-s1");
1311 assert_eq!(row.run_id, "run-s1");
1312 assert_eq!(row.kind, "llm");
1313 }
1314
1315 #[test]
1316 fn read_action_returns_inserted_action() {
1317 use crate::{
1318 ProvenanceMode, RunInsert, WriteRequest, WriterActor,
1319 writer::{ActionInsert, StepInsert},
1320 };
1321
1322 let db = NamedTempFile::new().expect("temporary db");
1323 let writer = WriterActor::start(
1324 db.path(),
1325 Arc::new(SchemaManager::new()),
1326 ProvenanceMode::Warn,
1327 Arc::new(TelemetryCounters::default()),
1328 )
1329 .expect("writer");
1330 writer
1331 .submit(WriteRequest {
1332 label: "runtime".to_owned(),
1333 nodes: vec![],
1334 node_retires: vec![],
1335 edges: vec![],
1336 edge_retires: vec![],
1337 chunks: vec![],
1338 runs: vec![RunInsert {
1339 id: "run-a1".to_owned(),
1340 kind: "session".to_owned(),
1341 status: "active".to_owned(),
1342 properties: "{}".to_owned(),
1343 source_ref: Some("src-1".to_owned()),
1344 upsert: false,
1345 supersedes_id: None,
1346 }],
1347 steps: vec![StepInsert {
1348 id: "step-a1".to_owned(),
1349 run_id: "run-a1".to_owned(),
1350 kind: "llm".to_owned(),
1351 status: "completed".to_owned(),
1352 properties: "{}".to_owned(),
1353 source_ref: Some("src-1".to_owned()),
1354 upsert: false,
1355 supersedes_id: None,
1356 }],
1357 actions: vec![ActionInsert {
1358 id: "action-a1".to_owned(),
1359 step_id: "step-a1".to_owned(),
1360 kind: "emit".to_owned(),
1361 status: "completed".to_owned(),
1362 properties: "{}".to_owned(),
1363 source_ref: Some("src-1".to_owned()),
1364 upsert: false,
1365 supersedes_id: None,
1366 }],
1367 optional_backfills: vec![],
1368 vec_inserts: vec![],
1369 operational_writes: vec![],
1370 })
1371 .expect("write action");
1372
1373 let coordinator = ExecutionCoordinator::open(
1374 db.path(),
1375 Arc::new(SchemaManager::new()),
1376 None,
1377 1,
1378 Arc::new(TelemetryCounters::default()),
1379 )
1380 .expect("coordinator");
1381 let row = coordinator
1382 .read_action("action-a1")
1383 .expect("read_action")
1384 .expect("row exists");
1385 assert_eq!(row.id, "action-a1");
1386 assert_eq!(row.step_id, "step-a1");
1387 assert_eq!(row.kind, "emit");
1388 }
1389
1390 #[test]
1391 fn read_active_runs_excludes_superseded() {
1392 use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
1393
1394 let db = NamedTempFile::new().expect("temporary db");
1395 let writer = WriterActor::start(
1396 db.path(),
1397 Arc::new(SchemaManager::new()),
1398 ProvenanceMode::Warn,
1399 Arc::new(TelemetryCounters::default()),
1400 )
1401 .expect("writer");
1402
1403 writer
1405 .submit(WriteRequest {
1406 label: "v1".to_owned(),
1407 nodes: vec![],
1408 node_retires: vec![],
1409 edges: vec![],
1410 edge_retires: vec![],
1411 chunks: vec![],
1412 runs: vec![RunInsert {
1413 id: "run-v1".to_owned(),
1414 kind: "session".to_owned(),
1415 status: "active".to_owned(),
1416 properties: "{}".to_owned(),
1417 source_ref: Some("src-1".to_owned()),
1418 upsert: false,
1419 supersedes_id: None,
1420 }],
1421 steps: vec![],
1422 actions: vec![],
1423 optional_backfills: vec![],
1424 vec_inserts: vec![],
1425 operational_writes: vec![],
1426 })
1427 .expect("v1 write");
1428
1429 writer
1431 .submit(WriteRequest {
1432 label: "v2".to_owned(),
1433 nodes: vec![],
1434 node_retires: vec![],
1435 edges: vec![],
1436 edge_retires: vec![],
1437 chunks: vec![],
1438 runs: vec![RunInsert {
1439 id: "run-v2".to_owned(),
1440 kind: "session".to_owned(),
1441 status: "completed".to_owned(),
1442 properties: "{}".to_owned(),
1443 source_ref: Some("src-2".to_owned()),
1444 upsert: true,
1445 supersedes_id: Some("run-v1".to_owned()),
1446 }],
1447 steps: vec![],
1448 actions: vec![],
1449 optional_backfills: vec![],
1450 vec_inserts: vec![],
1451 operational_writes: vec![],
1452 })
1453 .expect("v2 write");
1454
1455 let coordinator = ExecutionCoordinator::open(
1456 db.path(),
1457 Arc::new(SchemaManager::new()),
1458 None,
1459 1,
1460 Arc::new(TelemetryCounters::default()),
1461 )
1462 .expect("coordinator");
1463 let active = coordinator.read_active_runs().expect("read_active_runs");
1464
1465 assert_eq!(active.len(), 1, "only the non-superseded run should appear");
1466 assert_eq!(active[0].id, "run-v2");
1467 }
1468
1469 #[allow(clippy::panic)]
1470 fn poison_connection(coordinator: &ExecutionCoordinator) {
1471 let result = catch_unwind(AssertUnwindSafe(|| {
1472 let _guard = coordinator.pool.connections[0]
1473 .lock()
1474 .expect("poison test lock");
1475 panic!("poison coordinator connection mutex");
1476 }));
1477 assert!(
1478 result.is_err(),
1479 "poison test must unwind while holding the connection mutex"
1480 );
1481 }
1482
1483 #[allow(clippy::panic)]
1484 fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
1485 where
1486 F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
1487 {
1488 match op(coordinator) {
1489 Err(EngineError::Bridge(message)) => {
1490 assert_eq!(message, "connection mutex poisoned");
1491 }
1492 Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
1493 Err(error) => panic!("expected poisoned connection error, got {error:?}"),
1494 }
1495 }
1496
1497 #[test]
1498 fn poisoned_connection_returns_bridge_error_for_read_helpers() {
1499 let db = NamedTempFile::new().expect("temporary db");
1500 let coordinator = ExecutionCoordinator::open(
1501 db.path(),
1502 Arc::new(SchemaManager::new()),
1503 None,
1504 1,
1505 Arc::new(TelemetryCounters::default()),
1506 )
1507 .expect("coordinator");
1508
1509 poison_connection(&coordinator);
1510
1511 assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
1512 assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
1513 assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
1514 assert_poisoned_connection_error(
1515 &coordinator,
1516 super::ExecutionCoordinator::read_active_runs,
1517 );
1518 assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
1519 assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
1520 }
1521
1522 #[test]
1525 fn shape_cache_stays_bounded() {
1526 use fathomdb_query::ShapeHash;
1527
1528 let db = NamedTempFile::new().expect("temporary db");
1529 let coordinator = ExecutionCoordinator::open(
1530 db.path(),
1531 Arc::new(SchemaManager::new()),
1532 None,
1533 1,
1534 Arc::new(TelemetryCounters::default()),
1535 )
1536 .expect("coordinator");
1537
1538 {
1540 let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
1541 for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
1542 cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
1543 }
1544 }
1545 let compiled = QueryBuilder::nodes("Meeting")
1550 .text_search("budget", 5)
1551 .limit(10)
1552 .compile()
1553 .expect("compiled query");
1554
1555 coordinator
1556 .execute_compiled_read(&compiled)
1557 .expect("execute read");
1558
1559 assert!(
1560 coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
1561 "shape cache must stay bounded: got {} entries, max {}",
1562 coordinator.shape_sql_count(),
1563 super::MAX_SHAPE_CACHE_SIZE
1564 );
1565 }
1566
1567 #[test]
1570 fn read_pool_size_configurable() {
1571 let db = NamedTempFile::new().expect("temporary db");
1572 let coordinator = ExecutionCoordinator::open(
1573 db.path(),
1574 Arc::new(SchemaManager::new()),
1575 None,
1576 2,
1577 Arc::new(TelemetryCounters::default()),
1578 )
1579 .expect("coordinator with pool_size=2");
1580
1581 assert_eq!(coordinator.pool.size(), 2);
1582
1583 let compiled = QueryBuilder::nodes("Meeting")
1585 .text_search("budget", 5)
1586 .limit(10)
1587 .compile()
1588 .expect("compiled query");
1589
1590 let result = coordinator.execute_compiled_read(&compiled);
1591 assert!(result.is_ok(), "read through pool must succeed");
1592 }
1593
1594 #[test]
1597 fn grouped_read_results_match_baseline() {
1598 use fathomdb_query::TraverseDirection;
1599
1600 let db = NamedTempFile::new().expect("temporary db");
1601
1602 let coordinator = ExecutionCoordinator::open(
1604 db.path(),
1605 Arc::new(SchemaManager::new()),
1606 None,
1607 1,
1608 Arc::new(TelemetryCounters::default()),
1609 )
1610 .expect("coordinator");
1611
1612 {
1615 let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
1616 for i in 0..10 {
1617 conn.execute_batch(&format!(
1618 r#"
1619 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1620 VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
1621 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
1622 VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
1623 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
1624 VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
1625
1626 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1627 VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
1628 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
1629 VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
1630
1631 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1632 VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
1633 INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
1634 VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
1635 "#,
1636 )).expect("seed data");
1637 }
1638 }
1639
1640 let compiled = QueryBuilder::nodes("Meeting")
1641 .text_search("meeting", 10)
1642 .expand("tasks", TraverseDirection::Out, "HAS_TASK", 1)
1643 .limit(10)
1644 .compile_grouped()
1645 .expect("compiled grouped query");
1646
1647 let result = coordinator
1648 .execute_compiled_grouped_read(&compiled)
1649 .expect("grouped read");
1650
1651 assert!(!result.was_degraded, "grouped read should not be degraded");
1652 assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
1653 assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
1654 assert_eq!(result.expansions[0].slot, "tasks");
1655 assert_eq!(
1656 result.expansions[0].roots.len(),
1657 10,
1658 "each expansion slot should have entries for all 10 roots"
1659 );
1660
1661 for root_expansion in &result.expansions[0].roots {
1663 assert_eq!(
1664 root_expansion.nodes.len(),
1665 2,
1666 "root {} should have 2 expansion nodes, got {}",
1667 root_expansion.root_logical_id,
1668 root_expansion.nodes.len()
1669 );
1670 }
1671 }
1672}