Skip to main content

grafeo_core/execution/
chunk.rs

1//! DataChunk - the fundamental unit of vectorized execution.
2//!
3//! A DataChunk holds a batch of rows in columnar format. Processing data in
4//! batches (typically 1024-2048 rows) lets the CPU stay busy and enables SIMD.
5
6use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use crate::index::ZoneMapEntry;
9use grafeo_common::types::LogicalType;
10use grafeo_common::utils::hash::FxHashMap;
11
12/// Default chunk size (number of tuples).
13pub const DEFAULT_CHUNK_SIZE: usize = 2048;
14
15/// Zone map hints for chunk-level predicate pruning.
16///
17/// When a scan operator loads a chunk, it can attach zone map statistics
18/// (min/max values) for each column. The filter operator can then skip
19/// the entire chunk without evaluating rows when the zone map proves
20/// no rows can match the predicate.
21///
22/// # Example
23///
24/// If filtering `age > 100` and the chunk's max age is 80, the entire
25/// chunk can be skipped because no row can possibly match.
26#[derive(Debug, Clone, Default)]
27pub struct ChunkZoneHints {
28    /// Zone map entries keyed by column index.
29    pub column_hints: FxHashMap<usize, ZoneMapEntry>,
30}
31
32/// A batch of rows stored column-wise for vectorized processing.
33///
34/// Instead of storing rows like `[(a1,b1), (a2,b2), ...]`, we store columns
35/// like `[a1,a2,...], [b1,b2,...]`. This is cache-friendly for analytical
36/// queries that touch few columns but many rows.
37///
38/// The optional `SelectionVector` lets you filter rows without copying data -
39/// just mark which row indices are "selected".
40///
41/// # Example
42///
43/// ```
44/// use grafeo_core::execution::DataChunk;
45/// use grafeo_core::execution::ValueVector;
46/// use grafeo_common::types::Value;
47///
48/// // Create columns
49/// let names = ValueVector::from_values(&[Value::from("Alice"), Value::from("Bob")]);
50/// let ages = ValueVector::from_values(&[Value::from(30i64), Value::from(25i64)]);
51///
52/// // Bundle into a chunk
53/// let chunk = DataChunk::new(vec![names, ages]);
54/// assert_eq!(chunk.len(), 2);
55/// ```
56#[derive(Debug)]
57pub struct DataChunk {
58    /// Column vectors.
59    columns: Vec<ValueVector>,
60    /// Selection vector (None means all rows are selected).
61    selection: Option<SelectionVector>,
62    /// Number of rows in this chunk.
63    count: usize,
64    /// Capacity of this chunk.
65    capacity: usize,
66    /// Zone map hints for chunk-level filtering (optional).
67    zone_hints: Option<ChunkZoneHints>,
68}
69
70impl DataChunk {
71    /// Creates an empty data chunk with no columns.
72    #[must_use]
73    pub fn empty() -> Self {
74        Self {
75            columns: Vec::new(),
76            selection: None,
77            count: 0,
78            capacity: 0,
79            zone_hints: None,
80        }
81    }
82
83    /// Creates a new data chunk from existing vectors.
84    #[must_use]
85    pub fn new(columns: Vec<ValueVector>) -> Self {
86        let count = columns.first().map_or(0, ValueVector::len);
87        let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
88        Self {
89            columns,
90            selection: None,
91            count,
92            capacity,
93            zone_hints: None,
94        }
95    }
96
97    /// Creates a new empty data chunk with the given schema.
98    #[must_use]
99    pub fn with_schema(column_types: &[LogicalType]) -> Self {
100        Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
101    }
102
103    /// Creates a new data chunk with the given schema and capacity.
104    #[must_use]
105    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
106        let columns = column_types
107            .iter()
108            .map(|t| ValueVector::with_capacity(t.clone(), capacity))
109            .collect();
110
111        Self {
112            columns,
113            selection: None,
114            count: 0,
115            capacity,
116            zone_hints: None,
117        }
118    }
119
120    /// Returns the number of columns.
121    #[must_use]
122    pub fn column_count(&self) -> usize {
123        self.columns.len()
124    }
125
126    /// Returns the number of rows (considering selection).
127    #[must_use]
128    pub fn row_count(&self) -> usize {
129        self.selection.as_ref().map_or(self.count, |s| s.len())
130    }
131
132    /// Alias for row_count().
133    #[must_use]
134    pub fn len(&self) -> usize {
135        self.row_count()
136    }
137
138    /// Returns all columns.
139    #[must_use]
140    pub fn columns(&self) -> &[ValueVector] {
141        &self.columns
142    }
143
144    /// Returns the total number of rows (ignoring selection).
145    #[must_use]
146    pub fn total_row_count(&self) -> usize {
147        self.count
148    }
149
150    /// Returns true if the chunk is empty.
151    #[must_use]
152    pub fn is_empty(&self) -> bool {
153        self.row_count() == 0
154    }
155
156    /// Returns the capacity of this chunk.
157    #[must_use]
158    pub fn capacity(&self) -> usize {
159        self.capacity
160    }
161
162    /// Returns true if the chunk is full.
163    #[must_use]
164    pub fn is_full(&self) -> bool {
165        self.count >= self.capacity
166    }
167
168    /// Gets a column by index.
169    #[must_use]
170    pub fn column(&self, index: usize) -> Option<&ValueVector> {
171        self.columns.get(index)
172    }
173
174    /// Gets a mutable column by index.
175    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
176        self.columns.get_mut(index)
177    }
178
179    /// Returns the selection vector.
180    #[must_use]
181    pub fn selection(&self) -> Option<&SelectionVector> {
182        self.selection.as_ref()
183    }
184
185    /// Sets the selection vector.
186    pub fn set_selection(&mut self, selection: SelectionVector) {
187        self.selection = Some(selection);
188    }
189
190    /// Clears the selection vector (selects all rows).
191    pub fn clear_selection(&mut self) {
192        self.selection = None;
193    }
194
195    /// Sets zone map hints for this chunk.
196    ///
197    /// Zone map hints enable the filter operator to skip entire chunks
198    /// when predicates can't possibly match based on min/max statistics.
199    pub fn set_zone_hints(&mut self, hints: ChunkZoneHints) {
200        self.zone_hints = Some(hints);
201    }
202
203    /// Returns zone map hints if available.
204    ///
205    /// Used by the filter operator for chunk-level predicate pruning.
206    #[must_use]
207    pub fn zone_hints(&self) -> Option<&ChunkZoneHints> {
208        self.zone_hints.as_ref()
209    }
210
211    /// Clears zone map hints.
212    pub fn clear_zone_hints(&mut self) {
213        self.zone_hints = None;
214    }
215
216    /// Sets the row count.
217    pub fn set_count(&mut self, count: usize) {
218        self.count = count;
219    }
220
221    /// Resets the chunk for reuse.
222    pub fn reset(&mut self) {
223        for col in &mut self.columns {
224            col.clear();
225        }
226        self.selection = None;
227        self.zone_hints = None;
228        self.count = 0;
229    }
230
231    /// Flattens the selection by copying only selected rows.
232    ///
233    /// After this operation, selection is None and count equals the
234    /// previously selected row count.
235    pub fn flatten(&mut self) {
236        let selection = match self.selection.take() {
237            Some(sel) => sel,
238            None => return,
239        };
240
241        let selected_count = selection.len();
242
243        // Create new columns with only selected rows, preserving data types
244        let mut new_columns = Vec::with_capacity(self.columns.len());
245
246        for col in &self.columns {
247            // Create new vector with same data type as original
248            let mut new_col = ValueVector::with_type(col.data_type().clone());
249            for idx in selection.iter() {
250                if let Some(val) = col.get(idx) {
251                    new_col.push(val);
252                }
253            }
254            new_columns.push(new_col);
255        }
256
257        self.columns = new_columns;
258        self.count = selected_count;
259        self.capacity = selected_count;
260    }
261
262    /// Returns an iterator over selected row indices.
263    pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
264        match &self.selection {
265            Some(sel) => Box::new(sel.iter()),
266            None => Box::new(0..self.count),
267        }
268    }
269
270    /// Concatenates multiple chunks into a single chunk.
271    ///
272    /// All chunks must have the same schema (same number and types of columns).
273    pub fn concat(chunks: &[DataChunk]) -> DataChunk {
274        if chunks.is_empty() {
275            return DataChunk::empty();
276        }
277
278        if chunks.len() == 1 {
279            // Clone the single chunk
280            return DataChunk {
281                columns: chunks[0].columns.clone(),
282                selection: chunks[0].selection.clone(),
283                count: chunks[0].count,
284                capacity: chunks[0].capacity,
285                zone_hints: chunks[0].zone_hints.clone(),
286            };
287        }
288
289        let num_columns = chunks[0].column_count();
290        if num_columns == 0 {
291            return DataChunk::empty();
292        }
293
294        let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
295
296        // Concatenate each column
297        let mut result_columns = Vec::with_capacity(num_columns);
298
299        for col_idx in 0..num_columns {
300            let mut concat_vector = ValueVector::new();
301
302            for chunk in chunks {
303                if let Some(col) = chunk.column(col_idx) {
304                    // Append all values from this column
305                    for i in chunk.selected_indices() {
306                        if let Some(val) = col.get(i) {
307                            concat_vector.push(val);
308                        }
309                    }
310                }
311            }
312
313            result_columns.push(concat_vector);
314        }
315
316        DataChunk {
317            columns: result_columns,
318            selection: None,
319            count: total_rows,
320            capacity: total_rows,
321            zone_hints: None,
322        }
323    }
324
325    /// Applies a filter predicate and returns a new chunk with selected rows.
326    pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
327        // Combine existing selection with predicate
328        let selected: Vec<usize> = predicate
329            .iter()
330            .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
331            .collect();
332
333        let mut result_columns = Vec::with_capacity(self.columns.len());
334
335        for col in &self.columns {
336            let mut new_col = ValueVector::new();
337            for &idx in &selected {
338                if let Some(val) = col.get(idx) {
339                    new_col.push(val);
340                }
341            }
342            result_columns.push(new_col);
343        }
344
345        DataChunk {
346            columns: result_columns,
347            selection: None,
348            count: selected.len(),
349            capacity: selected.len(),
350            zone_hints: None,
351        }
352    }
353
354    /// Returns a slice of this chunk.
355    ///
356    /// Returns a new DataChunk containing rows [offset, offset + count).
357    #[must_use]
358    pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
359        if offset >= self.len() || count == 0 {
360            return DataChunk::empty();
361        }
362
363        let actual_count = count.min(self.len() - offset);
364        let mut result_columns = Vec::with_capacity(self.columns.len());
365
366        for col in &self.columns {
367            let mut new_col = ValueVector::new();
368            for i in offset..(offset + actual_count) {
369                let actual_idx = if let Some(sel) = &self.selection {
370                    sel.get(i).unwrap_or(i)
371                } else {
372                    i
373                };
374                if let Some(val) = col.get(actual_idx) {
375                    new_col.push(val);
376                }
377            }
378            result_columns.push(new_col);
379        }
380
381        DataChunk {
382            columns: result_columns,
383            selection: None,
384            count: actual_count,
385            capacity: actual_count,
386            zone_hints: None,
387        }
388    }
389
390    /// Returns the number of columns.
391    #[must_use]
392    pub fn num_columns(&self) -> usize {
393        self.columns.len()
394    }
395}
396
397impl Clone for DataChunk {
398    fn clone(&self) -> Self {
399        Self {
400            columns: self.columns.clone(),
401            selection: self.selection.clone(),
402            count: self.count,
403            capacity: self.capacity,
404            zone_hints: self.zone_hints.clone(),
405        }
406    }
407}
408
409/// Builder for creating DataChunks row by row.
410pub struct DataChunkBuilder {
411    chunk: DataChunk,
412}
413
414impl DataChunkBuilder {
415    /// Creates a new builder with the given schema.
416    #[must_use]
417    pub fn with_schema(column_types: &[LogicalType]) -> Self {
418        Self {
419            chunk: DataChunk::with_schema(column_types),
420        }
421    }
422
423    /// Creates a new builder with the given schema and capacity.
424    #[must_use]
425    pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
426        Self {
427            chunk: DataChunk::with_capacity(column_types, capacity),
428        }
429    }
430
431    /// Alias for with_schema for backward compatibility.
432    #[must_use]
433    pub fn new(column_types: &[LogicalType]) -> Self {
434        Self::with_schema(column_types)
435    }
436
437    /// Returns the current row count.
438    #[must_use]
439    pub fn row_count(&self) -> usize {
440        self.chunk.count
441    }
442
443    /// Returns true if the builder is full.
444    #[must_use]
445    pub fn is_full(&self) -> bool {
446        self.chunk.is_full()
447    }
448
449    /// Gets a mutable column for appending values.
450    pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
451        self.chunk.column_mut(index)
452    }
453
454    /// Increments the row count.
455    pub fn advance_row(&mut self) {
456        self.chunk.count += 1;
457    }
458
459    /// Finishes building and returns the chunk.
460    #[must_use]
461    pub fn finish(self) -> DataChunk {
462        self.chunk
463    }
464
465    /// Resets the builder for reuse.
466    pub fn reset(&mut self) {
467        self.chunk.reset();
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474
475    #[test]
476    fn test_chunk_creation() {
477        let schema = [LogicalType::Int64, LogicalType::String];
478        let chunk = DataChunk::with_schema(&schema);
479
480        assert_eq!(chunk.column_count(), 2);
481        assert_eq!(chunk.row_count(), 0);
482        assert!(chunk.is_empty());
483    }
484
485    #[test]
486    fn test_chunk_builder() {
487        let schema = [LogicalType::Int64, LogicalType::String];
488        let mut builder = DataChunkBuilder::with_schema(&schema);
489
490        // Add first row
491        builder.column_mut(0).unwrap().push_int64(1);
492        builder.column_mut(1).unwrap().push_string("hello");
493        builder.advance_row();
494
495        // Add second row
496        builder.column_mut(0).unwrap().push_int64(2);
497        builder.column_mut(1).unwrap().push_string("world");
498        builder.advance_row();
499
500        let chunk = builder.finish();
501
502        assert_eq!(chunk.row_count(), 2);
503        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
504        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
505    }
506
507    #[test]
508    fn test_chunk_selection() {
509        let schema = [LogicalType::Int64];
510        let mut builder = DataChunkBuilder::with_schema(&schema);
511
512        for i in 0..10 {
513            builder.column_mut(0).unwrap().push_int64(i);
514            builder.advance_row();
515        }
516
517        let mut chunk = builder.finish();
518        assert_eq!(chunk.row_count(), 10);
519
520        // Apply selection for even numbers
521        let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
522        chunk.set_selection(selection);
523
524        assert_eq!(chunk.row_count(), 5); // 0, 2, 4, 6, 8
525        assert_eq!(chunk.total_row_count(), 10);
526    }
527
528    #[test]
529    fn test_chunk_reset() {
530        let schema = [LogicalType::Int64];
531        let mut builder = DataChunkBuilder::with_schema(&schema);
532
533        builder.column_mut(0).unwrap().push_int64(1);
534        builder.advance_row();
535
536        let mut chunk = builder.finish();
537        assert_eq!(chunk.row_count(), 1);
538
539        chunk.reset();
540        assert_eq!(chunk.row_count(), 0);
541        assert!(chunk.is_empty());
542    }
543
544    #[test]
545    fn test_selected_indices() {
546        let schema = [LogicalType::Int64];
547        let mut chunk = DataChunk::with_schema(&schema);
548        chunk.set_count(5);
549
550        // No selection - should iterate 0..5
551        let indices: Vec<_> = chunk.selected_indices().collect();
552        assert_eq!(indices, vec![0, 1, 2, 3, 4]);
553
554        // With selection
555        let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
556        chunk.set_selection(selection);
557
558        let indices: Vec<_> = chunk.selected_indices().collect();
559        assert_eq!(indices, vec![1, 3]);
560    }
561
562    #[test]
563    fn test_chunk_flatten() {
564        let schema = [LogicalType::Int64, LogicalType::String];
565        let mut builder = DataChunkBuilder::with_schema(&schema);
566
567        // Add rows: (0, "a"), (1, "b"), (2, "c"), (3, "d"), (4, "e")
568        let letters = ["a", "b", "c", "d", "e"];
569        for i in 0..5 {
570            builder.column_mut(0).unwrap().push_int64(i);
571            builder
572                .column_mut(1)
573                .unwrap()
574                .push_string(letters[i as usize]);
575            builder.advance_row();
576        }
577
578        let mut chunk = builder.finish();
579
580        // Select only odd rows: (1, "b"), (3, "d")
581        let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
582        chunk.set_selection(selection);
583
584        assert_eq!(chunk.row_count(), 2);
585        assert_eq!(chunk.total_row_count(), 5);
586
587        // Flatten should copy selected rows
588        chunk.flatten();
589
590        // After flatten, total_row_count should equal row_count
591        assert_eq!(chunk.row_count(), 2);
592        assert_eq!(chunk.total_row_count(), 2);
593        assert!(chunk.selection().is_none());
594
595        // Verify the data is correct
596        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
597        assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
598        assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
599        assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
600    }
601
602    #[test]
603    fn test_chunk_flatten_no_selection() {
604        let schema = [LogicalType::Int64];
605        let mut builder = DataChunkBuilder::with_schema(&schema);
606
607        builder.column_mut(0).unwrap().push_int64(42);
608        builder.advance_row();
609
610        let mut chunk = builder.finish();
611        let original_count = chunk.row_count();
612
613        // Flatten with no selection should be a no-op
614        chunk.flatten();
615
616        assert_eq!(chunk.row_count(), original_count);
617        assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
618    }
619
620    #[test]
621    fn test_chunk_zone_hints_default() {
622        let hints = ChunkZoneHints::default();
623        assert!(hints.column_hints.is_empty());
624    }
625
626    #[test]
627    fn test_chunk_zone_hints_set_and_get() {
628        let schema = [LogicalType::Int64];
629        let mut chunk = DataChunk::with_schema(&schema);
630
631        // Initially no zone hints
632        assert!(chunk.zone_hints().is_none());
633
634        // Set zone hints
635        let mut hints = ChunkZoneHints::default();
636        hints.column_hints.insert(
637            0,
638            crate::index::ZoneMapEntry::with_min_max(
639                grafeo_common::types::Value::Int64(10),
640                grafeo_common::types::Value::Int64(100),
641                0,
642                10,
643            ),
644        );
645        chunk.set_zone_hints(hints);
646
647        // Zone hints should now be present
648        assert!(chunk.zone_hints().is_some());
649        let retrieved = chunk.zone_hints().unwrap();
650        assert_eq!(retrieved.column_hints.len(), 1);
651        assert!(retrieved.column_hints.contains_key(&0));
652    }
653
654    #[test]
655    fn test_chunk_zone_hints_clear() {
656        let schema = [LogicalType::Int64];
657        let mut chunk = DataChunk::with_schema(&schema);
658
659        // Set zone hints
660        let hints = ChunkZoneHints::default();
661        chunk.set_zone_hints(hints);
662        assert!(chunk.zone_hints().is_some());
663
664        // Clear zone hints
665        chunk.clear_zone_hints();
666        assert!(chunk.zone_hints().is_none());
667    }
668
669    #[test]
670    fn test_chunk_zone_hints_preserved_on_clone() {
671        let schema = [LogicalType::Int64];
672        let mut chunk = DataChunk::with_schema(&schema);
673
674        // Set zone hints
675        let mut hints = ChunkZoneHints::default();
676        hints.column_hints.insert(
677            0,
678            crate::index::ZoneMapEntry::with_min_max(
679                grafeo_common::types::Value::Int64(1),
680                grafeo_common::types::Value::Int64(10),
681                0,
682                10,
683            ),
684        );
685        chunk.set_zone_hints(hints);
686
687        // Clone and verify zone hints are preserved
688        let cloned = chunk.clone();
689        assert!(cloned.zone_hints().is_some());
690        assert_eq!(cloned.zone_hints().unwrap().column_hints.len(), 1);
691    }
692
693    #[test]
694    fn test_chunk_reset_clears_zone_hints() {
695        let schema = [LogicalType::Int64];
696        let mut chunk = DataChunk::with_schema(&schema);
697
698        // Set zone hints
699        let hints = ChunkZoneHints::default();
700        chunk.set_zone_hints(hints);
701        assert!(chunk.zone_hints().is_some());
702
703        // Reset should clear zone hints
704        chunk.reset();
705        assert!(chunk.zone_hints().is_none());
706    }
707}