grafeo_engine/query/executor/
mod.rs1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::types::{LogicalType, Value};
15use grafeo_common::utils::error::{Error, QueryError, Result};
16use grafeo_core::execution::operators::{Operator, OperatorError};
17use grafeo_core::execution::{
18 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
19};
20
21pub struct Executor {
23 columns: Vec<String>,
25 column_types: Vec<LogicalType>,
27 deadline: Option<Instant>,
29}
30
31impl Executor {
32 #[must_use]
34 pub fn new() -> Self {
35 Self {
36 columns: Vec::new(),
37 column_types: Vec::new(),
38 deadline: None,
39 }
40 }
41
42 #[must_use]
44 pub fn with_columns(columns: Vec<String>) -> Self {
45 let len = columns.len();
46 Self {
47 columns,
48 column_types: vec![LogicalType::Any; len],
49 deadline: None,
50 }
51 }
52
53 #[must_use]
55 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
56 Self {
57 columns,
58 column_types,
59 deadline: None,
60 }
61 }
62
63 #[must_use]
65 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
66 self.deadline = deadline;
67 self
68 }
69
70 fn check_deadline(&self) -> Result<()> {
72 if let Some(deadline) = self.deadline
73 && Instant::now() >= deadline
74 {
75 return Err(Error::Query(QueryError::timeout()));
76 }
77 Ok(())
78 }
79
80 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
86 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
87 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
88
89 loop {
90 self.check_deadline()?;
91
92 match operator.next() {
93 Ok(Some(chunk)) => {
94 if !types_captured && chunk.column_count() > 0 {
96 self.capture_column_types(&chunk, &mut result);
97 types_captured = true;
98 }
99 self.collect_chunk(&chunk, &mut result)?;
100 }
101 Ok(None) => break,
102 Err(err) => return Err(convert_operator_error(err)),
103 }
104 }
105
106 Ok(result)
107 }
108
109 pub fn execute_with_limit(
115 &self,
116 operator: &mut dyn Operator,
117 limit: usize,
118 ) -> Result<QueryResult> {
119 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
120 let mut collected = 0;
121 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
122
123 loop {
124 if collected >= limit {
125 break;
126 }
127
128 self.check_deadline()?;
129
130 match operator.next() {
131 Ok(Some(chunk)) => {
132 if !types_captured && chunk.column_count() > 0 {
134 self.capture_column_types(&chunk, &mut result);
135 types_captured = true;
136 }
137 let remaining = limit - collected;
138 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
139 }
140 Ok(None) => break,
141 Err(err) => return Err(convert_operator_error(err)),
142 }
143 }
144
145 Ok(result)
146 }
147
148 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
150 let col_count = chunk.column_count();
151 result.column_types = Vec::with_capacity(col_count);
152 for col_idx in 0..col_count {
153 let col_type = chunk
154 .column(col_idx)
155 .map_or(LogicalType::Any, |col| col.data_type().clone());
156 result.column_types.push(col_type);
157 }
158 }
159
160 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
165 let col_count = chunk.column_count();
166 let mut collected = 0;
167
168 for row_idx in chunk.selected_indices() {
169 let mut row = Vec::with_capacity(col_count);
170 for col_idx in 0..col_count {
171 let value = chunk
172 .column(col_idx)
173 .and_then(|col| col.get_value(row_idx))
174 .unwrap_or(Value::Null);
175 row.push(value);
176 }
177 result.rows.push(row);
178 collected += 1;
179 }
180
181 Ok(collected)
182 }
183
184 fn collect_chunk_limited(
189 &self,
190 chunk: &DataChunk,
191 result: &mut QueryResult,
192 limit: usize,
193 ) -> Result<usize> {
194 let col_count = chunk.column_count();
195 let mut collected = 0;
196
197 for row_idx in chunk.selected_indices() {
198 if collected >= limit {
199 break;
200 }
201 let mut row = Vec::with_capacity(col_count);
202 for col_idx in 0..col_count {
203 let value = chunk
204 .column(col_idx)
205 .and_then(|col| col.get_value(row_idx))
206 .unwrap_or(Value::Null);
207 row.push(value);
208 }
209 result.rows.push(row);
210 collected += 1;
211 }
212
213 Ok(collected)
214 }
215
216 pub fn execute_adaptive(
232 &self,
233 operator: Box<dyn Operator>,
234 adaptive_context: Option<AdaptiveContext>,
235 config: &AdaptiveConfig,
236 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
237 if !config.enabled {
239 let mut op = operator;
240 let result = self.execute(op.as_mut())?;
241 return Ok((result, None));
242 }
243
244 let Some(ctx) = adaptive_context else {
245 let mut op = operator;
246 let result = self.execute(op.as_mut())?;
247 return Ok((result, None));
248 };
249
250 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
252 config.threshold,
253 config.min_rows,
254 ));
255
256 for (op_id, checkpoint) in ctx.all_checkpoints() {
258 if let Some(mut inner) = shared_ctx.snapshot() {
259 inner.set_estimate(op_id, checkpoint.estimated);
260 }
261 }
262
263 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
265
266 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
268 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
269 let mut total_rows: u64 = 0;
270 let check_interval = config.min_rows;
271
272 loop {
273 self.check_deadline()?;
274
275 match wrapped.next() {
276 Ok(Some(chunk)) => {
277 let chunk_rows = chunk.row_count();
278 total_rows += chunk_rows as u64;
279
280 if !types_captured && chunk.column_count() > 0 {
282 self.capture_column_types(&chunk, &mut result);
283 types_captured = true;
284 }
285 self.collect_chunk(&chunk, &mut result)?;
286
287 if total_rows >= check_interval
289 && total_rows.is_multiple_of(check_interval)
290 && shared_ctx.should_reoptimize()
291 {
292 }
296 }
297 Ok(None) => break,
298 Err(err) => return Err(convert_operator_error(err)),
299 }
300 }
301
302 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
304
305 Ok((result, summary))
306 }
307}
308
309impl Default for Executor {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315fn convert_operator_error(err: OperatorError) -> Error {
317 match err {
318 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
319 OperatorError::ColumnNotFound(name) => {
320 Error::InvalidValue(format!("Column not found: {name}"))
321 }
322 OperatorError::Execution(msg) => Error::Internal(msg),
323 OperatorError::ConstraintViolation(msg) => {
324 Error::InvalidValue(format!("Constraint violation: {msg}"))
325 }
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use grafeo_common::types::LogicalType;
333 use grafeo_core::execution::DataChunk;
334
335 struct MockIntOperator {
337 values: Vec<i64>,
338 position: usize,
339 chunk_size: usize,
340 }
341
342 impl MockIntOperator {
343 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
344 Self {
345 values,
346 position: 0,
347 chunk_size,
348 }
349 }
350 }
351
352 impl Operator for MockIntOperator {
353 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
354 if self.position >= self.values.len() {
355 return Ok(None);
356 }
357
358 let end = (self.position + self.chunk_size).min(self.values.len());
359 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
360
361 {
362 let col = chunk.column_mut(0).unwrap();
363 for i in self.position..end {
364 col.push_int64(self.values[i]);
365 }
366 }
367 chunk.set_count(end - self.position);
368 self.position = end;
369
370 Ok(Some(chunk))
371 }
372
373 fn reset(&mut self) {
374 self.position = 0;
375 }
376
377 fn name(&self) -> &'static str {
378 "MockInt"
379 }
380 }
381
382 struct EmptyOperator;
384
385 impl Operator for EmptyOperator {
386 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
387 Ok(None)
388 }
389
390 fn reset(&mut self) {}
391
392 fn name(&self) -> &'static str {
393 "Empty"
394 }
395 }
396
397 #[test]
398 fn test_executor_empty() {
399 let executor = Executor::with_columns(vec!["a".to_string()]);
400 let mut op = EmptyOperator;
401
402 let result = executor.execute(&mut op).unwrap();
403 assert!(result.is_empty());
404 assert_eq!(result.column_count(), 1);
405 }
406
407 #[test]
408 fn test_executor_single_chunk() {
409 let executor = Executor::with_columns(vec!["value".to_string()]);
410 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
411
412 let result = executor.execute(&mut op).unwrap();
413 assert_eq!(result.row_count(), 3);
414 assert_eq!(result.rows[0][0], Value::Int64(1));
415 assert_eq!(result.rows[1][0], Value::Int64(2));
416 assert_eq!(result.rows[2][0], Value::Int64(3));
417 }
418
419 #[test]
420 fn test_executor_with_limit() {
421 let executor = Executor::with_columns(vec!["value".to_string()]);
422 let mut op = MockIntOperator::new((0..10).collect(), 100);
423
424 let result = executor.execute_with_limit(&mut op, 5).unwrap();
425 assert_eq!(result.row_count(), 5);
426 }
427
428 #[test]
429 fn test_executor_timeout_expired() {
430 use std::time::{Duration, Instant};
431
432 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
434 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
435 ));
436 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
437
438 let result = executor.execute(&mut op);
439 assert!(result.is_err());
440 let err = result.unwrap_err();
441 assert!(
442 err.to_string().contains("Query exceeded timeout"),
443 "Expected timeout error, got: {err}"
444 );
445 }
446
447 #[test]
448 fn test_executor_no_timeout() {
449 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
451 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
452
453 let result = executor.execute(&mut op).unwrap();
454 assert_eq!(result.row_count(), 3);
455 }
456}