Skip to main content

grafeo_core/execution/
chunk.rs

1//! DataChunk for batched tuple processing.
2
3use super::selection::SelectionVector;
4use super::vector::ValueVector;
5use grafeo_common::types::LogicalType;
6
7/// Default chunk size (number of tuples).
8pub const DEFAULT_CHUNK_SIZE: usize = 2048;
9
10/// A chunk of data containing multiple columns.
11///
12/// DataChunk is the fundamental unit of data processing in vectorized execution.
13/// It holds multiple ValueVectors (columns) and an optional SelectionVector
14/// for filtering without copying.
15#[derive(Debug)]
16pub struct DataChunk {
17    /// Column vectors.
18    columns: Vec<ValueVector>,
19    /// Selection vector (None means all rows are selected).
20    selection: Option<SelectionVector>,
21    /// Number of rows in this chunk.
22    count: usize,
23    /// Capacity of this chunk.
24    capacity: usize,
25}
26
27impl DataChunk {
28    /// Creates an empty data chunk with no columns.
29    #[must_use]
30    pub fn empty() -> Self {
31        Self {
32            columns: Vec::new(),
33            selection: None,
34            count: 0,
35            capacity: 0,
36        }
37    }
38
39    /// Creates a new data chunk from existing vectors.
40    #[must_use]
41    pub fn new(columns: Vec<ValueVector>) -> Self {
42        let count = columns.first().map_or(0, ValueVector::len);
43        let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
44        Self {
45            columns,
46            selection: None,
47            count,
48            capacity,
49        }
50    }
51
52    /// Creates a new empty data chunk with the given schema.
53    #[must_use]
54    pub fn with_schema(column_types: &[LogicalType]) -> Self {
55        Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
56    }
57
58    /// Creates a new data chunk with the given schema and capacity.
59    #[must_use]
60    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
61        let columns = column_types
62            .iter()
63            .map(|t| ValueVector::with_capacity(t.clone(), capacity))
64            .collect();
65
66        Self {
67            columns,
68            selection: None,
69            count: 0,
70            capacity,
71        }
72    }
73
74    /// Returns the number of columns.
75    #[must_use]
76    pub fn column_count(&self) -> usize {
77        self.columns.len()
78    }
79
80    /// Returns the number of rows (considering selection).
81    #[must_use]
82    pub fn row_count(&self) -> usize {
83        self.selection.as_ref().map_or(self.count, |s| s.len())
84    }
85
86    /// Alias for row_count().
87    #[must_use]
88    pub fn len(&self) -> usize {
89        self.row_count()
90    }
91
92    /// Returns all columns.
93    #[must_use]
94    pub fn columns(&self) -> &[ValueVector] {
95        &self.columns
96    }
97
98    /// Returns the total number of rows (ignoring selection).
99    #[must_use]
100    pub fn total_row_count(&self) -> usize {
101        self.count
102    }
103
104    /// Returns true if the chunk is empty.
105    #[must_use]
106    pub fn is_empty(&self) -> bool {
107        self.row_count() == 0
108    }
109
110    /// Returns the capacity of this chunk.
111    #[must_use]
112    pub fn capacity(&self) -> usize {
113        self.capacity
114    }
115
116    /// Returns true if the chunk is full.
117    #[must_use]
118    pub fn is_full(&self) -> bool {
119        self.count >= self.capacity
120    }
121
122    /// Gets a column by index.
123    #[must_use]
124    pub fn column(&self, index: usize) -> Option<&ValueVector> {
125        self.columns.get(index)
126    }
127
128    /// Gets a mutable column by index.
129    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
130        self.columns.get_mut(index)
131    }
132
133    /// Returns the selection vector.
134    #[must_use]
135    pub fn selection(&self) -> Option<&SelectionVector> {
136        self.selection.as_ref()
137    }
138
139    /// Sets the selection vector.
140    pub fn set_selection(&mut self, selection: SelectionVector) {
141        self.selection = Some(selection);
142    }
143
144    /// Clears the selection vector (selects all rows).
145    pub fn clear_selection(&mut self) {
146        self.selection = None;
147    }
148
149    /// Sets the row count.
150    pub fn set_count(&mut self, count: usize) {
151        self.count = count;
152    }
153
154    /// Resets the chunk for reuse.
155    pub fn reset(&mut self) {
156        for col in &mut self.columns {
157            col.clear();
158        }
159        self.selection = None;
160        self.count = 0;
161    }
162
163    /// Flattens the selection by copying only selected rows.
164    ///
165    /// After this operation, selection is None and count equals the
166    /// previously selected row count.
167    pub fn flatten(&mut self) {
168        let selection = match self.selection.take() {
169            Some(sel) => sel,
170            None => return,
171        };
172
173        let selected_count = selection.len();
174
175        // Create new columns with only selected rows, preserving data types
176        let mut new_columns = Vec::with_capacity(self.columns.len());
177
178        for col in &self.columns {
179            // Create new vector with same data type as original
180            let mut new_col = ValueVector::with_type(col.data_type().clone());
181            for idx in selection.iter() {
182                if let Some(val) = col.get(idx) {
183                    new_col.push(val);
184                }
185            }
186            new_columns.push(new_col);
187        }
188
189        self.columns = new_columns;
190        self.count = selected_count;
191        self.capacity = selected_count;
192    }
193
194    /// Returns an iterator over selected row indices.
195    pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
196        match &self.selection {
197            Some(sel) => Box::new(sel.iter()),
198            None => Box::new(0..self.count),
199        }
200    }
201
202    /// Concatenates multiple chunks into a single chunk.
203    ///
204    /// All chunks must have the same schema (same number and types of columns).
205    pub fn concat(chunks: &[DataChunk]) -> DataChunk {
206        if chunks.is_empty() {
207            return DataChunk::empty();
208        }
209
210        if chunks.len() == 1 {
211            // Clone the single chunk
212            return DataChunk {
213                columns: chunks[0].columns.clone(),
214                selection: chunks[0].selection.clone(),
215                count: chunks[0].count,
216                capacity: chunks[0].capacity,
217            };
218        }
219
220        let num_columns = chunks[0].column_count();
221        if num_columns == 0 {
222            return DataChunk::empty();
223        }
224
225        let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
226
227        // Concatenate each column
228        let mut result_columns = Vec::with_capacity(num_columns);
229
230        for col_idx in 0..num_columns {
231            let mut concat_vector = ValueVector::new();
232
233            for chunk in chunks {
234                if let Some(col) = chunk.column(col_idx) {
235                    // Append all values from this column
236                    for i in chunk.selected_indices() {
237                        if let Some(val) = col.get(i) {
238                            concat_vector.push(val);
239                        }
240                    }
241                }
242            }
243
244            result_columns.push(concat_vector);
245        }
246
247        DataChunk {
248            columns: result_columns,
249            selection: None,
250            count: total_rows,
251            capacity: total_rows,
252        }
253    }
254
255    /// Applies a filter predicate and returns a new chunk with selected rows.
256    pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
257        // Combine existing selection with predicate
258        let selected: Vec<usize> = predicate
259            .iter()
260            .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
261            .collect();
262
263        let mut result_columns = Vec::with_capacity(self.columns.len());
264
265        for col in &self.columns {
266            let mut new_col = ValueVector::new();
267            for &idx in &selected {
268                if let Some(val) = col.get(idx) {
269                    new_col.push(val);
270                }
271            }
272            result_columns.push(new_col);
273        }
274
275        DataChunk {
276            columns: result_columns,
277            selection: None,
278            count: selected.len(),
279            capacity: selected.len(),
280        }
281    }
282
283    /// Returns a slice of this chunk.
284    ///
285    /// Returns a new DataChunk containing rows [offset, offset + count).
286    #[must_use]
287    pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
288        if offset >= self.len() || count == 0 {
289            return DataChunk::empty();
290        }
291
292        let actual_count = count.min(self.len() - offset);
293        let mut result_columns = Vec::with_capacity(self.columns.len());
294
295        for col in &self.columns {
296            let mut new_col = ValueVector::new();
297            for i in offset..(offset + actual_count) {
298                let actual_idx = if let Some(sel) = &self.selection {
299                    sel.get(i).unwrap_or(i)
300                } else {
301                    i
302                };
303                if let Some(val) = col.get(actual_idx) {
304                    new_col.push(val);
305                }
306            }
307            result_columns.push(new_col);
308        }
309
310        DataChunk {
311            columns: result_columns,
312            selection: None,
313            count: actual_count,
314            capacity: actual_count,
315        }
316    }
317
318    /// Returns the number of columns.
319    #[must_use]
320    pub fn num_columns(&self) -> usize {
321        self.columns.len()
322    }
323}
324
325impl Clone for DataChunk {
326    fn clone(&self) -> Self {
327        Self {
328            columns: self.columns.clone(),
329            selection: self.selection.clone(),
330            count: self.count,
331            capacity: self.capacity,
332        }
333    }
334}
335
336/// Builder for creating DataChunks row by row.
337pub struct DataChunkBuilder {
338    chunk: DataChunk,
339}
340
341impl DataChunkBuilder {
342    /// Creates a new builder with the given schema.
343    #[must_use]
344    pub fn with_schema(column_types: &[LogicalType]) -> Self {
345        Self {
346            chunk: DataChunk::with_schema(column_types),
347        }
348    }
349
350    /// Creates a new builder with the given schema and capacity.
351    #[must_use]
352    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
353        Self {
354            chunk: DataChunk::with_capacity(column_types, capacity),
355        }
356    }
357
358    /// Alias for with_schema for backward compatibility.
359    #[must_use]
360    pub fn new(column_types: &[LogicalType]) -> Self {
361        Self::with_schema(column_types)
362    }
363
364    /// Returns the current row count.
365    #[must_use]
366    pub fn row_count(&self) -> usize {
367        self.chunk.count
368    }
369
370    /// Returns true if the builder is full.
371    #[must_use]
372    pub fn is_full(&self) -> bool {
373        self.chunk.is_full()
374    }
375
376    /// Gets a mutable column for appending values.
377    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
378        self.chunk.column_mut(index)
379    }
380
381    /// Increments the row count.
382    pub fn advance_row(&mut self) {
383        self.chunk.count += 1;
384    }
385
386    /// Finishes building and returns the chunk.
387    #[must_use]
388    pub fn finish(self) -> DataChunk {
389        self.chunk
390    }
391
392    /// Resets the builder for reuse.
393    pub fn reset(&mut self) {
394        self.chunk.reset();
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn test_chunk_creation() {
404        let schema = [LogicalType::Int64, LogicalType::String];
405        let chunk = DataChunk::with_schema(&schema);
406
407        assert_eq!(chunk.column_count(), 2);
408        assert_eq!(chunk.row_count(), 0);
409        assert!(chunk.is_empty());
410    }
411
412    #[test]
413    fn test_chunk_builder() {
414        let schema = [LogicalType::Int64, LogicalType::String];
415        let mut builder = DataChunkBuilder::with_schema(&schema);
416
417        // Add first row
418        builder.column_mut(0).unwrap().push_int64(1);
419        builder.column_mut(1).unwrap().push_string("hello");
420        builder.advance_row();
421
422        // Add second row
423        builder.column_mut(0).unwrap().push_int64(2);
424        builder.column_mut(1).unwrap().push_string("world");
425        builder.advance_row();
426
427        let chunk = builder.finish();
428
429        assert_eq!(chunk.row_count(), 2);
430        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
431        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
432    }
433
434    #[test]
435    fn test_chunk_selection() {
436        let schema = [LogicalType::Int64];
437        let mut builder = DataChunkBuilder::with_schema(&schema);
438
439        for i in 0..10 {
440            builder.column_mut(0).unwrap().push_int64(i);
441            builder.advance_row();
442        }
443
444        let mut chunk = builder.finish();
445        assert_eq!(chunk.row_count(), 10);
446
447        // Apply selection for even numbers
448        let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
449        chunk.set_selection(selection);
450
451        assert_eq!(chunk.row_count(), 5); // 0, 2, 4, 6, 8
452        assert_eq!(chunk.total_row_count(), 10);
453    }
454
455    #[test]
456    fn test_chunk_reset() {
457        let schema = [LogicalType::Int64];
458        let mut builder = DataChunkBuilder::with_schema(&schema);
459
460        builder.column_mut(0).unwrap().push_int64(1);
461        builder.advance_row();
462
463        let mut chunk = builder.finish();
464        assert_eq!(chunk.row_count(), 1);
465
466        chunk.reset();
467        assert_eq!(chunk.row_count(), 0);
468        assert!(chunk.is_empty());
469    }
470
471    #[test]
472    fn test_selected_indices() {
473        let schema = [LogicalType::Int64];
474        let mut chunk = DataChunk::with_schema(&schema);
475        chunk.set_count(5);
476
477        // No selection - should iterate 0..5
478        let indices: Vec<_> = chunk.selected_indices().collect();
479        assert_eq!(indices, vec![0, 1, 2, 3, 4]);
480
481        // With selection
482        let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
483        chunk.set_selection(selection);
484
485        let indices: Vec<_> = chunk.selected_indices().collect();
486        assert_eq!(indices, vec![1, 3]);
487    }
488
489    #[test]
490    fn test_chunk_flatten() {
491        let schema = [LogicalType::Int64, LogicalType::String];
492        let mut builder = DataChunkBuilder::with_schema(&schema);
493
494        // Add rows: (0, "a"), (1, "b"), (2, "c"), (3, "d"), (4, "e")
495        let letters = ["a", "b", "c", "d", "e"];
496        for i in 0..5 {
497            builder.column_mut(0).unwrap().push_int64(i);
498            builder
499                .column_mut(1)
500                .unwrap()
501                .push_string(letters[i as usize]);
502            builder.advance_row();
503        }
504
505        let mut chunk = builder.finish();
506
507        // Select only odd rows: (1, "b"), (3, "d")
508        let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
509        chunk.set_selection(selection);
510
511        assert_eq!(chunk.row_count(), 2);
512        assert_eq!(chunk.total_row_count(), 5);
513
514        // Flatten should copy selected rows
515        chunk.flatten();
516
517        // After flatten, total_row_count should equal row_count
518        assert_eq!(chunk.row_count(), 2);
519        assert_eq!(chunk.total_row_count(), 2);
520        assert!(chunk.selection().is_none());
521
522        // Verify the data is correct
523        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
524        assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
525        assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
526        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
527    }
528
529    #[test]
530    fn test_chunk_flatten_no_selection() {
531        let schema = [LogicalType::Int64];
532        let mut builder = DataChunkBuilder::with_schema(&schema);
533
534        builder.column_mut(0).unwrap().push_int64(42);
535        builder.advance_row();
536
537        let mut chunk = builder.finish();
538        let original_count = chunk.row_count();
539
540        // Flatten with no selection should be a no-op
541        chunk.flatten();
542
543        assert_eq!(chunk.row_count(), original_count);
544        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
545    }
546}