grafeo_engine/query/executor/
mod.rs1#[cfg(feature = "algos")]
6pub mod procedure_call;
7
8use std::time::Instant;
9
10use crate::config::AdaptiveConfig;
11use crate::database::QueryResult;
12use grafeo_common::types::{LogicalType, Value};
13use grafeo_common::utils::error::{Error, QueryError, Result};
14use grafeo_core::execution::operators::{Operator, OperatorError};
15use grafeo_core::execution::{
16 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
17};
18
19pub struct Executor {
21 columns: Vec<String>,
23 column_types: Vec<LogicalType>,
25 deadline: Option<Instant>,
27}
28
29impl Executor {
30 #[must_use]
32 pub fn new() -> Self {
33 Self {
34 columns: Vec::new(),
35 column_types: Vec::new(),
36 deadline: None,
37 }
38 }
39
40 #[must_use]
42 pub fn with_columns(columns: Vec<String>) -> Self {
43 let len = columns.len();
44 Self {
45 columns,
46 column_types: vec![LogicalType::Any; len],
47 deadline: None,
48 }
49 }
50
51 #[must_use]
53 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
54 Self {
55 columns,
56 column_types,
57 deadline: None,
58 }
59 }
60
61 #[must_use]
63 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
64 self.deadline = deadline;
65 self
66 }
67
68 fn check_deadline(&self) -> Result<()> {
70 if let Some(deadline) = self.deadline
71 && Instant::now() >= deadline
72 {
73 return Err(Error::Query(QueryError::timeout()));
74 }
75 Ok(())
76 }
77
78 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
84 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
85 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
86
87 loop {
88 self.check_deadline()?;
89
90 match operator.next() {
91 Ok(Some(chunk)) => {
92 if !types_captured && chunk.column_count() > 0 {
94 self.capture_column_types(&chunk, &mut result);
95 types_captured = true;
96 }
97 self.collect_chunk(&chunk, &mut result)?;
98 }
99 Ok(None) => break,
100 Err(err) => return Err(convert_operator_error(err)),
101 }
102 }
103
104 Ok(result)
105 }
106
107 pub fn execute_with_limit(
113 &self,
114 operator: &mut dyn Operator,
115 limit: usize,
116 ) -> Result<QueryResult> {
117 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
118 let mut collected = 0;
119 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
120
121 loop {
122 if collected >= limit {
123 break;
124 }
125
126 self.check_deadline()?;
127
128 match operator.next() {
129 Ok(Some(chunk)) => {
130 if !types_captured && chunk.column_count() > 0 {
132 self.capture_column_types(&chunk, &mut result);
133 types_captured = true;
134 }
135 let remaining = limit - collected;
136 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
137 }
138 Ok(None) => break,
139 Err(err) => return Err(convert_operator_error(err)),
140 }
141 }
142
143 Ok(result)
144 }
145
146 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
148 let col_count = chunk.column_count();
149 result.column_types = Vec::with_capacity(col_count);
150 for col_idx in 0..col_count {
151 let col_type = chunk
152 .column(col_idx)
153 .map_or(LogicalType::Any, |col| col.data_type().clone());
154 result.column_types.push(col_type);
155 }
156 }
157
158 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
163 let col_count = chunk.column_count();
164 let mut collected = 0;
165
166 for row_idx in chunk.selected_indices() {
167 let mut row = Vec::with_capacity(col_count);
168 for col_idx in 0..col_count {
169 let value = chunk
170 .column(col_idx)
171 .and_then(|col| col.get_value(row_idx))
172 .unwrap_or(Value::Null);
173 row.push(value);
174 }
175 result.rows.push(row);
176 collected += 1;
177 }
178
179 Ok(collected)
180 }
181
182 fn collect_chunk_limited(
187 &self,
188 chunk: &DataChunk,
189 result: &mut QueryResult,
190 limit: usize,
191 ) -> Result<usize> {
192 let col_count = chunk.column_count();
193 let mut collected = 0;
194
195 for row_idx in chunk.selected_indices() {
196 if collected >= limit {
197 break;
198 }
199 let mut row = Vec::with_capacity(col_count);
200 for col_idx in 0..col_count {
201 let value = chunk
202 .column(col_idx)
203 .and_then(|col| col.get_value(row_idx))
204 .unwrap_or(Value::Null);
205 row.push(value);
206 }
207 result.rows.push(row);
208 collected += 1;
209 }
210
211 Ok(collected)
212 }
213
214 pub fn execute_adaptive(
230 &self,
231 operator: Box<dyn Operator>,
232 adaptive_context: Option<AdaptiveContext>,
233 config: &AdaptiveConfig,
234 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
235 if !config.enabled {
237 let mut op = operator;
238 let result = self.execute(op.as_mut())?;
239 return Ok((result, None));
240 }
241
242 let Some(ctx) = adaptive_context else {
243 let mut op = operator;
244 let result = self.execute(op.as_mut())?;
245 return Ok((result, None));
246 };
247
248 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
250 config.threshold,
251 config.min_rows,
252 ));
253
254 for (op_id, checkpoint) in ctx.all_checkpoints() {
256 if let Some(mut inner) = shared_ctx.snapshot() {
257 inner.set_estimate(op_id, checkpoint.estimated);
258 }
259 }
260
261 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
263
264 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
266 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
267 let mut total_rows: u64 = 0;
268 let check_interval = config.min_rows;
269
270 loop {
271 self.check_deadline()?;
272
273 match wrapped.next() {
274 Ok(Some(chunk)) => {
275 let chunk_rows = chunk.row_count();
276 total_rows += chunk_rows as u64;
277
278 if !types_captured && chunk.column_count() > 0 {
280 self.capture_column_types(&chunk, &mut result);
281 types_captured = true;
282 }
283 self.collect_chunk(&chunk, &mut result)?;
284
285 if total_rows >= check_interval
287 && total_rows.is_multiple_of(check_interval)
288 && shared_ctx.should_reoptimize()
289 {
290 }
294 }
295 Ok(None) => break,
296 Err(err) => return Err(convert_operator_error(err)),
297 }
298 }
299
300 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
302
303 Ok((result, summary))
304 }
305}
306
307impl Default for Executor {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313fn convert_operator_error(err: OperatorError) -> Error {
315 match err {
316 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
317 OperatorError::ColumnNotFound(name) => {
318 Error::InvalidValue(format!("Column not found: {name}"))
319 }
320 OperatorError::Execution(msg) => Error::Internal(msg),
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use grafeo_common::types::LogicalType;
328 use grafeo_core::execution::DataChunk;
329
330 struct MockIntOperator {
332 values: Vec<i64>,
333 position: usize,
334 chunk_size: usize,
335 }
336
337 impl MockIntOperator {
338 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
339 Self {
340 values,
341 position: 0,
342 chunk_size,
343 }
344 }
345 }
346
347 impl Operator for MockIntOperator {
348 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
349 if self.position >= self.values.len() {
350 return Ok(None);
351 }
352
353 let end = (self.position + self.chunk_size).min(self.values.len());
354 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
355
356 {
357 let col = chunk.column_mut(0).unwrap();
358 for i in self.position..end {
359 col.push_int64(self.values[i]);
360 }
361 }
362 chunk.set_count(end - self.position);
363 self.position = end;
364
365 Ok(Some(chunk))
366 }
367
368 fn reset(&mut self) {
369 self.position = 0;
370 }
371
372 fn name(&self) -> &'static str {
373 "MockInt"
374 }
375 }
376
377 struct EmptyOperator;
379
380 impl Operator for EmptyOperator {
381 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
382 Ok(None)
383 }
384
385 fn reset(&mut self) {}
386
387 fn name(&self) -> &'static str {
388 "Empty"
389 }
390 }
391
392 #[test]
393 fn test_executor_empty() {
394 let executor = Executor::with_columns(vec!["a".to_string()]);
395 let mut op = EmptyOperator;
396
397 let result = executor.execute(&mut op).unwrap();
398 assert!(result.is_empty());
399 assert_eq!(result.column_count(), 1);
400 }
401
402 #[test]
403 fn test_executor_single_chunk() {
404 let executor = Executor::with_columns(vec!["value".to_string()]);
405 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
406
407 let result = executor.execute(&mut op).unwrap();
408 assert_eq!(result.row_count(), 3);
409 assert_eq!(result.rows[0][0], Value::Int64(1));
410 assert_eq!(result.rows[1][0], Value::Int64(2));
411 assert_eq!(result.rows[2][0], Value::Int64(3));
412 }
413
414 #[test]
415 fn test_executor_with_limit() {
416 let executor = Executor::with_columns(vec!["value".to_string()]);
417 let mut op = MockIntOperator::new((0..10).collect(), 100);
418
419 let result = executor.execute_with_limit(&mut op, 5).unwrap();
420 assert_eq!(result.row_count(), 5);
421 }
422
423 #[test]
424 fn test_executor_timeout_expired() {
425 use std::time::{Duration, Instant};
426
427 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
429 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
430 ));
431 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
432
433 let result = executor.execute(&mut op);
434 assert!(result.is_err());
435 let err = result.unwrap_err();
436 assert!(
437 err.to_string().contains("Query exceeded timeout"),
438 "Expected timeout error, got: {err}"
439 );
440 }
441
442 #[test]
443 fn test_executor_no_timeout() {
444 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
446 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
447
448 let result = executor.execute(&mut op).unwrap();
449 assert_eq!(result.row_count(), 3);
450 }
451}