grafeo_engine/query/executor/
mod.rs1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::grafeo_debug_span;
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, QueryError, Result};
17use grafeo_core::execution::operators::{Operator, OperatorError};
18use grafeo_core::execution::{
19 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
20};
21
22pub struct Executor {
24 columns: Vec<String>,
26 column_types: Vec<LogicalType>,
28 deadline: Option<Instant>,
30}
31
32impl Executor {
33 #[must_use]
35 pub fn new() -> Self {
36 Self {
37 columns: Vec::new(),
38 column_types: Vec::new(),
39 deadline: None,
40 }
41 }
42
43 #[must_use]
45 pub fn with_columns(columns: Vec<String>) -> Self {
46 let len = columns.len();
47 Self {
48 columns,
49 column_types: vec![LogicalType::Any; len],
50 deadline: None,
51 }
52 }
53
54 #[must_use]
56 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
57 Self {
58 columns,
59 column_types,
60 deadline: None,
61 }
62 }
63
64 #[must_use]
66 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
67 self.deadline = deadline;
68 self
69 }
70
71 fn check_deadline(&self) -> Result<()> {
73 #[cfg(not(target_arch = "wasm32"))]
74 if let Some(deadline) = self.deadline
75 && Instant::now() >= deadline
76 {
77 return Err(Error::Query(QueryError::timeout()));
78 }
79 Ok(())
80 }
81
82 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
88 let _span = grafeo_debug_span!("grafeo::query::execute");
89 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
90 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
91
92 loop {
93 self.check_deadline()?;
94
95 match operator.next() {
96 Ok(Some(chunk)) => {
97 if !types_captured && chunk.column_count() > 0 {
99 self.capture_column_types(&chunk, &mut result);
100 types_captured = true;
101 }
102 self.collect_chunk(&chunk, &mut result)?;
103 }
104 Ok(None) => break,
105 Err(err) => return Err(convert_operator_error(err)),
106 }
107 }
108
109 Ok(result)
110 }
111
112 pub fn execute_with_limit(
118 &self,
119 operator: &mut dyn Operator,
120 limit: usize,
121 ) -> Result<QueryResult> {
122 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
123 let mut collected = 0;
124 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
125
126 loop {
127 if collected >= limit {
128 break;
129 }
130
131 self.check_deadline()?;
132
133 match operator.next() {
134 Ok(Some(chunk)) => {
135 if !types_captured && chunk.column_count() > 0 {
137 self.capture_column_types(&chunk, &mut result);
138 types_captured = true;
139 }
140 let remaining = limit - collected;
141 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
142 }
143 Ok(None) => break,
144 Err(err) => return Err(convert_operator_error(err)),
145 }
146 }
147
148 Ok(result)
149 }
150
151 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
153 let col_count = chunk.column_count();
154 result.column_types = Vec::with_capacity(col_count);
155 for col_idx in 0..col_count {
156 let col_type = chunk
157 .column(col_idx)
158 .map_or(LogicalType::Any, |col| col.data_type().clone());
159 result.column_types.push(col_type);
160 }
161 }
162
163 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
168 let col_count = chunk.column_count();
169 let mut collected = 0;
170
171 for row_idx in chunk.selected_indices() {
172 let mut row = Vec::with_capacity(col_count);
173 for col_idx in 0..col_count {
174 let value = chunk
175 .column(col_idx)
176 .and_then(|col| col.get_value(row_idx))
177 .unwrap_or(Value::Null);
178 row.push(value);
179 }
180 result.rows.push(row);
181 collected += 1;
182 }
183
184 Ok(collected)
185 }
186
187 fn collect_chunk_limited(
192 &self,
193 chunk: &DataChunk,
194 result: &mut QueryResult,
195 limit: usize,
196 ) -> Result<usize> {
197 let col_count = chunk.column_count();
198 let mut collected = 0;
199
200 for row_idx in chunk.selected_indices() {
201 if collected >= limit {
202 break;
203 }
204 let mut row = Vec::with_capacity(col_count);
205 for col_idx in 0..col_count {
206 let value = chunk
207 .column(col_idx)
208 .and_then(|col| col.get_value(row_idx))
209 .unwrap_or(Value::Null);
210 row.push(value);
211 }
212 result.rows.push(row);
213 collected += 1;
214 }
215
216 Ok(collected)
217 }
218
219 pub fn execute_adaptive(
235 &self,
236 operator: Box<dyn Operator>,
237 adaptive_context: Option<AdaptiveContext>,
238 config: &AdaptiveConfig,
239 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
240 if !config.enabled {
242 let mut op = operator;
243 let result = self.execute(op.as_mut())?;
244 return Ok((result, None));
245 }
246
247 let Some(ctx) = adaptive_context else {
248 let mut op = operator;
249 let result = self.execute(op.as_mut())?;
250 return Ok((result, None));
251 };
252
253 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
255 config.threshold,
256 config.min_rows,
257 ));
258
259 for (op_id, checkpoint) in ctx.all_checkpoints() {
261 if let Some(mut inner) = shared_ctx.snapshot() {
262 inner.set_estimate(op_id, checkpoint.estimated);
263 }
264 }
265
266 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
268
269 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
271 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
272 let mut total_rows: u64 = 0;
273 let check_interval = config.min_rows;
274
275 loop {
276 self.check_deadline()?;
277
278 match wrapped.next() {
279 Ok(Some(chunk)) => {
280 let chunk_rows = chunk.row_count();
281 total_rows += chunk_rows as u64;
282
283 if !types_captured && chunk.column_count() > 0 {
285 self.capture_column_types(&chunk, &mut result);
286 types_captured = true;
287 }
288 self.collect_chunk(&chunk, &mut result)?;
289
290 if total_rows >= check_interval
292 && total_rows.is_multiple_of(check_interval)
293 && shared_ctx.should_reoptimize()
294 {
295 }
299 }
300 Ok(None) => break,
301 Err(err) => return Err(convert_operator_error(err)),
302 }
303 }
304
305 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
307
308 Ok((result, summary))
309 }
310}
311
312impl Default for Executor {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318fn convert_operator_error(err: OperatorError) -> Error {
320 match err {
321 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
322 OperatorError::ColumnNotFound(name) => {
323 Error::InvalidValue(format!("Column not found: {name}"))
324 }
325 OperatorError::Execution(msg) => Error::Internal(msg),
326 OperatorError::ConstraintViolation(msg) => {
327 Error::InvalidValue(format!("Constraint violation: {msg}"))
328 }
329 OperatorError::WriteConflict(msg) => {
330 Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
331 }
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use grafeo_common::types::LogicalType;
339 use grafeo_core::execution::DataChunk;
340
341 struct MockIntOperator {
343 values: Vec<i64>,
344 position: usize,
345 chunk_size: usize,
346 }
347
348 impl MockIntOperator {
349 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
350 Self {
351 values,
352 position: 0,
353 chunk_size,
354 }
355 }
356 }
357
358 impl Operator for MockIntOperator {
359 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
360 if self.position >= self.values.len() {
361 return Ok(None);
362 }
363
364 let end = (self.position + self.chunk_size).min(self.values.len());
365 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
366
367 {
368 let col = chunk.column_mut(0).unwrap();
369 for i in self.position..end {
370 col.push_int64(self.values[i]);
371 }
372 }
373 chunk.set_count(end - self.position);
374 self.position = end;
375
376 Ok(Some(chunk))
377 }
378
379 fn reset(&mut self) {
380 self.position = 0;
381 }
382
383 fn name(&self) -> &'static str {
384 "MockInt"
385 }
386 }
387
388 struct EmptyOperator;
390
391 impl Operator for EmptyOperator {
392 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
393 Ok(None)
394 }
395
396 fn reset(&mut self) {}
397
398 fn name(&self) -> &'static str {
399 "Empty"
400 }
401 }
402
403 #[test]
404 fn test_executor_empty() {
405 let executor = Executor::with_columns(vec!["a".to_string()]);
406 let mut op = EmptyOperator;
407
408 let result = executor.execute(&mut op).unwrap();
409 assert!(result.is_empty());
410 assert_eq!(result.column_count(), 1);
411 }
412
413 #[test]
414 fn test_executor_single_chunk() {
415 let executor = Executor::with_columns(vec!["value".to_string()]);
416 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
417
418 let result = executor.execute(&mut op).unwrap();
419 assert_eq!(result.row_count(), 3);
420 assert_eq!(result.rows[0][0], Value::Int64(1));
421 assert_eq!(result.rows[1][0], Value::Int64(2));
422 assert_eq!(result.rows[2][0], Value::Int64(3));
423 }
424
425 #[test]
426 fn test_executor_with_limit() {
427 let executor = Executor::with_columns(vec!["value".to_string()]);
428 let mut op = MockIntOperator::new((0..10).collect(), 100);
429
430 let result = executor.execute_with_limit(&mut op, 5).unwrap();
431 assert_eq!(result.row_count(), 5);
432 }
433
434 #[test]
435 fn test_executor_timeout_expired() {
436 use std::time::{Duration, Instant};
437
438 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
440 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
441 ));
442 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
443
444 let result = executor.execute(&mut op);
445 assert!(result.is_err());
446 let err = result.unwrap_err();
447 assert!(
448 err.to_string().contains("Query exceeded timeout"),
449 "Expected timeout error, got: {err}"
450 );
451 }
452
453 #[test]
454 fn test_executor_no_timeout() {
455 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
457 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
458
459 let result = executor.execute(&mut op).unwrap();
460 assert_eq!(result.row_count(), 3);
461 }
462}