grafeo_engine/query/executor/
mod.rs1use crate::config::AdaptiveConfig;
6use crate::database::QueryResult;
7use grafeo_common::types::{LogicalType, Value};
8use grafeo_common::utils::error::{Error, Result};
9use grafeo_core::execution::operators::{Operator, OperatorError};
10use grafeo_core::execution::{
11 AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
12};
13
14pub struct Executor {
16 columns: Vec<String>,
18 column_types: Vec<LogicalType>,
20}
21
22impl Executor {
23 #[must_use]
25 pub fn new() -> Self {
26 Self {
27 columns: Vec::new(),
28 column_types: Vec::new(),
29 }
30 }
31
32 #[must_use]
34 pub fn with_columns(columns: Vec<String>) -> Self {
35 let len = columns.len();
36 Self {
37 columns,
38 column_types: vec![LogicalType::Any; len],
39 }
40 }
41
42 #[must_use]
44 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
45 Self {
46 columns,
47 column_types,
48 }
49 }
50
51 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
57 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
58 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
59
60 loop {
61 match operator.next() {
62 Ok(Some(chunk)) => {
63 if !types_captured && chunk.column_count() > 0 {
65 self.capture_column_types(&chunk, &mut result);
66 types_captured = true;
67 }
68 self.collect_chunk(&chunk, &mut result)?;
69 }
70 Ok(None) => break,
71 Err(err) => return Err(convert_operator_error(err)),
72 }
73 }
74
75 Ok(result)
76 }
77
78 pub fn execute_with_limit(
84 &self,
85 operator: &mut dyn Operator,
86 limit: usize,
87 ) -> Result<QueryResult> {
88 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
89 let mut collected = 0;
90 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
91
92 loop {
93 if collected >= limit {
94 break;
95 }
96
97 match operator.next() {
98 Ok(Some(chunk)) => {
99 if !types_captured && chunk.column_count() > 0 {
101 self.capture_column_types(&chunk, &mut result);
102 types_captured = true;
103 }
104 let remaining = limit - collected;
105 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
106 }
107 Ok(None) => break,
108 Err(err) => return Err(convert_operator_error(err)),
109 }
110 }
111
112 Ok(result)
113 }
114
115 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
117 let col_count = chunk.column_count();
118 result.column_types = Vec::with_capacity(col_count);
119 for col_idx in 0..col_count {
120 let col_type = chunk
121 .column(col_idx)
122 .map(|col| col.data_type().clone())
123 .unwrap_or(LogicalType::Any);
124 result.column_types.push(col_type);
125 }
126 }
127
128 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
133 let col_count = chunk.column_count();
134 let mut collected = 0;
135
136 for row_idx in chunk.selected_indices() {
137 let mut row = Vec::with_capacity(col_count);
138 for col_idx in 0..col_count {
139 let value = chunk
140 .column(col_idx)
141 .and_then(|col| col.get_value(row_idx))
142 .unwrap_or(Value::Null);
143 row.push(value);
144 }
145 result.rows.push(row);
146 collected += 1;
147 }
148
149 Ok(collected)
150 }
151
152 fn collect_chunk_limited(
157 &self,
158 chunk: &DataChunk,
159 result: &mut QueryResult,
160 limit: usize,
161 ) -> Result<usize> {
162 let col_count = chunk.column_count();
163 let mut collected = 0;
164
165 for row_idx in chunk.selected_indices() {
166 if collected >= limit {
167 break;
168 }
169 let mut row = Vec::with_capacity(col_count);
170 for col_idx in 0..col_count {
171 let value = chunk
172 .column(col_idx)
173 .and_then(|col| col.get_value(row_idx))
174 .unwrap_or(Value::Null);
175 row.push(value);
176 }
177 result.rows.push(row);
178 collected += 1;
179 }
180
181 Ok(collected)
182 }
183
184 pub fn execute_adaptive(
200 &self,
201 operator: Box<dyn Operator>,
202 adaptive_context: Option<AdaptiveContext>,
203 config: &AdaptiveConfig,
204 ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
205 if !config.enabled {
207 let mut op = operator;
208 let result = self.execute(op.as_mut())?;
209 return Ok((result, None));
210 }
211
212 let ctx = match adaptive_context {
213 Some(ctx) => ctx,
214 None => {
215 let mut op = operator;
216 let result = self.execute(op.as_mut())?;
217 return Ok((result, None));
218 }
219 };
220
221 let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
223 config.threshold,
224 config.min_rows,
225 ));
226
227 for (op_id, checkpoint) in ctx.all_checkpoints() {
229 if let Some(mut inner) = shared_ctx.snapshot() {
230 inner.set_estimate(op_id, checkpoint.estimated);
231 }
232 }
233
234 let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
236
237 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
239 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
240 let mut total_rows: u64 = 0;
241 let check_interval = config.min_rows;
242
243 loop {
244 match wrapped.next() {
245 Ok(Some(chunk)) => {
246 let chunk_rows = chunk.row_count();
247 total_rows += chunk_rows as u64;
248
249 if !types_captured && chunk.column_count() > 0 {
251 self.capture_column_types(&chunk, &mut result);
252 types_captured = true;
253 }
254 self.collect_chunk(&chunk, &mut result)?;
255
256 if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
258 if shared_ctx.should_reoptimize() {
259 }
263 }
264 }
265 Ok(None) => break,
266 Err(err) => return Err(convert_operator_error(err)),
267 }
268 }
269
270 let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
272
273 Ok((result, summary))
274 }
275}
276
277impl Default for Executor {
278 fn default() -> Self {
279 Self::new()
280 }
281}
282
283fn convert_operator_error(err: OperatorError) -> Error {
285 match err {
286 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
287 OperatorError::ColumnNotFound(name) => {
288 Error::InvalidValue(format!("Column not found: {name}"))
289 }
290 OperatorError::Execution(msg) => Error::Internal(msg),
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use grafeo_common::types::LogicalType;
298 use grafeo_core::execution::DataChunk;
299
300 struct MockIntOperator {
302 values: Vec<i64>,
303 position: usize,
304 chunk_size: usize,
305 }
306
307 impl MockIntOperator {
308 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
309 Self {
310 values,
311 position: 0,
312 chunk_size,
313 }
314 }
315 }
316
317 impl Operator for MockIntOperator {
318 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
319 if self.position >= self.values.len() {
320 return Ok(None);
321 }
322
323 let end = (self.position + self.chunk_size).min(self.values.len());
324 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
325
326 {
327 let col = chunk.column_mut(0).unwrap();
328 for i in self.position..end {
329 col.push_int64(self.values[i]);
330 }
331 }
332 chunk.set_count(end - self.position);
333 self.position = end;
334
335 Ok(Some(chunk))
336 }
337
338 fn reset(&mut self) {
339 self.position = 0;
340 }
341
342 fn name(&self) -> &'static str {
343 "MockInt"
344 }
345 }
346
347 struct EmptyOperator;
349
350 impl Operator for EmptyOperator {
351 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
352 Ok(None)
353 }
354
355 fn reset(&mut self) {}
356
357 fn name(&self) -> &'static str {
358 "Empty"
359 }
360 }
361
362 #[test]
363 fn test_executor_empty() {
364 let executor = Executor::with_columns(vec!["a".to_string()]);
365 let mut op = EmptyOperator;
366
367 let result = executor.execute(&mut op).unwrap();
368 assert!(result.is_empty());
369 assert_eq!(result.column_count(), 1);
370 }
371
372 #[test]
373 fn test_executor_single_chunk() {
374 let executor = Executor::with_columns(vec!["value".to_string()]);
375 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
376
377 let result = executor.execute(&mut op).unwrap();
378 assert_eq!(result.row_count(), 3);
379 assert_eq!(result.rows[0][0], Value::Int64(1));
380 assert_eq!(result.rows[1][0], Value::Int64(2));
381 assert_eq!(result.rows[2][0], Value::Int64(3));
382 }
383
384 #[test]
385 fn test_executor_with_limit() {
386 let executor = Executor::with_columns(vec!["value".to_string()]);
387 let mut op = MockIntOperator::new((0..10).collect(), 100);
388
389 let result = executor.execute_with_limit(&mut op, 5).unwrap();
390 assert_eq!(result.row_count(), 5);
391 }
392}