Skip to main content

kyu_executor/
data_chunk.rs

1//! DataChunk — columnar batch of rows flowing between physical operators.
2//!
3//! Backed by `ValueVector` columns (flat byte buffers, packed bits, or owned
4//! TypedValues) with a `SelectionVector` for zero-copy filtering.
5
6use kyu_types::TypedValue;
7
8use crate::value_vector::{SelectionVector, ValueVector};
9
10/// A batch of rows in column-major format.
11///
12/// Columns are `ValueVector`s — either flat byte buffers from storage or
13/// owned TypedValue vecs from operators. A `SelectionVector` maps logical
14/// row indices to physical positions, enabling zero-copy filtering.
15#[derive(Clone, Debug)]
16pub struct DataChunk {
17    columns: Vec<ValueVector>,
18    selection: SelectionVector,
19}
20
21impl DataChunk {
22    /// Create from ValueVector columns + SelectionVector (scan path).
23    pub fn from_vectors(columns: Vec<ValueVector>, selection: SelectionVector) -> Self {
24        Self { columns, selection }
25    }
26
27    /// Create a new DataChunk from owned TypedValue columns (backward compat).
28    pub fn new(columns: Vec<Vec<TypedValue>>) -> Self {
29        let num_rows = columns.first().map_or(0, |c| c.len());
30        debug_assert!(columns.iter().all(|c| c.len() == num_rows));
31        let vectors = columns.into_iter().map(ValueVector::Owned).collect();
32        Self {
33            columns: vectors,
34            selection: SelectionVector::identity(num_rows),
35        }
36    }
37
38    /// Create a DataChunk with a specific row count (for zero-column chunks).
39    pub fn new_with_row_count(columns: Vec<Vec<TypedValue>>, num_rows: usize) -> Self {
40        let vectors = columns.into_iter().map(ValueVector::Owned).collect();
41        Self {
42            columns: vectors,
43            selection: SelectionVector::identity(num_rows),
44        }
45    }
46
47    /// Create an empty DataChunk with the given number of columns.
48    pub fn empty(num_columns: usize) -> Self {
49        Self {
50            columns: (0..num_columns)
51                .map(|_| ValueVector::Owned(Vec::new()))
52                .collect(),
53            selection: SelectionVector::identity(0),
54        }
55    }
56
57    /// Create an empty DataChunk with pre-allocated column capacity.
58    pub fn with_capacity(num_columns: usize, row_capacity: usize) -> Self {
59        Self {
60            columns: (0..num_columns)
61                .map(|_| ValueVector::Owned(Vec::with_capacity(row_capacity)))
62                .collect(),
63            selection: SelectionVector::identity(0),
64        }
65    }
66
67    /// Create a DataChunk with a single row of Null values.
68    pub fn single_empty_row(num_columns: usize) -> Self {
69        Self {
70            columns: (0..num_columns)
71                .map(|_| ValueVector::Owned(vec![TypedValue::Null]))
72                .collect(),
73            selection: SelectionVector::identity(1),
74        }
75    }
76
77    /// Create a DataChunk from rows (row-major → column-major).
78    pub fn from_rows(rows: &[Vec<TypedValue>], num_columns: usize) -> Self {
79        let mut columns: Vec<Vec<TypedValue>> = (0..num_columns).map(|_| Vec::new()).collect();
80        for row in rows {
81            for (col_idx, val) in row.iter().enumerate() {
82                if col_idx < num_columns {
83                    columns[col_idx].push(val.clone());
84                }
85            }
86        }
87        let num_rows = rows.len();
88        Self {
89            columns: columns.into_iter().map(ValueVector::Owned).collect(),
90            selection: SelectionVector::identity(num_rows),
91        }
92    }
93
94    #[inline]
95    pub fn num_columns(&self) -> usize {
96        self.columns.len()
97    }
98
99    #[inline]
100    pub fn num_rows(&self) -> usize {
101        self.selection.len()
102    }
103
104    #[inline]
105    pub fn is_empty(&self) -> bool {
106        self.selection.is_empty()
107    }
108
109    /// Get a single value by (logical) row and column index.
110    #[inline]
111    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> TypedValue {
112        let physical = self.selection.get(row_idx);
113        self.columns[col_idx].get_value(physical)
114    }
115
116    /// Get a row as a vector across all columns.
117    pub fn get_row(&self, row_idx: usize) -> Vec<TypedValue> {
118        (0..self.columns.len())
119            .map(|col| self.get_value(row_idx, col))
120            .collect()
121    }
122
123    /// Append a single row. Only works when columns are Owned.
124    pub fn append_row(&mut self, row: &[TypedValue]) {
125        debug_assert_eq!(row.len(), self.columns.len());
126        for (col_idx, val) in row.iter().enumerate() {
127            self.columns[col_idx].push(val.clone());
128        }
129        self.selection = SelectionVector::identity(self.selection.len() + 1);
130    }
131
132    /// Append all rows from another chunk.
133    /// Materializes self into Owned columns if needed (e.g., when self has
134    /// Flat columns from storage).
135    pub fn append(&mut self, other: &DataChunk) {
136        debug_assert_eq!(self.num_columns(), other.num_columns());
137        self.ensure_owned();
138        for row_idx in 0..other.num_rows() {
139            self.append_row_from_chunk(other, row_idx);
140        }
141    }
142
143    /// Convert all non-Owned columns to Owned by materializing values.
144    fn ensure_owned(&mut self) {
145        let needs_materialize = self.columns.iter().any(|c| !matches!(c, ValueVector::Owned(_)));
146        if !needs_materialize {
147            return;
148        }
149        let num_rows = self.num_rows();
150        let num_cols = self.num_columns();
151        let owned_columns: Vec<Vec<TypedValue>> = (0..num_cols)
152            .map(|col| {
153                (0..num_rows)
154                    .map(|row| self.get_value(row, col))
155                    .collect()
156            })
157            .collect();
158        self.columns = owned_columns.into_iter().map(ValueVector::Owned).collect();
159        self.selection = SelectionVector::identity(num_rows);
160    }
161
162    /// Access the selection vector.
163    #[inline]
164    pub fn selection(&self) -> &SelectionVector {
165        &self.selection
166    }
167
168    /// Direct access to a column's ValueVector.
169    #[inline]
170    pub fn column(&self, col_idx: usize) -> &ValueVector {
171        &self.columns[col_idx]
172    }
173
174    /// Extract a column compacted through the selection vector.
175    /// When the selection is identity, clones the column directly (cheap for
176    /// FlatVector/Owned). When non-identity, materializes selected values.
177    pub fn compact_column(&self, col_idx: usize) -> ValueVector {
178        if self.selection.is_identity() {
179            self.columns[col_idx].clone()
180        } else {
181            let n = self.selection.len();
182            let mut result = Vec::with_capacity(n);
183            for i in 0..n {
184                result.push(self.columns[col_idx].get_value(self.selection.get(i)));
185            }
186            ValueVector::Owned(result)
187        }
188    }
189
190    /// Create a new DataChunk containing only the specified columns (by move).
191    pub fn select_columns(mut self, indices: &[usize]) -> Self {
192        let columns = indices
193            .iter()
194            .map(|&i| std::mem::replace(&mut self.columns[i], ValueVector::Owned(Vec::new())))
195            .collect();
196        Self {
197            columns,
198            selection: self.selection,
199        }
200    }
201
202    /// Move a column out of the chunk (replacing it with an empty Owned vec).
203    /// For identity selection, avoids cloning the underlying data.
204    /// For non-identity, materializes selected values.
205    pub fn take_column(&mut self, col_idx: usize) -> ValueVector {
206        if self.selection.is_identity() {
207            std::mem::replace(
208                &mut self.columns[col_idx],
209                ValueVector::Owned(Vec::new()),
210            )
211        } else {
212            let n = self.selection.len();
213            let mut result = Vec::with_capacity(n);
214            for i in 0..n {
215                result.push(self.columns[col_idx].get_value(self.selection.get(i)));
216            }
217            ValueVector::Owned(result)
218        }
219    }
220
221    /// Replace selection, keeping columns intact. Consumes self.
222    #[inline]
223    pub fn with_selection(self, selection: SelectionVector) -> Self {
224        Self {
225            columns: self.columns,
226            selection,
227        }
228    }
229
230    /// Get a zero-copy row reference for expression evaluation.
231    #[inline]
232    pub fn row_ref(&self, row_idx: usize) -> RowRef<'_> {
233        RowRef {
234            chunk: self,
235            row_idx,
236        }
237    }
238
239    /// Copy a single row from a source chunk. Target must use Owned columns.
240    pub fn append_row_from_chunk(&mut self, source: &DataChunk, row_idx: usize) {
241        debug_assert_eq!(self.num_columns(), source.num_columns());
242        for col_idx in 0..self.columns.len() {
243            let val = source.get_value(row_idx, col_idx);
244            self.columns[col_idx].push(val);
245        }
246        self.selection = SelectionVector::identity(self.selection.len() + 1);
247    }
248
249    /// Copy a row from source chunk and append an extra value as the last column.
250    pub fn append_row_from_chunk_with_extra(
251        &mut self,
252        source: &DataChunk,
253        row_idx: usize,
254        extra: TypedValue,
255    ) {
256        debug_assert_eq!(self.num_columns(), source.num_columns() + 1);
257        for col_idx in 0..source.num_columns() {
258            let val = source.get_value(row_idx, col_idx);
259            self.columns[col_idx].push(val);
260        }
261        self.columns[source.num_columns()].push(extra);
262        self.selection = SelectionVector::identity(self.selection.len() + 1);
263    }
264}
265
266/// Row reference into a DataChunk for expression evaluation.
267///
268/// Values are extracted on-demand from the underlying ValueVector columns
269/// via the SelectionVector, avoiding upfront materialization.
270pub struct RowRef<'a> {
271    chunk: &'a DataChunk,
272    row_idx: usize,
273}
274
275impl kyu_expression::Tuple for RowRef<'_> {
276    #[inline]
277    fn value_at(&self, col_idx: usize) -> Option<TypedValue> {
278        if col_idx < self.chunk.num_columns() {
279            Some(self.chunk.get_value(self.row_idx, col_idx))
280        } else {
281            None
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use smol_str::SmolStr;
290
291    #[test]
292    fn new_chunk() {
293        let chunk = DataChunk::new(vec![
294            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
295            vec![
296                TypedValue::String(SmolStr::new("a")),
297                TypedValue::String(SmolStr::new("b")),
298            ],
299        ]);
300        assert_eq!(chunk.num_rows(), 2);
301        assert_eq!(chunk.num_columns(), 2);
302    }
303
304    #[test]
305    fn empty_chunk() {
306        let chunk = DataChunk::empty(3);
307        assert_eq!(chunk.num_rows(), 0);
308        assert_eq!(chunk.num_columns(), 3);
309        assert!(chunk.is_empty());
310    }
311
312    #[test]
313    fn single_empty_row() {
314        let chunk = DataChunk::single_empty_row(2);
315        assert_eq!(chunk.num_rows(), 1);
316        assert_eq!(chunk.get_value(0, 0), TypedValue::Null);
317        assert_eq!(chunk.get_value(0, 1), TypedValue::Null);
318    }
319
320    #[test]
321    fn from_rows() {
322        let rows = vec![
323            vec![TypedValue::Int64(1), TypedValue::Int64(10)],
324            vec![TypedValue::Int64(2), TypedValue::Int64(20)],
325        ];
326        let chunk = DataChunk::from_rows(&rows, 2);
327        assert_eq!(chunk.num_rows(), 2);
328        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(1));
329        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(2));
330        assert_eq!(chunk.get_value(0, 1), TypedValue::Int64(10));
331        assert_eq!(chunk.get_value(1, 1), TypedValue::Int64(20));
332    }
333
334    #[test]
335    fn get_row() {
336        let chunk = DataChunk::new(vec![
337            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
338            vec![TypedValue::Int64(10), TypedValue::Int64(20)],
339        ]);
340        assert_eq!(chunk.get_row(0), vec![TypedValue::Int64(1), TypedValue::Int64(10)]);
341        assert_eq!(chunk.get_row(1), vec![TypedValue::Int64(2), TypedValue::Int64(20)]);
342    }
343
344    #[test]
345    fn append_row() {
346        let mut chunk = DataChunk::empty(2);
347        chunk.append_row(&[TypedValue::Int64(1), TypedValue::Int64(2)]);
348        assert_eq!(chunk.num_rows(), 1);
349        chunk.append_row(&[TypedValue::Int64(3), TypedValue::Int64(4)]);
350        assert_eq!(chunk.num_rows(), 2);
351        assert_eq!(chunk.get_row(1), vec![TypedValue::Int64(3), TypedValue::Int64(4)]);
352    }
353
354    #[test]
355    fn row_ref_value_at() {
356        use kyu_expression::Tuple;
357        let chunk = DataChunk::new(vec![
358            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
359            vec![TypedValue::Int64(10), TypedValue::Int64(20)],
360        ]);
361        let row = chunk.row_ref(0);
362        assert_eq!(row.value_at(0), Some(TypedValue::Int64(1)));
363        assert_eq!(row.value_at(1), Some(TypedValue::Int64(10)));
364        assert_eq!(row.value_at(2), None);
365
366        let row1 = chunk.row_ref(1);
367        assert_eq!(row1.value_at(0), Some(TypedValue::Int64(2)));
368        assert_eq!(row1.value_at(1), Some(TypedValue::Int64(20)));
369    }
370
371    #[test]
372    fn append_row_from_chunk() {
373        let src = DataChunk::new(vec![
374            vec![TypedValue::Int64(1), TypedValue::Int64(2)],
375            vec![TypedValue::Int64(10), TypedValue::Int64(20)],
376        ]);
377        let mut dst = DataChunk::with_capacity(2, 2);
378        dst.append_row_from_chunk(&src, 1);
379        assert_eq!(dst.num_rows(), 1);
380        assert_eq!(dst.get_row(0), vec![TypedValue::Int64(2), TypedValue::Int64(20)]);
381    }
382
383    #[test]
384    fn append_chunks() {
385        let mut chunk1 = DataChunk::new(vec![vec![TypedValue::Int64(1)]]);
386        let chunk2 = DataChunk::new(vec![vec![TypedValue::Int64(2), TypedValue::Int64(3)]]);
387        chunk1.append(&chunk2);
388        assert_eq!(chunk1.num_rows(), 3);
389        assert_eq!(chunk1.get_value(0, 0), TypedValue::Int64(1));
390        assert_eq!(chunk1.get_value(1, 0), TypedValue::Int64(2));
391        assert_eq!(chunk1.get_value(2, 0), TypedValue::Int64(3));
392    }
393
394    #[test]
395    fn with_selection_filters() {
396        let chunk = DataChunk::new(vec![
397            vec![TypedValue::Int64(10), TypedValue::Int64(20), TypedValue::Int64(30)],
398        ]);
399        let filtered = chunk.with_selection(SelectionVector::from_indices(vec![0, 2]));
400        assert_eq!(filtered.num_rows(), 2);
401        assert_eq!(filtered.get_value(0, 0), TypedValue::Int64(10));
402        assert_eq!(filtered.get_value(1, 0), TypedValue::Int64(30));
403    }
404}