vibesql_executor/select/late_materialization/
lazy_batch.rs1use std::sync::Arc;
6use vibesql_storage::Row;
7use vibesql_types::SqlValue;
8
9use super::SelectionVector;
10use crate::errors::ExecutorError;
11use crate::select::columnar::{ColumnArray, ColumnarBatch};
12
13#[derive(Debug, Clone)]
15pub enum SourceData {
16 Rows(Arc<Vec<Row>>),
18 Columnar(Arc<ColumnarBatch>),
20}
21
22impl SourceData {
23 pub fn row_count(&self) -> usize {
25 match self {
26 SourceData::Rows(rows) => rows.len(),
27 SourceData::Columnar(batch) => batch.row_count(),
28 }
29 }
30
31 pub fn column_count(&self) -> usize {
33 match self {
34 SourceData::Rows(rows) => rows.first().map(|r| r.len()).unwrap_or(0),
35 SourceData::Columnar(batch) => batch.column_count(),
36 }
37 }
38
39 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
41 match self {
42 SourceData::Rows(rows) => rows
43 .get(row_idx)
44 .and_then(|row| row.get(col_idx))
45 .cloned()
46 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: col_idx }),
47 SourceData::Columnar(batch) => batch.get_value(row_idx, col_idx),
48 }
49 }
50
51 pub fn get_row(&self, row_idx: usize) -> Result<Row, ExecutorError> {
53 match self {
54 SourceData::Rows(rows) => rows
55 .get(row_idx)
56 .cloned()
57 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: row_idx }),
58 SourceData::Columnar(batch) => {
59 let mut values = Vec::with_capacity(batch.column_count());
60 for col_idx in 0..batch.column_count() {
61 values.push(batch.get_value(row_idx, col_idx)?);
62 }
63 Ok(Row::new(values))
64 }
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
99pub struct LazyMaterializedBatch {
100 source: SourceData,
102 selection: SelectionVector,
104 column_names: Option<Vec<String>>,
106}
107
108impl LazyMaterializedBatch {
109 pub fn new(source: SourceData) -> Self {
111 let row_count = source.row_count();
112 Self { source, selection: SelectionVector::all(row_count), column_names: None }
113 }
114
115 pub fn with_selection(source: SourceData, selection: SelectionVector) -> Self {
117 Self { source, selection, column_names: None }
118 }
119
120 pub fn from_rows(rows: Vec<Row>) -> Self {
122 Self::new(SourceData::Rows(Arc::new(rows)))
123 }
124
125 pub fn from_columnar(batch: ColumnarBatch) -> Self {
127 Self::new(SourceData::Columnar(Arc::new(batch)))
128 }
129
130 pub fn with_column_names(mut self, names: Vec<String>) -> Self {
132 self.column_names = Some(names);
133 self
134 }
135
136 pub fn source(&self) -> &SourceData {
138 &self.source
139 }
140
141 pub fn selection(&self) -> &SelectionVector {
143 &self.selection
144 }
145
146 pub fn column_names(&self) -> Option<&[String]> {
148 self.column_names.as_deref()
149 }
150
151 pub fn len(&self) -> usize {
153 self.selection.len()
154 }
155
156 pub fn is_empty(&self) -> bool {
158 self.selection.is_empty()
159 }
160
161 pub fn column_count(&self) -> usize {
163 self.source.column_count()
164 }
165
166 pub fn source_row_count(&self) -> usize {
168 self.source.row_count()
169 }
170
171 pub fn filter(&self, bitmap: &[bool]) -> Self {
176 let new_selection = self.selection.filter(bitmap, |&idx| bitmap[idx as usize]);
177 Self {
178 source: self.source.clone(),
179 selection: new_selection,
180 column_names: self.column_names.clone(),
181 }
182 }
183
184 pub fn filter_with<F>(&self, predicate: F) -> Self
189 where
190 F: Fn(usize) -> bool,
191 {
192 let new_indices: Vec<u32> =
193 self.selection.iter().filter(|&idx| predicate(idx as usize)).collect();
194
195 Self {
196 source: self.source.clone(),
197 selection: SelectionVector::from_indices(new_indices),
198 column_names: self.column_names.clone(),
199 }
200 }
201
202 pub fn intersect_selection(&self, other: &SelectionVector) -> Self {
204 Self {
205 source: self.source.clone(),
206 selection: self.selection.intersect(other),
207 column_names: self.column_names.clone(),
208 }
209 }
210
211 pub fn union_selection(&self, other: &SelectionVector) -> Self {
213 Self {
214 source: self.source.clone(),
215 selection: self.selection.union(other),
216 column_names: self.column_names.clone(),
217 }
218 }
219
220 pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
225 let mut rows = Vec::with_capacity(self.selection.len());
226
227 for idx in self.selection.iter() {
228 let row = self.source.get_row(idx as usize)?;
229 rows.push(row);
230 }
231
232 Ok(rows)
233 }
234
235 pub fn materialize_columns(&self, column_indices: &[usize]) -> Result<Vec<Row>, ExecutorError> {
240 let mut rows = Vec::with_capacity(self.selection.len());
241
242 for idx in self.selection.iter() {
243 let mut values = Vec::with_capacity(column_indices.len());
244 for &col_idx in column_indices {
245 let value = self.source.get_value(idx as usize, col_idx)?;
246 values.push(value);
247 }
248 rows.push(Row::new(values));
249 }
250
251 Ok(rows)
252 }
253
254 pub fn materialize_column(&self, column_idx: usize) -> Result<Vec<SqlValue>, ExecutorError> {
258 let mut values = Vec::with_capacity(self.selection.len());
259
260 for idx in self.selection.iter() {
261 let value = self.source.get_value(idx as usize, column_idx)?;
262 values.push(value);
263 }
264
265 Ok(values)
266 }
267
268 pub fn get_selected_value(
270 &self,
271 selection_idx: usize,
272 column_idx: usize,
273 ) -> Result<SqlValue, ExecutorError> {
274 let row_idx = self
275 .selection
276 .get(selection_idx)
277 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index: selection_idx })?;
278
279 self.source.get_value(row_idx as usize, column_idx)
280 }
281
282 pub fn iter_indices(&self) -> impl Iterator<Item = u32> + '_ {
284 self.selection.iter()
285 }
286
287 pub fn remap_selection(&self, child_selection: &SelectionVector) -> Self {
292 let remapped = child_selection.remap(&self.selection);
293 Self {
294 source: self.source.clone(),
295 selection: remapped,
296 column_names: self.column_names.clone(),
297 }
298 }
299
300 pub fn column_array(&self, column_idx: usize) -> Option<&ColumnArray> {
304 match &self.source {
305 SourceData::Columnar(batch) => batch.column(column_idx),
306 SourceData::Rows(_) => None,
307 }
308 }
309
310 pub fn is_columnar(&self) -> bool {
312 matches!(&self.source, SourceData::Columnar(_))
313 }
314
315 pub fn to_columnar(&self) -> Result<LazyMaterializedBatch, ExecutorError> {
319 match &self.source {
320 SourceData::Columnar(_) => Ok(self.clone()),
321 SourceData::Rows(rows) => {
322 let batch = ColumnarBatch::from_rows(rows)?;
323 Ok(LazyMaterializedBatch {
324 source: SourceData::Columnar(Arc::new(batch)),
325 selection: self.selection.clone(),
326 column_names: self.column_names.clone(),
327 })
328 }
329 }
330 }
331
332 pub fn selectivity(&self) -> f64 {
334 self.selection.selectivity(self.source_row_count())
335 }
336}
337
338pub struct JoinedLazyBatchBuilder {
342 left_source: SourceData,
344 right_source: SourceData,
346 left_indices: Vec<u32>,
348 right_indices: Vec<u32>,
350}
351
352impl JoinedLazyBatchBuilder {
353 pub fn new(left_source: SourceData, right_source: SourceData) -> Self {
355 Self { left_source, right_source, left_indices: Vec::new(), right_indices: Vec::new() }
356 }
357
358 pub fn add_match(&mut self, left_idx: u32, right_idx: u32) {
360 self.left_indices.push(left_idx);
361 self.right_indices.push(right_idx);
362 }
363
364 pub fn add_left_only(&mut self, left_idx: u32) {
366 self.left_indices.push(left_idx);
367 self.right_indices.push(u32::MAX); }
369
370 pub fn add_right_only(&mut self, right_idx: u32) {
372 self.left_indices.push(u32::MAX); self.right_indices.push(right_idx);
374 }
375
376 pub fn left_indices(&self) -> &[u32] {
378 &self.left_indices
379 }
380
381 pub fn right_indices(&self) -> &[u32] {
383 &self.right_indices
384 }
385
386 pub fn result_count(&self) -> usize {
388 self.left_indices.len()
389 }
390
391 pub fn materialize(&self) -> Result<Vec<Row>, ExecutorError> {
395 let left_cols = self.left_source.column_count();
396 let right_cols = self.right_source.column_count();
397 let total_cols = left_cols + right_cols;
398
399 let mut rows = Vec::with_capacity(self.left_indices.len());
400
401 for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
402 let mut values = Vec::with_capacity(total_cols);
403
404 if left_idx == u32::MAX {
406 for _ in 0..left_cols {
408 values.push(SqlValue::Null);
409 }
410 } else {
411 for col_idx in 0..left_cols {
412 values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
413 }
414 }
415
416 if right_idx == u32::MAX {
418 for _ in 0..right_cols {
420 values.push(SqlValue::Null);
421 }
422 } else {
423 for col_idx in 0..right_cols {
424 values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
425 }
426 }
427
428 rows.push(Row::new(values));
429 }
430
431 Ok(rows)
432 }
433
434 pub fn materialize_columns(
438 &self,
439 left_columns: &[usize],
440 right_columns: &[usize],
441 ) -> Result<Vec<Row>, ExecutorError> {
442 let total_cols = left_columns.len() + right_columns.len();
443 let mut rows = Vec::with_capacity(self.left_indices.len());
444
445 for (&left_idx, &right_idx) in self.left_indices.iter().zip(&self.right_indices) {
446 let mut values = Vec::with_capacity(total_cols);
447
448 if left_idx == u32::MAX {
450 for _ in 0..left_columns.len() {
451 values.push(SqlValue::Null);
452 }
453 } else {
454 for &col_idx in left_columns {
455 values.push(self.left_source.get_value(left_idx as usize, col_idx)?);
456 }
457 }
458
459 if right_idx == u32::MAX {
461 for _ in 0..right_columns.len() {
462 values.push(SqlValue::Null);
463 }
464 } else {
465 for &col_idx in right_columns {
466 values.push(self.right_source.get_value(right_idx as usize, col_idx)?);
467 }
468 }
469
470 rows.push(Row::new(values));
471 }
472
473 Ok(rows)
474 }
475}
476
477#[cfg(test)]
478mod lazy_batch_tests {
479 use super::*;
480
481 fn sample_rows() -> Vec<Row> {
482 vec![
483 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".into())]),
484 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".into())]),
485 Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar("Carol".into())]),
486 Row::new(vec![SqlValue::Integer(4), SqlValue::Varchar("Dave".into())]),
487 Row::new(vec![SqlValue::Integer(5), SqlValue::Varchar("Eve".into())]),
488 ]
489 }
490
491 #[test]
492 fn test_lazy_batch_creation() {
493 let rows = sample_rows();
494 let batch = LazyMaterializedBatch::from_rows(rows);
495
496 assert_eq!(batch.len(), 5);
497 assert_eq!(batch.column_count(), 2);
498 assert_eq!(batch.source_row_count(), 5);
499 }
500
501 #[test]
502 fn test_lazy_batch_filter() {
503 let rows = sample_rows();
504 let batch = LazyMaterializedBatch::from_rows(rows);
505
506 let filtered = batch.filter_with(|idx| {
508 if let SourceData::Rows(rows) = batch.source() {
509 if let Some(SqlValue::Integer(id)) = rows[idx].get(0) {
510 return *id > 2;
511 }
512 }
513 false
514 });
515
516 assert_eq!(filtered.len(), 3); }
518
519 #[test]
520 fn test_lazy_batch_materialize() {
521 let rows = sample_rows();
522 let batch = LazyMaterializedBatch::from_rows(rows);
523
524 let materialized = batch.materialize().unwrap();
525 assert_eq!(materialized.len(), 5);
526 assert_eq!(materialized[0].get(0), Some(&SqlValue::Integer(1)));
527 }
528
529 #[test]
530 fn test_lazy_batch_materialize_columns() {
531 let rows = sample_rows();
532 let batch = LazyMaterializedBatch::from_rows(rows);
533
534 let names = batch.materialize_columns(&[1]).unwrap();
536 assert_eq!(names.len(), 5);
537 assert_eq!(names[0].len(), 1); assert_eq!(names[0].get(0), Some(&SqlValue::Varchar("Alice".into())));
539 }
540
541 #[test]
542 fn test_lazy_batch_with_selection() {
543 let rows = sample_rows();
544 let selection = SelectionVector::from_indices(vec![0, 2, 4]); let batch =
546 LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
547
548 assert_eq!(batch.len(), 3);
549
550 let materialized = batch.materialize().unwrap();
551 assert_eq!(materialized[0].get(1), Some(&SqlValue::Varchar("Alice".into())));
552 assert_eq!(materialized[1].get(1), Some(&SqlValue::Varchar("Carol".into())));
553 assert_eq!(materialized[2].get(1), Some(&SqlValue::Varchar("Eve".into())));
554 }
555
556 #[test]
557 fn test_joined_batch_builder() {
558 let left_rows = vec![
559 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("A".into())]),
560 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar("B".into())]),
561 ];
562 let right_rows = vec![
563 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("X".into())]),
564 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar("Y".into())]),
565 ];
566
567 let mut builder = JoinedLazyBatchBuilder::new(
568 SourceData::Rows(Arc::new(left_rows)),
569 SourceData::Rows(Arc::new(right_rows)),
570 );
571
572 builder.add_match(0, 0);
574 builder.add_match(0, 1);
575 builder.add_left_only(1);
577
578 assert_eq!(builder.result_count(), 3);
579
580 let rows = builder.materialize().unwrap();
581 assert_eq!(rows.len(), 3);
582
583 assert_eq!(rows[0].get(0), Some(&SqlValue::Integer(1)));
585 assert_eq!(rows[0].get(2), Some(&SqlValue::Integer(1)));
586 assert_eq!(rows[0].get(3), Some(&SqlValue::Varchar("X".into())));
587
588 assert_eq!(rows[1].get(3), Some(&SqlValue::Varchar("Y".into())));
590
591 assert_eq!(rows[2].get(0), Some(&SqlValue::Integer(2)));
593 assert_eq!(rows[2].get(2), Some(&SqlValue::Null));
594 assert_eq!(rows[2].get(3), Some(&SqlValue::Null));
595 }
596
597 #[test]
598 fn test_selectivity() {
599 let rows = sample_rows();
600 let selection = SelectionVector::from_indices(vec![0, 2]); let batch =
602 LazyMaterializedBatch::with_selection(SourceData::Rows(Arc::new(rows)), selection);
603
604 assert!((batch.selectivity() - 0.4).abs() < 0.001);
605 }
606}