ruvector_graph/executor/
pipeline.rs1use crate::executor::operators::Operator;
6use crate::executor::plan::Value;
7use crate::executor::plan::{PhysicalPlan, QuerySchema};
8use crate::executor::{ExecutionError, Result};
9use std::collections::HashMap;
10
11const DEFAULT_BATCH_SIZE: usize = 1024;
13
14#[derive(Debug, Clone)]
16pub struct RowBatch {
17 pub rows: Vec<HashMap<String, Value>>,
18 pub schema: QuerySchema,
19}
20
21impl RowBatch {
22 pub fn new(schema: QuerySchema) -> Self {
24 Self {
25 rows: Vec::with_capacity(DEFAULT_BATCH_SIZE),
26 schema,
27 }
28 }
29
30 pub fn with_rows(rows: Vec<HashMap<String, Value>>, schema: QuerySchema) -> Self {
32 Self { rows, schema }
33 }
34
35 pub fn add_row(&mut self, row: HashMap<String, Value>) {
37 self.rows.push(row);
38 }
39
40 pub fn is_full(&self) -> bool {
42 self.rows.len() >= DEFAULT_BATCH_SIZE
43 }
44
45 pub fn len(&self) -> usize {
47 self.rows.len()
48 }
49
50 pub fn is_empty(&self) -> bool {
52 self.rows.is_empty()
53 }
54
55 pub fn clear(&mut self) {
57 self.rows.clear();
58 }
59
60 pub fn merge(&mut self, other: RowBatch) {
62 self.rows.extend(other.rows);
63 }
64}
65
66pub struct ExecutionContext {
68 pub memory_limit: usize,
70 pub memory_used: usize,
72 pub batch_size: usize,
74 pub enable_profiling: bool,
76}
77
78impl ExecutionContext {
79 pub fn new() -> Self {
81 Self {
82 memory_limit: 1024 * 1024 * 1024, memory_used: 0,
84 batch_size: DEFAULT_BATCH_SIZE,
85 enable_profiling: false,
86 }
87 }
88
89 pub fn with_memory_limit(memory_limit: usize) -> Self {
91 Self {
92 memory_limit,
93 memory_used: 0,
94 batch_size: DEFAULT_BATCH_SIZE,
95 enable_profiling: false,
96 }
97 }
98
99 pub fn check_memory(&self) -> Result<()> {
101 if self.memory_used > self.memory_limit {
102 Err(ExecutionError::ResourceExhausted(format!(
103 "Memory limit exceeded: {} > {}",
104 self.memory_used, self.memory_limit
105 )))
106 } else {
107 Ok(())
108 }
109 }
110
111 pub fn allocate(&mut self, bytes: usize) -> Result<()> {
113 self.memory_used += bytes;
114 self.check_memory()
115 }
116
117 pub fn free(&mut self, bytes: usize) {
119 self.memory_used = self.memory_used.saturating_sub(bytes);
120 }
121}
122
123impl Default for ExecutionContext {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129pub struct Pipeline {
131 plan: PhysicalPlan,
132 operators: Vec<Box<dyn Operator>>,
133 current_operator: usize,
134 context: ExecutionContext,
135 finished: bool,
136}
137
138impl Pipeline {
139 pub fn new(mut plan: PhysicalPlan) -> Self {
141 let operators = std::mem::take(&mut plan.operators);
142 Self {
143 operators,
144 plan,
145 current_operator: 0,
146 context: ExecutionContext::new(),
147 finished: false,
148 }
149 }
150
151 pub fn with_context(mut plan: PhysicalPlan, context: ExecutionContext) -> Self {
153 let operators = std::mem::take(&mut plan.operators);
154 Self {
155 operators,
156 plan,
157 current_operator: 0,
158 context,
159 finished: false,
160 }
161 }
162
163 pub fn next(&mut self) -> Result<Option<RowBatch>> {
165 if self.finished {
166 return Ok(None);
167 }
168
169 let result = self.execute_pipeline()?;
171
172 if result.is_none() {
173 self.finished = true;
174 }
175
176 Ok(result)
177 }
178
179 fn execute_pipeline(&mut self) -> Result<Option<RowBatch>> {
181 if self.operators.is_empty() {
182 return Ok(None);
183 }
184
185 let mut current_batch = self.operators[0].execute(None)?;
187
188 for operator in &mut self.operators[1..] {
190 if let Some(batch) = current_batch {
191 current_batch = operator.execute(Some(batch))?;
192 } else {
193 return Ok(None);
194 }
195 }
196
197 Ok(current_batch)
198 }
199
200 pub fn reset(&mut self) {
202 self.current_operator = 0;
203 self.finished = false;
204 self.context = ExecutionContext::new();
205 }
206
207 pub fn context(&self) -> &ExecutionContext {
209 &self.context
210 }
211
212 pub fn context_mut(&mut self) -> &mut ExecutionContext {
214 &mut self.context
215 }
216}
217
218pub struct PipelineBuilder {
220 operators: Vec<Box<dyn Operator>>,
221 context: ExecutionContext,
222}
223
224impl PipelineBuilder {
225 pub fn new() -> Self {
227 Self {
228 operators: Vec::new(),
229 context: ExecutionContext::new(),
230 }
231 }
232
233 pub fn add_operator(mut self, operator: Box<dyn Operator>) -> Self {
235 self.operators.push(operator);
236 self
237 }
238
239 pub fn with_context(mut self, context: ExecutionContext) -> Self {
241 self.context = context;
242 self
243 }
244
245 pub fn build(self) -> Pipeline {
247 let plan = PhysicalPlan {
248 operators: self.operators,
249 pipeline_breakers: Vec::new(),
250 parallelism: 1,
251 };
252
253 Pipeline::with_context(plan, self.context)
254 }
255}
256
257impl Default for PipelineBuilder {
258 fn default() -> Self {
259 Self::new()
260 }
261}
262
263pub struct PipelineIterator {
265 pipeline: Pipeline,
266}
267
268impl PipelineIterator {
269 pub fn new(pipeline: Pipeline) -> Self {
270 Self { pipeline }
271 }
272}
273
274impl Iterator for PipelineIterator {
275 type Item = Result<RowBatch>;
276
277 fn next(&mut self) -> Option<Self::Item> {
278 match self.pipeline.next() {
279 Ok(Some(batch)) => Some(Ok(batch)),
280 Ok(None) => None,
281 Err(e) => Some(Err(e)),
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::executor::plan::ColumnDef;
290 use crate::executor::plan::DataType;
291
292 #[test]
293 fn test_row_batch() {
294 let schema = QuerySchema::new(vec![ColumnDef {
295 name: "id".to_string(),
296 data_type: DataType::Int64,
297 nullable: false,
298 }]);
299
300 let mut batch = RowBatch::new(schema);
301 assert!(batch.is_empty());
302
303 let mut row = HashMap::new();
304 row.insert("id".to_string(), Value::Int64(1));
305 batch.add_row(row);
306
307 assert_eq!(batch.len(), 1);
308 assert!(!batch.is_empty());
309 }
310
311 #[test]
312 fn test_execution_context() {
313 let mut ctx = ExecutionContext::new();
314 assert_eq!(ctx.memory_used, 0);
315
316 ctx.allocate(1024).unwrap();
317 assert_eq!(ctx.memory_used, 1024);
318
319 ctx.free(512);
320 assert_eq!(ctx.memory_used, 512);
321 }
322
323 #[test]
324 fn test_memory_limit() {
325 let mut ctx = ExecutionContext::with_memory_limit(1000);
326 assert!(ctx.allocate(500).is_ok());
327 assert!(ctx.allocate(600).is_err());
328 }
329
330 #[test]
331 fn test_pipeline_builder() {
332 let builder = PipelineBuilder::new();
333 let pipeline = builder.build();
334 assert_eq!(pipeline.operators.len(), 0);
335 }
336}