Skip to main content

grafeo_core/execution/
chunk.rs

1//! DataChunk - the fundamental unit of vectorized execution.
2//!
3//! A DataChunk holds a batch of rows in columnar format. Processing data in
4//! batches (typically 1024-2048 rows) lets the CPU stay busy and enables SIMD.
5
6use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use crate::index::ZoneMapEntry;
9use grafeo_common::types::LogicalType;
10use grafeo_common::utils::hash::FxHashMap;
11
12/// Default chunk size (number of tuples).
13pub const DEFAULT_CHUNK_SIZE: usize = 2048;
14
15/// Zone map hints for chunk-level predicate pruning.
16///
17/// When a scan operator loads a chunk, it can attach zone map statistics
18/// (min/max values) for each column. The filter operator can then skip
19/// the entire chunk without evaluating rows when the zone map proves
20/// no rows can match the predicate.
21///
22/// # Example
23///
24/// If filtering `age > 100` and the chunk's max age is 80, the entire
25/// chunk can be skipped because no row can possibly match.
26#[derive(Debug, Clone, Default)]
27pub struct ChunkZoneHints {
28    /// Zone map entries keyed by column index.
29    pub column_hints: FxHashMap<usize, ZoneMapEntry>,
30}
31
32/// A batch of rows stored column-wise for vectorized processing.
33///
34/// Instead of storing rows like `[(a1,b1), (a2,b2), ...]`, we store columns
35/// like `[a1,a2,...], [b1,b2,...]`. This is cache-friendly for analytical
36/// queries that touch few columns but many rows.
37///
38/// The optional `SelectionVector` lets you filter rows without copying data -
39/// just mark which row indices are "selected".
40///
41/// # Example
42///
43/// ```
44/// use grafeo_core::execution::DataChunk;
45/// use grafeo_core::execution::ValueVector;
46/// use grafeo_common::types::Value;
47///
48/// // Create columns
49/// let names = ValueVector::from_values(&[Value::from("Alix"), Value::from("Gus")]);
50/// let ages = ValueVector::from_values(&[Value::from(30i64), Value::from(25i64)]);
51///
52/// // Bundle into a chunk
53/// let chunk = DataChunk::new(vec![names, ages]);
54/// assert_eq!(chunk.len(), 2);
55/// ```
56#[derive(Debug)]
57pub struct DataChunk {
58    /// Column vectors.
59    columns: Vec<ValueVector>,
60    /// Selection vector (None means all rows are selected).
61    selection: Option<SelectionVector>,
62    /// Number of rows in this chunk.
63    count: usize,
64    /// Capacity of this chunk.
65    capacity: usize,
66    /// Zone map hints for chunk-level filtering (optional).
67    zone_hints: Option<ChunkZoneHints>,
68}
69
70impl DataChunk {
71    /// Creates an empty data chunk with no columns.
72    #[must_use]
73    pub fn empty() -> Self {
74        Self {
75            columns: Vec::new(),
76            selection: None,
77            count: 0,
78            capacity: 0,
79            zone_hints: None,
80        }
81    }
82
83    /// Creates a new data chunk from existing vectors.
84    #[must_use]
85    pub fn new(columns: Vec<ValueVector>) -> Self {
86        let count = columns.first().map_or(0, ValueVector::len);
87        let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
88        Self {
89            columns,
90            selection: None,
91            count,
92            capacity,
93            zone_hints: None,
94        }
95    }
96
97    /// Creates a new empty data chunk with the given schema.
98    #[must_use]
99    pub fn with_schema(column_types: &[LogicalType]) -> Self {
100        Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
101    }
102
103    /// Creates a new data chunk with the given schema and capacity.
104    #[must_use]
105    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
106        let columns = column_types
107            .iter()
108            .map(|t| ValueVector::with_capacity(t.clone(), capacity))
109            .collect();
110
111        Self {
112            columns,
113            selection: None,
114            count: 0,
115            capacity,
116            zone_hints: None,
117        }
118    }
119
120    /// Returns the number of columns.
121    #[must_use]
122    pub fn column_count(&self) -> usize {
123        self.columns.len()
124    }
125
126    /// Returns the number of rows (considering selection).
127    #[must_use]
128    pub fn row_count(&self) -> usize {
129        self.selection.as_ref().map_or(self.count, |s| s.len())
130    }
131
132    /// Alias for row_count().
133    #[must_use]
134    pub fn len(&self) -> usize {
135        self.row_count()
136    }
137
138    /// Returns all columns.
139    #[must_use]
140    pub fn columns(&self) -> &[ValueVector] {
141        &self.columns
142    }
143
144    /// Returns the total number of rows (ignoring selection).
145    #[must_use]
146    pub fn total_row_count(&self) -> usize {
147        self.count
148    }
149
150    /// Returns true if the chunk is empty.
151    #[must_use]
152    pub fn is_empty(&self) -> bool {
153        self.row_count() == 0
154    }
155
156    /// Returns the capacity of this chunk.
157    #[must_use]
158    pub fn capacity(&self) -> usize {
159        self.capacity
160    }
161
162    /// Returns true if the chunk is full.
163    #[must_use]
164    pub fn is_full(&self) -> bool {
165        self.count >= self.capacity
166    }
167
168    /// Gets a column by index.
169    #[must_use]
170    pub fn column(&self, index: usize) -> Option<&ValueVector> {
171        self.columns.get(index)
172    }
173
174    /// Gets a mutable column by index.
175    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
176        self.columns.get_mut(index)
177    }
178
179    /// Returns the selection vector.
180    #[must_use]
181    pub fn selection(&self) -> Option<&SelectionVector> {
182        self.selection.as_ref()
183    }
184
185    /// Sets the selection vector.
186    pub fn set_selection(&mut self, selection: SelectionVector) {
187        self.selection = Some(selection);
188    }
189
190    /// Clears the selection vector (selects all rows).
191    pub fn clear_selection(&mut self) {
192        self.selection = None;
193    }
194
195    /// Sets zone map hints for this chunk.
196    ///
197    /// Zone map hints enable the filter operator to skip entire chunks
198    /// when predicates can't possibly match based on min/max statistics.
199    pub fn set_zone_hints(&mut self, hints: ChunkZoneHints) {
200        self.zone_hints = Some(hints);
201    }
202
203    /// Returns zone map hints if available.
204    ///
205    /// Used by the filter operator for chunk-level predicate pruning.
206    #[must_use]
207    pub fn zone_hints(&self) -> Option<&ChunkZoneHints> {
208        self.zone_hints.as_ref()
209    }
210
211    /// Clears zone map hints.
212    pub fn clear_zone_hints(&mut self) {
213        self.zone_hints = None;
214    }
215
216    /// Sets the row count.
217    pub fn set_count(&mut self, count: usize) {
218        self.count = count;
219    }
220
221    /// Resets the chunk for reuse.
222    pub fn reset(&mut self) {
223        for col in &mut self.columns {
224            col.clear();
225        }
226        self.selection = None;
227        self.zone_hints = None;
228        self.count = 0;
229    }
230
231    /// Flattens the selection by copying only selected rows.
232    ///
233    /// After this operation, selection is None and count equals the
234    /// previously selected row count.
235    pub fn flatten(&mut self) {
236        let Some(selection) = self.selection.take() else {
237            return;
238        };
239
240        let selected_count = selection.len();
241
242        // Create new columns with only selected rows, preserving data types
243        let mut new_columns = Vec::with_capacity(self.columns.len());
244
245        for col in &self.columns {
246            // Create new vector with same data type as original
247            let mut new_col = ValueVector::with_type(col.data_type().clone());
248            for idx in selection.iter() {
249                if let Some(val) = col.get(idx) {
250                    new_col.push(val);
251                }
252            }
253            new_columns.push(new_col);
254        }
255
256        self.columns = new_columns;
257        self.count = selected_count;
258        self.capacity = selected_count;
259    }
260
261    /// Returns an iterator over selected row indices.
262    pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
263        match &self.selection {
264            Some(sel) => Box::new(sel.iter()),
265            None => Box::new(0..self.count),
266        }
267    }
268
269    /// Concatenates multiple chunks into a single chunk.
270    ///
271    /// All chunks must have the same schema (same number and types of columns).
272    pub fn concat(chunks: &[DataChunk]) -> DataChunk {
273        if chunks.is_empty() {
274            return DataChunk::empty();
275        }
276
277        if chunks.len() == 1 {
278            // Clone the single chunk
279            return DataChunk {
280                columns: chunks[0].columns.clone(),
281                selection: chunks[0].selection.clone(),
282                count: chunks[0].count,
283                capacity: chunks[0].capacity,
284                zone_hints: chunks[0].zone_hints.clone(),
285            };
286        }
287
288        let num_columns = chunks[0].column_count();
289        if num_columns == 0 {
290            return DataChunk::empty();
291        }
292
293        let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
294
295        // Concatenate each column
296        let mut result_columns = Vec::with_capacity(num_columns);
297
298        for col_idx in 0..num_columns {
299            let mut concat_vector = ValueVector::new();
300
301            for chunk in chunks {
302                if let Some(col) = chunk.column(col_idx) {
303                    // Append all values from this column
304                    for i in chunk.selected_indices() {
305                        if let Some(val) = col.get(i) {
306                            concat_vector.push(val);
307                        }
308                    }
309                }
310            }
311
312            result_columns.push(concat_vector);
313        }
314
315        DataChunk {
316            columns: result_columns,
317            selection: None,
318            count: total_rows,
319            capacity: total_rows,
320            zone_hints: None,
321        }
322    }
323
324    /// Applies a filter predicate and returns a new chunk with selected rows.
325    pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
326        // Combine existing selection with predicate
327        let selected: Vec<usize> = predicate
328            .iter()
329            .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
330            .collect();
331
332        let mut result_columns = Vec::with_capacity(self.columns.len());
333
334        for col in &self.columns {
335            let mut new_col = ValueVector::new();
336            for &idx in &selected {
337                if let Some(val) = col.get(idx) {
338                    new_col.push(val);
339                }
340            }
341            result_columns.push(new_col);
342        }
343
344        DataChunk {
345            columns: result_columns,
346            selection: None,
347            count: selected.len(),
348            capacity: selected.len(),
349            zone_hints: None,
350        }
351    }
352
353    /// Returns a slice of this chunk.
354    ///
355    /// Returns a new DataChunk containing rows [offset, offset + count).
356    #[must_use]
357    pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
358        if offset >= self.len() || count == 0 {
359            return DataChunk::empty();
360        }
361
362        let actual_count = count.min(self.len() - offset);
363        let mut result_columns = Vec::with_capacity(self.columns.len());
364
365        for col in &self.columns {
366            let mut new_col = ValueVector::new();
367            for i in offset..(offset + actual_count) {
368                let actual_idx = if let Some(sel) = &self.selection {
369                    sel.get(i).unwrap_or(i)
370                } else {
371                    i
372                };
373                if let Some(val) = col.get(actual_idx) {
374                    new_col.push(val);
375                }
376            }
377            result_columns.push(new_col);
378        }
379
380        DataChunk {
381            columns: result_columns,
382            selection: None,
383            count: actual_count,
384            capacity: actual_count,
385            zone_hints: None,
386        }
387    }
388
389    /// Returns the number of columns.
390    #[must_use]
391    pub fn num_columns(&self) -> usize {
392        self.columns.len()
393    }
394}
395
396impl Clone for DataChunk {
397    fn clone(&self) -> Self {
398        Self {
399            columns: self.columns.clone(),
400            selection: self.selection.clone(),
401            count: self.count,
402            capacity: self.capacity,
403            zone_hints: self.zone_hints.clone(),
404        }
405    }
406}
407
408/// Builder for creating DataChunks row by row.
409pub struct DataChunkBuilder {
410    chunk: DataChunk,
411}
412
413impl DataChunkBuilder {
414    /// Creates a new builder with the given schema.
415    #[must_use]
416    pub fn with_schema(column_types: &[LogicalType]) -> Self {
417        Self {
418            chunk: DataChunk::with_schema(column_types),
419        }
420    }
421
422    /// Creates a new builder with the given schema and capacity.
423    #[must_use]
424    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
425        Self {
426            chunk: DataChunk::with_capacity(column_types, capacity),
427        }
428    }
429
430    /// Alias for with_schema for backward compatibility.
431    #[must_use]
432    pub fn new(column_types: &[LogicalType]) -> Self {
433        Self::with_schema(column_types)
434    }
435
436    /// Returns the current row count.
437    #[must_use]
438    pub fn row_count(&self) -> usize {
439        self.chunk.count
440    }
441
442    /// Returns true if the builder is full.
443    #[must_use]
444    pub fn is_full(&self) -> bool {
445        self.chunk.is_full()
446    }
447
448    /// Gets a mutable column for appending values.
449    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
450        self.chunk.column_mut(index)
451    }
452
453    /// Increments the row count.
454    pub fn advance_row(&mut self) {
455        self.chunk.count += 1;
456    }
457
458    /// Finishes building and returns the chunk.
459    #[must_use]
460    pub fn finish(self) -> DataChunk {
461        self.chunk
462    }
463
464    /// Resets the builder for reuse.
465    pub fn reset(&mut self) {
466        self.chunk.reset();
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_chunk_creation() {
476        let schema = [LogicalType::Int64, LogicalType::String];
477        let chunk = DataChunk::with_schema(&schema);
478
479        assert_eq!(chunk.column_count(), 2);
480        assert_eq!(chunk.row_count(), 0);
481        assert!(chunk.is_empty());
482    }
483
484    #[test]
485    fn test_chunk_builder() {
486        let schema = [LogicalType::Int64, LogicalType::String];
487        let mut builder = DataChunkBuilder::with_schema(&schema);
488
489        // Add first row
490        builder.column_mut(0).unwrap().push_int64(1);
491        builder.column_mut(1).unwrap().push_string("hello");
492        builder.advance_row();
493
494        // Add second row
495        builder.column_mut(0).unwrap().push_int64(2);
496        builder.column_mut(1).unwrap().push_string("world");
497        builder.advance_row();
498
499        let chunk = builder.finish();
500
501        assert_eq!(chunk.row_count(), 2);
502        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
503        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
504    }
505
506    #[test]
507    fn test_chunk_selection() {
508        let schema = [LogicalType::Int64];
509        let mut builder = DataChunkBuilder::with_schema(&schema);
510
511        for i in 0..10 {
512            builder.column_mut(0).unwrap().push_int64(i);
513            builder.advance_row();
514        }
515
516        let mut chunk = builder.finish();
517        assert_eq!(chunk.row_count(), 10);
518
519        // Apply selection for even numbers
520        let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
521        chunk.set_selection(selection);
522
523        assert_eq!(chunk.row_count(), 5); // 0, 2, 4, 6, 8
524        assert_eq!(chunk.total_row_count(), 10);
525    }
526
527    #[test]
528    fn test_chunk_reset() {
529        let schema = [LogicalType::Int64];
530        let mut builder = DataChunkBuilder::with_schema(&schema);
531
532        builder.column_mut(0).unwrap().push_int64(1);
533        builder.advance_row();
534
535        let mut chunk = builder.finish();
536        assert_eq!(chunk.row_count(), 1);
537
538        chunk.reset();
539        assert_eq!(chunk.row_count(), 0);
540        assert!(chunk.is_empty());
541    }
542
543    #[test]
544    fn test_selected_indices() {
545        let schema = [LogicalType::Int64];
546        let mut chunk = DataChunk::with_schema(&schema);
547        chunk.set_count(5);
548
549        // No selection - should iterate 0..5
550        let indices: Vec<_> = chunk.selected_indices().collect();
551        assert_eq!(indices, vec![0, 1, 2, 3, 4]);
552
553        // With selection
554        let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
555        chunk.set_selection(selection);
556
557        let indices: Vec<_> = chunk.selected_indices().collect();
558        assert_eq!(indices, vec![1, 3]);
559    }
560
561    #[test]
562    fn test_chunk_flatten() {
563        let schema = [LogicalType::Int64, LogicalType::String];
564        let mut builder = DataChunkBuilder::with_schema(&schema);
565
566        // Add rows: (0, "a"), (1, "b"), (2, "c"), (3, "d"), (4, "e")
567        let letters = ["a", "b", "c", "d", "e"];
568        for i in 0..5 {
569            builder.column_mut(0).unwrap().push_int64(i);
570            builder
571                .column_mut(1)
572                .unwrap()
573                .push_string(letters[i as usize]);
574            builder.advance_row();
575        }
576
577        let mut chunk = builder.finish();
578
579        // Select only odd rows: (1, "b"), (3, "d")
580        let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
581        chunk.set_selection(selection);
582
583        assert_eq!(chunk.row_count(), 2);
584        assert_eq!(chunk.total_row_count(), 5);
585
586        // Flatten should copy selected rows
587        chunk.flatten();
588
589        // After flatten, total_row_count should equal row_count
590        assert_eq!(chunk.row_count(), 2);
591        assert_eq!(chunk.total_row_count(), 2);
592        assert!(chunk.selection().is_none());
593
594        // Verify the data is correct
595        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
596        assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
597        assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
598        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
599    }
600
601    #[test]
602    fn test_chunk_flatten_no_selection() {
603        let schema = [LogicalType::Int64];
604        let mut builder = DataChunkBuilder::with_schema(&schema);
605
606        builder.column_mut(0).unwrap().push_int64(42);
607        builder.advance_row();
608
609        let mut chunk = builder.finish();
610        let original_count = chunk.row_count();
611
612        // Flatten with no selection should be a no-op
613        chunk.flatten();
614
615        assert_eq!(chunk.row_count(), original_count);
616        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
617    }
618
619    #[test]
620    fn test_chunk_zone_hints_default() {
621        let hints = ChunkZoneHints::default();
622        assert!(hints.column_hints.is_empty());
623    }
624
625    #[test]
626    fn test_chunk_zone_hints_set_and_get() {
627        let schema = [LogicalType::Int64];
628        let mut chunk = DataChunk::with_schema(&schema);
629
630        // Initially no zone hints
631        assert!(chunk.zone_hints().is_none());
632
633        // Set zone hints
634        let mut hints = ChunkZoneHints::default();
635        hints.column_hints.insert(
636            0,
637            crate::index::ZoneMapEntry::with_min_max(
638                grafeo_common::types::Value::Int64(10),
639                grafeo_common::types::Value::Int64(100),
640                0,
641                10,
642            ),
643        );
644        chunk.set_zone_hints(hints);
645
646        // Zone hints should now be present
647        assert!(chunk.zone_hints().is_some());
648        let retrieved = chunk.zone_hints().unwrap();
649        assert_eq!(retrieved.column_hints.len(), 1);
650        assert!(retrieved.column_hints.contains_key(&0));
651    }
652
653    #[test]
654    fn test_chunk_zone_hints_clear() {
655        let schema = [LogicalType::Int64];
656        let mut chunk = DataChunk::with_schema(&schema);
657
658        // Set zone hints
659        let hints = ChunkZoneHints::default();
660        chunk.set_zone_hints(hints);
661        assert!(chunk.zone_hints().is_some());
662
663        // Clear zone hints
664        chunk.clear_zone_hints();
665        assert!(chunk.zone_hints().is_none());
666    }
667
668    #[test]
669    fn test_chunk_zone_hints_preserved_on_clone() {
670        let schema = [LogicalType::Int64];
671        let mut chunk = DataChunk::with_schema(&schema);
672
673        // Set zone hints
674        let mut hints = ChunkZoneHints::default();
675        hints.column_hints.insert(
676            0,
677            crate::index::ZoneMapEntry::with_min_max(
678                grafeo_common::types::Value::Int64(1),
679                grafeo_common::types::Value::Int64(10),
680                0,
681                10,
682            ),
683        );
684        chunk.set_zone_hints(hints);
685
686        // Clone and verify zone hints are preserved
687        let cloned = chunk.clone();
688        assert!(cloned.zone_hints().is_some());
689        assert_eq!(cloned.zone_hints().unwrap().column_hints.len(), 1);
690    }
691
692    #[test]
693    fn test_chunk_reset_clears_zone_hints() {
694        let schema = [LogicalType::Int64];
695        let mut chunk = DataChunk::with_schema(&schema);
696
697        // Set zone hints
698        let hints = ChunkZoneHints::default();
699        chunk.set_zone_hints(hints);
700        assert!(chunk.zone_hints().is_some());
701
702        // Reset should clear zone hints
703        chunk.reset();
704        assert!(chunk.zone_hints().is_none());
705    }
706}