Skip to main content

kyu_executor/
data_chunk.rs

1//! DataChunk — columnar batch of rows flowing between physical operators.
2//!
3//! Backed by `ValueVector` columns (flat byte buffers, packed bits, or owned
4//! TypedValues) with a `SelectionVector` for zero-copy filtering.
5
6use kyu_types::TypedValue;
7
8use crate::value_vector::{SelectionVector, ValueVector};
9
10/// A batch of rows in column-major format.
11///
12/// Columns are `ValueVector`s — either flat byte buffers from storage or
13/// owned TypedValue vecs from operators. A `SelectionVector` maps logical
14/// row indices to physical positions, enabling zero-copy filtering.
15#[derive(Clone, Debug)]
16pub struct DataChunk {
17    columns: Vec<ValueVector>,
18    selection: SelectionVector,
19}
20
21impl DataChunk {
22    /// Create from ValueVector columns + SelectionVector (scan path).
23    pub fn from_vectors(columns: Vec<ValueVector>, selection: SelectionVector) -> Self {
24        Self { columns, selection }
25    }
26
27    /// Create a new DataChunk from owned TypedValue columns (backward compat).
28    pub fn new(columns: Vec<Vec<TypedValue>>) -> Self {
29        let num_rows = columns.first().map_or(0, |c| c.len());
30        debug_assert!(columns.iter().all(|c| c.len() == num_rows));
31        let vectors = columns.into_iter().map(ValueVector::Owned).collect();
32        Self {
33            columns: vectors,
34            selection: SelectionVector::identity(num_rows),
35        }
36    }
37
38    /// Create a DataChunk with a specific row count (for zero-column chunks).
39    pub fn new_with_row_count(columns: Vec<Vec<TypedValue>>, num_rows: usize) -> Self {
40        let vectors = columns.into_iter().map(ValueVector::Owned).collect();
41        Self {
42            columns: vectors,
43            selection: SelectionVector::identity(num_rows),
44        }
45    }
46
47    /// Create an empty DataChunk with the given number of columns.
48    pub fn empty(num_columns: usize) -> Self {
49        Self {
50            columns: (0..num_columns)
51                .map(|_| ValueVector::Owned(Vec::new()))
52                .collect(),
53            selection: SelectionVector::identity(0),
54        }
55    }
56
57    /// Create an empty DataChunk with pre-allocated column capacity.
58    pub fn with_capacity(num_columns: usize, row_capacity: usize) -> Self {
59        Self {
60            columns: (0..num_columns)
61                .map(|_| ValueVector::Owned(Vec::with_capacity(row_capacity)))
62                .collect(),
63            selection: SelectionVector::identity(0),
64        }
65    }
66
67    /// Create a DataChunk with a single row of Null values.
68    pub fn single_empty_row(num_columns: usize) -> Self {
69        Self {
70            columns: (0..num_columns)
71                .map(|_| ValueVector::Owned(vec![TypedValue::Null]))
72                .collect(),
73            selection: SelectionVector::identity(1),
74        }
75    }
76
77    /// Create a DataChunk from rows (row-major → column-major).
78    pub fn from_rows(rows: &[Vec<TypedValue>], num_columns: usize) -> Self {
79        let mut columns: Vec<Vec<TypedValue>> = (0..num_columns).map(|_| Vec::new()).collect();
80        for row in rows {
81            for (col_idx, val) in row.iter().enumerate() {
82                if col_idx < num_columns {
83                    columns[col_idx].push(val.clone());
84                }
85            }
86        }
87        let num_rows = rows.len();
88        Self {
89            columns: columns.into_iter().map(ValueVector::Owned).collect(),
90            selection: SelectionVector::identity(num_rows),
91        }
92    }
93
94    #[inline]
95    pub fn num_columns(&self) -> usize {
96        self.columns.len()
97    }
98
99    #[inline]
100    pub fn num_rows(&self) -> usize {
101        self.selection.len()
102    }
103
104    #[inline]
105    pub fn is_empty(&self) -> bool {
106        self.selection.is_empty()
107    }
108
109    /// Get a single value by (logical) row and column index.
110    #[inline]
111    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> TypedValue {
112        let physical = self.selection.get(row_idx);
113        self.columns[col_idx].get_value(physical)
114    }
115
116    /// Get a row as a vector across all columns.
117    pub fn get_row(&self, row_idx: usize) -> Vec<TypedValue> {
118        (0..self.columns.len())
119            .map(|col| self.get_value(row_idx, col))
120            .collect()
121    }
122
123    /// Append a single row. Only works when columns are Owned.
124    pub fn append_row(&mut self, row: &[TypedValue]) {
125        debug_assert_eq!(row.len(), self.columns.len());
126        for (col_idx, val) in row.iter().enumerate() {
127            self.columns[col_idx].push(val.clone());
128        }
129        self.selection = SelectionVector::identity(self.selection.len() + 1);
130    }
131
132    /// Append all rows from another chunk.
133    /// Materializes self into Owned columns if needed (e.g., when self has
134    /// Flat columns from storage).
135    pub fn append(&mut self, other: &DataChunk) {
136        debug_assert_eq!(self.num_columns(), other.num_columns());
137        self.ensure_owned();
138        for row_idx in 0..other.num_rows() {
139            self.append_row_from_chunk(other, row_idx);
140        }
141    }
142
143    /// Convert all non-Owned columns to Owned by materializing values.
144    fn ensure_owned(&mut self) {
145        let needs_materialize = self
146            .columns
147            .iter()
148            .any(|c| !matches!(c, ValueVector::Owned(_)));
149        if !needs_materialize {
150            return;
151        }
152        let num_rows = self.num_rows();
153        let num_cols = self.num_columns();
154        let owned_columns: Vec<Vec<TypedValue>> = (0..num_cols)
155            .map(|col| (0..num_rows).map(|row| self.get_value(row, col)).collect())
156            .collect();
157        self.columns = owned_columns.into_iter().map(ValueVector::Owned).collect();
158        self.selection = SelectionVector::identity(num_rows);
159    }
160
161    /// Access the selection vector.
162    #[inline]
163    pub fn selection(&self) -> &SelectionVector {
164        &self.selection
165    }
166
167    /// Direct access to a column's ValueVector.
168    #[inline]
169    pub fn column(&self, col_idx: usize) -> &ValueVector {
170        &self.columns[col_idx]
171    }
172
173    /// Extract a column compacted through the selection vector.
174    /// When the selection is identity, clones the column directly (cheap for
175    /// FlatVector/Owned). When non-identity, materializes selected values.
176    pub fn compact_column(&self, col_idx: usize) -> ValueVector {
177        if self.selection.is_identity() {
178            self.columns[col_idx].clone()
179        } else {
180            let n = self.selection.len();
181            let mut result = Vec::with_capacity(n);
182            for i in 0..n {
183                result.push(self.columns[col_idx].get_value(self.selection.get(i)));
184            }
185            ValueVector::Owned(result)
186        }
187    }
188
189    /// Create a new DataChunk containing only the specified columns (by move).
190    pub fn select_columns(mut self, indices: &[usize]) -> Self {
191        let columns = indices
192            .iter()
193            .map(|&i| std::mem::replace(&mut self.columns[i], ValueVector::Owned(Vec::new())))
194            .collect();
195        Self {
196            columns,
197            selection: self.selection,
198        }
199    }
200
201    /// Move a column out of the chunk (replacing it with an empty Owned vec).
202    /// For identity selection, avoids cloning the underlying data.
203    /// For non-identity, materializes selected values.
204    pub fn take_column(&mut self, col_idx: usize) -> ValueVector {
205        if self.selection.is_identity() {
206            std::mem::replace(&mut self.columns[col_idx], ValueVector::Owned(Vec::new()))
207        } else {
208            let n = self.selection.len();
209            let mut result = Vec::with_capacity(n);
210            for i in 0..n {
211                result.push(self.columns[col_idx].get_value(self.selection.get(i)));
212            }
213            ValueVector::Owned(result)
214        }
215    }
216
217    /// Replace selection, keeping columns intact. Consumes self.
218    #[inline]
219    pub fn with_selection(self, selection: SelectionVector) -> Self {
220        Self {
221            columns: self.columns,
222            selection,
223        }
224    }
225
226    /// Get a zero-copy row reference for expression evaluation.
227    #[inline]
228    pub fn row_ref(&self, row_idx: usize) -> RowRef<'_> {
229        RowRef {
230            chunk: self,
231            row_idx,
232        }
233    }
234
235    /// Copy a single row from a source chunk. Target must use Owned columns.
236    pub fn append_row_from_chunk(&mut self, source: &DataChunk, row_idx: usize) {
237        debug_assert_eq!(self.num_columns(), source.num_columns());
238        for col_idx in 0..self.columns.len() {
239            let val = source.get_value(row_idx, col_idx);
240            self.columns[col_idx].push(val);
241        }
242        self.selection = SelectionVector::identity(self.selection.len() + 1);
243    }
244
245    /// Copy a row from source chunk and append an extra value as the last column.
246    pub fn append_row_from_chunk_with_extra(
247        &mut self,
248        source: &DataChunk,
249        row_idx: usize,
250        extra: TypedValue,
251    ) {
252        debug_assert_eq!(self.num_columns(), source.num_columns() + 1);
253        for col_idx in 0..source.num_columns() {
254            let val = source.get_value(row_idx, col_idx);
255            self.columns[col_idx].push(val);
256        }
257        self.columns[source.num_columns()].push(extra);
258        self.selection = SelectionVector::identity(self.selection.len() + 1);
259    }
260}
261
262/// Row reference into a DataChunk for expression evaluation.
263///
264/// Values are extracted on-demand from the underlying ValueVector columns
265/// via the SelectionVector, avoiding upfront materialization.
266pub struct RowRef<'a> {
267    chunk: &'a DataChunk,
268    row_idx: usize,
269}
270
271impl kyu_expression::Tuple for RowRef<'_> {
272    #[inline]
273    fn value_at(&self, col_idx: usize) -> Option<TypedValue> {
274        if col_idx < self.chunk.num_columns() {
275            Some(self.chunk.get_value(self.row_idx, col_idx))
276        } else {
277            None
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use smol_str::SmolStr;
286
287    #[test]
288    fn new_chunk() {
289        let chunk = DataChunk::new(vec![
290            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
291            vec![
292                TypedValue::String(SmolStr::new("a")),
293                TypedValue::String(SmolStr::new("b")),
294            ],
295        ]);
296        assert_eq!(chunk.num_rows(), 2);
297        assert_eq!(chunk.num_columns(), 2);
298    }
299
300    #[test]
301    fn empty_chunk() {
302        let chunk = DataChunk::empty(3);
303        assert_eq!(chunk.num_rows(), 0);
304        assert_eq!(chunk.num_columns(), 3);
305        assert!(chunk.is_empty());
306    }
307
308    #[test]
309    fn single_empty_row() {
310        let chunk = DataChunk::single_empty_row(2);
311        assert_eq!(chunk.num_rows(), 1);
312        assert_eq!(chunk.get_value(0, 0), TypedValue::Null);
313        assert_eq!(chunk.get_value(0, 1), TypedValue::Null);
314    }
315
316    #[test]
317    fn from_rows() {
318        let rows = vec![
319            vec![TypedValue::Int64(1), TypedValue::Int64(10)],
320            vec![TypedValue::Int64(2), TypedValue::Int64(20)],
321        ];
322        let chunk = DataChunk::from_rows(&rows, 2);
323        assert_eq!(chunk.num_rows(), 2);
324        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(1));
325        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(2));
326        assert_eq!(chunk.get_value(0, 1), TypedValue::Int64(10));
327        assert_eq!(chunk.get_value(1, 1), TypedValue::Int64(20));
328    }
329
330    #[test]
331    fn get_row() {
332        let chunk = DataChunk::new(vec![
333            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
334            vec![TypedValue::Int64(10), TypedValue::Int64(20)],
335        ]);
336        assert_eq!(
337            chunk.get_row(0),
338            vec![TypedValue::Int64(1), TypedValue::Int64(10)]
339        );
340        assert_eq!(
341            chunk.get_row(1),
342            vec![TypedValue::Int64(2), TypedValue::Int64(20)]
343        );
344    }
345
346    #[test]
347    fn append_row() {
348        let mut chunk = DataChunk::empty(2);
349        chunk.append_row(&[TypedValue::Int64(1), TypedValue::Int64(2)]);
350        assert_eq!(chunk.num_rows(), 1);
351        chunk.append_row(&[TypedValue::Int64(3), TypedValue::Int64(4)]);
352        assert_eq!(chunk.num_rows(), 2);
353        assert_eq!(
354            chunk.get_row(1),
355            vec![TypedValue::Int64(3), TypedValue::Int64(4)]
356        );
357    }
358
359    #[test]
360    fn row_ref_value_at() {
361        use kyu_expression::Tuple;
362        let chunk = DataChunk::new(vec![
363            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
364            vec![TypedValue::Int64(10), TypedValue::Int64(20)],
365        ]);
366        let row = chunk.row_ref(0);
367        assert_eq!(row.value_at(0), Some(TypedValue::Int64(1)));
368        assert_eq!(row.value_at(1), Some(TypedValue::Int64(10)));
369        assert_eq!(row.value_at(2), None);
370
371        let row1 = chunk.row_ref(1);
372        assert_eq!(row1.value_at(0), Some(TypedValue::Int64(2)));
373        assert_eq!(row1.value_at(1), Some(TypedValue::Int64(20)));
374    }
375
376    #[test]
377    fn append_row_from_chunk() {
378        let src = DataChunk::new(vec![
379            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
380            vec![TypedValue::Int64(10), TypedValue::Int64(20)],
381        ]);
382        let mut dst = DataChunk::with_capacity(2, 2);
383        dst.append_row_from_chunk(&src, 1);
384        assert_eq!(dst.num_rows(), 1);
385        assert_eq!(
386            dst.get_row(0),
387            vec![TypedValue::Int64(2), TypedValue::Int64(20)]
388        );
389    }
390
391    #[test]
392    fn append_chunks() {
393        let mut chunk1 = DataChunk::new(vec![vec![TypedValue::Int64(1)]]);
394        let chunk2 = DataChunk::new(vec![vec![TypedValue::Int64(2), TypedValue::Int64(3)]]);
395        chunk1.append(&chunk2);
396        assert_eq!(chunk1.num_rows(), 3);
397        assert_eq!(chunk1.get_value(0, 0), TypedValue::Int64(1));
398        assert_eq!(chunk1.get_value(1, 0), TypedValue::Int64(2));
399        assert_eq!(chunk1.get_value(2, 0), TypedValue::Int64(3));
400    }
401
402    #[test]
403    fn with_selection_filters() {
404        let chunk = DataChunk::new(vec![vec![
405            TypedValue::Int64(10),
406            TypedValue::Int64(20),
407            TypedValue::Int64(30),
408        ]]);
409        let filtered = chunk.with_selection(SelectionVector::from_indices(vec![0, 2]));
410        assert_eq!(filtered.num_rows(), 2);
411        assert_eq!(filtered.get_value(0, 0), TypedValue::Int64(10));
412        assert_eq!(filtered.get_value(1, 0), TypedValue::Int64(30));
413    }
414}