1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(all(feature = "algos", feature = "gql"))]
8pub mod user_procedure;
9
10use std::time::{Duration, Instant};
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::grafeo_debug_span;
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, QueryError, Result};
17use grafeo_core::execution::operators::{Operator, OperatorError};
18use grafeo_core::execution::{
19 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, Pipeline,
20 SharedAdaptiveContext,
21};
22
23pub struct Executor {
25 columns: Vec<String>,
27 column_types: Vec<LogicalType>,
29 deadline: Option<Instant>,
31 query_timeout: Option<Duration>,
33}
34
35impl Executor {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 columns: Vec::new(),
41 column_types: Vec::new(),
42 deadline: None,
43 query_timeout: None,
44 }
45 }
46
47 #[must_use]
49 pub fn with_columns(columns: Vec<String>) -> Self {
50 let len = columns.len();
51 Self {
52 columns,
53 column_types: vec![LogicalType::Any; len],
54 deadline: None,
55 query_timeout: None,
56 }
57 }
58
59 #[must_use]
61 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
62 Self {
63 columns,
64 column_types,
65 deadline: None,
66 query_timeout: None,
67 }
68 }
69
70 #[must_use]
72 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
73 self.deadline = deadline;
74 self
75 }
76
77 #[must_use]
79 pub fn with_timeout_duration(mut self, timeout: Option<Duration>) -> Self {
80 self.query_timeout = timeout;
81 self
82 }
83
84 fn check_deadline(&self) -> Result<()> {
86 #[cfg(not(target_arch = "wasm32"))]
87 if let Some(deadline) = self.deadline
88 && Instant::now() >= deadline
89 {
90 return Err(Error::Query(match self.query_timeout {
91 Some(d) => QueryError::timeout_with_limit(d),
92 None => QueryError::timeout(),
93 }));
94 }
95 Ok(())
96 }
97
98 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
104 let _span = grafeo_debug_span!("grafeo::query::execute");
105 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
106 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
107
108 loop {
109 self.check_deadline()?;
110
111 match operator.next() {
112 Ok(Some(chunk)) => {
113 if !types_captured && chunk.column_count() > 0 {
115 self.capture_column_types(&chunk, &mut result);
116 types_captured = true;
117 }
118 self.collect_chunk(&chunk, &mut result)?;
119 }
120 Ok(None) => break,
121 Err(err) => return Err(convert_operator_error(err)),
122 }
123 }
124
125 Ok(result)
126 }
127
128 pub fn execute_pipeline(
142 &self,
143 source: Box<dyn Operator>,
144 push_ops: Vec<Box<dyn grafeo_core::execution::pipeline::PushOperator>>,
145 ) -> Result<QueryResult> {
146 use grafeo_core::execution::{ChunkCollector, OperatorSource};
147
148 let _span = grafeo_debug_span!("grafeo::query::execute_pipeline");
149
150 let source = Box::new(OperatorSource::new(source));
151 let collector = ChunkCollector::new();
152
153 let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
155 pipeline.set_deadline(self.deadline);
156 pipeline.execute().map_err(convert_operator_error)?;
157
158 let sink_box = pipeline.into_sink();
161 let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
162 let collector = any_sink
163 .downcast::<ChunkCollector>()
164 .expect("sink should be ChunkCollector");
165 let chunks = collector.into_chunks();
166
167 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
168 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
169
170 for chunk in &chunks {
171 if !types_captured && chunk.column_count() > 0 {
172 self.capture_column_types(chunk, &mut result);
173 types_captured = true;
174 }
175 self.collect_chunk(chunk, &mut result)?;
176 }
177
178 Ok(result)
179 }
180
181 pub fn execute_with_limit(
187 &self,
188 operator: &mut dyn Operator,
189 limit: usize,
190 ) -> Result<QueryResult> {
191 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
192 let mut collected = 0;
193 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
194
195 loop {
196 if collected >= limit {
197 break;
198 }
199
200 self.check_deadline()?;
201
202 match operator.next() {
203 Ok(Some(chunk)) => {
204 if !types_captured && chunk.column_count() > 0 {
206 self.capture_column_types(&chunk, &mut result);
207 types_captured = true;
208 }
209 let remaining = limit - collected;
210 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
211 }
212 Ok(None) => break,
213 Err(err) => return Err(convert_operator_error(err)),
214 }
215 }
216
217 Ok(result)
218 }
219
220 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
222 let col_count = chunk.column_count();
223 result.column_types = Vec::with_capacity(col_count);
224 for col_idx in 0..col_count {
225 let col_type = chunk
226 .column(col_idx)
227 .map_or(LogicalType::Any, |col| col.data_type().clone());
228 result.column_types.push(col_type);
229 }
230 }
231
232 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
237 let col_count = chunk.column_count();
238 let mut collected = 0;
239
240 for row_idx in chunk.selected_indices() {
241 let mut row = Vec::with_capacity(col_count);
242 for col_idx in 0..col_count {
243 let value = chunk
244 .column(col_idx)
245 .and_then(|col| col.get_value(row_idx))
246 .unwrap_or(Value::Null);
247 row.push(value);
248 }
249 result.rows.push(row);
250 collected += 1;
251 }
252
253 Ok(collected)
254 }
255
256 fn collect_chunk_limited(
261 &self,
262 chunk: &DataChunk,
263 result: &mut QueryResult,
264 limit: usize,
265 ) -> Result<usize> {
266 let col_count = chunk.column_count();
267 let mut collected = 0;
268
269 for row_idx in chunk.selected_indices() {
270 if collected >= limit {
271 break;
272 }
273 let mut row = Vec::with_capacity(col_count);
274 for col_idx in 0..col_count {
275 let value = chunk
276 .column(col_idx)
277 .and_then(|col| col.get_value(row_idx))
278 .unwrap_or(Value::Null);
279 row.push(value);
280 }
281 result.rows.push(row);
282 collected += 1;
283 }
284
285 Ok(collected)
286 }
287
288 pub fn execute_adaptive(
304 &self,
305 operator: Box<dyn Operator>,
306 adaptive_context: Option<AdaptiveContext>,
307 config: &AdaptiveConfig,
308 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
309 if !config.enabled {
311 let mut op = operator;
312 let result = self.execute(op.as_mut())?;
313 return Ok((result, None));
314 }
315
316 let Some(ctx) = adaptive_context else {
317 let mut op = operator;
318 let result = self.execute(op.as_mut())?;
319 return Ok((result, None));
320 };
321
322 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
324 config.threshold,
325 config.min_rows,
326 ));
327
328 for (op_id, checkpoint) in ctx.all_checkpoints() {
330 if let Some(mut inner) = shared_ctx.snapshot() {
331 inner.set_estimate(op_id, checkpoint.estimated);
332 }
333 }
334
335 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
337
338 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
340 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
341 let mut total_rows: u64 = 0;
342 let check_interval = config.min_rows;
343
344 loop {
345 self.check_deadline()?;
346
347 match wrapped.next() {
348 Ok(Some(chunk)) => {
349 let chunk_rows = chunk.row_count();
350 total_rows += chunk_rows as u64;
351
352 if !types_captured && chunk.column_count() > 0 {
354 self.capture_column_types(&chunk, &mut result);
355 types_captured = true;
356 }
357 self.collect_chunk(&chunk, &mut result)?;
358
359 if total_rows >= check_interval
361 && total_rows.is_multiple_of(check_interval)
362 && shared_ctx.should_reoptimize()
363 {
364 }
368 }
369 Ok(None) => break,
370 Err(err) => return Err(convert_operator_error(err)),
371 }
372 }
373
374 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
376
377 Ok((result, summary))
378 }
379}
380
381impl Default for Executor {
382 fn default() -> Self {
383 Self::new()
384 }
385}
386
387fn convert_operator_error(err: OperatorError) -> Error {
389 match err {
390 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
391 OperatorError::ColumnNotFound(name) => {
392 Error::InvalidValue(format!("Column not found: {name}"))
393 }
394 OperatorError::Execution(msg) => Error::Internal(msg),
395 OperatorError::ConstraintViolation(msg) => {
396 Error::InvalidValue(format!("Constraint violation: {msg}"))
397 }
398 OperatorError::WriteConflict(msg) => {
399 Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
400 }
401 _ => Error::Internal(format!("{err}")),
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use grafeo_common::types::LogicalType;
409 use grafeo_core::execution::DataChunk;
410
411 struct MockIntOperator {
413 values: Vec<i64>,
414 position: usize,
415 chunk_size: usize,
416 }
417
418 impl MockIntOperator {
419 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
420 Self {
421 values,
422 position: 0,
423 chunk_size,
424 }
425 }
426 }
427
428 impl Operator for MockIntOperator {
429 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
430 if self.position >= self.values.len() {
431 return Ok(None);
432 }
433
434 let end = (self.position + self.chunk_size).min(self.values.len());
435 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
436
437 {
438 let col = chunk.column_mut(0).unwrap();
439 for i in self.position..end {
440 col.push_int64(self.values[i]);
441 }
442 }
443 chunk.set_count(end - self.position);
444 self.position = end;
445
446 Ok(Some(chunk))
447 }
448
449 fn reset(&mut self) {
450 self.position = 0;
451 }
452
453 fn name(&self) -> &'static str {
454 "MockInt"
455 }
456
457 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
458 self
459 }
460 }
461
462 struct EmptyOperator;
464
465 impl Operator for EmptyOperator {
466 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
467 Ok(None)
468 }
469
470 fn reset(&mut self) {}
471
472 fn name(&self) -> &'static str {
473 "Empty"
474 }
475
476 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
477 self
478 }
479 }
480
481 #[test]
482 fn test_executor_empty() {
483 let executor = Executor::with_columns(vec!["a".to_string()]);
484 let mut op = EmptyOperator;
485
486 let result = executor.execute(&mut op).unwrap();
487 assert!(result.is_empty());
488 assert_eq!(result.column_count(), 1);
489 }
490
491 #[test]
492 fn test_executor_single_chunk() {
493 let executor = Executor::with_columns(vec!["value".to_string()]);
494 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
495
496 let result = executor.execute(&mut op).unwrap();
497 assert_eq!(result.row_count(), 3);
498 assert_eq!(result.rows[0][0], Value::Int64(1));
499 assert_eq!(result.rows[1][0], Value::Int64(2));
500 assert_eq!(result.rows[2][0], Value::Int64(3));
501 }
502
503 #[test]
504 fn test_executor_with_limit() {
505 let executor = Executor::with_columns(vec!["value".to_string()]);
506 let mut op = MockIntOperator::new((0..10).collect(), 100);
507
508 let result = executor.execute_with_limit(&mut op, 5).unwrap();
509 assert_eq!(result.row_count(), 5);
510 }
511
512 #[test]
513 fn test_executor_timeout_expired() {
514 use std::time::{Duration, Instant};
515
516 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
518 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
519 ));
520 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
521
522 let result = executor.execute(&mut op);
523 assert!(result.is_err());
524 let err = result.unwrap_err();
525 assert!(
526 err.to_string().contains("Query exceeded timeout"),
527 "Expected timeout error, got: {err}"
528 );
529 }
530
531 #[test]
532 fn test_executor_no_timeout() {
533 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
535 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
536
537 let result = executor.execute(&mut op).unwrap();
538 assert_eq!(result.row_count(), 3);
539 }
540
541 #[test]
542 fn test_executor_type_capture_from_first_chunk() {
543 let executor = Executor::with_columns(vec!["value".to_string()]);
546 let mut op = MockIntOperator::new(vec![42, 99], 10);
548
549 let result = executor.execute(&mut op).unwrap();
550 assert_eq!(result.row_count(), 2);
551 assert_eq!(result.column_types, vec![LogicalType::Int64]);
553 }
554
555 #[test]
556 fn test_executor_type_capture_with_explicit_types() {
557 let executor =
560 Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
561 let mut op = MockIntOperator::new(vec![1], 10);
562
563 let result = executor.execute(&mut op).unwrap();
564 assert_eq!(result.row_count(), 1);
565 assert_eq!(result.column_types, vec![LogicalType::String]);
567 }
568
569 #[test]
570 fn test_execute_pipeline_basic() {
571 let source = Box::new(MockIntOperator::new(vec![10, 20, 30], 10));
572 let executor = Executor::with_columns(vec!["value".to_string()]);
573
574 let result = executor.execute_pipeline(source, vec![]).unwrap();
575 assert_eq!(result.row_count(), 3);
576 assert_eq!(result.rows[0][0], Value::Int64(10));
577 assert_eq!(result.rows[1][0], Value::Int64(20));
578 assert_eq!(result.rows[2][0], Value::Int64(30));
579 }
580
581 #[test]
582 fn test_execute_pipeline_empty_source() {
583 let source = Box::new(EmptyOperator);
584 let executor = Executor::with_columns(vec!["value".to_string()]);
585
586 let result = executor.execute_pipeline(source, vec![]).unwrap();
587 assert!(result.is_empty());
588 }
589
590 #[test]
591 fn test_execute_pipeline_type_capture() {
592 let source = Box::new(MockIntOperator::new(vec![1, 2], 10));
595 let executor = Executor::with_columns(vec!["value".to_string()]);
596
597 let result = executor.execute_pipeline(source, vec![]).unwrap();
598 assert_eq!(result.column_types, vec![LogicalType::Int64]);
599 }
600
601 #[test]
602 fn test_execute_pipeline_explicit_types_preserved() {
603 let source = Box::new(MockIntOperator::new(vec![1], 10));
605 let executor =
606 Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
607
608 let result = executor.execute_pipeline(source, vec![]).unwrap();
609 assert_eq!(result.column_types, vec![LogicalType::String]);
611 }
612
613 #[test]
614 fn test_execute_with_limit_type_capture() {
615 let executor = Executor::with_columns(vec!["value".to_string()]);
617 let mut op = MockIntOperator::new(vec![1, 2, 3, 4, 5], 2);
618
619 let result = executor.execute_with_limit(&mut op, 3).unwrap();
620 assert_eq!(result.row_count(), 3);
621 assert_eq!(result.column_types, vec![LogicalType::Int64]);
622 }
623
624 #[test]
625 fn test_execute_with_limit_timeout_expired() {
626 use std::time::{Duration, Instant};
627
628 let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
629 let executor =
630 Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(expired));
631 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
632
633 let result = executor.execute_with_limit(&mut op, 10);
634 assert!(result.is_err());
635 assert!(
636 result
637 .unwrap_err()
638 .to_string()
639 .contains("Query exceeded timeout")
640 );
641 }
642
643 #[test]
644 fn test_convert_operator_error_variants() {
645 let err = convert_operator_error(OperatorError::TypeMismatch {
647 expected: "Int64".to_string(),
648 found: "String".to_string(),
649 });
650 assert!(matches!(err, Error::TypeMismatch { .. }));
651
652 let err = convert_operator_error(OperatorError::ColumnNotFound("col_x".to_string()));
653 assert!(matches!(err, Error::InvalidValue(_)));
654 assert!(err.to_string().contains("col_x"));
655
656 let err = convert_operator_error(OperatorError::Execution("internal issue".to_string()));
657 assert!(matches!(err, Error::Internal(_)));
658
659 let err = convert_operator_error(OperatorError::ConstraintViolation("unique".to_string()));
660 assert!(matches!(err, Error::InvalidValue(_)));
661 assert!(err.to_string().contains("unique"));
662
663 let err =
664 convert_operator_error(OperatorError::WriteConflict("concurrent write".to_string()));
665 assert!(matches!(err, Error::Transaction(_)));
666 }
667
668 #[test]
669 fn test_execute_pipeline_timeout_expired() {
670 use std::time::{Duration, Instant};
671
672 use grafeo_core::execution::pipeline::{Sink as PipelineSink, Source as PipelineSource};
673
674 struct PipelineTestSource {
675 remaining: usize,
676 }
677
678 impl PipelineSource for PipelineTestSource {
679 fn next_chunk(
680 &mut self,
681 _chunk_size: usize,
682 ) -> std::result::Result<Option<DataChunk>, OperatorError> {
683 if self.remaining == 0 {
684 return Ok(None);
685 }
686 self.remaining -= 1;
687 Ok(Some(DataChunk::empty()))
688 }
689 fn reset(&mut self) {}
690 fn name(&self) -> &'static str {
691 "PipelineTestSource"
692 }
693 }
694
695 struct PipelineTestSink;
696
697 impl PipelineSink for PipelineTestSink {
698 fn consume(&mut self, _chunk: DataChunk) -> std::result::Result<bool, OperatorError> {
699 Ok(true)
700 }
701 fn finalize(&mut self) -> std::result::Result<(), OperatorError> {
702 Ok(())
703 }
704 fn name(&self) -> &'static str {
705 "PipelineTestSink"
706 }
707 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
708 self
709 }
710 }
711
712 let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
713 let mut pipeline = Pipeline::simple(
714 Box::new(PipelineTestSource { remaining: 10 }),
715 Box::new(PipelineTestSink),
716 )
717 .with_deadline(Some(expired));
718
719 let result = pipeline.execute().map_err(convert_operator_error);
720 assert!(result.is_err());
721 let err = result.unwrap_err();
722 assert!(
723 err.to_string().contains("Query exceeded timeout"),
724 "Expected timeout error, got: {err}"
725 );
726 }
727}