grafeo_engine/query/executor/
mod.rs1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::grafeo_debug_span;
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, QueryError, Result};
17use grafeo_core::execution::operators::{Operator, OperatorError};
18use grafeo_core::execution::{
19 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
20};
21
22pub struct Executor {
24 columns: Vec<String>,
26 column_types: Vec<LogicalType>,
28 deadline: Option<Instant>,
30}
31
32impl Executor {
33 #[must_use]
35 pub fn new() -> Self {
36 Self {
37 columns: Vec::new(),
38 column_types: Vec::new(),
39 deadline: None,
40 }
41 }
42
43 #[must_use]
45 pub fn with_columns(columns: Vec<String>) -> Self {
46 let len = columns.len();
47 Self {
48 columns,
49 column_types: vec![LogicalType::Any; len],
50 deadline: None,
51 }
52 }
53
54 #[must_use]
56 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
57 Self {
58 columns,
59 column_types,
60 deadline: None,
61 }
62 }
63
64 #[must_use]
66 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
67 self.deadline = deadline;
68 self
69 }
70
71 fn check_deadline(&self) -> Result<()> {
73 #[cfg(not(target_arch = "wasm32"))]
74 if let Some(deadline) = self.deadline
75 && Instant::now() >= deadline
76 {
77 return Err(Error::Query(QueryError::timeout()));
78 }
79 Ok(())
80 }
81
82 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
88 let _span = grafeo_debug_span!("grafeo::query::execute");
89 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
90 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
91
92 loop {
93 self.check_deadline()?;
94
95 match operator.next() {
96 Ok(Some(chunk)) => {
97 if !types_captured && chunk.column_count() > 0 {
99 self.capture_column_types(&chunk, &mut result);
100 types_captured = true;
101 }
102 self.collect_chunk(&chunk, &mut result)?;
103 }
104 Ok(None) => break,
105 Err(err) => return Err(convert_operator_error(err)),
106 }
107 }
108
109 Ok(result)
110 }
111
112 pub fn execute_with_limit(
118 &self,
119 operator: &mut dyn Operator,
120 limit: usize,
121 ) -> Result<QueryResult> {
122 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
123 let mut collected = 0;
124 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
125
126 loop {
127 if collected >= limit {
128 break;
129 }
130
131 self.check_deadline()?;
132
133 match operator.next() {
134 Ok(Some(chunk)) => {
135 if !types_captured && chunk.column_count() > 0 {
137 self.capture_column_types(&chunk, &mut result);
138 types_captured = true;
139 }
140 let remaining = limit - collected;
141 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
142 }
143 Ok(None) => break,
144 Err(err) => return Err(convert_operator_error(err)),
145 }
146 }
147
148 Ok(result)
149 }
150
151 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
153 let col_count = chunk.column_count();
154 result.column_types = Vec::with_capacity(col_count);
155 for col_idx in 0..col_count {
156 let col_type = chunk
157 .column(col_idx)
158 .map_or(LogicalType::Any, |col| col.data_type().clone());
159 result.column_types.push(col_type);
160 }
161 }
162
163 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
168 let col_count = chunk.column_count();
169 let mut collected = 0;
170
171 for row_idx in chunk.selected_indices() {
172 let mut row = Vec::with_capacity(col_count);
173 for col_idx in 0..col_count {
174 let value = chunk
175 .column(col_idx)
176 .and_then(|col| col.get_value(row_idx))
177 .unwrap_or(Value::Null);
178 row.push(value);
179 }
180 result.rows.push(row);
181 collected += 1;
182 }
183
184 Ok(collected)
185 }
186
187 fn collect_chunk_limited(
192 &self,
193 chunk: &DataChunk,
194 result: &mut QueryResult,
195 limit: usize,
196 ) -> Result<usize> {
197 let col_count = chunk.column_count();
198 let mut collected = 0;
199
200 for row_idx in chunk.selected_indices() {
201 if collected >= limit {
202 break;
203 }
204 let mut row = Vec::with_capacity(col_count);
205 for col_idx in 0..col_count {
206 let value = chunk
207 .column(col_idx)
208 .and_then(|col| col.get_value(row_idx))
209 .unwrap_or(Value::Null);
210 row.push(value);
211 }
212 result.rows.push(row);
213 collected += 1;
214 }
215
216 Ok(collected)
217 }
218
219 pub fn execute_adaptive(
235 &self,
236 operator: Box<dyn Operator>,
237 adaptive_context: Option<AdaptiveContext>,
238 config: &AdaptiveConfig,
239 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
240 if !config.enabled {
242 let mut op = operator;
243 let result = self.execute(op.as_mut())?;
244 return Ok((result, None));
245 }
246
247 let Some(ctx) = adaptive_context else {
248 let mut op = operator;
249 let result = self.execute(op.as_mut())?;
250 return Ok((result, None));
251 };
252
253 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
255 config.threshold,
256 config.min_rows,
257 ));
258
259 for (op_id, checkpoint) in ctx.all_checkpoints() {
261 if let Some(mut inner) = shared_ctx.snapshot() {
262 inner.set_estimate(op_id, checkpoint.estimated);
263 }
264 }
265
266 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
268
269 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
271 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
272 let mut total_rows: u64 = 0;
273 let check_interval = config.min_rows;
274
275 loop {
276 self.check_deadline()?;
277
278 match wrapped.next() {
279 Ok(Some(chunk)) => {
280 let chunk_rows = chunk.row_count();
281 total_rows += chunk_rows as u64;
282
283 if !types_captured && chunk.column_count() > 0 {
285 self.capture_column_types(&chunk, &mut result);
286 types_captured = true;
287 }
288 self.collect_chunk(&chunk, &mut result)?;
289
290 if total_rows >= check_interval
292 && total_rows.is_multiple_of(check_interval)
293 && shared_ctx.should_reoptimize()
294 {
295 }
299 }
300 Ok(None) => break,
301 Err(err) => return Err(convert_operator_error(err)),
302 }
303 }
304
305 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
307
308 Ok((result, summary))
309 }
310}
311
312impl Default for Executor {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318fn convert_operator_error(err: OperatorError) -> Error {
320 match err {
321 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
322 OperatorError::ColumnNotFound(name) => {
323 Error::InvalidValue(format!("Column not found: {name}"))
324 }
325 OperatorError::Execution(msg) => Error::Internal(msg),
326 OperatorError::ConstraintViolation(msg) => {
327 Error::InvalidValue(format!("Constraint violation: {msg}"))
328 }
329 OperatorError::WriteConflict(msg) => {
330 Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
331 }
332 _ => Error::Internal(format!("{err}")),
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use grafeo_common::types::LogicalType;
340 use grafeo_core::execution::DataChunk;
341
342 struct MockIntOperator {
344 values: Vec<i64>,
345 position: usize,
346 chunk_size: usize,
347 }
348
349 impl MockIntOperator {
350 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
351 Self {
352 values,
353 position: 0,
354 chunk_size,
355 }
356 }
357 }
358
359 impl Operator for MockIntOperator {
360 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
361 if self.position >= self.values.len() {
362 return Ok(None);
363 }
364
365 let end = (self.position + self.chunk_size).min(self.values.len());
366 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
367
368 {
369 let col = chunk.column_mut(0).unwrap();
370 for i in self.position..end {
371 col.push_int64(self.values[i]);
372 }
373 }
374 chunk.set_count(end - self.position);
375 self.position = end;
376
377 Ok(Some(chunk))
378 }
379
380 fn reset(&mut self) {
381 self.position = 0;
382 }
383
384 fn name(&self) -> &'static str {
385 "MockInt"
386 }
387 }
388
389 struct EmptyOperator;
391
392 impl Operator for EmptyOperator {
393 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
394 Ok(None)
395 }
396
397 fn reset(&mut self) {}
398
399 fn name(&self) -> &'static str {
400 "Empty"
401 }
402 }
403
404 #[test]
405 fn test_executor_empty() {
406 let executor = Executor::with_columns(vec!["a".to_string()]);
407 let mut op = EmptyOperator;
408
409 let result = executor.execute(&mut op).unwrap();
410 assert!(result.is_empty());
411 assert_eq!(result.column_count(), 1);
412 }
413
414 #[test]
415 fn test_executor_single_chunk() {
416 let executor = Executor::with_columns(vec!["value".to_string()]);
417 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
418
419 let result = executor.execute(&mut op).unwrap();
420 assert_eq!(result.row_count(), 3);
421 assert_eq!(result.rows[0][0], Value::Int64(1));
422 assert_eq!(result.rows[1][0], Value::Int64(2));
423 assert_eq!(result.rows[2][0], Value::Int64(3));
424 }
425
426 #[test]
427 fn test_executor_with_limit() {
428 let executor = Executor::with_columns(vec!["value".to_string()]);
429 let mut op = MockIntOperator::new((0..10).collect(), 100);
430
431 let result = executor.execute_with_limit(&mut op, 5).unwrap();
432 assert_eq!(result.row_count(), 5);
433 }
434
435 #[test]
436 fn test_executor_timeout_expired() {
437 use std::time::{Duration, Instant};
438
439 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
441 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
442 ));
443 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
444
445 let result = executor.execute(&mut op);
446 assert!(result.is_err());
447 let err = result.unwrap_err();
448 assert!(
449 err.to_string().contains("Query exceeded timeout"),
450 "Expected timeout error, got: {err}"
451 );
452 }
453
454 #[test]
455 fn test_executor_no_timeout() {
456 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
458 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
459
460 let result = executor.execute(&mut op).unwrap();
461 assert_eq!(result.row_count(), 3);
462 }
463}