grafeo_engine/query/executor/
mod.rs1pub mod procedure_call;
6
7use std::time::Instant;
8
9use crate::config::AdaptiveConfig;
10use crate::database::QueryResult;
11use grafeo_common::types::{LogicalType, Value};
12use grafeo_common::utils::error::{Error, QueryError, Result};
13use grafeo_core::execution::operators::{Operator, OperatorError};
14use grafeo_core::execution::{
15 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
16};
17
18pub struct Executor {
20 columns: Vec<String>,
22 column_types: Vec<LogicalType>,
24 deadline: Option<Instant>,
26}
27
28impl Executor {
29 #[must_use]
31 pub fn new() -> Self {
32 Self {
33 columns: Vec::new(),
34 column_types: Vec::new(),
35 deadline: None,
36 }
37 }
38
39 #[must_use]
41 pub fn with_columns(columns: Vec<String>) -> Self {
42 let len = columns.len();
43 Self {
44 columns,
45 column_types: vec![LogicalType::Any; len],
46 deadline: None,
47 }
48 }
49
50 #[must_use]
52 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
53 Self {
54 columns,
55 column_types,
56 deadline: None,
57 }
58 }
59
60 #[must_use]
62 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
63 self.deadline = deadline;
64 self
65 }
66
67 fn check_deadline(&self) -> Result<()> {
69 if let Some(deadline) = self.deadline
70 && Instant::now() >= deadline
71 {
72 return Err(Error::Query(QueryError::timeout()));
73 }
74 Ok(())
75 }
76
77 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
83 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
84 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
85
86 loop {
87 self.check_deadline()?;
88
89 match operator.next() {
90 Ok(Some(chunk)) => {
91 if !types_captured && chunk.column_count() > 0 {
93 self.capture_column_types(&chunk, &mut result);
94 types_captured = true;
95 }
96 self.collect_chunk(&chunk, &mut result)?;
97 }
98 Ok(None) => break,
99 Err(err) => return Err(convert_operator_error(err)),
100 }
101 }
102
103 Ok(result)
104 }
105
106 pub fn execute_with_limit(
112 &self,
113 operator: &mut dyn Operator,
114 limit: usize,
115 ) -> Result<QueryResult> {
116 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
117 let mut collected = 0;
118 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
119
120 loop {
121 if collected >= limit {
122 break;
123 }
124
125 self.check_deadline()?;
126
127 match operator.next() {
128 Ok(Some(chunk)) => {
129 if !types_captured && chunk.column_count() > 0 {
131 self.capture_column_types(&chunk, &mut result);
132 types_captured = true;
133 }
134 let remaining = limit - collected;
135 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
136 }
137 Ok(None) => break,
138 Err(err) => return Err(convert_operator_error(err)),
139 }
140 }
141
142 Ok(result)
143 }
144
145 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
147 let col_count = chunk.column_count();
148 result.column_types = Vec::with_capacity(col_count);
149 for col_idx in 0..col_count {
150 let col_type = chunk
151 .column(col_idx)
152 .map_or(LogicalType::Any, |col| col.data_type().clone());
153 result.column_types.push(col_type);
154 }
155 }
156
157 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
162 let col_count = chunk.column_count();
163 let mut collected = 0;
164
165 for row_idx in chunk.selected_indices() {
166 let mut row = Vec::with_capacity(col_count);
167 for col_idx in 0..col_count {
168 let value = chunk
169 .column(col_idx)
170 .and_then(|col| col.get_value(row_idx))
171 .unwrap_or(Value::Null);
172 row.push(value);
173 }
174 result.rows.push(row);
175 collected += 1;
176 }
177
178 Ok(collected)
179 }
180
181 fn collect_chunk_limited(
186 &self,
187 chunk: &DataChunk,
188 result: &mut QueryResult,
189 limit: usize,
190 ) -> Result<usize> {
191 let col_count = chunk.column_count();
192 let mut collected = 0;
193
194 for row_idx in chunk.selected_indices() {
195 if collected >= limit {
196 break;
197 }
198 let mut row = Vec::with_capacity(col_count);
199 for col_idx in 0..col_count {
200 let value = chunk
201 .column(col_idx)
202 .and_then(|col| col.get_value(row_idx))
203 .unwrap_or(Value::Null);
204 row.push(value);
205 }
206 result.rows.push(row);
207 collected += 1;
208 }
209
210 Ok(collected)
211 }
212
213 pub fn execute_adaptive(
229 &self,
230 operator: Box<dyn Operator>,
231 adaptive_context: Option<AdaptiveContext>,
232 config: &AdaptiveConfig,
233 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
234 if !config.enabled {
236 let mut op = operator;
237 let result = self.execute(op.as_mut())?;
238 return Ok((result, None));
239 }
240
241 let Some(ctx) = adaptive_context else {
242 let mut op = operator;
243 let result = self.execute(op.as_mut())?;
244 return Ok((result, None));
245 };
246
247 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
249 config.threshold,
250 config.min_rows,
251 ));
252
253 for (op_id, checkpoint) in ctx.all_checkpoints() {
255 if let Some(mut inner) = shared_ctx.snapshot() {
256 inner.set_estimate(op_id, checkpoint.estimated);
257 }
258 }
259
260 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
262
263 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
265 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
266 let mut total_rows: u64 = 0;
267 let check_interval = config.min_rows;
268
269 loop {
270 self.check_deadline()?;
271
272 match wrapped.next() {
273 Ok(Some(chunk)) => {
274 let chunk_rows = chunk.row_count();
275 total_rows += chunk_rows as u64;
276
277 if !types_captured && chunk.column_count() > 0 {
279 self.capture_column_types(&chunk, &mut result);
280 types_captured = true;
281 }
282 self.collect_chunk(&chunk, &mut result)?;
283
284 if total_rows >= check_interval
286 && total_rows.is_multiple_of(check_interval)
287 && shared_ctx.should_reoptimize()
288 {
289 }
293 }
294 Ok(None) => break,
295 Err(err) => return Err(convert_operator_error(err)),
296 }
297 }
298
299 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
301
302 Ok((result, summary))
303 }
304}
305
306impl Default for Executor {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312fn convert_operator_error(err: OperatorError) -> Error {
314 match err {
315 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
316 OperatorError::ColumnNotFound(name) => {
317 Error::InvalidValue(format!("Column not found: {name}"))
318 }
319 OperatorError::Execution(msg) => Error::Internal(msg),
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use grafeo_common::types::LogicalType;
327 use grafeo_core::execution::DataChunk;
328
329 struct MockIntOperator {
331 values: Vec<i64>,
332 position: usize,
333 chunk_size: usize,
334 }
335
336 impl MockIntOperator {
337 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
338 Self {
339 values,
340 position: 0,
341 chunk_size,
342 }
343 }
344 }
345
346 impl Operator for MockIntOperator {
347 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
348 if self.position >= self.values.len() {
349 return Ok(None);
350 }
351
352 let end = (self.position + self.chunk_size).min(self.values.len());
353 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
354
355 {
356 let col = chunk.column_mut(0).unwrap();
357 for i in self.position..end {
358 col.push_int64(self.values[i]);
359 }
360 }
361 chunk.set_count(end - self.position);
362 self.position = end;
363
364 Ok(Some(chunk))
365 }
366
367 fn reset(&mut self) {
368 self.position = 0;
369 }
370
371 fn name(&self) -> &'static str {
372 "MockInt"
373 }
374 }
375
376 struct EmptyOperator;
378
379 impl Operator for EmptyOperator {
380 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
381 Ok(None)
382 }
383
384 fn reset(&mut self) {}
385
386 fn name(&self) -> &'static str {
387 "Empty"
388 }
389 }
390
391 #[test]
392 fn test_executor_empty() {
393 let executor = Executor::with_columns(vec!["a".to_string()]);
394 let mut op = EmptyOperator;
395
396 let result = executor.execute(&mut op).unwrap();
397 assert!(result.is_empty());
398 assert_eq!(result.column_count(), 1);
399 }
400
401 #[test]
402 fn test_executor_single_chunk() {
403 let executor = Executor::with_columns(vec!["value".to_string()]);
404 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
405
406 let result = executor.execute(&mut op).unwrap();
407 assert_eq!(result.row_count(), 3);
408 assert_eq!(result.rows[0][0], Value::Int64(1));
409 assert_eq!(result.rows[1][0], Value::Int64(2));
410 assert_eq!(result.rows[2][0], Value::Int64(3));
411 }
412
413 #[test]
414 fn test_executor_with_limit() {
415 let executor = Executor::with_columns(vec!["value".to_string()]);
416 let mut op = MockIntOperator::new((0..10).collect(), 100);
417
418 let result = executor.execute_with_limit(&mut op, 5).unwrap();
419 assert_eq!(result.row_count(), 5);
420 }
421
422 #[test]
423 fn test_executor_timeout_expired() {
424 use std::time::{Duration, Instant};
425
426 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
428 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
429 ));
430 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
431
432 let result = executor.execute(&mut op);
433 assert!(result.is_err());
434 let err = result.unwrap_err();
435 assert!(
436 err.to_string().contains("Query exceeded timeout"),
437 "Expected timeout error, got: {err}"
438 );
439 }
440
441 #[test]
442 fn test_executor_no_timeout() {
443 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
445 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
446
447 let result = executor.execute(&mut op).unwrap();
448 assert_eq!(result.row_count(), 3);
449 }
450}