Skip to main content

reddb_server/storage/query/batch/
columnar_scan.rs

1//! Columnar chunk → `ColumnBatch` bridge (#856, PRD #850 Phase 2).
2//!
3//! Connects the sealed columnar **chunk** decode (`RDCC` `ColumnBlock`,
4//! #852) to the vectorised [`ColumnBatch`](super::ColumnBatch) reader that
5//! has lived self-contained in `storage/query/batch/` since the batch
6//! sprint. An analytical scan over a columnar `Metrics`/`TimeSeries` chunk
7//! now decodes straight into one typed [`ColumnVector`] per referenced
8//! column — the same column-at-a-time layout the batch operators consume —
9//! instead of the row-at-a-time `Vec<TimeSeriesPoint>` accumulation
10//! [`points_from_column_block`](crate::storage::timeseries::chunk::points_from_column_block)
11//! produces.
12//!
13//! Two properties are load-bearing for the Phase 2 gate:
14//!
15//! * **Parity.** The values materialised here are bit-for-bit the values the
16//!   row path materialises — the batch path reinterprets the *same* raw
17//!   little-endian column bytes [`read_column_block`] hands back. No second
18//!   decoder, no divergent rounding.
19//! * **Projection pushdown.** Only the columns named in `projection` are
20//!   decoded; unreferenced column streams are never run through the codec
21//!   chain (via [`read_column_block_projected`]). A scan that touches one
22//!   column out of N pays for one column.
23//!
24//! Scope: this is the read/decode wiring only. The live INSERT→seal runtime
25//! call-site is owned by #861; full operator vectorisation across the SQL
26//! executor is explicitly out of scope (PRD #850).
27
28use std::sync::Arc;
29
30use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
31use crate::storage::schema::types::DataType;
32use crate::storage::unified::column_block::{read_column_block_projected, ColumnBlockError};
33
34/// Failures decoding a columnar chunk into a [`ColumnBatch`].
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ColumnarScanError {
37    /// The underlying `RDCC` block was malformed (bad magic, CRC, directory…).
38    Block(ColumnBlockError),
39    /// A requested column id was not present in the chunk's directory.
40    MissingColumn(u32),
41    /// The column's logical type has no `ColumnBatch` representation
42    /// (the batch layer carries only Int64/Float64/Bool/Text). Carries the
43    /// `DataType::to_byte()` tag that was rejected.
44    UnsupportedLogicalType(u8),
45    /// A fixed-width numeric stream's byte length was not a multiple of the
46    /// element width — the chunk is corrupt.
47    RaggedStream { column_id: u32, len: usize },
48}
49
50impl From<ColumnBlockError> for ColumnarScanError {
51    fn from(e: ColumnBlockError) -> Self {
52        ColumnarScanError::Block(e)
53    }
54}
55
56/// Map a stored `RDCC` logical-type tag to the batch layer's column kind.
57/// The batch executor reasons over four physical kinds; every fixed-width
58/// integer-family type collapses to `Int64` (8-byte LE reinterpret) and the
59/// float types to `Float64`. `None` for any type the batch layer can't hold.
60fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
61    match DataType::from_byte(tag)? {
62        // 8-byte integer-family streams. Unsigned values are reinterpreted
63        // through the same little-endian bytes; the row path does the same.
64        DataType::Integer
65        | DataType::UnsignedInteger
66        | DataType::Timestamp
67        | DataType::Duration => Some(ColumnKind::Int64),
68        DataType::Float => Some(ColumnKind::Float64),
69        DataType::Boolean => Some(ColumnKind::Bool),
70        DataType::Text => Some(ColumnKind::Text),
71        _ => None,
72    }
73}
74
75/// Decode the fixed-width numeric raw bytes of one column into a typed
76/// [`ColumnVector`]. Caller guarantees `kind` is a numeric kind.
77fn numeric_vector(
78    column_id: u32,
79    kind: &ColumnKind,
80    raw: &[u8],
81) -> Result<ColumnVector, ColumnarScanError> {
82    if !raw.len().is_multiple_of(8) {
83        return Err(ColumnarScanError::RaggedStream {
84            column_id,
85            len: raw.len(),
86        });
87    }
88    let n = raw.len() / 8;
89    Ok(match kind {
90        ColumnKind::Int64 => ColumnVector::Int64 {
91            data: le_bytes_to_i64_vec(raw, n),
92            validity: None,
93        },
94        ColumnKind::Float64 => ColumnVector::Float64 {
95            data: le_bytes_to_f64_vec(raw, n),
96            validity: None,
97        },
98        // Bool/Text never reach here (the caller routes them away); keep the
99        // match exhaustive without inventing a decode.
100        other => {
101            return Err(ColumnarScanError::UnsupportedLogicalType(match other {
102                ColumnKind::Bool => DataType::Boolean.to_byte(),
103                ColumnKind::Text => DataType::Text.to_byte(),
104                _ => unreachable!(),
105            }))
106        }
107    })
108}
109
110/// Convert a slice of little-endian 8-byte values to `Vec<i64>` (#962 fast path).
111/// On LE targets a single memcpy suffices; on BE each element is byte-swapped.
112fn le_bytes_to_i64_vec(raw: &[u8], n: usize) -> Vec<i64> {
113    #[cfg(target_endian = "little")]
114    {
115        // SAFETY: `raw` holds `n * 8` valid bytes. `Vec<i64>` is freshly allocated
116        // so source and destination never overlap. On LE platforms the bit pattern
117        // of LE-encoded i64 bytes equals the native i64 representation.
118        let mut v: Vec<i64> = Vec::with_capacity(n);
119        unsafe {
120            std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
121            v.set_len(n);
122        }
123        v
124    }
125    #[cfg(not(target_endian = "little"))]
126    raw.chunks_exact(8)
127        .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
128        .collect()
129}
130
131/// Convert a slice of little-endian 8-byte values to `Vec<f64>` (#962 fast path).
132fn le_bytes_to_f64_vec(raw: &[u8], n: usize) -> Vec<f64> {
133    #[cfg(target_endian = "little")]
134    {
135        // SAFETY: same as `le_bytes_to_i64_vec`. Every 8-byte pattern is a valid
136        // f64 bit pattern (including NaN / ±inf / ±0), so no invalid values can
137        // be produced.
138        let mut v: Vec<f64> = Vec::with_capacity(n);
139        unsafe {
140            std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
141            v.set_len(n);
142        }
143        v
144    }
145    #[cfg(not(target_endian = "little"))]
146    raw.chunks_exact(8)
147        .map(|b| f64::from_le_bytes(b.try_into().unwrap()))
148        .collect()
149}
150
151/// Decode a sealed columnar chunk (`RDCC` bytes) into a [`ColumnBatch`],
152/// materialising **only** the columns in `projection` (by stable column id,
153/// in the given order). This is the vectorised counterpart to the
154/// row-at-a-time `points_from_column_block`: identical values, column-major
155/// layout, and unreferenced columns are never decoded.
156///
157/// Field names are synthesised as `col_{id}` — the `RDCC` block keys columns
158/// by stable id, not by name; the batch operators address columns by index
159/// or by the schema's `index_of`, so the synthetic name is purely a handle.
160///
161/// Errors if a requested id is absent ([`ColumnarScanError::MissingColumn`])
162/// or carries a logical type the batch layer can't represent
163/// ([`ColumnarScanError::UnsupportedLogicalType`]). Only numeric chunks
164/// (Metrics/TimeSeries timestamp+value) are exercised today; Bool/Text map
165/// to a kind but their stream decode is out of this slice's scope and is
166/// rejected rather than silently mis-decoded.
167pub fn column_batch_from_block(
168    bytes: &[u8],
169    projection: &[u32],
170) -> Result<ColumnBatch, ColumnarScanError> {
171    let block = read_column_block_projected(bytes, projection)?;
172
173    let mut fields = Vec::with_capacity(projection.len());
174    let mut columns = Vec::with_capacity(projection.len());
175
176    // Honour the caller's projection order (the projected reader returns
177    // columns in directory order, which may differ from the query order).
178    for &id in projection {
179        let col = block
180            .columns
181            .iter()
182            .find(|c| c.column_id == id)
183            .ok_or(ColumnarScanError::MissingColumn(id))?;
184        let kind = kind_for_logical_type(col.logical_type)
185            .ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
186        let vector = match kind {
187            ColumnKind::Int64 | ColumnKind::Float64 => numeric_vector(id, &kind, &col.data)?,
188            ColumnKind::Bool | ColumnKind::Text => {
189                return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
190            }
191        };
192        fields.push(Field {
193            name: format!("col_{id}"),
194            kind,
195            nullable: false,
196        });
197        columns.push(vector);
198    }
199
200    let schema = Arc::new(Schema::new(fields));
201    Ok(ColumnBatch::new(schema, columns))
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::storage::timeseries::chunk::{
208        points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
209    };
210
211    /// Seal a synthetic columnar chunk of `n` points and return its `RDCC`
212    /// bytes — the same path Metrics/TimeSeries collections seal through.
213    fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
214        // `with_max_points` so large measurement chunks aren't capped by the
215        // default 1024-point auto-seal threshold.
216        let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
217        for i in 0..n {
218            assert!(chunk.append(
219                1_700_000_000_000 + i as u64 * 1_000_000,
220                95.0 + (i % 7) as f64 * 0.25
221            ));
222        }
223        chunk.seal_columnar(7, 1).expect("seal columnar chunk")
224    }
225
226    #[test]
227    fn scan_produces_results_through_the_column_batch_path() {
228        // AC1: a scan over a columnar chunk yields results via ColumnBatch.
229        let block = sealed_columnar_chunk(300);
230        let batch =
231            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
232                .expect("decode into ColumnBatch");
233        assert_eq!(batch.len(), 300);
234        assert_eq!(batch.schema.len(), 2);
235        // Timestamp column is Int64 (u64 reinterpret), value column is Float64.
236        assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
237        assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
238    }
239
240    #[test]
241    fn batch_path_is_value_for_value_identical_to_the_row_path() {
242        // AC2: behavioural parity with the row-at-a-time path.
243        let block = sealed_columnar_chunk(257);
244        let row_points = points_from_column_block(&block).expect("row path");
245        let batch =
246            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
247                .expect("batch path");
248
249        assert_eq!(batch.len(), row_points.len());
250        for (i, p) in row_points.iter().enumerate() {
251            // u64 timestamp survives the i64 reinterpret round-trip.
252            let ts = match &batch.columns[0] {
253                ColumnVector::Int64 { data, .. } => data[i] as u64,
254                _ => unreachable!(),
255            };
256            let val = match &batch.columns[1] {
257                ColumnVector::Float64 { data, .. } => data[i],
258                _ => unreachable!(),
259            };
260            assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
261            assert_eq!(val, p.value, "value parity at row {i}");
262        }
263    }
264
265    #[test]
266    fn projection_decodes_only_referenced_columns() {
267        // AC3: only the projected column is materialised.
268        let block = sealed_columnar_chunk(128);
269        let ts_only =
270            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
271        assert_eq!(ts_only.schema.len(), 1);
272        assert_eq!(ts_only.columns.len(), 1);
273        assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
274        assert_eq!(ts_only.schema.index_of("col_1"), None);
275
276        let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
277            .expect("value-only projection");
278        assert_eq!(val_only.schema.len(), 1);
279        assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
280    }
281
282    #[test]
283    fn missing_column_is_an_error() {
284        let block = sealed_columnar_chunk(16);
285        // `ColumnBatch` isn't `PartialEq`, so match on the error arm directly
286        // rather than comparing the whole `Result`.
287        assert_eq!(
288            column_batch_from_block(&block, &[42]).unwrap_err(),
289            ColumnarScanError::MissingColumn(42)
290        );
291    }
292
293    #[test]
294    fn measured_row_vs_batch_decode_comparison() {
295        // AC4 (Phase 2 gate): record a measured comparison of the columnar
296        // batch decode vs the row-at-a-time decode over the same chunk.
297        // This test never asserts on timing (wall-clock is machine- and
298        // load-dependent and would be flaky); it asserts parity and prints
299        // the measurement so the Phase 2 go/no-go has a number to read. The
300        // figure is also captured in the issue envelope / commit message.
301        use std::time::Instant;
302
303        let n = 50_000;
304        let block = sealed_columnar_chunk(n);
305        let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
306
307        // Warm both paths once (codec setup, allocator) before timing.
308        let _ = points_from_column_block(&block).unwrap();
309        let _ = column_batch_from_block(&block, &projection).unwrap();
310
311        let reps = 20;
312        let t_row = Instant::now();
313        let mut row_rows = 0usize;
314        for _ in 0..reps {
315            row_rows = points_from_column_block(&block).unwrap().len();
316        }
317        let row_elapsed = t_row.elapsed();
318
319        let t_batch = Instant::now();
320        let mut batch_rows = 0usize;
321        for _ in 0..reps {
322            batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
323        }
324        let batch_elapsed = t_batch.elapsed();
325
326        // Both paths see the same rows — the comparison is apples-to-apples.
327        assert_eq!(row_rows, n);
328        assert_eq!(batch_rows, n);
329
330        let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
331        let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
332        eprintln!(
333            "[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
334             row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
335             ratio {:.2}x (batch/row)",
336            batch_ns / row_ns
337        );
338    }
339}