sparrowdb_execution/chunk.rs
1//! Chunked columnar data representation for the vectorized execution pipeline.
2//!
3//! # Design
4//!
5//! A [`DataChunk`] is a fixed-capacity batch of tuples stored in **columnar layout**.
6//! Each column is a contiguous `Vec<u64>` (SparrowDB stores all values as raw `u64`
7//! internally). Filtering uses a **selection vector** instead of compacting columns,
8//! avoiding memcpy and preserving alignment.
9//!
10//! ## Key Design Points
11//! - `u64` everywhere — SparrowDB already encodes strings, ints, floats, bools as `u64`.
12//! - Selection vectors instead of compaction: filtering updates `sel` (indices of live
13//! rows) without moving data. Compaction happens only at chunk boundaries.
14//! - Fixed chunk capacity: default `CHUNK_CAPACITY` (2048) rows. Small enough to stay
15//! in L1/L2 cache, large enough to amortize per-chunk overhead.
16//! - Null bitmaps: one bit per row per column, packed into `u64` words.
17
18/// Default number of rows in a chunk. Tunable per-query via planner hint.
19pub const CHUNK_CAPACITY: usize = 2048;
20
21// ── NullBitmap ────────────────────────────────────────────────────────────────
22
23/// A bit-packed null bitmap. Bit `i` is set (1) when row `i` is NULL.
24///
25/// Stored as a `Vec<u64>` where word `i/64` bit `i%64` represents row `i`.
26#[derive(Debug, Clone, Default)]
27pub struct NullBitmap {
28 words: Vec<u64>,
29 len: usize,
30}
31
32impl NullBitmap {
33 /// Create a bitmap with `len` bits, all clear (non-null).
34 pub fn with_len(len: usize) -> Self {
35 let words = len.div_ceil(64);
36 NullBitmap {
37 words: vec![0u64; words],
38 len,
39 }
40 }
41
42 /// Set bit `i` (mark row `i` as null).
43 #[inline]
44 pub fn set_null(&mut self, i: usize) {
45 self.words[i / 64] |= 1u64 << (i % 64);
46 }
47
48 /// Return true if row `i` is null.
49 #[inline]
50 pub fn is_null(&self, i: usize) -> bool {
51 (self.words[i / 64] >> (i % 64)) & 1 == 1
52 }
53
54 /// Number of rows represented.
55 pub fn len(&self) -> usize {
56 self.len
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.len == 0
61 }
62}
63
64// ── ColumnVector ──────────────────────────────────────────────────────────────
65
66/// A single column of raw `u64` values with a null bitmap.
67///
68/// SparrowDB encodes every value type (int, float, bool, string pointer) as a
69/// raw `u64`. Decoding happens only at the final projection stage.
70#[derive(Debug, Clone)]
71pub struct ColumnVector {
72 /// Raw encoded values — one entry per row.
73 pub data: Vec<u64>,
74 /// Null bitmap — bit `i` set means `data[i]` is NULL.
75 pub nulls: NullBitmap,
76 /// Which property column this vector represents (matches `col_id_of` encoding).
77 pub col_id: u32,
78}
79
80impl ColumnVector {
81 /// Create a zero-filled column for `len` rows with no nulls.
82 pub fn zeroed(col_id: u32, len: usize) -> Self {
83 ColumnVector {
84 data: vec![0u64; len],
85 nulls: NullBitmap::with_len(len),
86 col_id,
87 }
88 }
89
90 /// Create a column from a pre-populated data vector.
91 pub fn from_data(col_id: u32, data: Vec<u64>) -> Self {
92 let len = data.len();
93 ColumnVector {
94 data,
95 nulls: NullBitmap::with_len(len),
96 col_id,
97 }
98 }
99
100 /// Number of rows in this column.
101 pub fn len(&self) -> usize {
102 self.data.len()
103 }
104
105 pub fn is_empty(&self) -> bool {
106 self.data.is_empty()
107 }
108}
109
110// ── DataChunk ─────────────────────────────────────────────────────────────────
111
112/// A batch of tuples in columnar layout.
113///
114/// Rows 0..`len` hold data; `sel` narrows which rows are "live" after filtering.
115/// When `sel` is `None`, all rows 0..`len` are live (avoids allocation on non-
116/// filtering operators).
117#[derive(Debug, Clone)]
118pub struct DataChunk {
119 /// Per-column vectors. All columns must have the same length (`len`).
120 columns: Vec<ColumnVector>,
121 /// Number of valid rows (may be ≤ `CHUNK_CAPACITY`).
122 len: usize,
123 /// Selection vector: indices of "live" rows after filtering.
124 /// `None` means all rows 0..len are live.
125 sel: Option<Vec<u32>>,
126}
127
128impl DataChunk {
129 /// Create an empty chunk with no columns and no rows.
130 pub fn empty() -> Self {
131 DataChunk {
132 columns: Vec::new(),
133 len: 0,
134 sel: None,
135 }
136 }
137
138 /// Create a chunk from a set of column vectors.
139 ///
140 /// # Panics
141 /// Panics in debug mode if columns have different lengths.
142 pub fn from_columns(columns: Vec<ColumnVector>) -> Self {
143 let len = columns.first().map(|c| c.len()).unwrap_or(0);
144 debug_assert!(
145 columns.iter().all(|c| c.len() == len),
146 "all columns must have the same length"
147 );
148 DataChunk {
149 columns,
150 len,
151 sel: None,
152 }
153 }
154
155 /// Create a chunk with exactly two u64 columns (typically src_slot, dst_slot).
156 ///
157 /// Convenience constructor used by `GetNeighbors`.
158 pub fn from_two_vecs(col0_id: u32, col0: Vec<u64>, col1_id: u32, col1: Vec<u64>) -> Self {
159 debug_assert_eq!(col0.len(), col1.len());
160 let len = col0.len();
161 DataChunk {
162 columns: vec![
163 ColumnVector::from_data(col0_id, col0),
164 ColumnVector::from_data(col1_id, col1),
165 ],
166 len,
167 sel: None,
168 }
169 }
170
171 /// Number of physical rows (before selection).
172 pub fn len(&self) -> usize {
173 self.len
174 }
175
176 pub fn is_empty(&self) -> bool {
177 self.len == 0
178 }
179
180 /// Number of "live" rows after applying the selection vector.
181 pub fn live_len(&self) -> usize {
182 match &self.sel {
183 None => self.len,
184 Some(sel) => sel.len(),
185 }
186 }
187
188 /// Get a reference to a column by position.
189 pub fn column(&self, pos: usize) -> &ColumnVector {
190 &self.columns[pos]
191 }
192
193 /// Get a mutable reference to a column by position.
194 pub fn column_mut(&mut self, pos: usize) -> &mut ColumnVector {
195 &mut self.columns[pos]
196 }
197
198 /// Number of columns.
199 pub fn num_columns(&self) -> usize {
200 self.columns.len()
201 }
202
203 /// Find a column by `col_id`. Returns `None` if not present.
204 pub fn find_column(&self, col_id: u32) -> Option<&ColumnVector> {
205 self.columns.iter().find(|c| c.col_id == col_id)
206 }
207
208 /// Add a column to the chunk.
209 ///
210 /// # Panics
211 /// Panics in debug mode if the new column length differs from `self.len`.
212 pub fn push_column(&mut self, col: ColumnVector) {
213 debug_assert!(
214 self.columns.is_empty() || col.len() == self.len,
215 "column length mismatch: expected {}, got {}",
216 self.len,
217 col.len()
218 );
219 if self.columns.is_empty() {
220 self.len = col.len();
221 }
222 self.columns.push(col);
223 }
224
225 /// Return the current selection vector, or `None` if all rows are live.
226 pub fn sel(&self) -> Option<&[u32]> {
227 self.sel.as_deref()
228 }
229
230 /// Apply a predicate to each live row and update the selection vector.
231 ///
232 /// `pred` receives the row index `i` and returns `true` if the row should
233 /// remain live. Rows that return `false` are removed from `sel`.
234 ///
235 /// This is the core **Filter** operation: it never copies column data,
236 /// only updates the selection vector. O(live_len) time, O(1) extra memory
237 /// (modifies sel in place).
238 pub fn filter_sel<F>(&mut self, mut pred: F)
239 where
240 F: FnMut(usize) -> bool,
241 {
242 match self.sel.take() {
243 None => {
244 // All rows were live — build a new selection vector keeping only
245 // rows that pass the predicate.
246 let mut new_sel = Vec::with_capacity(self.len);
247 for i in 0..self.len {
248 if pred(i) {
249 new_sel.push(i as u32);
250 }
251 }
252 // Optimization: if all rows passed, keep sel=None (no allocation needed).
253 if new_sel.len() == self.len {
254 self.sel = None;
255 } else {
256 self.sel = Some(new_sel);
257 }
258 }
259 Some(old_sel) => {
260 // Narrow the existing selection vector.
261 let new_sel: Vec<u32> = old_sel.into_iter().filter(|&i| pred(i as usize)).collect();
262 if new_sel.len() == self.len {
263 self.sel = None;
264 } else {
265 self.sel = Some(new_sel);
266 }
267 }
268 }
269 }
270
271 /// Materialize this chunk into an iterator over live row indices.
272 ///
273 /// When `sel` is `None`, yields `0..len`. When `sel` is `Some(sel)`,
274 /// yields indices from the selection vector.
275 pub fn live_rows(&self) -> LiveRows<'_> {
276 LiveRows {
277 chunk: self,
278 pos: 0,
279 }
280 }
281
282 /// Consume the chunk and extract its column vectors.
283 pub fn into_columns(self) -> Vec<ColumnVector> {
284 self.columns
285 }
286}
287
288/// Iterator over live row indices of a [`DataChunk`].
289pub struct LiveRows<'a> {
290 chunk: &'a DataChunk,
291 pos: usize,
292}
293
294impl<'a> Iterator for LiveRows<'a> {
295 type Item = usize;
296
297 fn next(&mut self) -> Option<Self::Item> {
298 match &self.chunk.sel {
299 None => {
300 if self.pos < self.chunk.len {
301 let i = self.pos;
302 self.pos += 1;
303 Some(i)
304 } else {
305 None
306 }
307 }
308 Some(sel) => {
309 if self.pos < sel.len() {
310 let i = sel[self.pos] as usize;
311 self.pos += 1;
312 Some(i)
313 } else {
314 None
315 }
316 }
317 }
318 }
319}
320
321// ── Column-ID constants used by pipeline operators ───────────────────────────
322
323/// Synthetic column-id used for "slot" (node position within a label's column file).
324/// Chosen to be in a range that won't collide with real property col_ids
325/// (which are `col_id_of(name)` = djb2 hash, always ≥ 1).
326pub const COL_ID_SLOT: u32 = 0;
327
328/// Synthetic column-id for "source slot" in `GetNeighbors` output.
329pub const COL_ID_SRC_SLOT: u32 = u32::MAX - 1;
330
331/// Synthetic column-id for "destination slot" in `GetNeighbors` output.
332pub const COL_ID_DST_SLOT: u32 = u32::MAX - 2;
333
334// ── Tests ─────────────────────────────────────────────────────────────────────
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn null_bitmap_set_and_query() {
342 let mut bm = NullBitmap::with_len(130);
343 assert!(!bm.is_null(0));
344 assert!(!bm.is_null(63));
345 assert!(!bm.is_null(64));
346 assert!(!bm.is_null(129));
347
348 bm.set_null(0);
349 bm.set_null(63);
350 bm.set_null(64);
351 bm.set_null(129);
352
353 assert!(bm.is_null(0));
354 assert!(bm.is_null(63));
355 assert!(bm.is_null(64));
356 assert!(bm.is_null(129));
357 assert!(!bm.is_null(1));
358 assert!(!bm.is_null(65));
359 }
360
361 #[test]
362 fn data_chunk_filter_reduces_sel_vector() {
363 // Build a chunk with one column containing values 0..10.
364 let data: Vec<u64> = (0u64..10).collect();
365 let col = ColumnVector::from_data(COL_ID_SLOT, data);
366 let mut chunk = DataChunk::from_columns(vec![col]);
367
368 assert_eq!(chunk.live_len(), 10);
369 assert!(
370 chunk.sel().is_none(),
371 "sel should be None before any filter"
372 );
373
374 // Filter: keep only even rows (indices 0, 2, 4, 6, 8).
375 // Pre-compute keep vec to avoid simultaneous &chunk / &mut chunk borrow.
376 let keep: Vec<bool> = (0..chunk.len())
377 .map(|i| chunk.column(0).data[i].is_multiple_of(2))
378 .collect();
379 chunk.filter_sel(|i| keep[i]);
380
381 assert_eq!(chunk.live_len(), 5);
382 let sel = chunk.sel().expect("sel should be Some after filtering");
383 assert_eq!(sel, &[0u32, 2, 4, 6, 8]);
384
385 // Apply a second filter: keep only values > 4 (i.e. indices 6, 8).
386 let keep2: Vec<bool> = (0..chunk.len())
387 .map(|i| chunk.column(0).data[i] > 4)
388 .collect();
389 chunk.filter_sel(|i| keep2[i]);
390 assert_eq!(chunk.live_len(), 2);
391 let sel2 = chunk.sel().unwrap();
392 assert_eq!(sel2, &[6u32, 8]);
393 }
394
395 #[test]
396 fn data_chunk_filter_all_pass_keeps_sel_none() {
397 let data: Vec<u64> = vec![10, 20, 30];
398 let col = ColumnVector::from_data(COL_ID_SLOT, data);
399 let mut chunk = DataChunk::from_columns(vec![col]);
400 // Filter that keeps all rows.
401 chunk.filter_sel(|_| true);
402 assert!(
403 chunk.sel().is_none(),
404 "sel should remain None when all rows pass"
405 );
406 }
407
408 #[test]
409 fn live_rows_iteration() {
410 let data: Vec<u64> = (0u64..5).collect();
411 let col = ColumnVector::from_data(COL_ID_SLOT, data);
412 let mut chunk = DataChunk::from_columns(vec![col]);
413 // Keep only odd indices (1, 3).
414 chunk.filter_sel(|i| i % 2 == 1);
415 let live: Vec<usize> = chunk.live_rows().collect();
416 assert_eq!(live, vec![1, 3]);
417 }
418
419 #[test]
420 fn chunk_capacity_constant() {
421 assert_eq!(CHUNK_CAPACITY, 2048);
422 }
423}