grafeo_engine/query/executor/
mod.rs1#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::types::{LogicalType, Value};
15use grafeo_common::utils::error::{Error, QueryError, Result};
16use grafeo_core::execution::operators::{Operator, OperatorError};
17use grafeo_core::execution::{
18 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
19};
20
21pub struct Executor {
23 columns: Vec<String>,
25 column_types: Vec<LogicalType>,
27 deadline: Option<Instant>,
29}
30
31impl Executor {
32 #[must_use]
34 pub fn new() -> Self {
35 Self {
36 columns: Vec::new(),
37 column_types: Vec::new(),
38 deadline: None,
39 }
40 }
41
42 #[must_use]
44 pub fn with_columns(columns: Vec<String>) -> Self {
45 let len = columns.len();
46 Self {
47 columns,
48 column_types: vec![LogicalType::Any; len],
49 deadline: None,
50 }
51 }
52
53 #[must_use]
55 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
56 Self {
57 columns,
58 column_types,
59 deadline: None,
60 }
61 }
62
63 #[must_use]
65 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
66 self.deadline = deadline;
67 self
68 }
69
70 fn check_deadline(&self) -> Result<()> {
72 #[cfg(not(target_arch = "wasm32"))]
73 if let Some(deadline) = self.deadline
74 && Instant::now() >= deadline
75 {
76 return Err(Error::Query(QueryError::timeout()));
77 }
78 Ok(())
79 }
80
81 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
87 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
88 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
89
90 loop {
91 self.check_deadline()?;
92
93 match operator.next() {
94 Ok(Some(chunk)) => {
95 if !types_captured && chunk.column_count() > 0 {
97 self.capture_column_types(&chunk, &mut result);
98 types_captured = true;
99 }
100 self.collect_chunk(&chunk, &mut result)?;
101 }
102 Ok(None) => break,
103 Err(err) => return Err(convert_operator_error(err)),
104 }
105 }
106
107 Ok(result)
108 }
109
110 pub fn execute_with_limit(
116 &self,
117 operator: &mut dyn Operator,
118 limit: usize,
119 ) -> Result<QueryResult> {
120 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
121 let mut collected = 0;
122 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
123
124 loop {
125 if collected >= limit {
126 break;
127 }
128
129 self.check_deadline()?;
130
131 match operator.next() {
132 Ok(Some(chunk)) => {
133 if !types_captured && chunk.column_count() > 0 {
135 self.capture_column_types(&chunk, &mut result);
136 types_captured = true;
137 }
138 let remaining = limit - collected;
139 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
140 }
141 Ok(None) => break,
142 Err(err) => return Err(convert_operator_error(err)),
143 }
144 }
145
146 Ok(result)
147 }
148
149 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
151 let col_count = chunk.column_count();
152 result.column_types = Vec::with_capacity(col_count);
153 for col_idx in 0..col_count {
154 let col_type = chunk
155 .column(col_idx)
156 .map_or(LogicalType::Any, |col| col.data_type().clone());
157 result.column_types.push(col_type);
158 }
159 }
160
161 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
166 let col_count = chunk.column_count();
167 let mut collected = 0;
168
169 for row_idx in chunk.selected_indices() {
170 let mut row = Vec::with_capacity(col_count);
171 for col_idx in 0..col_count {
172 let value = chunk
173 .column(col_idx)
174 .and_then(|col| col.get_value(row_idx))
175 .unwrap_or(Value::Null);
176 row.push(value);
177 }
178 result.rows.push(row);
179 collected += 1;
180 }
181
182 Ok(collected)
183 }
184
185 fn collect_chunk_limited(
190 &self,
191 chunk: &DataChunk,
192 result: &mut QueryResult,
193 limit: usize,
194 ) -> Result<usize> {
195 let col_count = chunk.column_count();
196 let mut collected = 0;
197
198 for row_idx in chunk.selected_indices() {
199 if collected >= limit {
200 break;
201 }
202 let mut row = Vec::with_capacity(col_count);
203 for col_idx in 0..col_count {
204 let value = chunk
205 .column(col_idx)
206 .and_then(|col| col.get_value(row_idx))
207 .unwrap_or(Value::Null);
208 row.push(value);
209 }
210 result.rows.push(row);
211 collected += 1;
212 }
213
214 Ok(collected)
215 }
216
217 pub fn execute_adaptive(
233 &self,
234 operator: Box<dyn Operator>,
235 adaptive_context: Option<AdaptiveContext>,
236 config: &AdaptiveConfig,
237 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
238 if !config.enabled {
240 let mut op = operator;
241 let result = self.execute(op.as_mut())?;
242 return Ok((result, None));
243 }
244
245 let Some(ctx) = adaptive_context else {
246 let mut op = operator;
247 let result = self.execute(op.as_mut())?;
248 return Ok((result, None));
249 };
250
251 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
253 config.threshold,
254 config.min_rows,
255 ));
256
257 for (op_id, checkpoint) in ctx.all_checkpoints() {
259 if let Some(mut inner) = shared_ctx.snapshot() {
260 inner.set_estimate(op_id, checkpoint.estimated);
261 }
262 }
263
264 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
266
267 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
269 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
270 let mut total_rows: u64 = 0;
271 let check_interval = config.min_rows;
272
273 loop {
274 self.check_deadline()?;
275
276 match wrapped.next() {
277 Ok(Some(chunk)) => {
278 let chunk_rows = chunk.row_count();
279 total_rows += chunk_rows as u64;
280
281 if !types_captured && chunk.column_count() > 0 {
283 self.capture_column_types(&chunk, &mut result);
284 types_captured = true;
285 }
286 self.collect_chunk(&chunk, &mut result)?;
287
288 if total_rows >= check_interval
290 && total_rows.is_multiple_of(check_interval)
291 && shared_ctx.should_reoptimize()
292 {
293 }
297 }
298 Ok(None) => break,
299 Err(err) => return Err(convert_operator_error(err)),
300 }
301 }
302
303 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
305
306 Ok((result, summary))
307 }
308}
309
310impl Default for Executor {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316fn convert_operator_error(err: OperatorError) -> Error {
318 match err {
319 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
320 OperatorError::ColumnNotFound(name) => {
321 Error::InvalidValue(format!("Column not found: {name}"))
322 }
323 OperatorError::Execution(msg) => Error::Internal(msg),
324 OperatorError::ConstraintViolation(msg) => {
325 Error::InvalidValue(format!("Constraint violation: {msg}"))
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use grafeo_common::types::LogicalType;
334 use grafeo_core::execution::DataChunk;
335
336 struct MockIntOperator {
338 values: Vec<i64>,
339 position: usize,
340 chunk_size: usize,
341 }
342
343 impl MockIntOperator {
344 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
345 Self {
346 values,
347 position: 0,
348 chunk_size,
349 }
350 }
351 }
352
353 impl Operator for MockIntOperator {
354 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
355 if self.position >= self.values.len() {
356 return Ok(None);
357 }
358
359 let end = (self.position + self.chunk_size).min(self.values.len());
360 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
361
362 {
363 let col = chunk.column_mut(0).unwrap();
364 for i in self.position..end {
365 col.push_int64(self.values[i]);
366 }
367 }
368 chunk.set_count(end - self.position);
369 self.position = end;
370
371 Ok(Some(chunk))
372 }
373
374 fn reset(&mut self) {
375 self.position = 0;
376 }
377
378 fn name(&self) -> &'static str {
379 "MockInt"
380 }
381 }
382
383 struct EmptyOperator;
385
386 impl Operator for EmptyOperator {
387 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
388 Ok(None)
389 }
390
391 fn reset(&mut self) {}
392
393 fn name(&self) -> &'static str {
394 "Empty"
395 }
396 }
397
398 #[test]
399 fn test_executor_empty() {
400 let executor = Executor::with_columns(vec!["a".to_string()]);
401 let mut op = EmptyOperator;
402
403 let result = executor.execute(&mut op).unwrap();
404 assert!(result.is_empty());
405 assert_eq!(result.column_count(), 1);
406 }
407
408 #[test]
409 fn test_executor_single_chunk() {
410 let executor = Executor::with_columns(vec!["value".to_string()]);
411 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
412
413 let result = executor.execute(&mut op).unwrap();
414 assert_eq!(result.row_count(), 3);
415 assert_eq!(result.rows[0][0], Value::Int64(1));
416 assert_eq!(result.rows[1][0], Value::Int64(2));
417 assert_eq!(result.rows[2][0], Value::Int64(3));
418 }
419
420 #[test]
421 fn test_executor_with_limit() {
422 let executor = Executor::with_columns(vec!["value".to_string()]);
423 let mut op = MockIntOperator::new((0..10).collect(), 100);
424
425 let result = executor.execute_with_limit(&mut op, 5).unwrap();
426 assert_eq!(result.row_count(), 5);
427 }
428
429 #[test]
430 fn test_executor_timeout_expired() {
431 use std::time::{Duration, Instant};
432
433 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
435 Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
436 ));
437 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
438
439 let result = executor.execute(&mut op);
440 assert!(result.is_err());
441 let err = result.unwrap_err();
442 assert!(
443 err.to_string().contains("Query exceeded timeout"),
444 "Expected timeout error, got: {err}"
445 );
446 }
447
448 #[test]
449 fn test_executor_no_timeout() {
450 let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
452 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
453
454 let result = executor.execute(&mut op).unwrap();
455 assert_eq!(result.row_count(), 3);
456 }
457}