Skip to main content

grafeo_core/execution/
chunk.rs

1//! DataChunk - the fundamental unit of vectorized execution.
2//!
3//! A DataChunk holds a batch of rows in columnar format. Processing data in
4//! batches (typically 1024-2048 rows) lets the CPU stay busy and enables SIMD.
5
6use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use grafeo_common::types::LogicalType;
9
10/// Default chunk size (number of tuples).
11pub const DEFAULT_CHUNK_SIZE: usize = 2048;
12
13/// A batch of rows stored column-wise for vectorized processing.
14///
15/// Instead of storing rows like `[(a1,b1), (a2,b2), ...]`, we store columns
16/// like `[a1,a2,...], [b1,b2,...]`. This is cache-friendly for analytical
17/// queries that touch few columns but many rows.
18///
19/// The optional `SelectionVector` lets you filter rows without copying data -
20/// just mark which row indices are "selected".
21///
22/// # Example
23///
24/// ```
25/// use grafeo_core::execution::DataChunk;
26/// use grafeo_core::execution::ValueVector;
27/// use grafeo_common::types::Value;
28///
29/// // Create columns
30/// let names = ValueVector::from_values(&[Value::from("Alice"), Value::from("Bob")]);
31/// let ages = ValueVector::from_values(&[Value::from(30i64), Value::from(25i64)]);
32///
33/// // Bundle into a chunk
34/// let chunk = DataChunk::new(vec![names, ages]);
35/// assert_eq!(chunk.len(), 2);
36/// ```
37#[derive(Debug)]
38pub struct DataChunk {
39    /// Column vectors.
40    columns: Vec<ValueVector>,
41    /// Selection vector (None means all rows are selected).
42    selection: Option<SelectionVector>,
43    /// Number of rows in this chunk.
44    count: usize,
45    /// Capacity of this chunk.
46    capacity: usize,
47}
48
49impl DataChunk {
50    /// Creates an empty data chunk with no columns.
51    #[must_use]
52    pub fn empty() -> Self {
53        Self {
54            columns: Vec::new(),
55            selection: None,
56            count: 0,
57            capacity: 0,
58        }
59    }
60
61    /// Creates a new data chunk from existing vectors.
62    #[must_use]
63    pub fn new(columns: Vec<ValueVector>) -> Self {
64        let count = columns.first().map_or(0, ValueVector::len);
65        let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
66        Self {
67            columns,
68            selection: None,
69            count,
70            capacity,
71        }
72    }
73
74    /// Creates a new empty data chunk with the given schema.
75    #[must_use]
76    pub fn with_schema(column_types: &[LogicalType]) -> Self {
77        Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
78    }
79
80    /// Creates a new data chunk with the given schema and capacity.
81    #[must_use]
82    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
83        let columns = column_types
84            .iter()
85            .map(|t| ValueVector::with_capacity(t.clone(), capacity))
86            .collect();
87
88        Self {
89            columns,
90            selection: None,
91            count: 0,
92            capacity,
93        }
94    }
95
96    /// Returns the number of columns.
97    #[must_use]
98    pub fn column_count(&self) -> usize {
99        self.columns.len()
100    }
101
102    /// Returns the number of rows (considering selection).
103    #[must_use]
104    pub fn row_count(&self) -> usize {
105        self.selection.as_ref().map_or(self.count, |s| s.len())
106    }
107
108    /// Alias for row_count().
109    #[must_use]
110    pub fn len(&self) -> usize {
111        self.row_count()
112    }
113
114    /// Returns all columns.
115    #[must_use]
116    pub fn columns(&self) -> &[ValueVector] {
117        &self.columns
118    }
119
120    /// Returns the total number of rows (ignoring selection).
121    #[must_use]
122    pub fn total_row_count(&self) -> usize {
123        self.count
124    }
125
126    /// Returns true if the chunk is empty.
127    #[must_use]
128    pub fn is_empty(&self) -> bool {
129        self.row_count() == 0
130    }
131
132    /// Returns the capacity of this chunk.
133    #[must_use]
134    pub fn capacity(&self) -> usize {
135        self.capacity
136    }
137
138    /// Returns true if the chunk is full.
139    #[must_use]
140    pub fn is_full(&self) -> bool {
141        self.count >= self.capacity
142    }
143
144    /// Gets a column by index.
145    #[must_use]
146    pub fn column(&self, index: usize) -> Option<&ValueVector> {
147        self.columns.get(index)
148    }
149
150    /// Gets a mutable column by index.
151    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
152        self.columns.get_mut(index)
153    }
154
155    /// Returns the selection vector.
156    #[must_use]
157    pub fn selection(&self) -> Option<&SelectionVector> {
158        self.selection.as_ref()
159    }
160
161    /// Sets the selection vector.
162    pub fn set_selection(&mut self, selection: SelectionVector) {
163        self.selection = Some(selection);
164    }
165
166    /// Clears the selection vector (selects all rows).
167    pub fn clear_selection(&mut self) {
168        self.selection = None;
169    }
170
171    /// Sets the row count.
172    pub fn set_count(&mut self, count: usize) {
173        self.count = count;
174    }
175
176    /// Resets the chunk for reuse.
177    pub fn reset(&mut self) {
178        for col in &mut self.columns {
179            col.clear();
180        }
181        self.selection = None;
182        self.count = 0;
183    }
184
185    /// Flattens the selection by copying only selected rows.
186    ///
187    /// After this operation, selection is None and count equals the
188    /// previously selected row count.
189    pub fn flatten(&mut self) {
190        let selection = match self.selection.take() {
191            Some(sel) => sel,
192            None => return,
193        };
194
195        let selected_count = selection.len();
196
197        // Create new columns with only selected rows, preserving data types
198        let mut new_columns = Vec::with_capacity(self.columns.len());
199
200        for col in &self.columns {
201            // Create new vector with same data type as original
202            let mut new_col = ValueVector::with_type(col.data_type().clone());
203            for idx in selection.iter() {
204                if let Some(val) = col.get(idx) {
205                    new_col.push(val);
206                }
207            }
208            new_columns.push(new_col);
209        }
210
211        self.columns = new_columns;
212        self.count = selected_count;
213        self.capacity = selected_count;
214    }
215
216    /// Returns an iterator over selected row indices.
217    pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
218        match &self.selection {
219            Some(sel) => Box::new(sel.iter()),
220            None => Box::new(0..self.count),
221        }
222    }
223
224    /// Concatenates multiple chunks into a single chunk.
225    ///
226    /// All chunks must have the same schema (same number and types of columns).
227    pub fn concat(chunks: &[DataChunk]) -> DataChunk {
228        if chunks.is_empty() {
229            return DataChunk::empty();
230        }
231
232        if chunks.len() == 1 {
233            // Clone the single chunk
234            return DataChunk {
235                columns: chunks[0].columns.clone(),
236                selection: chunks[0].selection.clone(),
237                count: chunks[0].count,
238                capacity: chunks[0].capacity,
239            };
240        }
241
242        let num_columns = chunks[0].column_count();
243        if num_columns == 0 {
244            return DataChunk::empty();
245        }
246
247        let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
248
249        // Concatenate each column
250        let mut result_columns = Vec::with_capacity(num_columns);
251
252        for col_idx in 0..num_columns {
253            let mut concat_vector = ValueVector::new();
254
255            for chunk in chunks {
256                if let Some(col) = chunk.column(col_idx) {
257                    // Append all values from this column
258                    for i in chunk.selected_indices() {
259                        if let Some(val) = col.get(i) {
260                            concat_vector.push(val);
261                        }
262                    }
263                }
264            }
265
266            result_columns.push(concat_vector);
267        }
268
269        DataChunk {
270            columns: result_columns,
271            selection: None,
272            count: total_rows,
273            capacity: total_rows,
274        }
275    }
276
277    /// Applies a filter predicate and returns a new chunk with selected rows.
278    pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
279        // Combine existing selection with predicate
280        let selected: Vec<usize> = predicate
281            .iter()
282            .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
283            .collect();
284
285        let mut result_columns = Vec::with_capacity(self.columns.len());
286
287        for col in &self.columns {
288            let mut new_col = ValueVector::new();
289            for &idx in &selected {
290                if let Some(val) = col.get(idx) {
291                    new_col.push(val);
292                }
293            }
294            result_columns.push(new_col);
295        }
296
297        DataChunk {
298            columns: result_columns,
299            selection: None,
300            count: selected.len(),
301            capacity: selected.len(),
302        }
303    }
304
305    /// Returns a slice of this chunk.
306    ///
307    /// Returns a new DataChunk containing rows [offset, offset + count).
308    #[must_use]
309    pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
310        if offset >= self.len() || count == 0 {
311            return DataChunk::empty();
312        }
313
314        let actual_count = count.min(self.len() - offset);
315        let mut result_columns = Vec::with_capacity(self.columns.len());
316
317        for col in &self.columns {
318            let mut new_col = ValueVector::new();
319            for i in offset..(offset + actual_count) {
320                let actual_idx = if let Some(sel) = &self.selection {
321                    sel.get(i).unwrap_or(i)
322                } else {
323                    i
324                };
325                if let Some(val) = col.get(actual_idx) {
326                    new_col.push(val);
327                }
328            }
329            result_columns.push(new_col);
330        }
331
332        DataChunk {
333            columns: result_columns,
334            selection: None,
335            count: actual_count,
336            capacity: actual_count,
337        }
338    }
339
340    /// Returns the number of columns.
341    #[must_use]
342    pub fn num_columns(&self) -> usize {
343        self.columns.len()
344    }
345}
346
347impl Clone for DataChunk {
348    fn clone(&self) -> Self {
349        Self {
350            columns: self.columns.clone(),
351            selection: self.selection.clone(),
352            count: self.count,
353            capacity: self.capacity,
354        }
355    }
356}
357
358/// Builder for creating DataChunks row by row.
359pub struct DataChunkBuilder {
360    chunk: DataChunk,
361}
362
363impl DataChunkBuilder {
364    /// Creates a new builder with the given schema.
365    #[must_use]
366    pub fn with_schema(column_types: &[LogicalType]) -> Self {
367        Self {
368            chunk: DataChunk::with_schema(column_types),
369        }
370    }
371
372    /// Creates a new builder with the given schema and capacity.
373    #[must_use]
374    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
375        Self {
376            chunk: DataChunk::with_capacity(column_types, capacity),
377        }
378    }
379
380    /// Alias for with_schema for backward compatibility.
381    #[must_use]
382    pub fn new(column_types: &[LogicalType]) -> Self {
383        Self::with_schema(column_types)
384    }
385
386    /// Returns the current row count.
387    #[must_use]
388    pub fn row_count(&self) -> usize {
389        self.chunk.count
390    }
391
392    /// Returns true if the builder is full.
393    #[must_use]
394    pub fn is_full(&self) -> bool {
395        self.chunk.is_full()
396    }
397
398    /// Gets a mutable column for appending values.
399    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
400        self.chunk.column_mut(index)
401    }
402
403    /// Increments the row count.
404    pub fn advance_row(&mut self) {
405        self.chunk.count += 1;
406    }
407
408    /// Finishes building and returns the chunk.
409    #[must_use]
410    pub fn finish(self) -> DataChunk {
411        self.chunk
412    }
413
414    /// Resets the builder for reuse.
415    pub fn reset(&mut self) {
416        self.chunk.reset();
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423
424    #[test]
425    fn test_chunk_creation() {
426        let schema = [LogicalType::Int64, LogicalType::String];
427        let chunk = DataChunk::with_schema(&schema);
428
429        assert_eq!(chunk.column_count(), 2);
430        assert_eq!(chunk.row_count(), 0);
431        assert!(chunk.is_empty());
432    }
433
434    #[test]
435    fn test_chunk_builder() {
436        let schema = [LogicalType::Int64, LogicalType::String];
437        let mut builder = DataChunkBuilder::with_schema(&schema);
438
439        // Add first row
440        builder.column_mut(0).unwrap().push_int64(1);
441        builder.column_mut(1).unwrap().push_string("hello");
442        builder.advance_row();
443
444        // Add second row
445        builder.column_mut(0).unwrap().push_int64(2);
446        builder.column_mut(1).unwrap().push_string("world");
447        builder.advance_row();
448
449        let chunk = builder.finish();
450
451        assert_eq!(chunk.row_count(), 2);
452        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
453        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
454    }
455
456    #[test]
457    fn test_chunk_selection() {
458        let schema = [LogicalType::Int64];
459        let mut builder = DataChunkBuilder::with_schema(&schema);
460
461        for i in 0..10 {
462            builder.column_mut(0).unwrap().push_int64(i);
463            builder.advance_row();
464        }
465
466        let mut chunk = builder.finish();
467        assert_eq!(chunk.row_count(), 10);
468
469        // Apply selection for even numbers
470        let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
471        chunk.set_selection(selection);
472
473        assert_eq!(chunk.row_count(), 5); // 0, 2, 4, 6, 8
474        assert_eq!(chunk.total_row_count(), 10);
475    }
476
477    #[test]
478    fn test_chunk_reset() {
479        let schema = [LogicalType::Int64];
480        let mut builder = DataChunkBuilder::with_schema(&schema);
481
482        builder.column_mut(0).unwrap().push_int64(1);
483        builder.advance_row();
484
485        let mut chunk = builder.finish();
486        assert_eq!(chunk.row_count(), 1);
487
488        chunk.reset();
489        assert_eq!(chunk.row_count(), 0);
490        assert!(chunk.is_empty());
491    }
492
493    #[test]
494    fn test_selected_indices() {
495        let schema = [LogicalType::Int64];
496        let mut chunk = DataChunk::with_schema(&schema);
497        chunk.set_count(5);
498
499        // No selection - should iterate 0..5
500        let indices: Vec<_> = chunk.selected_indices().collect();
501        assert_eq!(indices, vec![0, 1, 2, 3, 4]);
502
503        // With selection
504        let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
505        chunk.set_selection(selection);
506
507        let indices: Vec<_> = chunk.selected_indices().collect();
508        assert_eq!(indices, vec![1, 3]);
509    }
510
511    #[test]
512    fn test_chunk_flatten() {
513        let schema = [LogicalType::Int64, LogicalType::String];
514        let mut builder = DataChunkBuilder::with_schema(&schema);
515
516        // Add rows: (0, "a"), (1, "b"), (2, "c"), (3, "d"), (4, "e")
517        let letters = ["a", "b", "c", "d", "e"];
518        for i in 0..5 {
519            builder.column_mut(0).unwrap().push_int64(i);
520            builder
521                .column_mut(1)
522                .unwrap()
523                .push_string(letters[i as usize]);
524            builder.advance_row();
525        }
526
527        let mut chunk = builder.finish();
528
529        // Select only odd rows: (1, "b"), (3, "d")
530        let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
531        chunk.set_selection(selection);
532
533        assert_eq!(chunk.row_count(), 2);
534        assert_eq!(chunk.total_row_count(), 5);
535
536        // Flatten should copy selected rows
537        chunk.flatten();
538
539        // After flatten, total_row_count should equal row_count
540        assert_eq!(chunk.row_count(), 2);
541        assert_eq!(chunk.total_row_count(), 2);
542        assert!(chunk.selection().is_none());
543
544        // Verify the data is correct
545        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
546        assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
547        assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
548        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
549    }
550
551    #[test]
552    fn test_chunk_flatten_no_selection() {
553        let schema = [LogicalType::Int64];
554        let mut builder = DataChunkBuilder::with_schema(&schema);
555
556        builder.column_mut(0).unwrap().push_int64(42);
557        builder.advance_row();
558
559        let mut chunk = builder.finish();
560        let original_count = chunk.row_count();
561
562        // Flatten with no selection should be a no-op
563        chunk.flatten();
564
565        assert_eq!(chunk.row_count(), original_count);
566        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
567    }
568}