Skip to main content

sparrowdb_execution/
chunk.rs

1//! Chunked columnar data representation for the vectorized execution pipeline.
2//!
3//! # Design
4//!
5//! A [`DataChunk`] is a fixed-capacity batch of tuples stored in **columnar layout**.
6//! Each column is a contiguous `Vec<u64>` (SparrowDB stores all values as raw `u64`
7//! internally). Filtering uses a **selection vector** instead of compacting columns,
8//! avoiding memcpy and preserving alignment.
9//!
10//! ## Key Design Points
11//! - `u64` everywhere — SparrowDB already encodes strings, ints, floats, bools as `u64`.
12//! - Selection vectors instead of compaction: filtering updates `sel` (indices of live
13//!   rows) without moving data. Compaction happens only at chunk boundaries.
14//! - Fixed chunk capacity: default `CHUNK_CAPACITY` (2048) rows. Small enough to stay
15//!   in L1/L2 cache, large enough to amortize per-chunk overhead.
16//! - Null bitmaps: one bit per row per column, packed into `u64` words.
17
18/// Default number of rows in a chunk. Tunable per-query via planner hint.
19pub const CHUNK_CAPACITY: usize = 2048;
20
21// ── NullBitmap ────────────────────────────────────────────────────────────────
22
23/// A bit-packed null bitmap. Bit `i` is set (1) when row `i` is NULL.
24///
25/// Stored as a `Vec<u64>` where word `i/64` bit `i%64` represents row `i`.
26#[derive(Debug, Clone, Default)]
27pub struct NullBitmap {
28    words: Vec<u64>,
29    len: usize,
30}
31
32impl NullBitmap {
33    /// Create a bitmap with `len` bits, all clear (non-null).
34    pub fn with_len(len: usize) -> Self {
35        let words = len.div_ceil(64);
36        NullBitmap {
37            words: vec![0u64; words],
38            len,
39        }
40    }
41
42    /// Set bit `i` (mark row `i` as null).
43    #[inline]
44    pub fn set_null(&mut self, i: usize) {
45        self.words[i / 64] |= 1u64 << (i % 64);
46    }
47
48    /// Return true if row `i` is null.
49    #[inline]
50    pub fn is_null(&self, i: usize) -> bool {
51        (self.words[i / 64] >> (i % 64)) & 1 == 1
52    }
53
54    /// Number of rows represented.
55    pub fn len(&self) -> usize {
56        self.len
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.len == 0
61    }
62}
63
64// ── ColumnVector ──────────────────────────────────────────────────────────────
65
66/// A single column of raw `u64` values with a null bitmap.
67///
68/// SparrowDB encodes every value type (int, float, bool, string pointer) as a
69/// raw `u64`. Decoding happens only at the final projection stage.
70#[derive(Debug, Clone)]
71pub struct ColumnVector {
72    /// Raw encoded values — one entry per row.
73    pub data: Vec<u64>,
74    /// Null bitmap — bit `i` set means `data[i]` is NULL.
75    pub nulls: NullBitmap,
76    /// Which property column this vector represents (matches `col_id_of` encoding).
77    pub col_id: u32,
78}
79
80impl ColumnVector {
81    /// Create a zero-filled column for `len` rows with no nulls.
82    pub fn zeroed(col_id: u32, len: usize) -> Self {
83        ColumnVector {
84            data: vec![0u64; len],
85            nulls: NullBitmap::with_len(len),
86            col_id,
87        }
88    }
89
90    /// Create a column from a pre-populated data vector.
91    pub fn from_data(col_id: u32, data: Vec<u64>) -> Self {
92        let len = data.len();
93        ColumnVector {
94            data,
95            nulls: NullBitmap::with_len(len),
96            col_id,
97        }
98    }
99
100    /// Number of rows in this column.
101    pub fn len(&self) -> usize {
102        self.data.len()
103    }
104
105    pub fn is_empty(&self) -> bool {
106        self.data.is_empty()
107    }
108}
109
110// ── DataChunk ─────────────────────────────────────────────────────────────────
111
112/// A batch of tuples in columnar layout.
113///
114/// Rows 0..`len` hold data; `sel` narrows which rows are "live" after filtering.
115/// When `sel` is `None`, all rows 0..`len` are live (avoids allocation on non-
116/// filtering operators).
117#[derive(Debug, Clone)]
118pub struct DataChunk {
119    /// Per-column vectors. All columns must have the same length (`len`).
120    columns: Vec<ColumnVector>,
121    /// Number of valid rows (may be ≤ `CHUNK_CAPACITY`).
122    len: usize,
123    /// Selection vector: indices of "live" rows after filtering.
124    /// `None` means all rows 0..len are live.
125    sel: Option<Vec<u32>>,
126}
127
128impl DataChunk {
129    /// Create an empty chunk with no columns and no rows.
130    pub fn empty() -> Self {
131        DataChunk {
132            columns: Vec::new(),
133            len: 0,
134            sel: None,
135        }
136    }
137
138    /// Create a chunk from a set of column vectors.
139    ///
140    /// # Panics
141    /// Panics in debug mode if columns have different lengths.
142    pub fn from_columns(columns: Vec<ColumnVector>) -> Self {
143        let len = columns.first().map(|c| c.len()).unwrap_or(0);
144        debug_assert!(
145            columns.iter().all(|c| c.len() == len),
146            "all columns must have the same length"
147        );
148        DataChunk {
149            columns,
150            len,
151            sel: None,
152        }
153    }
154
155    /// Create a chunk with exactly two u64 columns (typically src_slot, dst_slot).
156    ///
157    /// Convenience constructor used by `GetNeighbors`.
158    pub fn from_two_vecs(col0_id: u32, col0: Vec<u64>, col1_id: u32, col1: Vec<u64>) -> Self {
159        debug_assert_eq!(col0.len(), col1.len());
160        let len = col0.len();
161        DataChunk {
162            columns: vec![
163                ColumnVector::from_data(col0_id, col0),
164                ColumnVector::from_data(col1_id, col1),
165            ],
166            len,
167            sel: None,
168        }
169    }
170
171    /// Number of physical rows (before selection).
172    pub fn len(&self) -> usize {
173        self.len
174    }
175
176    pub fn is_empty(&self) -> bool {
177        self.len == 0
178    }
179
180    /// Number of "live" rows after applying the selection vector.
181    pub fn live_len(&self) -> usize {
182        match &self.sel {
183            None => self.len,
184            Some(sel) => sel.len(),
185        }
186    }
187
188    /// Get a reference to a column by position.
189    pub fn column(&self, pos: usize) -> &ColumnVector {
190        &self.columns[pos]
191    }
192
193    /// Get a mutable reference to a column by position.
194    pub fn column_mut(&mut self, pos: usize) -> &mut ColumnVector {
195        &mut self.columns[pos]
196    }
197
198    /// Number of columns.
199    pub fn num_columns(&self) -> usize {
200        self.columns.len()
201    }
202
203    /// Find a column by `col_id`. Returns `None` if not present.
204    pub fn find_column(&self, col_id: u32) -> Option<&ColumnVector> {
205        self.columns.iter().find(|c| c.col_id == col_id)
206    }
207
208    /// Add a column to the chunk.
209    ///
210    /// # Panics
211    /// Panics in debug mode if the new column length differs from `self.len`.
212    pub fn push_column(&mut self, col: ColumnVector) {
213        debug_assert!(
214            self.columns.is_empty() || col.len() == self.len,
215            "column length mismatch: expected {}, got {}",
216            self.len,
217            col.len()
218        );
219        if self.columns.is_empty() {
220            self.len = col.len();
221        }
222        self.columns.push(col);
223    }
224
225    /// Return the current selection vector, or `None` if all rows are live.
226    pub fn sel(&self) -> Option<&[u32]> {
227        self.sel.as_deref()
228    }
229
230    /// Apply a predicate to each live row and update the selection vector.
231    ///
232    /// `pred` receives the row index `i` and returns `true` if the row should
233    /// remain live. Rows that return `false` are removed from `sel`.
234    ///
235    /// This is the core **Filter** operation: it never copies column data,
236    /// only updates the selection vector. O(live_len) time, O(1) extra memory
237    /// (modifies sel in place).
238    pub fn filter_sel<F>(&mut self, mut pred: F)
239    where
240        F: FnMut(usize) -> bool,
241    {
242        match self.sel.take() {
243            None => {
244                // All rows were live — build a new selection vector keeping only
245                // rows that pass the predicate.
246                let mut new_sel = Vec::with_capacity(self.len);
247                for i in 0..self.len {
248                    if pred(i) {
249                        new_sel.push(i as u32);
250                    }
251                }
252                // Optimization: if all rows passed, keep sel=None (no allocation needed).
253                if new_sel.len() == self.len {
254                    self.sel = None;
255                } else {
256                    self.sel = Some(new_sel);
257                }
258            }
259            Some(old_sel) => {
260                // Narrow the existing selection vector.
261                let new_sel: Vec<u32> = old_sel.into_iter().filter(|&i| pred(i as usize)).collect();
262                if new_sel.len() == self.len {
263                    self.sel = None;
264                } else {
265                    self.sel = Some(new_sel);
266                }
267            }
268        }
269    }
270
271    /// Materialize this chunk into an iterator over live row indices.
272    ///
273    /// When `sel` is `None`, yields `0..len`. When `sel` is `Some(sel)`,
274    /// yields indices from the selection vector.
275    pub fn live_rows(&self) -> LiveRows<'_> {
276        LiveRows {
277            chunk: self,
278            pos: 0,
279        }
280    }
281
282    /// Consume the chunk and extract its column vectors.
283    pub fn into_columns(self) -> Vec<ColumnVector> {
284        self.columns
285    }
286}
287
288/// Iterator over live row indices of a [`DataChunk`].
289pub struct LiveRows<'a> {
290    chunk: &'a DataChunk,
291    pos: usize,
292}
293
294impl<'a> Iterator for LiveRows<'a> {
295    type Item = usize;
296
297    fn next(&mut self) -> Option<Self::Item> {
298        match &self.chunk.sel {
299            None => {
300                if self.pos < self.chunk.len {
301                    let i = self.pos;
302                    self.pos += 1;
303                    Some(i)
304                } else {
305                    None
306                }
307            }
308            Some(sel) => {
309                if self.pos < sel.len() {
310                    let i = sel[self.pos] as usize;
311                    self.pos += 1;
312                    Some(i)
313                } else {
314                    None
315                }
316            }
317        }
318    }
319}
320
321// ── Column-ID constants used by pipeline operators ───────────────────────────
322
323/// Synthetic column-id used for "slot" (node position within a label's column file).
324/// Chosen to be in a range that won't collide with real property col_ids
325/// (which are `col_id_of(name)` = djb2 hash, always ≥ 1).
326pub const COL_ID_SLOT: u32 = 0;
327
328/// Synthetic column-id for "source slot" in `GetNeighbors` output.
329pub const COL_ID_SRC_SLOT: u32 = u32::MAX - 1;
330
331/// Synthetic column-id for "destination slot" in `GetNeighbors` output.
332pub const COL_ID_DST_SLOT: u32 = u32::MAX - 2;
333
334// ── Tests ─────────────────────────────────────────────────────────────────────
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn null_bitmap_set_and_query() {
342        let mut bm = NullBitmap::with_len(130);
343        assert!(!bm.is_null(0));
344        assert!(!bm.is_null(63));
345        assert!(!bm.is_null(64));
346        assert!(!bm.is_null(129));
347
348        bm.set_null(0);
349        bm.set_null(63);
350        bm.set_null(64);
351        bm.set_null(129);
352
353        assert!(bm.is_null(0));
354        assert!(bm.is_null(63));
355        assert!(bm.is_null(64));
356        assert!(bm.is_null(129));
357        assert!(!bm.is_null(1));
358        assert!(!bm.is_null(65));
359    }
360
361    #[test]
362    fn data_chunk_filter_reduces_sel_vector() {
363        // Build a chunk with one column containing values 0..10.
364        let data: Vec<u64> = (0u64..10).collect();
365        let col = ColumnVector::from_data(COL_ID_SLOT, data);
366        let mut chunk = DataChunk::from_columns(vec![col]);
367
368        assert_eq!(chunk.live_len(), 10);
369        assert!(
370            chunk.sel().is_none(),
371            "sel should be None before any filter"
372        );
373
374        // Filter: keep only even rows (indices 0, 2, 4, 6, 8).
375        // Pre-compute keep vec to avoid simultaneous &chunk / &mut chunk borrow.
376        let keep: Vec<bool> = (0..chunk.len())
377            .map(|i| chunk.column(0).data[i].is_multiple_of(2))
378            .collect();
379        chunk.filter_sel(|i| keep[i]);
380
381        assert_eq!(chunk.live_len(), 5);
382        let sel = chunk.sel().expect("sel should be Some after filtering");
383        assert_eq!(sel, &[0u32, 2, 4, 6, 8]);
384
385        // Apply a second filter: keep only values > 4 (i.e. indices 6, 8).
386        let keep2: Vec<bool> = (0..chunk.len())
387            .map(|i| chunk.column(0).data[i] > 4)
388            .collect();
389        chunk.filter_sel(|i| keep2[i]);
390        assert_eq!(chunk.live_len(), 2);
391        let sel2 = chunk.sel().unwrap();
392        assert_eq!(sel2, &[6u32, 8]);
393    }
394
395    #[test]
396    fn data_chunk_filter_all_pass_keeps_sel_none() {
397        let data: Vec<u64> = vec![10, 20, 30];
398        let col = ColumnVector::from_data(COL_ID_SLOT, data);
399        let mut chunk = DataChunk::from_columns(vec![col]);
400        // Filter that keeps all rows.
401        chunk.filter_sel(|_| true);
402        assert!(
403            chunk.sel().is_none(),
404            "sel should remain None when all rows pass"
405        );
406    }
407
408    #[test]
409    fn live_rows_iteration() {
410        let data: Vec<u64> = (0u64..5).collect();
411        let col = ColumnVector::from_data(COL_ID_SLOT, data);
412        let mut chunk = DataChunk::from_columns(vec![col]);
413        // Keep only odd indices (1, 3).
414        chunk.filter_sel(|i| i % 2 == 1);
415        let live: Vec<usize> = chunk.live_rows().collect();
416        assert_eq!(live, vec![1, 3]);
417    }
418
419    #[test]
420    fn chunk_capacity_constant() {
421        assert_eq!(CHUNK_CAPACITY, 2048);
422    }
423}