vibesql_executor/select/late_materialization/
lazy_batch.rs1use std::sync::Arc;
6
7use vibesql_storage::Row;
8use vibesql_types::SqlValue;
9
10use super::SelectionVector;
11use crate::{
12 errors::ExecutorError,
13 select::columnar::{ColumnArray, ColumnarBatch},
14};
15
16#[derive(Debug, Clone)]
18pub enum SourceData {
19 Rows(Arc<Vec<Row>>),
21 Columnar(Arc<ColumnarBatch>),
23}
24
25impl SourceData {
26 pub fn row_count(&self) -> usize {
28 match self {
29 SourceData::Rows(rows) => rows.len(),
30 SourceData::Columnar(batch) => batch.row_count(),
31 }
32 }
33
34 pub fn column_count(&self) -> usize {
36 match self {
37 SourceData::Rows(rows) => rows.first().map(|r| r.len()).unwrap_or(0),
38 SourceData::Columnar(batch) => batch.column_count(),
39 }
40 }
41
42 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
44 match self {
45 SourceData::Rows(rows) => rows
46 .get(row_idx)
47 .and_then(|row| row.get(col_idx))
48 .cloned()
49 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: col_idx }),
50 SourceData::Columnar(batch) => batch.get_value(row_idx, col_idx),
51 }
52 }
53
54 pub fn get_row(&self, row_idx: usize) -> Result<Row, ExecutorError> {
56 match self {
57 SourceData::Rows(rows) => rows
58 .get(row_idx)
59 .cloned()
60 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: row_idx }),
61 SourceData::Columnar(batch) => {
62 let mut values = Vec::with_capacity(batch.column_count());
63 for col_idx in 0..batch.column_count() {
64 values.push(batch.get_value(row_idx, col_idx)?);
65 }
66 Ok(Row::new(values))
67 }
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
102pub struct LazyMaterializedBatch {
103 source: SourceData,
105 selection: SelectionVector,
107 column_names: Option<Vec<String>>,
109}
110
111impl LazyMaterializedBatch {
112 pub fn new(source: SourceData) -> Self {
114 let row_count = source.row_count();
115 Self { source, selection: SelectionVector::all(row_count), column_names: None }
116 }
117
118 pub fn with_selection(source: SourceData, selection: SelectionVector) -> Self {
120 Self { source, selection, column_names: None }
121 }
122
123 pub fn from_rows(rows: Vec<Row>) -> Self {
125 Self::new(SourceData::Rows(Arc::new(rows)))
126 }
127
128 pub fn from_columnar(batch: ColumnarBatch) -> Self {
130 Self::new(SourceData::Columnar(Arc::new(batch)))
131 }
132
133 pub fn with_column_names(mut self, names: Vec<String>) -> Self {
135 self.column_names = Some(names);
136 self
137 }
138
139 pub fn source(&self) -> &SourceData {
141 &self.source
142 }
143
144 pub fn selection(&self) -> &SelectionVector {
146 &self.selection
147 }
148
149 pub fn column_names(&self) -> Option<&[String]> {
151 self.column_names.as_deref()
152 }
153
154 pub fn len(&self) -> usize {
156 self.selection.len()
157 }
158
159 pub fn is_empty(&self) -> bool {
161 self.selection.is_empty()
162 }
163
164 pub fn column_count(&self) -> usize {
166 self.source.column_count()
167 }
168
169 pub fn source_row_count(&self) -> usize {
171 self.source.row_count()
172 }
173
174 pub fn filter(&self, bitmap: &[bool]) -> Self {
179 let new_selection = self.selection.filter(bitmap, |&idx| bitmap[idx as usize]);
180 Self {
181 source: self.source.clone(),
182 selection: new_selection,
183 column_names: self.column_names.clone(),
184 }
185 }
186
187 pub fn filter_with<F>(&self, predicate: F) -> Self
192 where
193 F: Fn(usize) -> bool,
194 {
195 let new_indices: Vec<u32> =
196 self.selection.iter().filter(|&idx| predicate(idx as usize)).collect();
197
198 Self {
199 source: self.source.clone(),
200 selection: SelectionVector::from_indices(new_indices),
201 column_names: self.column_names.clone(),
202 }
203 }
204
205 pub fn intersect_selection(&self, other: &SelectionVector) -> Self {
207 Self {
208 source: self.source.clone(),
209 selection: self.selection.intersect(other),
210 column_names: self.column_names.clone(),
211 }
212 }
213
214 pub fn union_selection(&self, other: &SelectionVector) -> Self {
216 Self {
217 source: self.source.clone(),
218 selection: self.selection.union(other),
219 column_names: self.column_names.clone(),
220 }
221 }
222
223 pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
228 let mut rows = Vec::with_capacity(self.selection.len());
229
230 for idx in self.selection.iter() {
231 let row = self.source.get_row(idx as usize)?;
232 rows.push(row);
233 }
234
235 Ok(rows)
236 }
237
238 pub fn materialize_columns(&self, column_indices: &[usize]) -> Result<Vec<Row>, ExecutorError> {
243 let mut rows = Vec::with_capacity(self.selection.len());
244
245 for idx in self.selection.iter() {
246 let mut values = Vec::with_capacity(column_indices.len());
247 for &col_idx in column_indices {
248 let value = self.source.get_value(idx as usize, col_idx)?;
249 values.push(value);
250 }
251 rows.push(Row::new(values));
252 }
253
254 Ok(rows)
255 }
256
257 pub fn materialize_column(&self, column_idx: usize) -> Result<Vec<SqlValue>, ExecutorError> {
261 let mut values = Vec::with_capacity(self.selection.len());
262
263 for idx in self.selection.iter() {
264 let value = self.source.get_value(idx as usize, column_idx)?;
265 values.push(value);
266 }
267
268 Ok(values)
269 }
270
271 pub fn get_selected_value(
273 &self,
274 selection_idx: usize,
275 column_idx: usize,
276 ) -> Result<SqlValue, ExecutorError> {
277 let row_idx = self
278 .selection
279 .get(selection_idx)
280 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: selection_idx })?;
281
282 self.source.get_value(row_idx as usize, column_idx)
283 }
284
285 pub fn iter_indices(&self) -> impl Iterator<Item = u32> + '_ {
287 self.selection.iter()
288 }
289
290 pub fn remap_selection(&self, child_selection: &SelectionVector) -> Self {
295 let remapped = child_selection.remap(&self.selection);
296 Self {
297 source: self.source.clone(),
298 selection: remapped,
299 column_names: self.column_names.clone(),
300 }
301 }
302
303 pub fn column_array(&self, column_idx: usize) -> Option<&ColumnArray> {
307 match &self.source {
308 SourceData::Columnar(batch) => batch.column(column_idx),
309 SourceData::Rows(_) => None,
310 }
311 }
312
313 pub fn is_columnar(&self) -> bool {
315 matches!(&self.source, SourceData::Columnar(_))
316 }
317
318 pub fn to_columnar(&self) -> Result<LazyMaterializedBatch, ExecutorError> {
322 match &self.source {
323 SourceData::Columnar(_) => Ok(self.clone()),
324 SourceData::Rows(rows) => {
325 let batch = ColumnarBatch::from_rows(rows)?;
326 Ok(LazyMaterializedBatch {
327 source: SourceData::Columnar(Arc::new(batch)),
328 selection: self.selection.clone(),
329 column_names: self.column_names.clone(),
330 })
331 }
332 }
333 }
334
335 pub fn selectivity(&self) -> f64 {
337 self.selection.selectivity(self.source_row_count())
338 }
339}
340
341pub struct JoinedLazyBatchBuilder {
345 left_source: SourceData,
347 right_source: SourceData,
349 left_indices: Vec<u32>,
351 right_indices: Vec<u32>,
353}
354
355impl JoinedLazyBatchBuilder {
356 pub fn new(left_source: SourceData, right_source: SourceData) -> Self {
358 Self { left_source, right_source, left_indices: Vec::new(), right_indices: Vec::new() }
359 }
360
361 pub fn add_match(&mut self, left_idx: u32, right_idx: u32) {
363 self.left_indices.push(left_idx);
364 self.right_indices.push(right_idx);
365 }
366
367 pub fn add_left_only(&mut self, left_idx: u32) {
369 self.left_indices.push(left_idx);
370 self.right_indices.push(u32::MAX); }
372
373 pub fn add_right_only(&mut self, right_idx: u32) {
375 self.left_indices.push(u32::MAX); self.right_indices.push(right_idx);
377 }
378
379 pub fn left_indices(&self) -> &[u32] {
381 &self.left_indices
382 }
383
384 pub fn right_indices(&self) -> &[u32] {
386 &self.right_indices
387 }
388
389 pub fn result_count(&self) -> usize {
391 self.left_indices.len()
392 }
393
394 pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
398 let left_cols = self.left_source.column_count();
399 let right_cols = self.right_source.column_count();
400 let total_cols = left_cols + right_cols;
401
402 let mut rows = Vec::with_capacity(self.left_indices.len());
403
404 for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
405 let mut values = Vec::with_capacity(total_cols);
406
407 if left_idx == u32::MAX {
409 for _ in 0..left_cols {
411 values.push(SqlValue::Null);
412 }
413 } else {
414 for col_idx in 0..left_cols {
415 values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
416 }
417 }
418
419 if right_idx == u32::MAX {
421 for _ in 0..right_cols {
423 values.push(SqlValue::Null);
424 }
425 } else {
426 for col_idx in 0..right_cols {
427 values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
428 }
429 }
430
431 rows.push(Row::new(values));
432 }
433
434 Ok(rows)
435 }
436
437 pub fn materialize_columns(
441 &self,
442 left_columns: &[usize],
443 right_columns: &[usize],
444 ) -> Result<Vec<Row>, ExecutorError> {
445 let total_cols = left_columns.len() + right_columns.len();
446 let mut rows = Vec::with_capacity(self.left_indices.len());
447
448 for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
449 let mut values = Vec::with_capacity(total_cols);
450
451 if left_idx == u32::MAX {
453 for _ in 0..left_columns.len() {
454 values.push(SqlValue::Null);
455 }
456 } else {
457 for &col_idx in left_columns {
458 values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
459 }
460 }
461
462 if right_idx == u32::MAX {
464 for _ in 0..right_columns.len() {
465 values.push(SqlValue::Null);
466 }
467 } else {
468 for &col_idx in right_columns {
469 values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
470 }
471 }
472
473 rows.push(Row::new(values));
474 }
475
476 Ok(rows)
477 }
478}
479
480#[cfg(test)]
481mod lazy_batch_tests {
482 use super::*;
483
484 fn sample_rows() -> Vec<Row> {
485 vec![
486 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]),
487 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]),
488 Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("Carol"))]),
489 Row::new(vec![SqlValue::Integer(4), SqlValue::Varchar(arcstr::ArcStr::from("Dave"))]),
490 Row::new(vec![SqlValue::Integer(5), SqlValue::Varchar(arcstr::ArcStr::from("Eve"))]),
491 ]
492 }
493
494 #[test]
495 fn test_lazy_batch_creation() {
496 let rows = sample_rows();
497 let batch = LazyMaterializedBatch::from_rows(rows);
498
499 assert_eq!(batch.len(), 5);
500 assert_eq!(batch.column_count(), 2);
501 assert_eq!(batch.source_row_count(), 5);
502 }
503
504 #[test]
505 fn test_lazy_batch_filter() {
506 let rows = sample_rows();
507 let batch = LazyMaterializedBatch::from_rows(rows);
508
509 let filtered = batch.filter_with(|idx| {
511 if let SourceData::Rows(rows) = batch.source() {
512 if let Some(SqlValue::Integer(id)) = rows[idx].get(0) {
513 return *id > 2;
514 }
515 }
516 false
517 });
518
519 assert_eq!(filtered.len(), 3); }
521
522 #[test]
523 fn test_lazy_batch_materialize() {
524 let rows = sample_rows();
525 let batch = LazyMaterializedBatch::from_rows(rows);
526
527 let materialized = batch.materialize().unwrap();
528 assert_eq!(materialized.len(), 5);
529 assert_eq!(materialized[0].get(0), Some(&SqlValue::Integer(1)));
530 }
531
532 #[test]
533 fn test_lazy_batch_materialize_columns() {
534 let rows = sample_rows();
535 let batch = LazyMaterializedBatch::from_rows(rows);
536
537 let names = batch.materialize_columns(&[1]).unwrap();
539 assert_eq!(names.len(), 5);
540 assert_eq!(names[0].len(), 1); assert_eq!(names[0].get(0), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Alice"))));
542 }
543
544 #[test]
545 fn test_lazy_batch_with_selection() {
546 let rows = sample_rows();
547 let selection = SelectionVector::from_indices(vec![0, 2, 4]); let batch =
549 LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
550
551 assert_eq!(batch.len(), 3);
552
553 let materialized = batch.materialize().unwrap();
554 assert_eq!(materialized[0].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Alice"))));
555 assert_eq!(materialized[1].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Carol"))));
556 assert_eq!(materialized[2].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Eve"))));
557 }
558
559 #[test]
560 fn test_joined_batch_builder() {
561 let left_rows = vec![
562 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
563 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
564 ];
565 let right_rows = vec![
566 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("X"))]),
567 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Y"))]),
568 ];
569
570 let mut builder = JoinedLazyBatchBuilder::new(
571 SourceData::Rows(Arc::new(left_rows)),
572 SourceData::Rows(Arc::new(right_rows)),
573 );
574
575 builder.add_match(0, 0);
577 builder.add_match(0, 1);
578 builder.add_left_only(1);
580
581 assert_eq!(builder.result_count(), 3);
582
583 let rows = builder.materialize().unwrap();
584 assert_eq!(rows.len(), 3);
585
586 assert_eq!(rows[0].get(0), Some(&SqlValue::Integer(1)));
588 assert_eq!(rows[0].get(2), Some(&SqlValue::Integer(1)));
589 assert_eq!(rows[0].get(3), Some(&SqlValue::Varchar(arcstr::ArcStr::from("X"))));
590
591 assert_eq!(rows[1].get(3), Some(&SqlValue::Varchar(arcstr::ArcStr::from("Y"))));
593
594 assert_eq!(rows[2].get(0), Some(&SqlValue::Integer(2)));
596 assert_eq!(rows[2].get(2), Some(&SqlValue::Null));
597 assert_eq!(rows[2].get(3), Some(&SqlValue::Null));
598 }
599
600 #[test]
601 fn test_selectivity() {
602 let rows = sample_rows();
603 let selection = SelectionVector::from_indices(vec![0, 2]); let batch =
605 LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
606
607 assert!((batch.selectivity() - 0.4).abs() < 0.001);
608 }
609}