Skip to main content

reddb_server/storage/query/batch/
columnar_scan.rs

1//! Columnar chunk → `ColumnBatch` bridge (#856, PRD #850 Phase 2).
2//!
3//! Connects the sealed columnar **chunk** decode (`RDCC` `ColumnBlock`,
4//! #852) to the vectorised [`ColumnBatch`](super::ColumnBatch) reader that
5//! has lived self-contained in `storage/query/batch/` since the batch
6//! sprint. An analytical scan over a columnar `Metrics`/`TimeSeries` chunk
7//! now decodes straight into one typed [`ColumnVector`] per referenced
8//! column — the same column-at-a-time layout the batch operators consume —
9//! instead of the row-at-a-time `Vec<TimeSeriesPoint>` accumulation
10//! [`points_from_column_block`](crate::storage::timeseries::chunk::points_from_column_block)
11//! produces.
12//!
13//! Two properties are load-bearing for the Phase 2 gate:
14//!
15//! * **Parity.** The values materialised here are bit-for-bit the values the
16//!   row path materialises — the batch path reinterprets the *same* raw
17//!   little-endian column bytes [`read_column_block`] hands back. No second
18//!   decoder, no divergent rounding.
19//! * **Projection pushdown.** Only the columns named in `projection` are
20//!   decoded; unreferenced column streams are never run through the codec
21//!   chain (via [`read_column_block_projected`]). A scan that touches one
22//!   column out of N pays for one column.
23//!
24//! Scope: this is the read/decode wiring only. The live INSERT→seal runtime
25//! call-site is owned by #861; full operator vectorisation across the SQL
26//! executor is explicitly out of scope (PRD #850).
27
28use std::sync::Arc;
29
30use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
31use crate::storage::schema::types::DataType;
32use crate::storage::unified::column_block::{read_column_block_projected, ColumnBlockError};
33
34/// Failures decoding a columnar chunk into a [`ColumnBatch`].
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ColumnarScanError {
37    /// The underlying `RDCC` block was malformed (bad magic, CRC, directory…).
38    Block(ColumnBlockError),
39    /// A requested column id was not present in the chunk's directory.
40    MissingColumn(u32),
41    /// The column's logical type has no `ColumnBatch` representation
42    /// (the batch layer carries only Int64/Float64/Bool/Text). Carries the
43    /// `DataType::to_byte()` tag that was rejected.
44    UnsupportedLogicalType(u8),
45    /// A fixed-width numeric stream's byte length was not a multiple of the
46    /// element width — the chunk is corrupt.
47    RaggedStream { column_id: u32, len: usize },
48}
49
50impl From<ColumnBlockError> for ColumnarScanError {
51    fn from(e: ColumnBlockError) -> Self {
52        ColumnarScanError::Block(e)
53    }
54}
55
56/// Map a stored `RDCC` logical-type tag to the batch layer's column kind.
57/// The batch executor reasons over four physical kinds; every fixed-width
58/// integer-family type collapses to `Int64` (8-byte LE reinterpret) and the
59/// float types to `Float64`. `None` for any type the batch layer can't hold.
60fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
61    match DataType::from_byte(tag)? {
62        // 8-byte integer-family streams. Unsigned values are reinterpreted
63        // through the same little-endian bytes; the row path does the same.
64        DataType::Integer
65        | DataType::UnsignedInteger
66        | DataType::Timestamp
67        | DataType::Duration => Some(ColumnKind::Int64),
68        DataType::Float => Some(ColumnKind::Float64),
69        DataType::Boolean => Some(ColumnKind::Bool),
70        DataType::Text => Some(ColumnKind::Text),
71        _ => None,
72    }
73}
74
75/// Decode the fixed-width numeric raw bytes of one column into a typed
76/// [`ColumnVector`]. Caller guarantees `kind` is a numeric kind.
77fn numeric_vector(
78    column_id: u32,
79    kind: &ColumnKind,
80    raw: &[u8],
81) -> Result<ColumnVector, ColumnarScanError> {
82    if !raw.len().is_multiple_of(8) {
83        return Err(ColumnarScanError::RaggedStream {
84            column_id,
85            len: raw.len(),
86        });
87    }
88    Ok(match kind {
89        ColumnKind::Int64 => ColumnVector::Int64 {
90            data: raw
91                .chunks_exact(8)
92                .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
93                .collect(),
94            validity: None,
95        },
96        ColumnKind::Float64 => ColumnVector::Float64 {
97            data: raw
98                .chunks_exact(8)
99                .map(|b| f64::from_le_bytes(b.try_into().unwrap()))
100                .collect(),
101            validity: None,
102        },
103        // Bool/Text never reach here (the caller routes them away); keep the
104        // match exhaustive without inventing a decode.
105        other => {
106            return Err(ColumnarScanError::UnsupportedLogicalType(match other {
107                ColumnKind::Bool => DataType::Boolean.to_byte(),
108                ColumnKind::Text => DataType::Text.to_byte(),
109                _ => unreachable!(),
110            }))
111        }
112    })
113}
114
115/// Decode a sealed columnar chunk (`RDCC` bytes) into a [`ColumnBatch`],
116/// materialising **only** the columns in `projection` (by stable column id,
117/// in the given order). This is the vectorised counterpart to the
118/// row-at-a-time `points_from_column_block`: identical values, column-major
119/// layout, and unreferenced columns are never decoded.
120///
121/// Field names are synthesised as `col_{id}` — the `RDCC` block keys columns
122/// by stable id, not by name; the batch operators address columns by index
123/// or by the schema's `index_of`, so the synthetic name is purely a handle.
124///
125/// Errors if a requested id is absent ([`ColumnarScanError::MissingColumn`])
126/// or carries a logical type the batch layer can't represent
127/// ([`ColumnarScanError::UnsupportedLogicalType`]). Only numeric chunks
128/// (Metrics/TimeSeries timestamp+value) are exercised today; Bool/Text map
129/// to a kind but their stream decode is out of this slice's scope and is
130/// rejected rather than silently mis-decoded.
131pub fn column_batch_from_block(
132    bytes: &[u8],
133    projection: &[u32],
134) -> Result<ColumnBatch, ColumnarScanError> {
135    let block = read_column_block_projected(bytes, projection)?;
136
137    let mut fields = Vec::with_capacity(projection.len());
138    let mut columns = Vec::with_capacity(projection.len());
139
140    // Honour the caller's projection order (the projected reader returns
141    // columns in directory order, which may differ from the query order).
142    for &id in projection {
143        let col = block
144            .columns
145            .iter()
146            .find(|c| c.column_id == id)
147            .ok_or(ColumnarScanError::MissingColumn(id))?;
148        let kind = kind_for_logical_type(col.logical_type)
149            .ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
150        let vector = match kind {
151            ColumnKind::Int64 | ColumnKind::Float64 => numeric_vector(id, &kind, &col.data)?,
152            ColumnKind::Bool | ColumnKind::Text => {
153                return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
154            }
155        };
156        fields.push(Field {
157            name: format!("col_{id}"),
158            kind,
159            nullable: false,
160        });
161        columns.push(vector);
162    }
163
164    let schema = Arc::new(Schema::new(fields));
165    Ok(ColumnBatch::new(schema, columns))
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use crate::storage::timeseries::chunk::{
172        points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
173    };
174
175    /// Seal a synthetic columnar chunk of `n` points and return its `RDCC`
176    /// bytes — the same path Metrics/TimeSeries collections seal through.
177    fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
178        // `with_max_points` so large measurement chunks aren't capped by the
179        // default 1024-point auto-seal threshold.
180        let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
181        for i in 0..n {
182            assert!(chunk.append(
183                1_700_000_000_000 + i as u64 * 1_000_000,
184                95.0 + (i % 7) as f64 * 0.25
185            ));
186        }
187        chunk.seal_columnar(7, 1).expect("seal columnar chunk")
188    }
189
190    #[test]
191    fn scan_produces_results_through_the_column_batch_path() {
192        // AC1: a scan over a columnar chunk yields results via ColumnBatch.
193        let block = sealed_columnar_chunk(300);
194        let batch =
195            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
196                .expect("decode into ColumnBatch");
197        assert_eq!(batch.len(), 300);
198        assert_eq!(batch.schema.len(), 2);
199        // Timestamp column is Int64 (u64 reinterpret), value column is Float64.
200        assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
201        assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
202    }
203
204    #[test]
205    fn batch_path_is_value_for_value_identical_to_the_row_path() {
206        // AC2: behavioural parity with the row-at-a-time path.
207        let block = sealed_columnar_chunk(257);
208        let row_points = points_from_column_block(&block).expect("row path");
209        let batch =
210            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
211                .expect("batch path");
212
213        assert_eq!(batch.len(), row_points.len());
214        for (i, p) in row_points.iter().enumerate() {
215            // u64 timestamp survives the i64 reinterpret round-trip.
216            let ts = match &batch.columns[0] {
217                ColumnVector::Int64 { data, .. } => data[i] as u64,
218                _ => unreachable!(),
219            };
220            let val = match &batch.columns[1] {
221                ColumnVector::Float64 { data, .. } => data[i],
222                _ => unreachable!(),
223            };
224            assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
225            assert_eq!(val, p.value, "value parity at row {i}");
226        }
227    }
228
229    #[test]
230    fn projection_decodes_only_referenced_columns() {
231        // AC3: only the projected column is materialised.
232        let block = sealed_columnar_chunk(128);
233        let ts_only =
234            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
235        assert_eq!(ts_only.schema.len(), 1);
236        assert_eq!(ts_only.columns.len(), 1);
237        assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
238        assert_eq!(ts_only.schema.index_of("col_1"), None);
239
240        let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
241            .expect("value-only projection");
242        assert_eq!(val_only.schema.len(), 1);
243        assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
244    }
245
246    #[test]
247    fn missing_column_is_an_error() {
248        let block = sealed_columnar_chunk(16);
249        // `ColumnBatch` isn't `PartialEq`, so match on the error arm directly
250        // rather than comparing the whole `Result`.
251        assert_eq!(
252            column_batch_from_block(&block, &[42]).unwrap_err(),
253            ColumnarScanError::MissingColumn(42)
254        );
255    }
256
257    #[test]
258    fn measured_row_vs_batch_decode_comparison() {
259        // AC4 (Phase 2 gate): record a measured comparison of the columnar
260        // batch decode vs the row-at-a-time decode over the same chunk.
261        // This test never asserts on timing (wall-clock is machine- and
262        // load-dependent and would be flaky); it asserts parity and prints
263        // the measurement so the Phase 2 go/no-go has a number to read. The
264        // figure is also captured in the issue envelope / commit message.
265        use std::time::Instant;
266
267        let n = 50_000;
268        let block = sealed_columnar_chunk(n);
269        let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
270
271        // Warm both paths once (codec setup, allocator) before timing.
272        let _ = points_from_column_block(&block).unwrap();
273        let _ = column_batch_from_block(&block, &projection).unwrap();
274
275        let reps = 20;
276        let t_row = Instant::now();
277        let mut row_rows = 0usize;
278        for _ in 0..reps {
279            row_rows = points_from_column_block(&block).unwrap().len();
280        }
281        let row_elapsed = t_row.elapsed();
282
283        let t_batch = Instant::now();
284        let mut batch_rows = 0usize;
285        for _ in 0..reps {
286            batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
287        }
288        let batch_elapsed = t_batch.elapsed();
289
290        // Both paths see the same rows — the comparison is apples-to-apples.
291        assert_eq!(row_rows, n);
292        assert_eq!(batch_rows, n);
293
294        let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
295        let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
296        eprintln!(
297            "[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
298             row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
299             ratio {:.2}x (batch/row)",
300            batch_ns / row_ns
301        );
302    }
303}