Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(all(feature = "gql", feature = "lpg"))]
8pub mod stream;
9#[cfg(all(feature = "algos", feature = "gql"))]
10pub mod user_procedure;
11
12use std::time::{Duration, Instant};
13
14use crate::config::AdaptiveConfig;
15use crate::database::QueryResult;
16use grafeo_common::grafeo_debug_span;
17use grafeo_common::types::{LogicalType, Value};
18use grafeo_common::utils::error::{Error, QueryError, Result};
19use grafeo_core::execution::operators::{Operator, OperatorError};
20use grafeo_core::execution::{
21    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, Pipeline,
22    SharedAdaptiveContext,
23};
24
25/// Executes a physical operator tree and collects results.
26pub struct Executor {
27    /// Column names for the result.
28    columns: Vec<String>,
29    /// Column types for the result.
30    column_types: Vec<LogicalType>,
31    /// Wall-clock deadline after which execution is aborted.
32    deadline: Option<Instant>,
33    /// The configured timeout duration (for error messages).
34    query_timeout: Option<Duration>,
35}
36
37impl Executor {
38    /// Creates a new executor.
39    #[must_use]
40    pub fn new() -> Self {
41        Self {
42            columns: Vec::new(),
43            column_types: Vec::new(),
44            deadline: None,
45            query_timeout: None,
46        }
47    }
48
49    /// Creates an executor with specified column names.
50    #[must_use]
51    pub fn with_columns(columns: Vec<String>) -> Self {
52        let len = columns.len();
53        Self {
54            columns,
55            column_types: vec![LogicalType::Any; len],
56            deadline: None,
57            query_timeout: None,
58        }
59    }
60
61    /// Creates an executor with specified column names and types.
62    #[must_use]
63    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
64        Self {
65            columns,
66            column_types,
67            deadline: None,
68            query_timeout: None,
69        }
70    }
71
72    /// Sets a wall-clock deadline for query execution.
73    #[must_use]
74    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
75        self.deadline = deadline;
76        self
77    }
78
79    /// Sets the original timeout duration (used for error messages).
80    #[must_use]
81    pub fn with_timeout_duration(mut self, timeout: Option<Duration>) -> Self {
82        self.query_timeout = timeout;
83        self
84    }
85
86    /// Checks whether the deadline has been exceeded.
87    fn check_deadline(&self) -> Result<()> {
88        #[cfg(not(target_arch = "wasm32"))]
89        if let Some(deadline) = self.deadline
90            && Instant::now() >= deadline
91        {
92            return Err(Error::Query(match self.query_timeout {
93                Some(d) => QueryError::timeout_with_limit(d),
94                None => QueryError::timeout(),
95            }));
96        }
97        Ok(())
98    }
99
100    /// Executes a physical operator and collects all results.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if operator execution fails or the query timeout is exceeded.
105    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
106        let _span = grafeo_debug_span!("grafeo::query::execute");
107        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
108        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
109
110        loop {
111            self.check_deadline()?;
112
113            match operator.next() {
114                Ok(Some(chunk)) => {
115                    // Capture column types from first non-empty chunk
116                    if !types_captured && chunk.column_count() > 0 {
117                        self.capture_column_types(&chunk, &mut result);
118                        types_captured = true;
119                    }
120                    self.collect_chunk(&chunk, &mut result)?;
121                }
122                Ok(None) => break,
123                Err(err) => return Err(convert_operator_error(err)),
124            }
125        }
126
127        Ok(result)
128    }
129
130    /// Executes a push-based pipeline.
131    ///
132    /// The source operator is wrapped in `OperatorSource`, push operators form
133    /// the pipeline body, and a `ChunkCollector` gathers results.
134    ///
135    /// # Panics
136    ///
137    /// Panics if the internal sink downcast fails (should never happen since we
138    /// create the `ChunkCollector` ourselves).
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if pipeline execution fails or the query timeout is exceeded.
143    pub fn execute_pipeline(
144        &self,
145        source: Box<dyn Operator>,
146        push_ops: Vec<Box<dyn grafeo_core::execution::pipeline::PushOperator>>,
147    ) -> Result<QueryResult> {
148        use grafeo_core::execution::{ChunkCollector, OperatorSource};
149
150        let _span = grafeo_debug_span!("grafeo::query::execute_pipeline");
151
152        let source = Box::new(OperatorSource::new(source));
153        let collector = ChunkCollector::new();
154
155        // Build and execute the pipeline with deadline enforcement
156        let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
157        pipeline.set_deadline(self.deadline);
158        pipeline.execute().map_err(convert_operator_error)?;
159
160        // Extract the sink (ChunkCollector) and get the chunks
161        // Safety: we know the sink is a ChunkCollector because we just created it
162        let sink_box = pipeline.into_sink();
163        let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
164        let collector = any_sink
165            .downcast::<ChunkCollector>()
166            .expect("sink should be ChunkCollector");
167        let chunks = collector.into_chunks();
168
169        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
170        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
171
172        for chunk in &chunks {
173            if !types_captured && chunk.column_count() > 0 {
174                self.capture_column_types(chunk, &mut result);
175                types_captured = true;
176            }
177            self.collect_chunk(chunk, &mut result)?;
178        }
179
180        Ok(result)
181    }
182
183    /// Executes and returns at most `limit` rows.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if operator execution fails or the query timeout is exceeded.
188    pub fn execute_with_limit(
189        &self,
190        operator: &mut dyn Operator,
191        limit: usize,
192    ) -> Result<QueryResult> {
193        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
194        let mut collected = 0;
195        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
196
197        loop {
198            if collected >= limit {
199                break;
200            }
201
202            self.check_deadline()?;
203
204            match operator.next() {
205                Ok(Some(chunk)) => {
206                    // Capture column types from first non-empty chunk
207                    if !types_captured && chunk.column_count() > 0 {
208                        self.capture_column_types(&chunk, &mut result);
209                        types_captured = true;
210                    }
211                    let remaining = limit - collected;
212                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
213                }
214                Ok(None) => break,
215                Err(err) => return Err(convert_operator_error(err)),
216            }
217        }
218
219        Ok(result)
220    }
221
222    /// Captures column types from a DataChunk.
223    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
224        let col_count = chunk.column_count();
225        result.column_types = Vec::with_capacity(col_count);
226        for col_idx in 0..col_count {
227            let col_type = chunk
228                .column(col_idx)
229                .map_or(LogicalType::Any, |col| col.data_type().clone());
230            result.column_types.push(col_type);
231        }
232    }
233
234    /// Collects all rows from a DataChunk into the result.
235    ///
236    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
237    /// (e.g., after filtering operations).
238    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
239        let col_count = chunk.column_count();
240        let mut collected = 0;
241
242        for row_idx in chunk.selected_indices() {
243            let mut row = Vec::with_capacity(col_count);
244            for col_idx in 0..col_count {
245                let value = chunk
246                    .column(col_idx)
247                    .and_then(|col| col.get_value(row_idx))
248                    .unwrap_or(Value::Null);
249                row.push(value);
250            }
251            result.rows.push(row);
252            collected += 1;
253        }
254
255        Ok(collected)
256    }
257
258    /// Collects up to `limit` rows from a DataChunk.
259    ///
260    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
261    /// (e.g., after filtering operations).
262    fn collect_chunk_limited(
263        &self,
264        chunk: &DataChunk,
265        result: &mut QueryResult,
266        limit: usize,
267    ) -> Result<usize> {
268        let col_count = chunk.column_count();
269        let mut collected = 0;
270
271        for row_idx in chunk.selected_indices() {
272            if collected >= limit {
273                break;
274            }
275            let mut row = Vec::with_capacity(col_count);
276            for col_idx in 0..col_count {
277                let value = chunk
278                    .column(col_idx)
279                    .and_then(|col| col.get_value(row_idx))
280                    .unwrap_or(Value::Null);
281                row.push(value);
282            }
283            result.rows.push(row);
284            collected += 1;
285        }
286
287        Ok(collected)
288    }
289
290    /// Executes a physical operator with adaptive cardinality tracking.
291    ///
292    /// This wraps the operator in a cardinality tracking layer and monitors
293    /// deviation from estimates during execution. The adaptive summary is
294    /// returned alongside the query result.
295    ///
296    /// # Arguments
297    ///
298    /// * `operator` - The root physical operator to execute
299    /// * `adaptive_context` - Context with cardinality estimates from planning
300    /// * `config` - Adaptive execution configuration
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if operator execution fails.
305    pub fn execute_adaptive(
306        &self,
307        operator: Box<dyn Operator>,
308        adaptive_context: Option<AdaptiveContext>,
309        config: &AdaptiveConfig,
310    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
311        // If adaptive is disabled or no context, fall back to normal execution
312        if !config.enabled {
313            let mut op = operator;
314            let result = self.execute(op.as_mut())?;
315            return Ok((result, None));
316        }
317
318        let Some(ctx) = adaptive_context else {
319            let mut op = operator;
320            let result = self.execute(op.as_mut())?;
321            return Ok((result, None));
322        };
323
324        // Create shared context for tracking
325        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
326            config.threshold,
327            config.min_rows,
328        ));
329
330        // Copy estimates from the planning context to the shared tracking context
331        for (op_id, checkpoint) in ctx.all_checkpoints() {
332            if let Some(mut inner) = shared_ctx.snapshot() {
333                inner.set_estimate(op_id, checkpoint.estimated);
334            }
335        }
336
337        // Wrap operator with tracking
338        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
339
340        // Execute with tracking
341        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
342        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
343        let mut total_rows: u64 = 0;
344        let check_interval = config.min_rows;
345
346        loop {
347            self.check_deadline()?;
348
349            match wrapped.next() {
350                Ok(Some(chunk)) => {
351                    let chunk_rows = chunk.row_count();
352                    total_rows += chunk_rows as u64;
353
354                    // Capture column types from first non-empty chunk
355                    if !types_captured && chunk.column_count() > 0 {
356                        self.capture_column_types(&chunk, &mut result);
357                        types_captured = true;
358                    }
359                    self.collect_chunk(&chunk, &mut result)?;
360
361                    // Periodically check for significant deviation
362                    if total_rows >= check_interval
363                        && total_rows.is_multiple_of(check_interval)
364                        && shared_ctx.should_reoptimize()
365                    {
366                        // For now, just log/note that re-optimization would trigger
367                        // Full re-optimization would require plan regeneration
368                        // which is a more invasive change
369                    }
370                }
371                Ok(None) => break,
372                Err(err) => return Err(convert_operator_error(err)),
373            }
374        }
375
376        // Get final summary
377        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
378
379        Ok((result, summary))
380    }
381}
382
383impl Default for Executor {
384    fn default() -> Self {
385        Self::new()
386    }
387}
388
389/// Converts an operator error to a common error.
390pub(crate) fn convert_operator_error(err: OperatorError) -> Error {
391    match err {
392        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
393        OperatorError::ColumnNotFound(name) => {
394            Error::InvalidValue(format!("Column not found: {name}"))
395        }
396        OperatorError::Execution(msg) => Error::Internal(msg),
397        OperatorError::ConstraintViolation(msg) => {
398            Error::InvalidValue(format!("Constraint violation: {msg}"))
399        }
400        OperatorError::WriteConflict(msg) => {
401            Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
402        }
403        _ => Error::Internal(format!("{err}")),
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use grafeo_common::types::LogicalType;
411    use grafeo_core::execution::DataChunk;
412
413    /// A mock operator that generates chunks with integer data on demand.
414    struct MockIntOperator {
415        values: Vec<i64>,
416        position: usize,
417        chunk_size: usize,
418    }
419
420    impl MockIntOperator {
421        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
422            Self {
423                values,
424                position: 0,
425                chunk_size,
426            }
427        }
428    }
429
430    impl Operator for MockIntOperator {
431        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
432            if self.position >= self.values.len() {
433                return Ok(None);
434            }
435
436            let end = (self.position + self.chunk_size).min(self.values.len());
437            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
438
439            {
440                let col = chunk.column_mut(0).unwrap();
441                for i in self.position..end {
442                    col.push_int64(self.values[i]);
443                }
444            }
445            chunk.set_count(end - self.position);
446            self.position = end;
447
448            Ok(Some(chunk))
449        }
450
451        fn reset(&mut self) {
452            self.position = 0;
453        }
454
455        fn name(&self) -> &'static str {
456            "MockInt"
457        }
458
459        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
460            self
461        }
462    }
463
464    /// Empty mock operator for testing empty results.
465    struct EmptyOperator;
466
467    impl Operator for EmptyOperator {
468        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
469            Ok(None)
470        }
471
472        fn reset(&mut self) {}
473
474        fn name(&self) -> &'static str {
475            "Empty"
476        }
477
478        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
479            self
480        }
481    }
482
483    #[test]
484    fn test_executor_empty() {
485        let executor = Executor::with_columns(vec!["a".to_string()]);
486        let mut op = EmptyOperator;
487
488        let result = executor.execute(&mut op).unwrap();
489        assert!(result.is_empty());
490        assert_eq!(result.column_count(), 1);
491    }
492
493    #[test]
494    fn test_executor_single_chunk() {
495        let executor = Executor::with_columns(vec!["value".to_string()]);
496        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
497
498        let result = executor.execute(&mut op).unwrap();
499        assert_eq!(result.row_count(), 3);
500        assert_eq!(result.rows[0][0], Value::Int64(1));
501        assert_eq!(result.rows[1][0], Value::Int64(2));
502        assert_eq!(result.rows[2][0], Value::Int64(3));
503    }
504
505    #[test]
506    fn test_executor_with_limit() {
507        let executor = Executor::with_columns(vec!["value".to_string()]);
508        let mut op = MockIntOperator::new((0..10).collect(), 100);
509
510        let result = executor.execute_with_limit(&mut op, 5).unwrap();
511        assert_eq!(result.row_count(), 5);
512    }
513
514    #[test]
515    fn test_executor_timeout_expired() {
516        use std::time::{Duration, Instant};
517
518        // Set a deadline that has already passed
519        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
520            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
521        ));
522        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
523
524        let result = executor.execute(&mut op);
525        assert!(result.is_err());
526        let err = result.unwrap_err();
527        assert!(
528            err.to_string().contains("Query exceeded timeout"),
529            "Expected timeout error, got: {err}"
530        );
531    }
532
533    #[test]
534    fn test_executor_no_timeout() {
535        // No deadline set - should execute normally
536        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
537        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
538
539        let result = executor.execute(&mut op).unwrap();
540        assert_eq!(result.row_count(), 3);
541    }
542
543    #[test]
544    fn test_executor_type_capture_from_first_chunk() {
545        // When column_types are all Any, types should be captured from the first
546        // non-empty chunk.
547        let executor = Executor::with_columns(vec!["value".to_string()]);
548        // column_types starts as [Any] from with_columns
549        let mut op = MockIntOperator::new(vec![42, 99], 10);
550
551        let result = executor.execute(&mut op).unwrap();
552        assert_eq!(result.row_count(), 2);
553        // After execution, column types should be captured as Int64
554        assert_eq!(result.column_types, vec![LogicalType::Int64]);
555    }
556
557    #[test]
558    fn test_executor_type_capture_with_explicit_types() {
559        // When column_types are explicitly set (not all Any), types should NOT be
560        // overwritten from chunks.
561        let executor =
562            Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
563        let mut op = MockIntOperator::new(vec![1], 10);
564
565        let result = executor.execute(&mut op).unwrap();
566        assert_eq!(result.row_count(), 1);
567        // Types should remain as explicitly set (String), not changed to Int64
568        assert_eq!(result.column_types, vec![LogicalType::String]);
569    }
570
571    #[test]
572    fn test_execute_pipeline_basic() {
573        let source = Box::new(MockIntOperator::new(vec![10, 20, 30], 10));
574        let executor = Executor::with_columns(vec!["value".to_string()]);
575
576        let result = executor.execute_pipeline(source, vec![]).unwrap();
577        assert_eq!(result.row_count(), 3);
578        assert_eq!(result.rows[0][0], Value::Int64(10));
579        assert_eq!(result.rows[1][0], Value::Int64(20));
580        assert_eq!(result.rows[2][0], Value::Int64(30));
581    }
582
583    #[test]
584    fn test_execute_pipeline_empty_source() {
585        let source = Box::new(EmptyOperator);
586        let executor = Executor::with_columns(vec!["value".to_string()]);
587
588        let result = executor.execute_pipeline(source, vec![]).unwrap();
589        assert!(result.is_empty());
590    }
591
592    #[test]
593    fn test_execute_pipeline_type_capture() {
594        // Pipeline should capture column types from first non-empty chunk when
595        // column_types are all Any.
596        let source = Box::new(MockIntOperator::new(vec![1, 2], 10));
597        let executor = Executor::with_columns(vec!["value".to_string()]);
598
599        let result = executor.execute_pipeline(source, vec![]).unwrap();
600        assert_eq!(result.column_types, vec![LogicalType::Int64]);
601    }
602
603    #[test]
604    fn test_execute_pipeline_explicit_types_preserved() {
605        // Pipeline should preserve explicitly set column types.
606        let source = Box::new(MockIntOperator::new(vec![1], 10));
607        let executor =
608            Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
609
610        let result = executor.execute_pipeline(source, vec![]).unwrap();
611        // Explicit types should not be overwritten
612        assert_eq!(result.column_types, vec![LogicalType::String]);
613    }
614
615    #[test]
616    fn test_execute_with_limit_type_capture() {
617        // execute_with_limit should also capture types from first chunk
618        let executor = Executor::with_columns(vec!["value".to_string()]);
619        let mut op = MockIntOperator::new(vec![1, 2, 3, 4, 5], 2);
620
621        let result = executor.execute_with_limit(&mut op, 3).unwrap();
622        assert_eq!(result.row_count(), 3);
623        assert_eq!(result.column_types, vec![LogicalType::Int64]);
624    }
625
626    #[test]
627    fn test_execute_with_limit_timeout_expired() {
628        use std::time::{Duration, Instant};
629
630        let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
631        let executor =
632            Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(expired));
633        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
634
635        let result = executor.execute_with_limit(&mut op, 10);
636        assert!(result.is_err());
637        assert!(
638            result
639                .unwrap_err()
640                .to_string()
641                .contains("Query exceeded timeout")
642        );
643    }
644
645    #[test]
646    fn test_convert_operator_error_variants() {
647        // Test all OperatorError conversion branches
648        let err = convert_operator_error(OperatorError::TypeMismatch {
649            expected: "Int64".to_string(),
650            found: "String".to_string(),
651        });
652        assert!(matches!(err, Error::TypeMismatch { .. }));
653
654        let err = convert_operator_error(OperatorError::ColumnNotFound("col_x".to_string()));
655        assert!(matches!(err, Error::InvalidValue(_)));
656        assert!(err.to_string().contains("col_x"));
657
658        let err = convert_operator_error(OperatorError::Execution("internal issue".to_string()));
659        assert!(matches!(err, Error::Internal(_)));
660
661        let err = convert_operator_error(OperatorError::ConstraintViolation("unique".to_string()));
662        assert!(matches!(err, Error::InvalidValue(_)));
663        assert!(err.to_string().contains("unique"));
664
665        let err =
666            convert_operator_error(OperatorError::WriteConflict("concurrent write".to_string()));
667        assert!(matches!(err, Error::Transaction(_)));
668    }
669
670    #[test]
671    fn test_execute_pipeline_timeout_expired() {
672        use std::time::{Duration, Instant};
673
674        use grafeo_core::execution::pipeline::{Sink as PipelineSink, Source as PipelineSource};
675
676        struct PipelineTestSource {
677            remaining: usize,
678        }
679
680        impl PipelineSource for PipelineTestSource {
681            fn next_chunk(
682                &mut self,
683                _chunk_size: usize,
684            ) -> std::result::Result<Option<DataChunk>, OperatorError> {
685                if self.remaining == 0 {
686                    return Ok(None);
687                }
688                self.remaining -= 1;
689                Ok(Some(DataChunk::empty()))
690            }
691            fn reset(&mut self) {}
692            fn name(&self) -> &'static str {
693                "PipelineTestSource"
694            }
695        }
696
697        struct PipelineTestSink;
698
699        impl PipelineSink for PipelineTestSink {
700            fn consume(&mut self, _chunk: DataChunk) -> std::result::Result<bool, OperatorError> {
701                Ok(true)
702            }
703            fn finalize(&mut self) -> std::result::Result<(), OperatorError> {
704                Ok(())
705            }
706            fn name(&self) -> &'static str {
707                "PipelineTestSink"
708            }
709            fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
710                self
711            }
712        }
713
714        let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
715        let mut pipeline = Pipeline::simple(
716            Box::new(PipelineTestSource { remaining: 10 }),
717            Box::new(PipelineTestSink),
718        )
719        .with_deadline(Some(expired));
720
721        let result = pipeline.execute().map_err(convert_operator_error);
722        assert!(result.is_err());
723        let err = result.unwrap_err();
724        assert!(
725            err.to_string().contains("Query exceeded timeout"),
726            "Expected timeout error, got: {err}"
727        );
728    }
729}