vibesql_executor/select/late_materialization/
lazy_batch.rs

1//! Lazy Materialized Batch
2//!
3//! A batch that holds source data and selection, materializing rows on demand.
4
5use std::sync::Arc;
6use vibesql_storage::Row;
7use vibesql_types::SqlValue;
8
9use super::SelectionVector;
10use crate::errors::ExecutorError;
11use crate::select::columnar::{ColumnArray, ColumnarBatch};
12
13/// Source data format for lazy materialization
14#[derive(Debug, Clone)]
15pub enum SourceData {
16    /// Row-based source data (Vec<Row>)
17    Rows(Arc<Vec<Row>>),
18    /// Columnar source data (ColumnarBatch)
19    Columnar(Arc<ColumnarBatch>),
20}
21
22impl SourceData {
23    /// Get the number of rows in the source data
24    pub fn row_count(&self) -> usize {
25        match self {
26            SourceData::Rows(rows) => rows.len(),
27            SourceData::Columnar(batch) => batch.row_count(),
28        }
29    }
30
31    /// Get the number of columns
32    pub fn column_count(&self) -> usize {
33        match self {
34            SourceData::Rows(rows) => rows.first().map(|r| r.len()).unwrap_or(0),
35            SourceData::Columnar(batch) => batch.column_count(),
36        }
37    }
38
39    /// Get a value at a specific position
40    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
41        match self {
42            SourceData::Rows(rows) => rows
43                .get(row_idx)
44                .and_then(|row| row.get(col_idx))
45                .cloned()
46                .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: col_idx }),
47            SourceData::Columnar(batch) => batch.get_value(row_idx, col_idx),
48        }
49    }
50
51    /// Get a row at a specific index (materializes if columnar)
52    pub fn get_row(&self, row_idx: usize) -> Result<Row, ExecutorError> {
53        match self {
54            SourceData::Rows(rows) => rows
55                .get(row_idx)
56                .cloned()
57                .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: row_idx }),
58            SourceData::Columnar(batch) => {
59                let mut values = Vec::with_capacity(batch.column_count());
60                for col_idx in 0..batch.column_count() {
61                    values.push(batch.get_value(row_idx, col_idx)?);
62                }
63                Ok(Row::new(values))
64            }
65        }
66    }
67}
68
69/// A lazy materialized batch that defers row materialization
70///
71/// This is the key abstraction for late materialization. It holds:
72/// - Source data (either row-based or columnar)
73/// - A selection vector indicating which rows are active
74///
75/// Rows are only materialized when explicitly requested, typically
76/// at the output boundary of query execution.
77///
78/// # Performance Benefits
79///
80/// 1. **Memory**: Only stores indices (4 bytes each) instead of full rows
81/// 2. **Cache**: Selection vectors are cache-friendly during iteration
82/// 3. **Composition**: Multiple operations can share source data via Arc
83/// 4. **Lazy Evaluation**: Skips materialization for filtered rows
84///
85/// # Example
86///
87/// ```text
88/// // Create a lazy batch from source data
89/// let source = SourceData::Rows(Arc::new(rows));
90/// let lazy_batch = LazyMaterializedBatch::new(source);
91///
92/// // Apply filter (just updates selection, no materialization)
93/// let filtered = lazy_batch.filter(&filter_bitmap);
94///
95/// // Only materialize at output
96/// let result_rows = filtered.materialize_selected(&[0, 2, 3])?; // Only these columns
97/// ```
98#[derive(Debug, Clone)]
99pub struct LazyMaterializedBatch {
100    /// The underlying source data
101    source: SourceData,
102    /// Selection vector (which rows are active)
103    selection: SelectionVector,
104    /// Column names (optional metadata)
105    column_names: Option<Vec<String>>,
106}
107
108impl LazyMaterializedBatch {
109    /// Create a new lazy batch from source data (selects all rows)
110    pub fn new(source: SourceData) -> Self {
111        let row_count = source.row_count();
112        Self { source, selection: SelectionVector::all(row_count), column_names: None }
113    }
114
115    /// Create a new lazy batch with a specific selection
116    pub fn with_selection(source: SourceData, selection: SelectionVector) -> Self {
117        Self { source, selection, column_names: None }
118    }
119
120    /// Create from row-based data
121    pub fn from_rows(rows: Vec<Row>) -> Self {
122        Self::new(SourceData::Rows(Arc::new(rows)))
123    }
124
125    /// Create from columnar data
126    pub fn from_columnar(batch: ColumnarBatch) -> Self {
127        Self::new(SourceData::Columnar(Arc::new(batch)))
128    }
129
130    /// Set column names
131    pub fn with_column_names(mut self, names: Vec<String>) -> Self {
132        self.column_names = Some(names);
133        self
134    }
135
136    /// Get the source data
137    pub fn source(&self) -> &SourceData {
138        &self.source
139    }
140
141    /// Get the selection vector
142    pub fn selection(&self) -> &SelectionVector {
143        &self.selection
144    }
145
146    /// Get column names if available
147    pub fn column_names(&self) -> Option<&[String]> {
148        self.column_names.as_deref()
149    }
150
151    /// Number of selected (active) rows
152    pub fn len(&self) -> usize {
153        self.selection.len()
154    }
155
156    /// Check if no rows are selected
157    pub fn is_empty(&self) -> bool {
158        self.selection.is_empty()
159    }
160
161    /// Number of columns in the source
162    pub fn column_count(&self) -> usize {
163        self.source.column_count()
164    }
165
166    /// Total rows in source (before selection)
167    pub fn source_row_count(&self) -> usize {
168        self.source.row_count()
169    }
170
171    /// Apply a filter bitmap, returning a new lazy batch with refined selection
172    ///
173    /// This is a lazy operation - it updates the selection vector without
174    /// materializing any row data.
175    pub fn filter(&self, bitmap: &[bool]) -> Self {
176        let new_selection = self.selection.filter(bitmap, |&idx| bitmap[idx as usize]);
177        Self {
178            source: self.source.clone(),
179            selection: new_selection,
180            column_names: self.column_names.clone(),
181        }
182    }
183
184    /// Apply a filter to the current selection
185    ///
186    /// The predicate receives the source row index and should return true
187    /// to keep the row.
188    pub fn filter_with<F>(&self, predicate: F) -> Self
189    where
190        F: Fn(usize) -> bool,
191    {
192        let new_indices: Vec<u32> =
193            self.selection.iter().filter(|&idx| predicate(idx as usize)).collect();
194
195        Self {
196            source: self.source.clone(),
197            selection: SelectionVector::from_indices(new_indices),
198            column_names: self.column_names.clone(),
199        }
200    }
201
202    /// Intersect selection with another selection vector
203    pub fn intersect_selection(&self, other: &SelectionVector) -> Self {
204        Self {
205            source: self.source.clone(),
206            selection: self.selection.intersect(other),
207            column_names: self.column_names.clone(),
208        }
209    }
210
211    /// Union selection with another selection vector
212    pub fn union_selection(&self, other: &SelectionVector) -> Self {
213        Self {
214            source: self.source.clone(),
215            selection: self.selection.union(other),
216            column_names: self.column_names.clone(),
217        }
218    }
219
220    /// Materialize all selected rows
221    ///
222    /// This is typically called at the output boundary when results
223    /// need to be returned to the caller.
224    pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
225        let mut rows = Vec::with_capacity(self.selection.len());
226
227        for idx in self.selection.iter() {
228            let row = self.source.get_row(idx as usize)?;
229            rows.push(row);
230        }
231
232        Ok(rows)
233    }
234
235    /// Materialize only specific columns for selected rows
236    ///
237    /// This is the most efficient output path - only materializes
238    /// columns that appear in the final SELECT projection.
239    pub fn materialize_columns(&self, column_indices: &[usize]) -> Result<Vec<Row>, ExecutorError> {
240        let mut rows = Vec::with_capacity(self.selection.len());
241
242        for idx in self.selection.iter() {
243            let mut values = Vec::with_capacity(column_indices.len());
244            for &col_idx in column_indices {
245                let value = self.source.get_value(idx as usize, col_idx)?;
246                values.push(value);
247            }
248            rows.push(Row::new(values));
249        }
250
251        Ok(rows)
252    }
253
254    /// Materialize a single column for selected rows
255    ///
256    /// Useful for extracting join keys or aggregation inputs.
257    pub fn materialize_column(&self, column_idx: usize) -> Result<Vec<SqlValue>, ExecutorError> {
258        let mut values = Vec::with_capacity(self.selection.len());
259
260        for idx in self.selection.iter() {
261            let value = self.source.get_value(idx as usize, column_idx)?;
262            values.push(value);
263        }
264
265        Ok(values)
266    }
267
268    /// Get a single value from selected row
269    pub fn get_selected_value(
270        &self,
271        selection_idx: usize,
272        column_idx: usize,
273    ) -> Result<SqlValue, ExecutorError> {
274        let row_idx = self
275            .selection
276            .get(selection_idx)
277            .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: selection_idx })?;
278
279        self.source.get_value(row_idx as usize, column_idx)
280    }
281
282    /// Iterate over selected row indices
283    pub fn iter_indices(&self) -> impl Iterator<Item = u32> + '_ {
284        self.selection.iter()
285    }
286
287    /// Create a child batch with remapped selection
288    ///
289    /// Used when chaining operations: if we filter a filtered result,
290    /// we need to maintain the chain of selections.
291    pub fn remap_selection(&self, child_selection: &SelectionVector) -> Self {
292        let remapped = child_selection.remap(&self.selection);
293        Self {
294            source: self.source.clone(),
295            selection: remapped,
296            column_names: self.column_names.clone(),
297        }
298    }
299
300    /// Get the raw column array if source is columnar
301    ///
302    /// Returns None if source is row-based.
303    pub fn column_array(&self, column_idx: usize) -> Option<&ColumnArray> {
304        match &self.source {
305            SourceData::Columnar(batch) => batch.column(column_idx),
306            SourceData::Rows(_) => None,
307        }
308    }
309
310    /// Check if the source is columnar
311    pub fn is_columnar(&self) -> bool {
312        matches!(&self.source, SourceData::Columnar(_))
313    }
314
315    /// Convert to columnar format if not already
316    ///
317    /// This is useful when downstream operations benefit from columnar access.
318    pub fn to_columnar(&self) -> Result<LazyMaterializedBatch, ExecutorError> {
319        match &self.source {
320            SourceData::Columnar(_) => Ok(self.clone()),
321            SourceData::Rows(rows) => {
322                let batch = ColumnarBatch::from_rows(rows)?;
323                Ok(LazyMaterializedBatch {
324                    source: SourceData::Columnar(Arc::new(batch)),
325                    selection: self.selection.clone(),
326                    column_names: self.column_names.clone(),
327                })
328            }
329        }
330    }
331
332    /// Selectivity ratio
333    pub fn selectivity(&self) -> f64 {
334        self.selection.selectivity(self.source_row_count())
335    }
336}
337
338/// Builder for creating lazy batches with joined sources
339///
340/// Used for join operations that combine multiple source tables.
341pub struct JoinedLazyBatchBuilder {
342    /// Left source
343    left_source: SourceData,
344    /// Right source
345    right_source: SourceData,
346    /// Left selection indices
347    left_indices: Vec<u32>,
348    /// Right selection indices (u32::MAX for outer join NULL rows)
349    right_indices: Vec<u32>,
350}
351
352impl JoinedLazyBatchBuilder {
353    /// Create a new builder for joining two sources
354    pub fn new(left_source: SourceData, right_source: SourceData) -> Self {
355        Self { left_source, right_source, left_indices: Vec::new(), right_indices: Vec::new() }
356    }
357
358    /// Add a matched pair of indices
359    pub fn add_match(&mut self, left_idx: u32, right_idx: u32) {
360        self.left_indices.push(left_idx);
361        self.right_indices.push(right_idx);
362    }
363
364    /// Add a left-only row (for LEFT OUTER join)
365    pub fn add_left_only(&mut self, left_idx: u32) {
366        self.left_indices.push(left_idx);
367        self.right_indices.push(u32::MAX); // NULL marker
368    }
369
370    /// Add a right-only row (for RIGHT OUTER join)
371    pub fn add_right_only(&mut self, right_idx: u32) {
372        self.left_indices.push(u32::MAX); // NULL marker
373        self.right_indices.push(right_idx);
374    }
375
376    /// Get left indices
377    pub fn left_indices(&self) -> &[u32] {
378        &self.left_indices
379    }
380
381    /// Get right indices
382    pub fn right_indices(&self) -> &[u32] {
383        &self.right_indices
384    }
385
386    /// Build the result count
387    pub fn result_count(&self) -> usize {
388        self.left_indices.len()
389    }
390
391    /// Materialize the joined result
392    ///
393    /// Combines columns from both sources for each matching pair.
394    pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
395        let left_cols = self.left_source.column_count();
396        let right_cols = self.right_source.column_count();
397        let total_cols = left_cols + right_cols;
398
399        let mut rows = Vec::with_capacity(self.left_indices.len());
400
401        for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
402            let mut values = Vec::with_capacity(total_cols);
403
404            // Left columns
405            if left_idx == u32::MAX {
406                // NULL row for RIGHT OUTER join
407                for _ in 0..left_cols {
408                    values.push(SqlValue::Null);
409                }
410            } else {
411                for col_idx in 0..left_cols {
412                    values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
413                }
414            }
415
416            // Right columns
417            if right_idx == u32::MAX {
418                // NULL row for LEFT OUTER join
419                for _ in 0..right_cols {
420                    values.push(SqlValue::Null);
421                }
422            } else {
423                for col_idx in 0..right_cols {
424                    values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
425                }
426            }
427
428            rows.push(Row::new(values));
429        }
430
431        Ok(rows)
432    }
433
434    /// Materialize only specific columns
435    ///
436    /// `left_columns` and `right_columns` specify which columns to include from each source.
437    pub fn materialize_columns(
438        &self,
439        left_columns: &[usize],
440        right_columns: &[usize],
441    ) -> Result<Vec<Row>, ExecutorError> {
442        let total_cols = left_columns.len() + right_columns.len();
443        let mut rows = Vec::with_capacity(self.left_indices.len());
444
445        for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
446            let mut values = Vec::with_capacity(total_cols);
447
448            // Selected left columns
449            if left_idx == u32::MAX {
450                for _ in 0..left_columns.len() {
451                    values.push(SqlValue::Null);
452                }
453            } else {
454                for &col_idx in left_columns {
455                    values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
456                }
457            }
458
459            // Selected right columns
460            if right_idx == u32::MAX {
461                for _ in 0..right_columns.len() {
462                    values.push(SqlValue::Null);
463                }
464            } else {
465                for &col_idx in right_columns {
466                    values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
467                }
468            }
469
470            rows.push(Row::new(values));
471        }
472
473        Ok(rows)
474    }
475}
476
477#[cfg(test)]
478mod lazy_batch_tests {
479    use super::*;
480
481    fn sample_rows() -> Vec<Row> {
482        vec![
483            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".into())]),
484            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".into())]),
485            Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar("Carol".into())]),
486            Row::new(vec![SqlValue::Integer(4), SqlValue::Varchar("Dave".into())]),
487            Row::new(vec![SqlValue::Integer(5), SqlValue::Varchar("Eve".into())]),
488        ]
489    }
490
491    #[test]
492    fn test_lazy_batch_creation() {
493        let rows = sample_rows();
494        let batch = LazyMaterializedBatch::from_rows(rows);
495
496        assert_eq!(batch.len(), 5);
497        assert_eq!(batch.column_count(), 2);
498        assert_eq!(batch.source_row_count(), 5);
499    }
500
501    #[test]
502    fn test_lazy_batch_filter() {
503        let rows = sample_rows();
504        let batch = LazyMaterializedBatch::from_rows(rows);
505
506        // Filter to rows where id > 2
507        let filtered = batch.filter_with(|idx| {
508            if let SourceData::Rows(rows) = batch.source() {
509                if let Some(SqlValue::Integer(id)) = rows[idx].get(0) {
510                    return *id > 2;
511                }
512            }
513            false
514        });
515
516        assert_eq!(filtered.len(), 3); // Carol, Dave, Eve
517    }
518
519    #[test]
520    fn test_lazy_batch_materialize() {
521        let rows = sample_rows();
522        let batch = LazyMaterializedBatch::from_rows(rows);
523
524        let materialized = batch.materialize().unwrap();
525        assert_eq!(materialized.len(), 5);
526        assert_eq!(materialized[0].get(0), Some(&SqlValue::Integer(1)));
527    }
528
529    #[test]
530    fn test_lazy_batch_materialize_columns() {
531        let rows = sample_rows();
532        let batch = LazyMaterializedBatch::from_rows(rows);
533
534        // Only materialize the name column
535        let names = batch.materialize_columns(&[1]).unwrap();
536        assert_eq!(names.len(), 5);
537        assert_eq!(names[0].len(), 1); // Only 1 column
538        assert_eq!(names[0].get(0), Some(&SqlValue::Varchar("Alice".into())));
539    }
540
541    #[test]
542    fn test_lazy_batch_with_selection() {
543        let rows = sample_rows();
544        let selection = SelectionVector::from_indices(vec![0, 2, 4]); // Alice, Carol, Eve
545        let batch =
546            LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
547
548        assert_eq!(batch.len(), 3);
549
550        let materialized = batch.materialize().unwrap();
551        assert_eq!(materialized[0].get(1), Some(&SqlValue::Varchar("Alice".into())));
552        assert_eq!(materialized[1].get(1), Some(&SqlValue::Varchar("Carol".into())));
553        assert_eq!(materialized[2].get(1), Some(&SqlValue::Varchar("Eve".into())));
554    }
555
556    #[test]
557    fn test_joined_batch_builder() {
558        let left_rows = vec![
559            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("A".into())]),
560            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar("B".into())]),
561        ];
562        let right_rows = vec![
563            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("X".into())]),
564            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("Y".into())]),
565        ];
566
567        let mut builder = JoinedLazyBatchBuilder::new(
568            SourceData::Rows(Arc::new(left_rows)),
569            SourceData::Rows(Arc::new(right_rows)),
570        );
571
572        // Left row 0 matches right rows 0 and 1
573        builder.add_match(0, 0);
574        builder.add_match(0, 1);
575        // Left row 1 has no match (left outer join)
576        builder.add_left_only(1);
577
578        assert_eq!(builder.result_count(), 3);
579
580        let rows = builder.materialize().unwrap();
581        assert_eq!(rows.len(), 3);
582
583        // First match: (1, A, 1, X)
584        assert_eq!(rows[0].get(0), Some(&SqlValue::Integer(1)));
585        assert_eq!(rows[0].get(2), Some(&SqlValue::Integer(1)));
586        assert_eq!(rows[0].get(3), Some(&SqlValue::Varchar("X".into())));
587
588        // Second match: (1, A, 1, Y)
589        assert_eq!(rows[1].get(3), Some(&SqlValue::Varchar("Y".into())));
590
591        // Left-only: (2, B, NULL, NULL)
592        assert_eq!(rows[2].get(0), Some(&SqlValue::Integer(2)));
593        assert_eq!(rows[2].get(2), Some(&SqlValue::Null));
594        assert_eq!(rows[2].get(3), Some(&SqlValue::Null));
595    }
596
597    #[test]
598    fn test_selectivity() {
599        let rows = sample_rows();
600        let selection = SelectionVector::from_indices(vec![0, 2]); // 2 out of 5
601        let batch =
602            LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
603
604        assert!((batch.selectivity() - 0.4).abs() < 0.001);
605    }
606}