1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(all(feature = "gql", feature = "lpg"))]
8pub mod stream;
9#[cfg(all(feature = "algos", feature = "gql"))]
10pub mod user_procedure;
11
12use std::time::{Duration, Instant};
13
14use crate::config::AdaptiveConfig;
15use crate::database::QueryResult;
16use grafeo_common::grafeo_debug_span;
17use grafeo_common::types::{LogicalType, Value};
18use grafeo_common::utils::error::{Error, QueryError, Result};
19use grafeo_core::execution::operators::{Operator, OperatorError};
20use grafeo_core::execution::{
21 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, Pipeline,
22 SharedAdaptiveContext,
23};
24
25pub struct Executor {
27 columns: Vec<String>,
29 column_types: Vec<LogicalType>,
31 deadline: Option<Instant>,
33 query_timeout: Option<Duration>,
35}
36
37impl Executor {
38 #[must_use]
40 pub fn new() -> Self {
41 Self {
42 columns: Vec::new(),
43 column_types: Vec::new(),
44 deadline: None,
45 query_timeout: None,
46 }
47 }
48
49 #[must_use]
51 pub fn with_columns(columns: Vec<String>) -> Self {
52 let len = columns.len();
53 Self {
54 columns,
55 column_types: vec![LogicalType::Any; len],
56 deadline: None,
57 query_timeout: None,
58 }
59 }
60
61 #[must_use]
63 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
64 Self {
65 columns,
66 column_types,
67 deadline: None,
68 query_timeout: None,
69 }
70 }
71
72 #[must_use]
74 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
75 self.deadline = deadline;
76 self
77 }
78
79 #[must_use]
81 pub fn with_timeout_duration(mut self, timeout: Option<Duration>) -> Self {
82 self.query_timeout = timeout;
83 self
84 }
85
86 fn check_deadline(&self) -> Result<()> {
88 #[cfg(not(target_arch = "wasm32"))]
89 if let Some(deadline) = self.deadline
90 && Instant::now() >= deadline
91 {
92 return Err(Error::Query(match self.query_timeout {
93 Some(d) => QueryError::timeout_with_limit(d),
94 None => QueryError::timeout(),
95 }));
96 }
97 Ok(())
98 }
99
100 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
106 let _span = grafeo_debug_span!("grafeo::query::execute");
107 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
108 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
109
110 loop {
111 self.check_deadline()?;
112
113 match operator.next() {
114 Ok(Some(chunk)) => {
115 if !types_captured && chunk.column_count() > 0 {
117 self.capture_column_types(&chunk, &mut result);
118 types_captured = true;
119 }
120 self.collect_chunk(&chunk, &mut result)?;
121 }
122 Ok(None) => break,
123 Err(err) => return Err(convert_operator_error(err)),
124 }
125 }
126
127 Ok(result)
128 }
129
130 pub fn execute_pipeline(
144 &self,
145 source: Box<dyn Operator>,
146 push_ops: Vec<Box<dyn grafeo_core::execution::pipeline::PushOperator>>,
147 ) -> Result<QueryResult> {
148 use grafeo_core::execution::{ChunkCollector, OperatorSource};
149
150 let _span = grafeo_debug_span!("grafeo::query::execute_pipeline");
151
152 let source = Box::new(OperatorSource::new(source));
153 let collector = ChunkCollector::new();
154
155 let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
157 pipeline.set_deadline(self.deadline);
158 pipeline.execute().map_err(convert_operator_error)?;
159
160 let sink_box = pipeline.into_sink();
163 let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
164 let collector = any_sink
165 .downcast::<ChunkCollector>()
166 .expect("sink should be ChunkCollector");
167 let chunks = collector.into_chunks();
168
169 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
170 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
171
172 for chunk in &chunks {
173 if !types_captured && chunk.column_count() > 0 {
174 self.capture_column_types(chunk, &mut result);
175 types_captured = true;
176 }
177 self.collect_chunk(chunk, &mut result)?;
178 }
179
180 Ok(result)
181 }
182
183 pub fn execute_with_limit(
189 &self,
190 operator: &mut dyn Operator,
191 limit: usize,
192 ) -> Result<QueryResult> {
193 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
194 let mut collected = 0;
195 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
196
197 loop {
198 if collected >= limit {
199 break;
200 }
201
202 self.check_deadline()?;
203
204 match operator.next() {
205 Ok(Some(chunk)) => {
206 if !types_captured && chunk.column_count() > 0 {
208 self.capture_column_types(&chunk, &mut result);
209 types_captured = true;
210 }
211 let remaining = limit - collected;
212 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
213 }
214 Ok(None) => break,
215 Err(err) => return Err(convert_operator_error(err)),
216 }
217 }
218
219 Ok(result)
220 }
221
222 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
224 let col_count = chunk.column_count();
225 result.column_types = Vec::with_capacity(col_count);
226 for col_idx in 0..col_count {
227 let col_type = chunk
228 .column(col_idx)
229 .map_or(LogicalType::Any, |col| col.data_type().clone());
230 result.column_types.push(col_type);
231 }
232 }
233
234 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
239 let col_count = chunk.column_count();
240 let mut collected = 0;
241
242 for row_idx in chunk.selected_indices() {
243 let mut row = Vec::with_capacity(col_count);
244 for col_idx in 0..col_count {
245 let value = chunk
246 .column(col_idx)
247 .and_then(|col| col.get_value(row_idx))
248 .unwrap_or(Value::Null);
249 row.push(value);
250 }
251 result.rows.push(row);
252 collected += 1;
253 }
254
255 Ok(collected)
256 }
257
258 fn collect_chunk_limited(
263 &self,
264 chunk: &DataChunk,
265 result: &mut QueryResult,
266 limit: usize,
267 ) -> Result<usize> {
268 let col_count = chunk.column_count();
269 let mut collected = 0;
270
271 for row_idx in chunk.selected_indices() {
272 if collected >= limit {
273 break;
274 }
275 let mut row = Vec::with_capacity(col_count);
276 for col_idx in 0..col_count {
277 let value = chunk
278 .column(col_idx)
279 .and_then(|col| col.get_value(row_idx))
280 .unwrap_or(Value::Null);
281 row.push(value);
282 }
283 result.rows.push(row);
284 collected += 1;
285 }
286
287 Ok(collected)
288 }
289
290 pub fn execute_adaptive(
306 &self,
307 operator: Box<dyn Operator>,
308 adaptive_context: Option<AdaptiveContext>,
309 config: &AdaptiveConfig,
310 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
311 if !config.enabled {
313 let mut op = operator;
314 let result = self.execute(op.as_mut())?;
315 return Ok((result, None));
316 }
317
318 let Some(ctx) = adaptive_context else {
319 let mut op = operator;
320 let result = self.execute(op.as_mut())?;
321 return Ok((result, None));
322 };
323
324 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
326 config.threshold,
327 config.min_rows,
328 ));
329
330 for (op_id, checkpoint) in ctx.all_checkpoints() {
332 if let Some(mut inner) = shared_ctx.snapshot() {
333 inner.set_estimate(op_id, checkpoint.estimated);
334 }
335 }
336
337 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
339
340 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
342 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
343 let mut total_rows: u64 = 0;
344 let check_interval = config.min_rows;
345
346 loop {
347 self.check_deadline()?;
348
349 match wrapped.next() {
350 Ok(Some(chunk)) => {
351 let chunk_rows = chunk.row_count();
352 total_rows += chunk_rows as u64;
353
354 if !types_captured && chunk.column_count() > 0 {
356 self.capture_column_types(&chunk, &mut result);
357 types_captured = true;
358 }
359 self.collect_chunk(&chunk, &mut result)?;
360
361 if total_rows >= check_interval
363 && total_rows.is_multiple_of(check_interval)
364 && shared_ctx.should_reoptimize()
365 {
366 }
370 }
371 Ok(None) => break,
372 Err(err) => return Err(convert_operator_error(err)),
373 }
374 }
375
376 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
378
379 Ok((result, summary))
380 }
381}
382
383impl Default for Executor {
384 fn default() -> Self {
385 Self::new()
386 }
387}
388
389pub(crate) fn convert_operator_error(err: OperatorError) -> Error {
391 match err {
392 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
393 OperatorError::ColumnNotFound(name) => {
394 Error::InvalidValue(format!("Column not found: {name}"))
395 }
396 OperatorError::Execution(msg) => Error::Internal(msg),
397 OperatorError::ConstraintViolation(msg) => {
398 Error::InvalidValue(format!("Constraint violation: {msg}"))
399 }
400 OperatorError::WriteConflict(msg) => {
401 Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
402 }
403 _ => Error::Internal(format!("{err}")),
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use grafeo_common::types::LogicalType;
411 use grafeo_core::execution::DataChunk;
412
413 struct MockIntOperator {
415 values: Vec<i64>,
416 position: usize,
417 chunk_size: usize,
418 }
419
420 impl MockIntOperator {
421 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
422 Self {
423 values,
424 position: 0,
425 chunk_size,
426 }
427 }
428 }
429
430 impl Operator for MockIntOperator {
431 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
432 if self.position >= self.values.len() {
433 return Ok(None);
434 }
435
436 let end = (self.position + self.chunk_size).min(self.values.len());
437 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
438
439 {
440 let col = chunk.column_mut(0).unwrap();
441 for i in self.position..end {
442 col.push_int64(self.values[i]);
443 }
444 }
445 chunk.set_count(end - self.position);
446 self.position = end;
447
448 Ok(Some(chunk))
449 }
450
451 fn reset(&mut self) {
452 self.position = 0;
453 }
454
455 fn name(&self) -> &'static str {
456 "MockInt"
457 }
458
459 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
460 self
461 }
462 }
463
464 struct EmptyOperator;
466
467 impl Operator for EmptyOperator {
468 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
469 Ok(None)
470 }
471
472 fn reset(&mut self) {}
473
474 fn name(&self) -> &'static str {
475 "Empty"
476 }
477
478 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
479 self
480 }
481 }
482
483 #[test]
484 fn test_executor_empty() {
485 let executor = Executor::with_columns(vec!["a".to_string()]);
486 let mut op = EmptyOperator;
487
488 let result = executor.execute(&mut op).unwrap();
489 assert!(result.is_empty());
490 assert_eq!(result.column_count(), 1);
491 }
492
493 #[test]
494 fn test_executor_single_chunk() {
495 let executor = Executor::with_columns(vec!["value".to_string()]);
496 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
497
498 let result = executor.execute(&mut op).unwrap();
499 assert_eq!(result.row_count(), 3);
500 assert_eq!(result.rows[0][0], Value::Int64(1));
501 assert_eq!(result.rows[1][0], Value::Int64(2));
502 assert_eq!(result.rows[2][0], Value::Int64(3));
503 }
504
505 #[test]
506 fn test_executor_with_limit() {
507 let executor = Executor::with_columns(vec!["value".to_string()]);
508 let mut op = MockIntOperator::new((0..10).collect(), 100);
509
510 let result = executor.execute_with_limit(&mut op, 5).unwrap();
511 assert_eq!(result.row_count(), 5);
512 }
513
514 #[test]
515 fn test_executor_timeout_expired() {
516 use std::time::{Duration, Instant};
517
518 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
520 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
521 ));
522 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
523
524 let result = executor.execute(&mut op);
525 assert!(result.is_err());
526 let err = result.unwrap_err();
527 assert!(
528 err.to_string().contains("Query exceeded timeout"),
529 "Expected timeout error, got: {err}"
530 );
531 }
532
533 #[test]
534 fn test_executor_no_timeout() {
535 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
537 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
538
539 let result = executor.execute(&mut op).unwrap();
540 assert_eq!(result.row_count(), 3);
541 }
542
543 #[test]
544 fn test_executor_type_capture_from_first_chunk() {
545 let executor = Executor::with_columns(vec!["value".to_string()]);
548 let mut op = MockIntOperator::new(vec![42, 99], 10);
550
551 let result = executor.execute(&mut op).unwrap();
552 assert_eq!(result.row_count(), 2);
553 assert_eq!(result.column_types, vec![LogicalType::Int64]);
555 }
556
557 #[test]
558 fn test_executor_type_capture_with_explicit_types() {
559 let executor =
562 Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
563 let mut op = MockIntOperator::new(vec![1], 10);
564
565 let result = executor.execute(&mut op).unwrap();
566 assert_eq!(result.row_count(), 1);
567 assert_eq!(result.column_types, vec![LogicalType::String]);
569 }
570
571 #[test]
572 fn test_execute_pipeline_basic() {
573 let source = Box::new(MockIntOperator::new(vec![10, 20, 30], 10));
574 let executor = Executor::with_columns(vec!["value".to_string()]);
575
576 let result = executor.execute_pipeline(source, vec![]).unwrap();
577 assert_eq!(result.row_count(), 3);
578 assert_eq!(result.rows[0][0], Value::Int64(10));
579 assert_eq!(result.rows[1][0], Value::Int64(20));
580 assert_eq!(result.rows[2][0], Value::Int64(30));
581 }
582
583 #[test]
584 fn test_execute_pipeline_empty_source() {
585 let source = Box::new(EmptyOperator);
586 let executor = Executor::with_columns(vec!["value".to_string()]);
587
588 let result = executor.execute_pipeline(source, vec![]).unwrap();
589 assert!(result.is_empty());
590 }
591
592 #[test]
593 fn test_execute_pipeline_type_capture() {
594 let source = Box::new(MockIntOperator::new(vec![1, 2], 10));
597 let executor = Executor::with_columns(vec!["value".to_string()]);
598
599 let result = executor.execute_pipeline(source, vec![]).unwrap();
600 assert_eq!(result.column_types, vec![LogicalType::Int64]);
601 }
602
603 #[test]
604 fn test_execute_pipeline_explicit_types_preserved() {
605 let source = Box::new(MockIntOperator::new(vec![1], 10));
607 let executor =
608 Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
609
610 let result = executor.execute_pipeline(source, vec![]).unwrap();
611 assert_eq!(result.column_types, vec![LogicalType::String]);
613 }
614
615 #[test]
616 fn test_execute_with_limit_type_capture() {
617 let executor = Executor::with_columns(vec!["value".to_string()]);
619 let mut op = MockIntOperator::new(vec![1, 2, 3, 4, 5], 2);
620
621 let result = executor.execute_with_limit(&mut op, 3).unwrap();
622 assert_eq!(result.row_count(), 3);
623 assert_eq!(result.column_types, vec![LogicalType::Int64]);
624 }
625
626 #[test]
627 fn test_execute_with_limit_timeout_expired() {
628 use std::time::{Duration, Instant};
629
630 let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
631 let executor =
632 Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(expired));
633 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
634
635 let result = executor.execute_with_limit(&mut op, 10);
636 assert!(result.is_err());
637 assert!(
638 result
639 .unwrap_err()
640 .to_string()
641 .contains("Query exceeded timeout")
642 );
643 }
644
645 #[test]
646 fn test_convert_operator_error_variants() {
647 let err = convert_operator_error(OperatorError::TypeMismatch {
649 expected: "Int64".to_string(),
650 found: "String".to_string(),
651 });
652 assert!(matches!(err, Error::TypeMismatch { .. }));
653
654 let err = convert_operator_error(OperatorError::ColumnNotFound("col_x".to_string()));
655 assert!(matches!(err, Error::InvalidValue(_)));
656 assert!(err.to_string().contains("col_x"));
657
658 let err = convert_operator_error(OperatorError::Execution("internal issue".to_string()));
659 assert!(matches!(err, Error::Internal(_)));
660
661 let err = convert_operator_error(OperatorError::ConstraintViolation("unique".to_string()));
662 assert!(matches!(err, Error::InvalidValue(_)));
663 assert!(err.to_string().contains("unique"));
664
665 let err =
666 convert_operator_error(OperatorError::WriteConflict("concurrent write".to_string()));
667 assert!(matches!(err, Error::Transaction(_)));
668 }
669
670 #[test]
671 fn test_execute_pipeline_timeout_expired() {
672 use std::time::{Duration, Instant};
673
674 use grafeo_core::execution::pipeline::{Sink as PipelineSink, Source as PipelineSource};
675
676 struct PipelineTestSource {
677 remaining: usize,
678 }
679
680 impl PipelineSource for PipelineTestSource {
681 fn next_chunk(
682 &mut self,
683 _chunk_size: usize,
684 ) -> std::result::Result<Option<DataChunk>, OperatorError> {
685 if self.remaining == 0 {
686 return Ok(None);
687 }
688 self.remaining -= 1;
689 Ok(Some(DataChunk::empty()))
690 }
691 fn reset(&mut self) {}
692 fn name(&self) -> &'static str {
693 "PipelineTestSource"
694 }
695 }
696
697 struct PipelineTestSink;
698
699 impl PipelineSink for PipelineTestSink {
700 fn consume(&mut self, _chunk: DataChunk) -> std::result::Result<bool, OperatorError> {
701 Ok(true)
702 }
703 fn finalize(&mut self) -> std::result::Result<(), OperatorError> {
704 Ok(())
705 }
706 fn name(&self) -> &'static str {
707 "PipelineTestSink"
708 }
709 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
710 self
711 }
712 }
713
714 let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
715 let mut pipeline = Pipeline::simple(
716 Box::new(PipelineTestSource { remaining: 10 }),
717 Box::new(PipelineTestSink),
718 )
719 .with_deadline(Some(expired));
720
721 let result = pipeline.execute().map_err(convert_operator_error);
722 assert!(result.is_err());
723 let err = result.unwrap_err();
724 assert!(
725 err.to_string().contains("Query exceeded timeout"),
726 "Expected timeout error, got: {err}"
727 );
728 }
729}