vibesql_executor/select/late_materialization/
lazy_batch.rs

1//! Lazy Materialized Batch
2//!
3//! A batch that holds source data and selection, materializing rows on demand.
4
5use std::sync::Arc;
6
7use vibesql_storage::Row;
8use vibesql_types::SqlValue;
9
10use super::SelectionVector;
11use crate::{
12    errors::ExecutorError,
13    select::columnar::{ColumnArray, ColumnarBatch},
14};
15
16/// Source data format for lazy materialization
17#[derive(Debug, Clone)]
18pub enum SourceData {
19    /// Row-based source data (Vec<Row>)
20    Rows(Arc<Vec<Row>>),
21    /// Columnar source data (ColumnarBatch)
22    Columnar(Arc<ColumnarBatch>),
23}
24
25impl SourceData {
26    /// Get the number of rows in the source data
27    pub fn row_count(&self) -> usize {
28        match self {
29            SourceData::Rows(rows) => rows.len(),
30            SourceData::Columnar(batch) => batch.row_count(),
31        }
32    }
33
34    /// Get the number of columns
35    pub fn column_count(&self) -> usize {
36        match self {
37            SourceData::Rows(rows) => rows.first().map(|r| r.len()).unwrap_or(0),
38            SourceData::Columnar(batch) => batch.column_count(),
39        }
40    }
41
42    /// Get a value at a specific position
43    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
44        match self {
45            SourceData::Rows(rows) => rows
46                .get(row_idx)
47                .and_then(|row| row.get(col_idx))
48                .cloned()
49                .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: col_idx }),
50            SourceData::Columnar(batch) => batch.get_value(row_idx, col_idx),
51        }
52    }
53
54    /// Get a row at a specific index (materializes if columnar)
55    pub fn get_row(&self, row_idx: usize) -> Result<Row, ExecutorError> {
56        match self {
57            SourceData::Rows(rows) => rows
58                .get(row_idx)
59                .cloned()
60                .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: row_idx }),
61            SourceData::Columnar(batch) => {
62                let mut values = Vec::with_capacity(batch.column_count());
63                for col_idx in 0..batch.column_count() {
64                    values.push(batch.get_value(row_idx, col_idx)?);
65                }
66                Ok(Row::new(values))
67            }
68        }
69    }
70}
71
72/// A lazy materialized batch that defers row materialization
73///
74/// This is the key abstraction for late materialization. It holds:
75/// - Source data (either row-based or columnar)
76/// - A selection vector indicating which rows are active
77///
78/// Rows are only materialized when explicitly requested, typically
79/// at the output boundary of query execution.
80///
81/// # Performance Benefits
82///
83/// 1. **Memory**: Only stores indices (4 bytes each) instead of full rows
84/// 2. **Cache**: Selection vectors are cache-friendly during iteration
85/// 3. **Composition**: Multiple operations can share source data via Arc
86/// 4. **Lazy Evaluation**: Skips materialization for filtered rows
87///
88/// # Example
89///
90/// ```text
91/// // Create a lazy batch from source data
92/// let source = SourceData::Rows(Arc::new(rows));
93/// let lazy_batch = LazyMaterializedBatch::new(source);
94///
95/// // Apply filter (just updates selection, no materialization)
96/// let filtered = lazy_batch.filter(&filter_bitmap);
97///
98/// // Only materialize at output
99/// let result_rows = filtered.materialize_selected(&[0, 2, 3])?; // Only these columns
100/// ```
101#[derive(Debug, Clone)]
102pub struct LazyMaterializedBatch {
103    /// The underlying source data
104    source: SourceData,
105    /// Selection vector (which rows are active)
106    selection: SelectionVector,
107    /// Column names (optional metadata)
108    column_names: Option<Vec<String>>,
109}
110
111impl LazyMaterializedBatch {
112    /// Create a new lazy batch from source data (selects all rows)
113    pub fn new(source: SourceData) -> Self {
114        let row_count = source.row_count();
115        Self { source, selection: SelectionVector::all(row_count), column_names: None }
116    }
117
118    /// Create a new lazy batch with a specific selection
119    pub fn with_selection(source: SourceData, selection: SelectionVector) -> Self {
120        Self { source, selection, column_names: None }
121    }
122
123    /// Create from row-based data
124    pub fn from_rows(rows: Vec<Row>) -> Self {
125        Self::new(SourceData::Rows(Arc::new(rows)))
126    }
127
128    /// Create from columnar data
129    pub fn from_columnar(batch: ColumnarBatch) -> Self {
130        Self::new(SourceData::Columnar(Arc::new(batch)))
131    }
132
133    /// Set column names
134    pub fn with_column_names(mut self, names: Vec<String>) -> Self {
135        self.column_names = Some(names);
136        self
137    }
138
139    /// Get the source data
140    pub fn source(&self) -> &SourceData {
141        &self.source
142    }
143
144    /// Get the selection vector
145    pub fn selection(&self) -> &SelectionVector {
146        &self.selection
147    }
148
149    /// Get column names if available
150    pub fn column_names(&self) -> Option<&[String]> {
151        self.column_names.as_deref()
152    }
153
154    /// Number of selected (active) rows
155    pub fn len(&self) -> usize {
156        self.selection.len()
157    }
158
159    /// Check if no rows are selected
160    pub fn is_empty(&self) -> bool {
161        self.selection.is_empty()
162    }
163
164    /// Number of columns in the source
165    pub fn column_count(&self) -> usize {
166        self.source.column_count()
167    }
168
169    /// Total rows in source (before selection)
170    pub fn source_row_count(&self) -> usize {
171        self.source.row_count()
172    }
173
174    /// Apply a filter bitmap, returning a new lazy batch with refined selection
175    ///
176    /// This is a lazy operation - it updates the selection vector without
177    /// materializing any row data.
178    pub fn filter(&self, bitmap: &[bool]) -> Self {
179        let new_selection = self.selection.filter(bitmap, |&idx| bitmap[idx as usize]);
180        Self {
181            source: self.source.clone(),
182            selection: new_selection,
183            column_names: self.column_names.clone(),
184        }
185    }
186
187    /// Apply a filter to the current selection
188    ///
189    /// The predicate receives the source row index and should return true
190    /// to keep the row.
191    pub fn filter_with<F>(&self, predicate: F) -> Self
192    where
193        F: Fn(usize) -> bool,
194    {
195        let new_indices: Vec<u32> =
196            self.selection.iter().filter(|&idx| predicate(idx as usize)).collect();
197
198        Self {
199            source: self.source.clone(),
200            selection: SelectionVector::from_indices(new_indices),
201            column_names: self.column_names.clone(),
202        }
203    }
204
205    /// Intersect selection with another selection vector
206    pub fn intersect_selection(&self, other: &SelectionVector) -> Self {
207        Self {
208            source: self.source.clone(),
209            selection: self.selection.intersect(other),
210            column_names: self.column_names.clone(),
211        }
212    }
213
214    /// Union selection with another selection vector
215    pub fn union_selection(&self, other: &SelectionVector) -> Self {
216        Self {
217            source: self.source.clone(),
218            selection: self.selection.union(other),
219            column_names: self.column_names.clone(),
220        }
221    }
222
223    /// Materialize all selected rows
224    ///
225    /// This is typically called at the output boundary when results
226    /// need to be returned to the caller.
227    pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
228        let mut rows = Vec::with_capacity(self.selection.len());
229
230        for idx in self.selection.iter() {
231            let row = self.source.get_row(idx as usize)?;
232            rows.push(row);
233        }
234
235        Ok(rows)
236    }
237
238    /// Materialize only specific columns for selected rows
239    ///
240    /// This is the most efficient output path - only materializes
241    /// columns that appear in the final SELECT projection.
242    pub fn materialize_columns(&self, column_indices: &[usize]) -> Result<Vec<Row>, ExecutorError> {
243        let mut rows = Vec::with_capacity(self.selection.len());
244
245        for idx in self.selection.iter() {
246            let mut values = Vec::with_capacity(column_indices.len());
247            for &col_idx in column_indices {
248                let value = self.source.get_value(idx as usize, col_idx)?;
249                values.push(value);
250            }
251            rows.push(Row::new(values));
252        }
253
254        Ok(rows)
255    }
256
257    /// Materialize a single column for selected rows
258    ///
259    /// Useful for extracting join keys or aggregation inputs.
260    pub fn materialize_column(&self, column_idx: usize) -> Result<Vec<SqlValue>, ExecutorError> {
261        let mut values = Vec::with_capacity(self.selection.len());
262
263        for idx in self.selection.iter() {
264            let value = self.source.get_value(idx as usize, column_idx)?;
265            values.push(value);
266        }
267
268        Ok(values)
269    }
270
271    /// Get a single value from selected row
272    pub fn get_selected_value(
273        &self,
274        selection_idx: usize,
275        column_idx: usize,
276    ) -> Result<SqlValue, ExecutorError> {
277        let row_idx = self
278            .selection
279            .get(selection_idx)
280            .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: selection_idx })?;
281
282        self.source.get_value(row_idx as usize, column_idx)
283    }
284
285    /// Iterate over selected row indices
286    pub fn iter_indices(&self) -> impl Iterator<Item = u32> + '_ {
287        self.selection.iter()
288    }
289
290    /// Create a child batch with remapped selection
291    ///
292    /// Used when chaining operations: if we filter a filtered result,
293    /// we need to maintain the chain of selections.
294    pub fn remap_selection(&self, child_selection: &SelectionVector) -> Self {
295        let remapped = child_selection.remap(&self.selection);
296        Self {
297            source: self.source.clone(),
298            selection: remapped,
299            column_names: self.column_names.clone(),
300        }
301    }
302
303    /// Get the raw column array if source is columnar
304    ///
305    /// Returns None if source is row-based.
306    pub fn column_array(&self, column_idx: usize) -> Option<&ColumnArray> {
307        match &self.source {
308            SourceData::Columnar(batch) => batch.column(column_idx),
309            SourceData::Rows(_) => None,
310        }
311    }
312
313    /// Check if the source is columnar
314    pub fn is_columnar(&self) -> bool {
315        matches!(&self.source, SourceData::Columnar(_))
316    }
317
318    /// Convert to columnar format if not already
319    ///
320    /// This is useful when downstream operations benefit from columnar access.
321    pub fn to_columnar(&self) -> Result<LazyMaterializedBatch, ExecutorError> {
322        match &self.source {
323            SourceData::Columnar(_) => Ok(self.clone()),
324            SourceData::Rows(rows) => {
325                let batch = ColumnarBatch::from_rows(rows)?;
326                Ok(LazyMaterializedBatch {
327                    source: SourceData::Columnar(Arc::new(batch)),
328                    selection: self.selection.clone(),
329                    column_names: self.column_names.clone(),
330                })
331            }
332        }
333    }
334
335    /// Selectivity ratio
336    pub fn selectivity(&self) -> f64 {
337        self.selection.selectivity(self.source_row_count())
338    }
339}
340
341/// Builder for creating lazy batches with joined sources
342///
343/// Used for join operations that combine multiple source tables.
344pub struct JoinedLazyBatchBuilder {
345    /// Left source
346    left_source: SourceData,
347    /// Right source
348    right_source: SourceData,
349    /// Left selection indices
350    left_indices: Vec<u32>,
351    /// Right selection indices (u32::MAX for outer join NULL rows)
352    right_indices: Vec<u32>,
353}
354
355impl JoinedLazyBatchBuilder {
356    /// Create a new builder for joining two sources
357    pub fn new(left_source: SourceData, right_source: SourceData) -> Self {
358        Self { left_source, right_source, left_indices: Vec::new(), right_indices: Vec::new() }
359    }
360
361    /// Add a matched pair of indices
362    pub fn add_match(&mut self, left_idx: u32, right_idx: u32) {
363        self.left_indices.push(left_idx);
364        self.right_indices.push(right_idx);
365    }
366
367    /// Add a left-only row (for LEFT OUTER join)
368    pub fn add_left_only(&mut self, left_idx: u32) {
369        self.left_indices.push(left_idx);
370        self.right_indices.push(u32::MAX); // NULL marker
371    }
372
373    /// Add a right-only row (for RIGHT OUTER join)
374    pub fn add_right_only(&mut self, right_idx: u32) {
375        self.left_indices.push(u32::MAX); // NULL marker
376        self.right_indices.push(right_idx);
377    }
378
379    /// Get left indices
380    pub fn left_indices(&self) -> &[u32] {
381        &self.left_indices
382    }
383
384    /// Get right indices
385    pub fn right_indices(&self) -> &[u32] {
386        &self.right_indices
387    }
388
389    /// Build the result count
390    pub fn result_count(&self) -> usize {
391        self.left_indices.len()
392    }
393
394    /// Materialize the joined result
395    ///
396    /// Combines columns from both sources for each matching pair.
397    pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
398        let left_cols = self.left_source.column_count();
399        let right_cols = self.right_source.column_count();
400        let total_cols = left_cols + right_cols;
401
402        let mut rows = Vec::with_capacity(self.left_indices.len());
403
404        for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
405            let mut values = Vec::with_capacity(total_cols);
406
407            // Left columns
408            if left_idx == u32::MAX {
409                // NULL row for RIGHT OUTER join
410                for _ in 0..left_cols {
411                    values.push(SqlValue::Null);
412                }
413            } else {
414                for col_idx in 0..left_cols {
415                    values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
416                }
417            }
418
419            // Right columns
420            if right_idx == u32::MAX {
421                // NULL row for LEFT OUTER join
422                for _ in 0..right_cols {
423                    values.push(SqlValue::Null);
424                }
425            } else {
426                for col_idx in 0..right_cols {
427                    values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
428                }
429            }
430
431            rows.push(Row::new(values));
432        }
433
434        Ok(rows)
435    }
436
437    /// Materialize only specific columns
438    ///
439    /// `left_columns` and `right_columns` specify which columns to include from each source.
440    pub fn materialize_columns(
441        &self,
442        left_columns: &[usize],
443        right_columns: &[usize],
444    ) -> Result<Vec<Row>, ExecutorError> {
445        let total_cols = left_columns.len() + right_columns.len();
446        let mut rows = Vec::with_capacity(self.left_indices.len());
447
448        for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
449            let mut values = Vec::with_capacity(total_cols);
450
451            // Selected left columns
452            if left_idx == u32::MAX {
453                for _ in 0..left_columns.len() {
454                    values.push(SqlValue::Null);
455                }
456            } else {
457                for &col_idx in left_columns {
458                    values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
459                }
460            }
461
462            // Selected right columns
463            if right_idx == u32::MAX {
464                for _ in 0..right_columns.len() {
465                    values.push(SqlValue::Null);
466                }
467            } else {
468                for &col_idx in right_columns {
469                    values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
470                }
471            }
472
473            rows.push(Row::new(values));
474        }
475
476        Ok(rows)
477    }
478}
479
480#[cfg(test)]
481mod lazy_batch_tests {
482    use super::*;
483
484    fn sample_rows() -> Vec<Row> {
485        vec![
486            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]),
487            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]),
488            Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("Carol"))]),
489            Row::new(vec![SqlValue::Integer(4), SqlValue::Varchar(arcstr::ArcStr::from("Dave"))]),
490            Row::new(vec![SqlValue::Integer(5), SqlValue::Varchar(arcstr::ArcStr::from("Eve"))]),
491        ]
492    }
493
494    #[test]
495    fn test_lazy_batch_creation() {
496        let rows = sample_rows();
497        let batch = LazyMaterializedBatch::from_rows(rows);
498
499        assert_eq!(batch.len(), 5);
500        assert_eq!(batch.column_count(), 2);
501        assert_eq!(batch.source_row_count(), 5);
502    }
503
504    #[test]
505    fn test_lazy_batch_filter() {
506        let rows = sample_rows();
507        let batch = LazyMaterializedBatch::from_rows(rows);
508
509        // Filter to rows where id > 2
510        let filtered = batch.filter_with(|idx| {
511            if let SourceData::Rows(rows) = batch.source() {
512                if let Some(SqlValue::Integer(id)) = rows[idx].get(0) {
513                    return *id > 2;
514                }
515            }
516            false
517        });
518
519        assert_eq!(filtered.len(), 3); // Carol, Dave, Eve
520    }
521
522    #[test]
523    fn test_lazy_batch_materialize() {
524        let rows = sample_rows();
525        let batch = LazyMaterializedBatch::from_rows(rows);
526
527        let materialized = batch.materialize().unwrap();
528        assert_eq!(materialized.len(), 5);
529        assert_eq!(materialized[0].get(0), Some(&SqlValue::Integer(1)));
530    }
531
532    #[test]
533    fn test_lazy_batch_materialize_columns() {
534        let rows = sample_rows();
535        let batch = LazyMaterializedBatch::from_rows(rows);
536
537        // Only materialize the name column
538        let names = batch.materialize_columns(&[1]).unwrap();
539        assert_eq!(names.len(), 5);
540        assert_eq!(names[0].len(), 1); // Only 1 column
541        assert_eq!(names[0].get(0), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Alice"))));
542    }
543
544    #[test]
545    fn test_lazy_batch_with_selection() {
546        let rows = sample_rows();
547        let selection = SelectionVector::from_indices(vec![0, 2, 4]); // Alice, Carol, Eve
548        let batch =
549            LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
550
551        assert_eq!(batch.len(), 3);
552
553        let materialized = batch.materialize().unwrap();
554        assert_eq!(materialized[0].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Alice"))));
555        assert_eq!(materialized[1].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Carol"))));
556        assert_eq!(materialized[2].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Eve"))));
557    }
558
559    #[test]
560    fn test_joined_batch_builder() {
561        let left_rows = vec![
562            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
563            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
564        ];
565        let right_rows = vec![
566            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("X"))]),
567            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Y"))]),
568        ];
569
570        let mut builder = JoinedLazyBatchBuilder::new(
571            SourceData::Rows(Arc::new(left_rows)),
572            SourceData::Rows(Arc::new(right_rows)),
573        );
574
575        // Left row 0 matches right rows 0 and 1
576        builder.add_match(0, 0);
577        builder.add_match(0, 1);
578        // Left row 1 has no match (left outer join)
579        builder.add_left_only(1);
580
581        assert_eq!(builder.result_count(), 3);
582
583        let rows = builder.materialize().unwrap();
584        assert_eq!(rows.len(), 3);
585
586        // First match: (1, A, 1, X)
587        assert_eq!(rows[0].get(0), Some(&SqlValue::Integer(1)));
588        assert_eq!(rows[0].get(2), Some(&SqlValue::Integer(1)));
589        assert_eq!(rows[0].get(3), Some(&SqlValue::Varchar(arcstr::ArcStr::from("X"))));
590
591        // Second match: (1, A, 1, Y)
592        assert_eq!(rows[1].get(3), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Y"))));
593
594        // Left-only: (2, B, NULL, NULL)
595        assert_eq!(rows[2].get(0), Some(&SqlValue::Integer(2)));
596        assert_eq!(rows[2].get(2), Some(&SqlValue::Null));
597        assert_eq!(rows[2].get(3), Some(&SqlValue::Null));
598    }
599
600    #[test]
601    fn test_selectivity() {
602        let rows = sample_rows();
603        let selection = SelectionVector::from_indices(vec![0, 2]); // 2 out of 5
604        let batch =
605            LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
606
607        assert!((batch.selectivity() - 0.4).abs() < 0.001);
608    }
609}