grafeo_engine/query/executor/
mod.rs1use crate::database::QueryResult;
6use grafeo_common::types::{LogicalType, Value};
7use grafeo_common::utils::error::{Error, Result};
8use grafeo_core::execution::DataChunk;
9use grafeo_core::execution::operators::{Operator, OperatorError};
10
11pub struct Executor {
13 columns: Vec<String>,
15 column_types: Vec<LogicalType>,
17}
18
19impl Executor {
20 #[must_use]
22 pub fn new() -> Self {
23 Self {
24 columns: Vec::new(),
25 column_types: Vec::new(),
26 }
27 }
28
29 #[must_use]
31 pub fn with_columns(columns: Vec<String>) -> Self {
32 let len = columns.len();
33 Self {
34 columns,
35 column_types: vec![LogicalType::Any; len],
36 }
37 }
38
39 #[must_use]
41 pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
42 Self {
43 columns,
44 column_types,
45 }
46 }
47
48 pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
54 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
55 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
56
57 loop {
58 match operator.next() {
59 Ok(Some(chunk)) => {
60 if !types_captured && chunk.column_count() > 0 {
62 self.capture_column_types(&chunk, &mut result);
63 types_captured = true;
64 }
65 self.collect_chunk(&chunk, &mut result)?;
66 }
67 Ok(None) => break,
68 Err(err) => return Err(convert_operator_error(err)),
69 }
70 }
71
72 Ok(result)
73 }
74
75 pub fn execute_with_limit(
81 &self,
82 operator: &mut dyn Operator,
83 limit: usize,
84 ) -> Result<QueryResult> {
85 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
86 let mut collected = 0;
87 let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
88
89 loop {
90 if collected >= limit {
91 break;
92 }
93
94 match operator.next() {
95 Ok(Some(chunk)) => {
96 if !types_captured && chunk.column_count() > 0 {
98 self.capture_column_types(&chunk, &mut result);
99 types_captured = true;
100 }
101 let remaining = limit - collected;
102 collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
103 }
104 Ok(None) => break,
105 Err(err) => return Err(convert_operator_error(err)),
106 }
107 }
108
109 Ok(result)
110 }
111
112 fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
114 let col_count = chunk.column_count();
115 result.column_types = Vec::with_capacity(col_count);
116 for col_idx in 0..col_count {
117 let col_type = chunk
118 .column(col_idx)
119 .map(|col| col.data_type().clone())
120 .unwrap_or(LogicalType::Any);
121 result.column_types.push(col_type);
122 }
123 }
124
125 fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
127 let row_count = chunk.row_count();
128 let col_count = chunk.column_count();
129
130 for row_idx in 0..row_count {
131 let mut row = Vec::with_capacity(col_count);
132 for col_idx in 0..col_count {
133 let value = chunk
134 .column(col_idx)
135 .and_then(|col| col.get_value(row_idx))
136 .unwrap_or(Value::Null);
137 row.push(value);
138 }
139 result.rows.push(row);
140 }
141
142 Ok(row_count)
143 }
144
145 fn collect_chunk_limited(
147 &self,
148 chunk: &DataChunk,
149 result: &mut QueryResult,
150 limit: usize,
151 ) -> Result<usize> {
152 let row_count = chunk.row_count().min(limit);
153 let col_count = chunk.column_count();
154
155 for row_idx in 0..row_count {
156 let mut row = Vec::with_capacity(col_count);
157 for col_idx in 0..col_count {
158 let value = chunk
159 .column(col_idx)
160 .and_then(|col| col.get_value(row_idx))
161 .unwrap_or(Value::Null);
162 row.push(value);
163 }
164 result.rows.push(row);
165 }
166
167 Ok(row_count)
168 }
169}
170
171impl Default for Executor {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177fn convert_operator_error(err: OperatorError) -> Error {
179 match err {
180 OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
181 OperatorError::ColumnNotFound(name) => {
182 Error::InvalidValue(format!("Column not found: {name}"))
183 }
184 OperatorError::Execution(msg) => Error::Internal(msg),
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use grafeo_common::types::LogicalType;
192 use grafeo_core::execution::DataChunk;
193
194 struct MockIntOperator {
196 values: Vec<i64>,
197 position: usize,
198 chunk_size: usize,
199 }
200
201 impl MockIntOperator {
202 fn new(values: Vec<i64>, chunk_size: usize) -> Self {
203 Self {
204 values,
205 position: 0,
206 chunk_size,
207 }
208 }
209 }
210
211 impl Operator for MockIntOperator {
212 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
213 if self.position >= self.values.len() {
214 return Ok(None);
215 }
216
217 let end = (self.position + self.chunk_size).min(self.values.len());
218 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
219
220 {
221 let col = chunk.column_mut(0).unwrap();
222 for i in self.position..end {
223 col.push_int64(self.values[i]);
224 }
225 }
226 chunk.set_count(end - self.position);
227 self.position = end;
228
229 Ok(Some(chunk))
230 }
231
232 fn reset(&mut self) {
233 self.position = 0;
234 }
235
236 fn name(&self) -> &'static str {
237 "MockInt"
238 }
239 }
240
241 struct EmptyOperator;
243
244 impl Operator for EmptyOperator {
245 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
246 Ok(None)
247 }
248
249 fn reset(&mut self) {}
250
251 fn name(&self) -> &'static str {
252 "Empty"
253 }
254 }
255
256 #[test]
257 fn test_executor_empty() {
258 let executor = Executor::with_columns(vec!["a".to_string()]);
259 let mut op = EmptyOperator;
260
261 let result = executor.execute(&mut op).unwrap();
262 assert!(result.is_empty());
263 assert_eq!(result.column_count(), 1);
264 }
265
266 #[test]
267 fn test_executor_single_chunk() {
268 let executor = Executor::with_columns(vec!["value".to_string()]);
269 let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
270
271 let result = executor.execute(&mut op).unwrap();
272 assert_eq!(result.row_count(), 3);
273 assert_eq!(result.rows[0][0], Value::Int64(1));
274 assert_eq!(result.rows[1][0], Value::Int64(2));
275 assert_eq!(result.rows[2][0], Value::Int64(3));
276 }
277
278 #[test]
279 fn test_executor_with_limit() {
280 let executor = Executor::with_columns(vec!["value".to_string()]);
281 let mut op = MockIntOperator::new((0..10).collect(), 100);
282
283 let result = executor.execute_with_limit(&mut op, 5).unwrap();
284 assert_eq!(result.row_count(), 5);
285 }
286}