ruvector_graph/executor/
pipeline.rs

1//! Pipeline execution model with Volcano-style iterators
2//!
3//! Implements pull-based query execution with row batching
4
5use crate::executor::operators::Operator;
6use crate::executor::plan::Value;
7use crate::executor::plan::{PhysicalPlan, QuerySchema};
8use crate::executor::{ExecutionError, Result};
9use std::collections::HashMap;
10
11/// Batch size for vectorized execution
12const DEFAULT_BATCH_SIZE: usize = 1024;
13
14/// Row batch for vectorized processing
15#[derive(Debug, Clone)]
16pub struct RowBatch {
17    pub rows: Vec<HashMap<String, Value>>,
18    pub schema: QuerySchema,
19}
20
21impl RowBatch {
22    /// Create a new row batch
23    pub fn new(schema: QuerySchema) -> Self {
24        Self {
25            rows: Vec::with_capacity(DEFAULT_BATCH_SIZE),
26            schema,
27        }
28    }
29
30    /// Create batch with rows
31    pub fn with_rows(rows: Vec<HashMap<String, Value>>, schema: QuerySchema) -> Self {
32        Self { rows, schema }
33    }
34
35    /// Add a row to the batch
36    pub fn add_row(&mut self, row: HashMap<String, Value>) {
37        self.rows.push(row);
38    }
39
40    /// Check if batch is full
41    pub fn is_full(&self) -> bool {
42        self.rows.len() >= DEFAULT_BATCH_SIZE
43    }
44
45    /// Get number of rows
46    pub fn len(&self) -> usize {
47        self.rows.len()
48    }
49
50    /// Check if batch is empty
51    pub fn is_empty(&self) -> bool {
52        self.rows.is_empty()
53    }
54
55    /// Clear the batch
56    pub fn clear(&mut self) {
57        self.rows.clear();
58    }
59
60    /// Merge another batch into this one
61    pub fn merge(&mut self, other: RowBatch) {
62        self.rows.extend(other.rows);
63    }
64}
65
66/// Execution context for query pipeline
67pub struct ExecutionContext {
68    /// Memory limit for execution
69    pub memory_limit: usize,
70    /// Current memory usage
71    pub memory_used: usize,
72    /// Batch size
73    pub batch_size: usize,
74    /// Enable query profiling
75    pub enable_profiling: bool,
76}
77
78impl ExecutionContext {
79    /// Create new execution context
80    pub fn new() -> Self {
81        Self {
82            memory_limit: 1024 * 1024 * 1024, // 1GB default
83            memory_used: 0,
84            batch_size: DEFAULT_BATCH_SIZE,
85            enable_profiling: false,
86        }
87    }
88
89    /// Create with custom memory limit
90    pub fn with_memory_limit(memory_limit: usize) -> Self {
91        Self {
92            memory_limit,
93            memory_used: 0,
94            batch_size: DEFAULT_BATCH_SIZE,
95            enable_profiling: false,
96        }
97    }
98
99    /// Check if memory limit exceeded
100    pub fn check_memory(&self) -> Result<()> {
101        if self.memory_used > self.memory_limit {
102            Err(ExecutionError::ResourceExhausted(format!(
103                "Memory limit exceeded: {} > {}",
104                self.memory_used, self.memory_limit
105            )))
106        } else {
107            Ok(())
108        }
109    }
110
111    /// Allocate memory
112    pub fn allocate(&mut self, bytes: usize) -> Result<()> {
113        self.memory_used += bytes;
114        self.check_memory()
115    }
116
117    /// Free memory
118    pub fn free(&mut self, bytes: usize) {
119        self.memory_used = self.memory_used.saturating_sub(bytes);
120    }
121}
122
123impl Default for ExecutionContext {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129/// Pipeline executor using Volcano iterator model
130pub struct Pipeline {
131    plan: PhysicalPlan,
132    operators: Vec<Box<dyn Operator>>,
133    current_operator: usize,
134    context: ExecutionContext,
135    finished: bool,
136}
137
138impl Pipeline {
139    /// Create a new pipeline from physical plan (takes ownership of operators)
140    pub fn new(mut plan: PhysicalPlan) -> Self {
141        let operators = std::mem::take(&mut plan.operators);
142        Self {
143            operators,
144            plan,
145            current_operator: 0,
146            context: ExecutionContext::new(),
147            finished: false,
148        }
149    }
150
151    /// Create pipeline with custom context (takes ownership of operators)
152    pub fn with_context(mut plan: PhysicalPlan, context: ExecutionContext) -> Self {
153        let operators = std::mem::take(&mut plan.operators);
154        Self {
155            operators,
156            plan,
157            current_operator: 0,
158            context,
159            finished: false,
160        }
161    }
162
163    /// Get next batch from pipeline
164    pub fn next(&mut self) -> Result<Option<RowBatch>> {
165        if self.finished {
166            return Ok(None);
167        }
168
169        // Execute pipeline in pull-based fashion
170        let result = self.execute_pipeline()?;
171
172        if result.is_none() {
173            self.finished = true;
174        }
175
176        Ok(result)
177    }
178
179    /// Execute the full pipeline
180    fn execute_pipeline(&mut self) -> Result<Option<RowBatch>> {
181        if self.operators.is_empty() {
182            return Ok(None);
183        }
184
185        // Start with the first operator (scan)
186        let mut current_batch = self.operators[0].execute(None)?;
187
188        // Pipeline the batch through remaining operators
189        for operator in &mut self.operators[1..] {
190            if let Some(batch) = current_batch {
191                current_batch = operator.execute(Some(batch))?;
192            } else {
193                return Ok(None);
194            }
195        }
196
197        Ok(current_batch)
198    }
199
200    /// Reset pipeline for re-execution
201    pub fn reset(&mut self) {
202        self.current_operator = 0;
203        self.finished = false;
204        self.context = ExecutionContext::new();
205    }
206
207    /// Get execution context
208    pub fn context(&self) -> &ExecutionContext {
209        &self.context
210    }
211
212    /// Get mutable execution context
213    pub fn context_mut(&mut self) -> &mut ExecutionContext {
214        &mut self.context
215    }
216}
217
218/// Pipeline builder for constructing execution pipelines
219pub struct PipelineBuilder {
220    operators: Vec<Box<dyn Operator>>,
221    context: ExecutionContext,
222}
223
224impl PipelineBuilder {
225    /// Create a new pipeline builder
226    pub fn new() -> Self {
227        Self {
228            operators: Vec::new(),
229            context: ExecutionContext::new(),
230        }
231    }
232
233    /// Add an operator to the pipeline
234    pub fn add_operator(mut self, operator: Box<dyn Operator>) -> Self {
235        self.operators.push(operator);
236        self
237    }
238
239    /// Set execution context
240    pub fn with_context(mut self, context: ExecutionContext) -> Self {
241        self.context = context;
242        self
243    }
244
245    /// Build the pipeline
246    pub fn build(self) -> Pipeline {
247        let plan = PhysicalPlan {
248            operators: self.operators,
249            pipeline_breakers: Vec::new(),
250            parallelism: 1,
251        };
252
253        Pipeline::with_context(plan, self.context)
254    }
255}
256
257impl Default for PipelineBuilder {
258    fn default() -> Self {
259        Self::new()
260    }
261}
262
263/// Iterator adapter for pipeline
264pub struct PipelineIterator {
265    pipeline: Pipeline,
266}
267
268impl PipelineIterator {
269    pub fn new(pipeline: Pipeline) -> Self {
270        Self { pipeline }
271    }
272}
273
274impl Iterator for PipelineIterator {
275    type Item = Result<RowBatch>;
276
277    fn next(&mut self) -> Option<Self::Item> {
278        match self.pipeline.next() {
279            Ok(Some(batch)) => Some(Ok(batch)),
280            Ok(None) => None,
281            Err(e) => Some(Err(e)),
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::executor::plan::ColumnDef;
290    use crate::executor::plan::DataType;
291
292    #[test]
293    fn test_row_batch() {
294        let schema = QuerySchema::new(vec![ColumnDef {
295            name: "id".to_string(),
296            data_type: DataType::Int64,
297            nullable: false,
298        }]);
299
300        let mut batch = RowBatch::new(schema);
301        assert!(batch.is_empty());
302
303        let mut row = HashMap::new();
304        row.insert("id".to_string(), Value::Int64(1));
305        batch.add_row(row);
306
307        assert_eq!(batch.len(), 1);
308        assert!(!batch.is_empty());
309    }
310
311    #[test]
312    fn test_execution_context() {
313        let mut ctx = ExecutionContext::new();
314        assert_eq!(ctx.memory_used, 0);
315
316        ctx.allocate(1024).unwrap();
317        assert_eq!(ctx.memory_used, 1024);
318
319        ctx.free(512);
320        assert_eq!(ctx.memory_used, 512);
321    }
322
323    #[test]
324    fn test_memory_limit() {
325        let mut ctx = ExecutionContext::with_memory_limit(1000);
326        assert!(ctx.allocate(500).is_ok());
327        assert!(ctx.allocate(600).is_err());
328    }
329
330    #[test]
331    fn test_pipeline_builder() {
332        let builder = PipelineBuilder::new();
333        let pipeline = builder.build();
334        assert_eq!(pipeline.operators.len(), 0);
335    }
336}