Skip to main content

reddb_server/storage/query/batch/
columnar_scan.rs

1//! Columnar chunk → `ColumnBatch` bridge (#856, PRD #850 Phase 2).
2//!
3//! Connects the sealed columnar **chunk** decode (`RDCC` `ColumnBlock`,
4//! #852) to the vectorised [`ColumnBatch`](super::ColumnBatch) reader that
5//! has lived self-contained in `storage/query/batch/` since the batch
6//! sprint. An analytical scan over a columnar `Metrics`/`TimeSeries` chunk
7//! now decodes straight into one typed [`ColumnVector`] per referenced
8//! column — the same column-at-a-time layout the batch operators consume —
9//! instead of the row-at-a-time `Vec<TimeSeriesPoint>` accumulation
10//! [`points_from_column_block`](crate::storage::timeseries::chunk::points_from_column_block)
11//! produces.
12//!
13//! Two properties are load-bearing for the Phase 2 gate:
14//!
15//! * **Parity.** The values materialised here are bit-for-bit the values the
16//!   row path materialises — the batch path reinterprets the *same* raw
17//!   little-endian column bytes [`read_column_block`] hands back. No second
18//!   decoder, no divergent rounding.
19//! * **Projection pushdown.** Only the columns named in `projection` are
20//!   decoded; unreferenced column streams are never run through the codec
21//!   chain (via [`read_column_block_projected`]). A scan that touches one
22//!   column out of N pays for one column.
23//!
24//! Scope: this is the read/decode wiring only. The live INSERT→seal runtime
25//! call-site is owned by #861; full operator vectorisation across the SQL
26//! executor is explicitly out of scope (PRD #850).
27
28use std::sync::Arc;
29
30use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
31use crate::storage::schema::types::DataType;
32use crate::storage::unified::column_block::{
33    read_column_block_projected_typed, ColumnBlockError, ProjectedColumnData,
34};
35
36/// Failures decoding a columnar chunk into a [`ColumnBatch`].
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum ColumnarScanError {
39    /// The underlying `RDCC` block was malformed (bad magic, CRC, directory…).
40    Block(ColumnBlockError),
41    /// A requested column id was not present in the chunk's directory.
42    MissingColumn(u32),
43    /// The column's logical type has no `ColumnBatch` representation
44    /// (the batch layer carries only Int64/Float64/Bool/Text). Carries the
45    /// `DataType::to_byte()` tag that was rejected.
46    UnsupportedLogicalType(u8),
47    /// A fixed-width numeric stream's byte length was not a multiple of the
48    /// element width — the chunk is corrupt.
49    RaggedStream { column_id: u32, len: usize },
50}
51
52impl From<ColumnBlockError> for ColumnarScanError {
53    fn from(e: ColumnBlockError) -> Self {
54        ColumnarScanError::Block(e)
55    }
56}
57
58/// Map a stored `RDCC` logical-type tag to the batch layer's column kind.
59/// The batch executor reasons over four physical kinds; every fixed-width
60/// integer-family type collapses to `Int64` (8-byte LE reinterpret) and the
61/// float types to `Float64`. `None` for any type the batch layer can't hold.
62fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
63    match DataType::from_byte(tag)? {
64        // 8-byte integer-family streams. Unsigned values are reinterpreted
65        // through the same little-endian bytes; the row path does the same.
66        DataType::Integer
67        | DataType::UnsignedInteger
68        | DataType::Timestamp
69        | DataType::Duration => Some(ColumnKind::Int64),
70        DataType::Float => Some(ColumnKind::Float64),
71        DataType::Boolean => Some(ColumnKind::Bool),
72        DataType::Text => Some(ColumnKind::Text),
73        _ => None,
74    }
75}
76
77/// Decode the fixed-width numeric raw bytes of one column into a typed
78/// [`ColumnVector`]. Caller guarantees `kind` is a numeric kind.
79fn numeric_vector(
80    column_id: u32,
81    kind: &ColumnKind,
82    raw: &[u8],
83) -> Result<ColumnVector, ColumnarScanError> {
84    if !raw.len().is_multiple_of(8) {
85        return Err(ColumnarScanError::RaggedStream {
86            column_id,
87            len: raw.len(),
88        });
89    }
90    let n = raw.len() / 8;
91    Ok(match kind {
92        ColumnKind::Int64 => ColumnVector::Int64 {
93            data: le_bytes_to_i64_vec(raw, n),
94            validity: None,
95        },
96        ColumnKind::Float64 => ColumnVector::Float64 {
97            data: le_bytes_to_f64_vec(raw, n),
98            validity: None,
99        },
100        // Bool/Text never reach here (the caller routes them away); keep the
101        // match exhaustive without inventing a decode.
102        other => {
103            return Err(ColumnarScanError::UnsupportedLogicalType(match other {
104                ColumnKind::Bool => DataType::Boolean.to_byte(),
105                ColumnKind::Text => DataType::Text.to_byte(),
106                _ => unreachable!(),
107            }))
108        }
109    })
110}
111
112/// Convert a slice of little-endian 8-byte values to `Vec<i64>` (#962 fast path).
113/// On LE targets a single memcpy suffices; on BE each element is byte-swapped.
114fn le_bytes_to_i64_vec(raw: &[u8], n: usize) -> Vec<i64> {
115    #[cfg(target_endian = "little")]
116    {
117        // SAFETY: `raw` holds `n * 8` valid bytes. `Vec<i64>` is freshly allocated
118        // so source and destination never overlap. On LE platforms the bit pattern
119        // of LE-encoded i64 bytes equals the native i64 representation.
120        let mut v: Vec<i64> = Vec::with_capacity(n);
121        unsafe {
122            std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
123            v.set_len(n);
124        }
125        v
126    }
127    #[cfg(not(target_endian = "little"))]
128    raw.chunks_exact(8)
129        .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
130        .collect()
131}
132
133/// Convert a slice of little-endian 8-byte values to `Vec<f64>` (#962 fast path).
134fn le_bytes_to_f64_vec(raw: &[u8], n: usize) -> Vec<f64> {
135    #[cfg(target_endian = "little")]
136    {
137        // SAFETY: same as `le_bytes_to_i64_vec`. Every 8-byte pattern is a valid
138        // f64 bit pattern (including NaN / ±inf / ±0), so no invalid values can
139        // be produced.
140        let mut v: Vec<f64> = Vec::with_capacity(n);
141        unsafe {
142            std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
143            v.set_len(n);
144        }
145        v
146    }
147    #[cfg(not(target_endian = "little"))]
148    raw.chunks_exact(8)
149        .map(|b| f64::from_le_bytes(b.try_into().unwrap()))
150        .collect()
151}
152
153/// Reinterpret an aligned `Vec<u64>` as `Vec<i64>` without copying (#962).
154/// `u64` and `i64` share size (8) and alignment (8), so the allocation's
155/// layout is valid for `i64`; the bit pattern is the same little-endian
156/// representation [`numeric_vector`]'s byte path would have produced.
157fn u64_words_to_i64(v: Vec<u64>) -> Vec<i64> {
158    let mut me = std::mem::ManuallyDrop::new(v);
159    // SAFETY: identical layout (size 8, align 8); ptr/len/cap come from a live
160    // `Vec<u64>` we forget, so no double-free and no realloc-type mismatch.
161    unsafe { Vec::from_raw_parts(me.as_mut_ptr() as *mut i64, me.len(), me.capacity()) }
162}
163
164/// Reinterpret an aligned `Vec<u64>` as `Vec<f64>` without copying (#962).
165/// Every 64-bit pattern is a valid `f64` (incl. NaN / ±inf / ±0), identical to
166/// `f64::from_bits`, and the two types share size and alignment.
167fn u64_words_to_f64(v: Vec<u64>) -> Vec<f64> {
168    let mut me = std::mem::ManuallyDrop::new(v);
169    // SAFETY: identical layout (size 8, align 8); see `u64_words_to_i64`.
170    unsafe { Vec::from_raw_parts(me.as_mut_ptr() as *mut f64, me.len(), me.capacity()) }
171}
172
173/// Build a typed [`ColumnVector`] from a decoded column payload. The numeric
174/// fast path (`Words`) reinterprets the aligned `u64` buffer in place; the
175/// `Bytes` fallback copies through [`numeric_vector`].
176fn vector_for(
177    column_id: u32,
178    kind: &ColumnKind,
179    data: ProjectedColumnData,
180) -> Result<ColumnVector, ColumnarScanError> {
181    match data {
182        ProjectedColumnData::Words(words) => Ok(match kind {
183            ColumnKind::Int64 => ColumnVector::Int64 {
184                data: u64_words_to_i64(words),
185                validity: None,
186            },
187            ColumnKind::Float64 => ColumnVector::Float64 {
188                data: u64_words_to_f64(words),
189                validity: None,
190            },
191            other => {
192                return Err(ColumnarScanError::UnsupportedLogicalType(match other {
193                    ColumnKind::Bool => DataType::Boolean.to_byte(),
194                    ColumnKind::Text => DataType::Text.to_byte(),
195                    _ => unreachable!(),
196                }))
197            }
198        }),
199        ProjectedColumnData::Bytes(raw) => numeric_vector(column_id, kind, &raw),
200    }
201}
202
203/// Decode a sealed columnar chunk (`RDCC` bytes) into a [`ColumnBatch`],
204/// materialising **only** the columns in `projection` (by stable column id,
205/// in the given order). This is the vectorised counterpart to the
206/// row-at-a-time `points_from_column_block`: identical values, column-major
207/// layout, and unreferenced columns are never decoded.
208///
209/// Field names are synthesised as `col_{id}` — the `RDCC` block keys columns
210/// by stable id, not by name; the batch operators address columns by index
211/// or by the schema's `index_of`, so the synthetic name is purely a handle.
212///
213/// Errors if a requested id is absent ([`ColumnarScanError::MissingColumn`])
214/// or carries a logical type the batch layer can't represent
215/// ([`ColumnarScanError::UnsupportedLogicalType`]). Only numeric chunks
216/// (Metrics/TimeSeries timestamp+value) are exercised today; Bool/Text map
217/// to a kind but their stream decode is out of this slice's scope and is
218/// rejected rather than silently mis-decoded.
219pub fn column_batch_from_block(
220    bytes: &[u8],
221    projection: &[u32],
222) -> Result<ColumnBatch, ColumnarScanError> {
223    let mut decoded = read_column_block_projected_typed(bytes, projection)?;
224
225    let mut fields = Vec::with_capacity(projection.len());
226    let mut columns = Vec::with_capacity(projection.len());
227
228    // Honour the caller's projection order (the projected reader returns
229    // columns in directory order, which may differ from the query order).
230    // `swap_remove` moves each column out so the typed `Vec<u64>` payload is
231    // reinterpreted in place rather than copied (#962).
232    for &id in projection {
233        let pos = decoded
234            .iter()
235            .position(|c| c.column_id == id)
236            .ok_or(ColumnarScanError::MissingColumn(id))?;
237        let col = decoded.swap_remove(pos);
238        let kind = kind_for_logical_type(col.logical_type)
239            .ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
240        let vector = match kind {
241            ColumnKind::Int64 | ColumnKind::Float64 => vector_for(id, &kind, col.data)?,
242            ColumnKind::Bool | ColumnKind::Text => {
243                return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
244            }
245        };
246        fields.push(Field {
247            name: format!("col_{id}"),
248            kind,
249            nullable: false,
250        });
251        columns.push(vector);
252    }
253
254    let schema = Arc::new(Schema::new(fields));
255    Ok(ColumnBatch::new(schema, columns))
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::storage::timeseries::chunk::{
262        points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
263    };
264
265    /// Seal a synthetic columnar chunk of `n` points and return its `RDCC`
266    /// bytes — the same path Metrics/TimeSeries collections seal through.
267    fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
268        // `with_max_points` so large measurement chunks aren't capped by the
269        // default 1024-point auto-seal threshold.
270        let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
271        for i in 0..n {
272            assert!(chunk.append(
273                1_700_000_000_000 + i as u64 * 1_000_000,
274                95.0 + (i % 7) as f64 * 0.25
275            ));
276        }
277        chunk.seal_columnar(7, 1).expect("seal columnar chunk")
278    }
279
280    #[test]
281    fn scan_produces_results_through_the_column_batch_path() {
282        // AC1: a scan over a columnar chunk yields results via ColumnBatch.
283        let block = sealed_columnar_chunk(300);
284        let batch =
285            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
286                .expect("decode into ColumnBatch");
287        assert_eq!(batch.len(), 300);
288        assert_eq!(batch.schema.len(), 2);
289        // Timestamp column is Int64 (u64 reinterpret), value column is Float64.
290        assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
291        assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
292    }
293
294    #[test]
295    fn batch_path_is_value_for_value_identical_to_the_row_path() {
296        // AC2: behavioural parity with the row-at-a-time path.
297        let block = sealed_columnar_chunk(257);
298        let row_points = points_from_column_block(&block).expect("row path");
299        let batch =
300            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
301                .expect("batch path");
302
303        assert_eq!(batch.len(), row_points.len());
304        for (i, p) in row_points.iter().enumerate() {
305            // u64 timestamp survives the i64 reinterpret round-trip.
306            let ts = match &batch.columns[0] {
307                ColumnVector::Int64 { data, .. } => data[i] as u64,
308                _ => unreachable!(),
309            };
310            let val = match &batch.columns[1] {
311                ColumnVector::Float64 { data, .. } => data[i],
312                _ => unreachable!(),
313            };
314            assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
315            assert_eq!(val, p.value, "value parity at row {i}");
316        }
317    }
318
319    #[test]
320    fn projection_decodes_only_referenced_columns() {
321        // AC3: only the projected column is materialised.
322        let block = sealed_columnar_chunk(128);
323        let ts_only =
324            column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
325        assert_eq!(ts_only.schema.len(), 1);
326        assert_eq!(ts_only.columns.len(), 1);
327        assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
328        assert_eq!(ts_only.schema.index_of("col_1"), None);
329
330        let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
331            .expect("value-only projection");
332        assert_eq!(val_only.schema.len(), 1);
333        assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
334    }
335
336    #[test]
337    fn missing_column_is_an_error() {
338        let block = sealed_columnar_chunk(16);
339        // `ColumnBatch` isn't `PartialEq`, so match on the error arm directly
340        // rather than comparing the whole `Result`.
341        assert_eq!(
342            column_batch_from_block(&block, &[42]).unwrap_err(),
343            ColumnarScanError::MissingColumn(42)
344        );
345    }
346
347    #[test]
348    fn measured_row_vs_batch_decode_comparison() {
349        // AC4 (Phase 2 gate): record a measured comparison of the columnar
350        // batch decode vs the row-at-a-time decode over the same chunk.
351        // This test never asserts on timing (wall-clock is machine- and
352        // load-dependent and would be flaky); it asserts parity and prints
353        // the measurement so the Phase 2 go/no-go has a number to read. The
354        // figure is also captured in the issue envelope / commit message.
355        use std::time::Instant;
356
357        let n = 50_000;
358        let block = sealed_columnar_chunk(n);
359        let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
360
361        // Warm both paths once (codec setup, allocator) before timing.
362        let _ = points_from_column_block(&block).unwrap();
363        let _ = column_batch_from_block(&block, &projection).unwrap();
364
365        let reps = 20;
366        let t_row = Instant::now();
367        let mut row_rows = 0usize;
368        for _ in 0..reps {
369            row_rows = points_from_column_block(&block).unwrap().len();
370        }
371        let row_elapsed = t_row.elapsed();
372
373        let t_batch = Instant::now();
374        let mut batch_rows = 0usize;
375        for _ in 0..reps {
376            batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
377        }
378        let batch_elapsed = t_batch.elapsed();
379
380        // Both paths see the same rows — the comparison is apples-to-apples.
381        assert_eq!(row_rows, n);
382        assert_eq!(batch_rows, n);
383
384        let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
385        let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
386        eprintln!(
387            "[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
388             row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
389             ratio {:.2}x (batch/row)",
390            batch_ns / row_ns
391        );
392    }
393}