grafeo_engine/query/executor/
mod.rs1use std::time::Instant;
6
7use crate::config::AdaptiveConfig;
8use crate::database::QueryResult;
9use grafeo_common::types::{LogicalType, Value};
10use grafeo_common::utils::error::{Error, QueryError, Result};
11use grafeo_core::execution::operators::{Operator, OperatorError};
12use grafeo_core::execution::{
13 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
14};
15
16pub struct Executor {
18 columns: Vec<String>,
20 column_types: Vec<LogicalType>,
22 deadline: Option<Instant>,
24}
25
26impl Executor {
27 #[must_use]
29 pub fn new() -> Self {
30 Self {
31 columns: Vec::new(),
32 column_types: Vec::new(),
33 deadline: None,
34 }
35 }
36
37 #[must_use]
39 pub fn with_columns(columns: Vec<String>) -> Self {
40 let len = columns.len();
41 Self {
42 columns,
43 column_types: vec![LogicalType::Any; len],
44 deadline: None,
45 }
46 }
47
48 #[must_use]
50 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
51 Self {
52 columns,
53 column_types,
54 deadline: None,
55 }
56 }
57
58 #[must_use]
60 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
61 self.deadline = deadline;
62 self
63 }
64
65 fn check_deadline(&self) -> Result<()> {
67 if let Some(deadline) = self.deadline
68 && Instant::now() >= deadline
69 {
70 return Err(Error::Query(QueryError::timeout()));
71 }
72 Ok(())
73 }
74
75 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
81 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
82 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
83
84 loop {
85 self.check_deadline()?;
86
87 match operator.next() {
88 Ok(Some(chunk)) => {
89 if !types_captured && chunk.column_count() > 0 {
91 self.capture_column_types(&chunk, &mut result);
92 types_captured = true;
93 }
94 self.collect_chunk(&chunk, &mut result)?;
95 }
96 Ok(None) => break,
97 Err(err) => return Err(convert_operator_error(err)),
98 }
99 }
100
101 Ok(result)
102 }
103
104 pub fn execute_with_limit(
110 &self,
111 operator: &mut dyn Operator,
112 limit: usize,
113 ) -> Result<QueryResult> {
114 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
115 let mut collected = 0;
116 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
117
118 loop {
119 if collected >= limit {
120 break;
121 }
122
123 self.check_deadline()?;
124
125 match operator.next() {
126 Ok(Some(chunk)) => {
127 if !types_captured && chunk.column_count() > 0 {
129 self.capture_column_types(&chunk, &mut result);
130 types_captured = true;
131 }
132 let remaining = limit - collected;
133 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
134 }
135 Ok(None) => break,
136 Err(err) => return Err(convert_operator_error(err)),
137 }
138 }
139
140 Ok(result)
141 }
142
143 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
145 let col_count = chunk.column_count();
146 result.column_types = Vec::with_capacity(col_count);
147 for col_idx in 0..col_count {
148 let col_type = chunk
149 .column(col_idx)
150 .map_or(LogicalType::Any, |col| col.data_type().clone());
151 result.column_types.push(col_type);
152 }
153 }
154
155 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
160 let col_count = chunk.column_count();
161 let mut collected = 0;
162
163 for row_idx in chunk.selected_indices() {
164 let mut row = Vec::with_capacity(col_count);
165 for col_idx in 0..col_count {
166 let value = chunk
167 .column(col_idx)
168 .and_then(|col| col.get_value(row_idx))
169 .unwrap_or(Value::Null);
170 row.push(value);
171 }
172 result.rows.push(row);
173 collected += 1;
174 }
175
176 Ok(collected)
177 }
178
179 fn collect_chunk_limited(
184 &self,
185 chunk: &DataChunk,
186 result: &mut QueryResult,
187 limit: usize,
188 ) -> Result<usize> {
189 let col_count = chunk.column_count();
190 let mut collected = 0;
191
192 for row_idx in chunk.selected_indices() {
193 if collected >= limit {
194 break;
195 }
196 let mut row = Vec::with_capacity(col_count);
197 for col_idx in 0..col_count {
198 let value = chunk
199 .column(col_idx)
200 .and_then(|col| col.get_value(row_idx))
201 .unwrap_or(Value::Null);
202 row.push(value);
203 }
204 result.rows.push(row);
205 collected += 1;
206 }
207
208 Ok(collected)
209 }
210
211 pub fn execute_adaptive(
227 &self,
228 operator: Box<dyn Operator>,
229 adaptive_context: Option<AdaptiveContext>,
230 config: &AdaptiveConfig,
231 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
232 if !config.enabled {
234 let mut op = operator;
235 let result = self.execute(op.as_mut())?;
236 return Ok((result, None));
237 }
238
239 let Some(ctx) = adaptive_context else {
240 let mut op = operator;
241 let result = self.execute(op.as_mut())?;
242 return Ok((result, None));
243 };
244
245 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
247 config.threshold,
248 config.min_rows,
249 ));
250
251 for (op_id, checkpoint) in ctx.all_checkpoints() {
253 if let Some(mut inner) = shared_ctx.snapshot() {
254 inner.set_estimate(op_id, checkpoint.estimated);
255 }
256 }
257
258 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
260
261 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
263 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
264 let mut total_rows: u64 = 0;
265 let check_interval = config.min_rows;
266
267 loop {
268 self.check_deadline()?;
269
270 match wrapped.next() {
271 Ok(Some(chunk)) => {
272 let chunk_rows = chunk.row_count();
273 total_rows += chunk_rows as u64;
274
275 if !types_captured && chunk.column_count() > 0 {
277 self.capture_column_types(&chunk, &mut result);
278 types_captured = true;
279 }
280 self.collect_chunk(&chunk, &mut result)?;
281
282 if total_rows >= check_interval
284 && total_rows.is_multiple_of(check_interval)
285 && shared_ctx.should_reoptimize()
286 {
287 }
291 }
292 Ok(None) => break,
293 Err(err) => return Err(convert_operator_error(err)),
294 }
295 }
296
297 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
299
300 Ok((result, summary))
301 }
302}
303
304impl Default for Executor {
305 fn default() -> Self {
306 Self::new()
307 }
308}
309
310fn convert_operator_error(err: OperatorError) -> Error {
312 match err {
313 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
314 OperatorError::ColumnNotFound(name) => {
315 Error::InvalidValue(format!("Column not found: {name}"))
316 }
317 OperatorError::Execution(msg) => Error::Internal(msg),
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use grafeo_common::types::LogicalType;
325 use grafeo_core::execution::DataChunk;
326
327 struct MockIntOperator {
329 values: Vec<i64>,
330 position: usize,
331 chunk_size: usize,
332 }
333
334 impl MockIntOperator {
335 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
336 Self {
337 values,
338 position: 0,
339 chunk_size,
340 }
341 }
342 }
343
344 impl Operator for MockIntOperator {
345 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
346 if self.position >= self.values.len() {
347 return Ok(None);
348 }
349
350 let end = (self.position + self.chunk_size).min(self.values.len());
351 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
352
353 {
354 let col = chunk.column_mut(0).unwrap();
355 for i in self.position..end {
356 col.push_int64(self.values[i]);
357 }
358 }
359 chunk.set_count(end - self.position);
360 self.position = end;
361
362 Ok(Some(chunk))
363 }
364
365 fn reset(&mut self) {
366 self.position = 0;
367 }
368
369 fn name(&self) -> &'static str {
370 "MockInt"
371 }
372 }
373
374 struct EmptyOperator;
376
377 impl Operator for EmptyOperator {
378 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
379 Ok(None)
380 }
381
382 fn reset(&mut self) {}
383
384 fn name(&self) -> &'static str {
385 "Empty"
386 }
387 }
388
389 #[test]
390 fn test_executor_empty() {
391 let executor = Executor::with_columns(vec!["a".to_string()]);
392 let mut op = EmptyOperator;
393
394 let result = executor.execute(&mut op).unwrap();
395 assert!(result.is_empty());
396 assert_eq!(result.column_count(), 1);
397 }
398
399 #[test]
400 fn test_executor_single_chunk() {
401 let executor = Executor::with_columns(vec!["value".to_string()]);
402 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
403
404 let result = executor.execute(&mut op).unwrap();
405 assert_eq!(result.row_count(), 3);
406 assert_eq!(result.rows[0][0], Value::Int64(1));
407 assert_eq!(result.rows[1][0], Value::Int64(2));
408 assert_eq!(result.rows[2][0], Value::Int64(3));
409 }
410
411 #[test]
412 fn test_executor_with_limit() {
413 let executor = Executor::with_columns(vec!["value".to_string()]);
414 let mut op = MockIntOperator::new((0..10).collect(), 100);
415
416 let result = executor.execute_with_limit(&mut op, 5).unwrap();
417 assert_eq!(result.row_count(), 5);
418 }
419
420 #[test]
421 fn test_executor_timeout_expired() {
422 use std::time::{Duration, Instant};
423
424 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
426 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
427 ));
428 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
429
430 let result = executor.execute(&mut op);
431 assert!(result.is_err());
432 let err = result.unwrap_err();
433 assert!(
434 err.to_string().contains("Query exceeded timeout"),
435 "Expected timeout error, got: {err}"
436 );
437 }
438
439 #[test]
440 fn test_executor_no_timeout() {
441 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
443 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
444
445 let result = executor.execute(&mut op).unwrap();
446 assert_eq!(result.row_count(), 3);
447 }
448}