grafeo_engine/query/executor/
mod.rs1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::types::{LogicalType, Value};
15use grafeo_common::utils::error::{Error, QueryError, Result};
16use grafeo_core::execution::operators::{Operator, OperatorError};
17use grafeo_core::execution::{
18 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
19};
20
21pub struct Executor {
23 columns: Vec<String>,
25 column_types: Vec<LogicalType>,
27 deadline: Option<Instant>,
29}
30
31impl Executor {
32 #[must_use]
34 pub fn new() -> Self {
35 Self {
36 columns: Vec::new(),
37 column_types: Vec::new(),
38 deadline: None,
39 }
40 }
41
42 #[must_use]
44 pub fn with_columns(columns: Vec<String>) -> Self {
45 let len = columns.len();
46 Self {
47 columns,
48 column_types: vec![LogicalType::Any; len],
49 deadline: None,
50 }
51 }
52
53 #[must_use]
55 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
56 Self {
57 columns,
58 column_types,
59 deadline: None,
60 }
61 }
62
63 #[must_use]
65 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
66 self.deadline = deadline;
67 self
68 }
69
70 fn check_deadline(&self) -> Result<()> {
72 #[cfg(not(target_arch = "wasm32"))]
73 if let Some(deadline) = self.deadline
74 && Instant::now() >= deadline
75 {
76 return Err(Error::Query(QueryError::timeout()));
77 }
78 Ok(())
79 }
80
81 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
87 let _span = tracing::debug_span!("grafeo::query::execute").entered();
88 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
89 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
90
91 loop {
92 self.check_deadline()?;
93
94 match operator.next() {
95 Ok(Some(chunk)) => {
96 if !types_captured && chunk.column_count() > 0 {
98 self.capture_column_types(&chunk, &mut result);
99 types_captured = true;
100 }
101 self.collect_chunk(&chunk, &mut result)?;
102 }
103 Ok(None) => break,
104 Err(err) => return Err(convert_operator_error(err)),
105 }
106 }
107
108 Ok(result)
109 }
110
111 pub fn execute_with_limit(
117 &self,
118 operator: &mut dyn Operator,
119 limit: usize,
120 ) -> Result<QueryResult> {
121 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
122 let mut collected = 0;
123 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
124
125 loop {
126 if collected >= limit {
127 break;
128 }
129
130 self.check_deadline()?;
131
132 match operator.next() {
133 Ok(Some(chunk)) => {
134 if !types_captured && chunk.column_count() > 0 {
136 self.capture_column_types(&chunk, &mut result);
137 types_captured = true;
138 }
139 let remaining = limit - collected;
140 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
141 }
142 Ok(None) => break,
143 Err(err) => return Err(convert_operator_error(err)),
144 }
145 }
146
147 Ok(result)
148 }
149
150 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
152 let col_count = chunk.column_count();
153 result.column_types = Vec::with_capacity(col_count);
154 for col_idx in 0..col_count {
155 let col_type = chunk
156 .column(col_idx)
157 .map_or(LogicalType::Any, |col| col.data_type().clone());
158 result.column_types.push(col_type);
159 }
160 }
161
162 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
167 let col_count = chunk.column_count();
168 let mut collected = 0;
169
170 for row_idx in chunk.selected_indices() {
171 let mut row = Vec::with_capacity(col_count);
172 for col_idx in 0..col_count {
173 let value = chunk
174 .column(col_idx)
175 .and_then(|col| col.get_value(row_idx))
176 .unwrap_or(Value::Null);
177 row.push(value);
178 }
179 result.rows.push(row);
180 collected += 1;
181 }
182
183 Ok(collected)
184 }
185
186 fn collect_chunk_limited(
191 &self,
192 chunk: &DataChunk,
193 result: &mut QueryResult,
194 limit: usize,
195 ) -> Result<usize> {
196 let col_count = chunk.column_count();
197 let mut collected = 0;
198
199 for row_idx in chunk.selected_indices() {
200 if collected >= limit {
201 break;
202 }
203 let mut row = Vec::with_capacity(col_count);
204 for col_idx in 0..col_count {
205 let value = chunk
206 .column(col_idx)
207 .and_then(|col| col.get_value(row_idx))
208 .unwrap_or(Value::Null);
209 row.push(value);
210 }
211 result.rows.push(row);
212 collected += 1;
213 }
214
215 Ok(collected)
216 }
217
218 pub fn execute_adaptive(
234 &self,
235 operator: Box<dyn Operator>,
236 adaptive_context: Option<AdaptiveContext>,
237 config: &AdaptiveConfig,
238 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
239 if !config.enabled {
241 let mut op = operator;
242 let result = self.execute(op.as_mut())?;
243 return Ok((result, None));
244 }
245
246 let Some(ctx) = adaptive_context else {
247 let mut op = operator;
248 let result = self.execute(op.as_mut())?;
249 return Ok((result, None));
250 };
251
252 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
254 config.threshold,
255 config.min_rows,
256 ));
257
258 for (op_id, checkpoint) in ctx.all_checkpoints() {
260 if let Some(mut inner) = shared_ctx.snapshot() {
261 inner.set_estimate(op_id, checkpoint.estimated);
262 }
263 }
264
265 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
267
268 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
270 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
271 let mut total_rows: u64 = 0;
272 let check_interval = config.min_rows;
273
274 loop {
275 self.check_deadline()?;
276
277 match wrapped.next() {
278 Ok(Some(chunk)) => {
279 let chunk_rows = chunk.row_count();
280 total_rows += chunk_rows as u64;
281
282 if !types_captured && chunk.column_count() > 0 {
284 self.capture_column_types(&chunk, &mut result);
285 types_captured = true;
286 }
287 self.collect_chunk(&chunk, &mut result)?;
288
289 if total_rows >= check_interval
291 && total_rows.is_multiple_of(check_interval)
292 && shared_ctx.should_reoptimize()
293 {
294 }
298 }
299 Ok(None) => break,
300 Err(err) => return Err(convert_operator_error(err)),
301 }
302 }
303
304 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
306
307 Ok((result, summary))
308 }
309}
310
311impl Default for Executor {
312 fn default() -> Self {
313 Self::new()
314 }
315}
316
317fn convert_operator_error(err: OperatorError) -> Error {
319 match err {
320 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
321 OperatorError::ColumnNotFound(name) => {
322 Error::InvalidValue(format!("Column not found: {name}"))
323 }
324 OperatorError::Execution(msg) => Error::Internal(msg),
325 OperatorError::ConstraintViolation(msg) => {
326 Error::InvalidValue(format!("Constraint violation: {msg}"))
327 }
328 OperatorError::WriteConflict(msg) => {
329 Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
330 }
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use grafeo_common::types::LogicalType;
338 use grafeo_core::execution::DataChunk;
339
340 struct MockIntOperator {
342 values: Vec<i64>,
343 position: usize,
344 chunk_size: usize,
345 }
346
347 impl MockIntOperator {
348 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
349 Self {
350 values,
351 position: 0,
352 chunk_size,
353 }
354 }
355 }
356
357 impl Operator for MockIntOperator {
358 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
359 if self.position >= self.values.len() {
360 return Ok(None);
361 }
362
363 let end = (self.position + self.chunk_size).min(self.values.len());
364 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
365
366 {
367 let col = chunk.column_mut(0).unwrap();
368 for i in self.position..end {
369 col.push_int64(self.values[i]);
370 }
371 }
372 chunk.set_count(end - self.position);
373 self.position = end;
374
375 Ok(Some(chunk))
376 }
377
378 fn reset(&mut self) {
379 self.position = 0;
380 }
381
382 fn name(&self) -> &'static str {
383 "MockInt"
384 }
385 }
386
387 struct EmptyOperator;
389
390 impl Operator for EmptyOperator {
391 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
392 Ok(None)
393 }
394
395 fn reset(&mut self) {}
396
397 fn name(&self) -> &'static str {
398 "Empty"
399 }
400 }
401
402 #[test]
403 fn test_executor_empty() {
404 let executor = Executor::with_columns(vec!["a".to_string()]);
405 let mut op = EmptyOperator;
406
407 let result = executor.execute(&mut op).unwrap();
408 assert!(result.is_empty());
409 assert_eq!(result.column_count(), 1);
410 }
411
412 #[test]
413 fn test_executor_single_chunk() {
414 let executor = Executor::with_columns(vec!["value".to_string()]);
415 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
416
417 let result = executor.execute(&mut op).unwrap();
418 assert_eq!(result.row_count(), 3);
419 assert_eq!(result.rows[0][0], Value::Int64(1));
420 assert_eq!(result.rows[1][0], Value::Int64(2));
421 assert_eq!(result.rows[2][0], Value::Int64(3));
422 }
423
424 #[test]
425 fn test_executor_with_limit() {
426 let executor = Executor::with_columns(vec!["value".to_string()]);
427 let mut op = MockIntOperator::new((0..10).collect(), 100);
428
429 let result = executor.execute_with_limit(&mut op, 5).unwrap();
430 assert_eq!(result.row_count(), 5);
431 }
432
433 #[test]
434 fn test_executor_timeout_expired() {
435 use std::time::{Duration, Instant};
436
437 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
439 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
440 ));
441 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
442
443 let result = executor.execute(&mut op);
444 assert!(result.is_err());
445 let err = result.unwrap_err();
446 assert!(
447 err.to_string().contains("Query exceeded timeout"),
448 "Expected timeout error, got: {err}"
449 );
450 }
451
452 #[test]
453 fn test_executor_no_timeout() {
454 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
456 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
457
458 let result = executor.execute(&mut op).unwrap();
459 assert_eq!(result.row_count(), 3);
460 }
461}