Skip to main content

reddb_server/storage/query/batch/
column_batch.rs

1//! Columnar batch representation.
2//!
3//! A [`ColumnBatch`] is an immutable slice of up to [`BATCH_SIZE`]
4//! rows stored as one typed [`ColumnVector`] per column. Compared
5//! with the row-at-a-time `Binding` trail the Volcano iterators
6//! produce, the columnar layout unlocks tight inner loops the
7//! compiler can auto-vectorise and, eventually, explicit SIMD.
8
9use std::sync::Arc;
10
11/// Default vector width. Matches ClickHouse's `max_block_size`
12/// lower bound and fits comfortably in L2 cache for f64 columns
13/// (2048 × 8 B = 16 KiB per column).
14pub const BATCH_SIZE: usize = 2048;
15
16/// Column type markers. Kept deliberately small — the batch layer
17/// doesn't need the full `schema::Value` enum to execute arithmetic.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ColumnKind {
20    Int64,
21    Float64,
22    Bool,
23    Text,
24}
25
26#[derive(Debug, Clone)]
27pub struct Field {
28    pub name: String,
29    pub kind: ColumnKind,
30    /// `true` when `ColumnVector` slots of this column may contain
31    /// `None`. False lets the operator layer skip null checks.
32    pub nullable: bool,
33}
34
35#[derive(Debug, Clone)]
36pub struct Schema {
37    fields: Vec<Field>,
38}
39
40impl Schema {
41    pub fn new(fields: Vec<Field>) -> Self {
42        Self { fields }
43    }
44
45    pub fn fields(&self) -> &[Field] {
46        &self.fields
47    }
48
49    pub fn index_of(&self, name: &str) -> Option<usize> {
50        self.fields.iter().position(|f| f.name == name)
51    }
52
53    pub fn field(&self, idx: usize) -> Option<&Field> {
54        self.fields.get(idx)
55    }
56
57    pub fn len(&self) -> usize {
58        self.fields.len()
59    }
60
61    pub fn is_empty(&self) -> bool {
62        self.fields.is_empty()
63    }
64
65    pub fn with_subset(&self, indices: &[usize]) -> Self {
66        Self {
67            fields: indices
68                .iter()
69                .filter_map(|i| self.fields.get(*i).cloned())
70                .collect(),
71        }
72    }
73}
74
75/// Typed column storage. Nullable columns carry the validity bits
76/// alongside the payload so `Option` isn't materialised row-by-row.
77#[derive(Debug, Clone)]
78pub enum ColumnVector {
79    Int64 {
80        data: Vec<i64>,
81        validity: Option<Vec<bool>>,
82    },
83    Float64 {
84        data: Vec<f64>,
85        validity: Option<Vec<bool>>,
86    },
87    Bool {
88        data: Vec<bool>,
89        validity: Option<Vec<bool>>,
90    },
91    Text {
92        data: Vec<String>,
93        validity: Option<Vec<bool>>,
94    },
95}
96
97impl ColumnVector {
98    pub fn len(&self) -> usize {
99        match self {
100            ColumnVector::Int64 { data, .. } => data.len(),
101            ColumnVector::Float64 { data, .. } => data.len(),
102            ColumnVector::Bool { data, .. } => data.len(),
103            ColumnVector::Text { data, .. } => data.len(),
104        }
105    }
106
107    pub fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    pub fn kind(&self) -> ColumnKind {
112        match self {
113            ColumnVector::Int64 { .. } => ColumnKind::Int64,
114            ColumnVector::Float64 { .. } => ColumnKind::Float64,
115            ColumnVector::Bool { .. } => ColumnKind::Bool,
116            ColumnVector::Text { .. } => ColumnKind::Text,
117        }
118    }
119
120    pub fn is_valid(&self, idx: usize) -> bool {
121        let validity = match self {
122            ColumnVector::Int64 { validity, .. } => validity.as_ref(),
123            ColumnVector::Float64 { validity, .. } => validity.as_ref(),
124            ColumnVector::Bool { validity, .. } => validity.as_ref(),
125            ColumnVector::Text { validity, .. } => validity.as_ref(),
126        };
127        validity
128            .map(|v| v.get(idx).copied().unwrap_or(false))
129            .unwrap_or(true)
130    }
131
132    pub fn take_indices(&self, indices: &[usize]) -> ColumnVector {
133        match self {
134            ColumnVector::Int64 { data, validity } => {
135                let new_data: Vec<i64> = indices.iter().map(|i| data[*i]).collect();
136                let new_validity = validity.as_ref().map(|v| {
137                    indices
138                        .iter()
139                        .map(|i| *v.get(*i).unwrap_or(&true))
140                        .collect()
141                });
142                ColumnVector::Int64 {
143                    data: new_data,
144                    validity: new_validity,
145                }
146            }
147            ColumnVector::Float64 { data, validity } => {
148                let new_data: Vec<f64> = indices.iter().map(|i| data[*i]).collect();
149                let new_validity = validity.as_ref().map(|v| {
150                    indices
151                        .iter()
152                        .map(|i| *v.get(*i).unwrap_or(&true))
153                        .collect()
154                });
155                ColumnVector::Float64 {
156                    data: new_data,
157                    validity: new_validity,
158                }
159            }
160            ColumnVector::Bool { data, validity } => {
161                let new_data: Vec<bool> = indices.iter().map(|i| data[*i]).collect();
162                let new_validity = validity.as_ref().map(|v| {
163                    indices
164                        .iter()
165                        .map(|i| *v.get(*i).unwrap_or(&true))
166                        .collect()
167                });
168                ColumnVector::Bool {
169                    data: new_data,
170                    validity: new_validity,
171                }
172            }
173            ColumnVector::Text { data, validity } => {
174                let new_data: Vec<String> = indices.iter().map(|i| data[*i].clone()).collect();
175                let new_validity = validity.as_ref().map(|v| {
176                    indices
177                        .iter()
178                        .map(|i| *v.get(*i).unwrap_or(&true))
179                        .collect()
180                });
181                ColumnVector::Text {
182                    data: new_data,
183                    validity: new_validity,
184                }
185            }
186        }
187    }
188}
189
190/// Value pulled out of a batch at `(column, row)` — kept tiny so
191/// operator predicates can consume it without allocating.
192#[derive(Debug, Clone, PartialEq)]
193pub enum ValueRef<'a> {
194    Int64(i64),
195    Float64(f64),
196    Bool(bool),
197    Text(&'a str),
198    Null,
199}
200
201impl<'a> ValueRef<'a> {
202    pub fn as_i64(&self) -> Option<i64> {
203        match self {
204            ValueRef::Int64(v) => Some(*v),
205            _ => None,
206        }
207    }
208
209    pub fn as_f64(&self) -> Option<f64> {
210        match self {
211            ValueRef::Float64(v) => Some(*v),
212            ValueRef::Int64(v) => Some(*v as f64),
213            _ => None,
214        }
215    }
216
217    pub fn as_bool(&self) -> Option<bool> {
218        match self {
219            ValueRef::Bool(v) => Some(*v),
220            _ => None,
221        }
222    }
223
224    pub fn as_str(&self) -> Option<&str> {
225        match self {
226            ValueRef::Text(s) => Some(s),
227            _ => None,
228        }
229    }
230
231    pub fn is_null(&self) -> bool {
232        matches!(self, ValueRef::Null)
233    }
234}
235
236#[derive(Debug, Clone)]
237pub struct ColumnBatch {
238    pub schema: Arc<Schema>,
239    pub columns: Vec<ColumnVector>,
240    pub len: usize,
241}
242
243impl ColumnBatch {
244    pub fn new(schema: Arc<Schema>, columns: Vec<ColumnVector>) -> Self {
245        let len = columns.first().map(|c| c.len()).unwrap_or(0);
246        debug_assert!(
247            columns.iter().all(|c| c.len() == len),
248            "column lengths diverge in batch construction"
249        );
250        debug_assert_eq!(
251            schema.len(),
252            columns.len(),
253            "schema / column count mismatch"
254        );
255        Self {
256            schema,
257            columns,
258            len,
259        }
260    }
261
262    pub fn empty(schema: Arc<Schema>) -> Self {
263        let columns = schema
264            .fields()
265            .iter()
266            .map(|f| match f.kind {
267                ColumnKind::Int64 => ColumnVector::Int64 {
268                    data: Vec::new(),
269                    validity: None,
270                },
271                ColumnKind::Float64 => ColumnVector::Float64 {
272                    data: Vec::new(),
273                    validity: None,
274                },
275                ColumnKind::Bool => ColumnVector::Bool {
276                    data: Vec::new(),
277                    validity: None,
278                },
279                ColumnKind::Text => ColumnVector::Text {
280                    data: Vec::new(),
281                    validity: None,
282                },
283            })
284            .collect();
285        Self {
286            schema,
287            columns,
288            len: 0,
289        }
290    }
291
292    pub fn len(&self) -> usize {
293        self.len
294    }
295
296    pub fn is_empty(&self) -> bool {
297        self.len == 0
298    }
299
300    /// Fetch a single cell. Used by predicates that aren't SIMD-able.
301    pub fn value(&self, row: usize, column: usize) -> ValueRef<'_> {
302        if row >= self.len || column >= self.columns.len() {
303            return ValueRef::Null;
304        }
305        let col = &self.columns[column];
306        if !col.is_valid(row) {
307            return ValueRef::Null;
308        }
309        match col {
310            ColumnVector::Int64 { data, .. } => ValueRef::Int64(data[row]),
311            ColumnVector::Float64 { data, .. } => ValueRef::Float64(data[row]),
312            ColumnVector::Bool { data, .. } => ValueRef::Bool(data[row]),
313            ColumnVector::Text { data, .. } => ValueRef::Text(data[row].as_str()),
314        }
315    }
316
317    /// Build a new batch keeping only the rows at `indices`.
318    pub fn take(&self, indices: &[usize]) -> ColumnBatch {
319        let columns = self
320            .columns
321            .iter()
322            .map(|c| c.take_indices(indices))
323            .collect();
324        ColumnBatch {
325            schema: Arc::clone(&self.schema),
326            columns,
327            len: indices.len(),
328        }
329    }
330
331    /// Project a subset of columns (by index) into a narrower batch.
332    pub fn project(&self, indices: &[usize]) -> ColumnBatch {
333        let new_schema = Arc::new(self.schema.with_subset(indices));
334        let columns = indices
335            .iter()
336            .filter_map(|i| self.columns.get(*i).cloned())
337            .collect();
338        ColumnBatch {
339            schema: new_schema,
340            columns,
341            len: self.len,
342        }
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    fn simple_schema() -> Arc<Schema> {
351        Arc::new(Schema::new(vec![
352            Field {
353                name: "id".into(),
354                kind: ColumnKind::Int64,
355                nullable: false,
356            },
357            Field {
358                name: "value".into(),
359                kind: ColumnKind::Float64,
360                nullable: false,
361            },
362            Field {
363                name: "name".into(),
364                kind: ColumnKind::Text,
365                nullable: true,
366            },
367        ]))
368    }
369
370    fn batch_of(n: usize) -> ColumnBatch {
371        let schema = simple_schema();
372        let ids: Vec<i64> = (0..n as i64).collect();
373        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
374        let names: Vec<String> = (0..n).map(|i| format!("row-{i}")).collect();
375        ColumnBatch::new(
376            schema,
377            vec![
378                ColumnVector::Int64 {
379                    data: ids,
380                    validity: None,
381                },
382                ColumnVector::Float64 {
383                    data: values,
384                    validity: None,
385                },
386                ColumnVector::Text {
387                    data: names,
388                    validity: None,
389                },
390            ],
391        )
392    }
393
394    #[test]
395    fn schema_lookup_by_name_returns_index() {
396        let s = simple_schema();
397        assert_eq!(s.index_of("id"), Some(0));
398        assert_eq!(s.index_of("value"), Some(1));
399        assert_eq!(s.index_of("missing"), None);
400    }
401
402    #[test]
403    fn value_access_by_row_and_column() {
404        let b = batch_of(5);
405        assert_eq!(b.value(0, 0), ValueRef::Int64(0));
406        assert_eq!(b.value(3, 1), ValueRef::Float64(4.5));
407        assert_eq!(b.value(4, 2), ValueRef::Text("row-4"));
408    }
409
410    #[test]
411    fn value_out_of_range_yields_null() {
412        let b = batch_of(3);
413        assert!(b.value(99, 0).is_null());
414        assert!(b.value(0, 99).is_null());
415    }
416
417    #[test]
418    fn take_produces_reduced_batch_preserving_schema() {
419        let b = batch_of(10);
420        let taken = b.take(&[0, 2, 4]);
421        assert_eq!(taken.len(), 3);
422        assert_eq!(taken.value(1, 0), ValueRef::Int64(2));
423        assert_eq!(taken.value(2, 1), ValueRef::Float64(6.0));
424    }
425
426    #[test]
427    fn project_drops_unwanted_columns() {
428        let b = batch_of(4);
429        let p = b.project(&[0, 2]);
430        assert_eq!(p.schema.len(), 2);
431        assert_eq!(p.schema.index_of("value"), None);
432        assert_eq!(p.value(2, 0), ValueRef::Int64(2));
433    }
434
435    #[test]
436    fn validity_bits_mask_nulls() {
437        let col = ColumnVector::Int64 {
438            data: vec![1, 2, 3],
439            validity: Some(vec![true, false, true]),
440        };
441        assert!(col.is_valid(0));
442        assert!(!col.is_valid(1));
443        assert!(col.is_valid(2));
444    }
445
446    #[test]
447    fn batch_size_constant_is_power_of_two() {
448        assert_eq!(BATCH_SIZE & (BATCH_SIZE - 1), 0);
449        assert!(BATCH_SIZE >= 1024);
450    }
451}