reddb_server/storage/query/batch/columnar_scan.rs
1//! Columnar chunk → `ColumnBatch` bridge (#856, PRD #850 Phase 2).
2//!
3//! Connects the sealed columnar **chunk** decode (`RDCC` `ColumnBlock`,
4//! #852) to the vectorised [`ColumnBatch`](super::ColumnBatch) reader that
5//! has lived self-contained in `storage/query/batch/` since the batch
6//! sprint. An analytical scan over a columnar `Metrics`/`TimeSeries` chunk
7//! now decodes straight into one typed [`ColumnVector`] per referenced
8//! column — the same column-at-a-time layout the batch operators consume —
9//! instead of the row-at-a-time `Vec<TimeSeriesPoint>` accumulation
10//! [`points_from_column_block`](crate::storage::timeseries::chunk::points_from_column_block)
11//! produces.
12//!
13//! Two properties are load-bearing for the Phase 2 gate:
14//!
15//! * **Parity.** The values materialised here are bit-for-bit the values the
16//! row path materialises — the batch path reinterprets the *same* raw
17//! little-endian column bytes [`read_column_block`] hands back. No second
18//! decoder, no divergent rounding.
19//! * **Projection pushdown.** Only the columns named in `projection` are
20//! decoded; unreferenced column streams are never run through the codec
21//! chain (via [`read_column_block_projected`]). A scan that touches one
22//! column out of N pays for one column.
23//!
24//! Scope: this is the read/decode wiring only. The live INSERT→seal runtime
25//! call-site is owned by #861; full operator vectorisation across the SQL
26//! executor is explicitly out of scope (PRD #850).
27
28use std::sync::Arc;
29
30use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
31use crate::storage::schema::types::DataType;
32use crate::storage::unified::column_block::{
33 read_column_block_projected_typed, ColumnBlockError, ProjectedColumnData,
34};
35
36/// Failures decoding a columnar chunk into a [`ColumnBatch`].
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum ColumnarScanError {
39 /// The underlying `RDCC` block was malformed (bad magic, CRC, directory…).
40 Block(ColumnBlockError),
41 /// A requested column id was not present in the chunk's directory.
42 MissingColumn(u32),
43 /// The column's logical type has no `ColumnBatch` representation
44 /// (the batch layer carries only Int64/Float64/Bool/Text). Carries the
45 /// `DataType::to_byte()` tag that was rejected.
46 UnsupportedLogicalType(u8),
47 /// A fixed-width numeric stream's byte length was not a multiple of the
48 /// element width — the chunk is corrupt.
49 RaggedStream { column_id: u32, len: usize },
50}
51
52impl From<ColumnBlockError> for ColumnarScanError {
53 fn from(e: ColumnBlockError) -> Self {
54 ColumnarScanError::Block(e)
55 }
56}
57
58/// Map a stored `RDCC` logical-type tag to the batch layer's column kind.
59/// The batch executor reasons over four physical kinds; every fixed-width
60/// integer-family type collapses to `Int64` (8-byte LE reinterpret) and the
61/// float types to `Float64`. `None` for any type the batch layer can't hold.
62fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
63 match DataType::from_byte(tag)? {
64 // 8-byte integer-family streams. Unsigned values are reinterpreted
65 // through the same little-endian bytes; the row path does the same.
66 DataType::Integer
67 | DataType::UnsignedInteger
68 | DataType::Timestamp
69 | DataType::Duration => Some(ColumnKind::Int64),
70 DataType::Float => Some(ColumnKind::Float64),
71 DataType::Boolean => Some(ColumnKind::Bool),
72 DataType::Text => Some(ColumnKind::Text),
73 _ => None,
74 }
75}
76
77/// Decode the fixed-width numeric raw bytes of one column into a typed
78/// [`ColumnVector`]. Caller guarantees `kind` is a numeric kind.
79fn numeric_vector(
80 column_id: u32,
81 kind: &ColumnKind,
82 raw: &[u8],
83) -> Result<ColumnVector, ColumnarScanError> {
84 if !raw.len().is_multiple_of(8) {
85 return Err(ColumnarScanError::RaggedStream {
86 column_id,
87 len: raw.len(),
88 });
89 }
90 let n = raw.len() / 8;
91 Ok(match kind {
92 ColumnKind::Int64 => ColumnVector::Int64 {
93 data: le_bytes_to_i64_vec(raw, n),
94 validity: None,
95 },
96 ColumnKind::Float64 => ColumnVector::Float64 {
97 data: le_bytes_to_f64_vec(raw, n),
98 validity: None,
99 },
100 // Bool/Text never reach here (the caller routes them away); keep the
101 // match exhaustive without inventing a decode.
102 other => {
103 return Err(ColumnarScanError::UnsupportedLogicalType(match other {
104 ColumnKind::Bool => DataType::Boolean.to_byte(),
105 ColumnKind::Text => DataType::Text.to_byte(),
106 _ => unreachable!(),
107 }))
108 }
109 })
110}
111
112/// Convert a slice of little-endian 8-byte values to `Vec<i64>` (#962 fast path).
113/// On LE targets a single memcpy suffices; on BE each element is byte-swapped.
114fn le_bytes_to_i64_vec(raw: &[u8], n: usize) -> Vec<i64> {
115 #[cfg(target_endian = "little")]
116 {
117 // SAFETY: `raw` holds `n * 8` valid bytes. `Vec<i64>` is freshly allocated
118 // so source and destination never overlap. On LE platforms the bit pattern
119 // of LE-encoded i64 bytes equals the native i64 representation.
120 let mut v: Vec<i64> = Vec::with_capacity(n);
121 unsafe {
122 std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
123 v.set_len(n);
124 }
125 v
126 }
127 #[cfg(not(target_endian = "little"))]
128 raw.chunks_exact(8)
129 .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
130 .collect()
131}
132
133/// Convert a slice of little-endian 8-byte values to `Vec<f64>` (#962 fast path).
134fn le_bytes_to_f64_vec(raw: &[u8], n: usize) -> Vec<f64> {
135 #[cfg(target_endian = "little")]
136 {
137 // SAFETY: same as `le_bytes_to_i64_vec`. Every 8-byte pattern is a valid
138 // f64 bit pattern (including NaN / ±inf / ±0), so no invalid values can
139 // be produced.
140 let mut v: Vec<f64> = Vec::with_capacity(n);
141 unsafe {
142 std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
143 v.set_len(n);
144 }
145 v
146 }
147 #[cfg(not(target_endian = "little"))]
148 raw.chunks_exact(8)
149 .map(|b| f64::from_le_bytes(b.try_into().unwrap()))
150 .collect()
151}
152
153/// Reinterpret an aligned `Vec<u64>` as `Vec<i64>` without copying (#962).
154/// `u64` and `i64` share size (8) and alignment (8), so the allocation's
155/// layout is valid for `i64`; the bit pattern is the same little-endian
156/// representation [`numeric_vector`]'s byte path would have produced.
157fn u64_words_to_i64(v: Vec<u64>) -> Vec<i64> {
158 let mut me = std::mem::ManuallyDrop::new(v);
159 // SAFETY: identical layout (size 8, align 8); ptr/len/cap come from a live
160 // `Vec<u64>` we forget, so no double-free and no realloc-type mismatch.
161 unsafe { Vec::from_raw_parts(me.as_mut_ptr() as *mut i64, me.len(), me.capacity()) }
162}
163
164/// Reinterpret an aligned `Vec<u64>` as `Vec<f64>` without copying (#962).
165/// Every 64-bit pattern is a valid `f64` (incl. NaN / ±inf / ±0), identical to
166/// `f64::from_bits`, and the two types share size and alignment.
167fn u64_words_to_f64(v: Vec<u64>) -> Vec<f64> {
168 let mut me = std::mem::ManuallyDrop::new(v);
169 // SAFETY: identical layout (size 8, align 8); see `u64_words_to_i64`.
170 unsafe { Vec::from_raw_parts(me.as_mut_ptr() as *mut f64, me.len(), me.capacity()) }
171}
172
173/// Build a typed [`ColumnVector`] from a decoded column payload. The numeric
174/// fast path (`Words`) reinterprets the aligned `u64` buffer in place; the
175/// `Bytes` fallback copies through [`numeric_vector`].
176fn vector_for(
177 column_id: u32,
178 kind: &ColumnKind,
179 data: ProjectedColumnData,
180) -> Result<ColumnVector, ColumnarScanError> {
181 match data {
182 ProjectedColumnData::Words(words) => Ok(match kind {
183 ColumnKind::Int64 => ColumnVector::Int64 {
184 data: u64_words_to_i64(words),
185 validity: None,
186 },
187 ColumnKind::Float64 => ColumnVector::Float64 {
188 data: u64_words_to_f64(words),
189 validity: None,
190 },
191 other => {
192 return Err(ColumnarScanError::UnsupportedLogicalType(match other {
193 ColumnKind::Bool => DataType::Boolean.to_byte(),
194 ColumnKind::Text => DataType::Text.to_byte(),
195 _ => unreachable!(),
196 }))
197 }
198 }),
199 ProjectedColumnData::Bytes(raw) => numeric_vector(column_id, kind, &raw),
200 }
201}
202
203/// Decode a sealed columnar chunk (`RDCC` bytes) into a [`ColumnBatch`],
204/// materialising **only** the columns in `projection` (by stable column id,
205/// in the given order). This is the vectorised counterpart to the
206/// row-at-a-time `points_from_column_block`: identical values, column-major
207/// layout, and unreferenced columns are never decoded.
208///
209/// Field names are synthesised as `col_{id}` — the `RDCC` block keys columns
210/// by stable id, not by name; the batch operators address columns by index
211/// or by the schema's `index_of`, so the synthetic name is purely a handle.
212///
213/// Errors if a requested id is absent ([`ColumnarScanError::MissingColumn`])
214/// or carries a logical type the batch layer can't represent
215/// ([`ColumnarScanError::UnsupportedLogicalType`]). Only numeric chunks
216/// (Metrics/TimeSeries timestamp+value) are exercised today; Bool/Text map
217/// to a kind but their stream decode is out of this slice's scope and is
218/// rejected rather than silently mis-decoded.
219pub fn column_batch_from_block(
220 bytes: &[u8],
221 projection: &[u32],
222) -> Result<ColumnBatch, ColumnarScanError> {
223 let mut decoded = read_column_block_projected_typed(bytes, projection)?;
224
225 let mut fields = Vec::with_capacity(projection.len());
226 let mut columns = Vec::with_capacity(projection.len());
227
228 // Honour the caller's projection order (the projected reader returns
229 // columns in directory order, which may differ from the query order).
230 // `swap_remove` moves each column out so the typed `Vec<u64>` payload is
231 // reinterpreted in place rather than copied (#962).
232 for &id in projection {
233 let pos = decoded
234 .iter()
235 .position(|c| c.column_id == id)
236 .ok_or(ColumnarScanError::MissingColumn(id))?;
237 let col = decoded.swap_remove(pos);
238 let kind = kind_for_logical_type(col.logical_type)
239 .ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
240 let vector = match kind {
241 ColumnKind::Int64 | ColumnKind::Float64 => vector_for(id, &kind, col.data)?,
242 ColumnKind::Bool | ColumnKind::Text => {
243 return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
244 }
245 };
246 fields.push(Field {
247 name: format!("col_{id}"),
248 kind,
249 nullable: false,
250 });
251 columns.push(vector);
252 }
253
254 let schema = Arc::new(Schema::new(fields));
255 Ok(ColumnBatch::new(schema, columns))
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::storage::timeseries::chunk::{
262 points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
263 };
264
265 /// Seal a synthetic columnar chunk of `n` points and return its `RDCC`
266 /// bytes — the same path Metrics/TimeSeries collections seal through.
267 fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
268 // `with_max_points` so large measurement chunks aren't capped by the
269 // default 1024-point auto-seal threshold.
270 let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
271 for i in 0..n {
272 assert!(chunk.append(
273 1_700_000_000_000 + i as u64 * 1_000_000,
274 95.0 + (i % 7) as f64 * 0.25
275 ));
276 }
277 chunk.seal_columnar(7, 1).expect("seal columnar chunk")
278 }
279
280 #[test]
281 fn scan_produces_results_through_the_column_batch_path() {
282 // AC1: a scan over a columnar chunk yields results via ColumnBatch.
283 let block = sealed_columnar_chunk(300);
284 let batch =
285 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
286 .expect("decode into ColumnBatch");
287 assert_eq!(batch.len(), 300);
288 assert_eq!(batch.schema.len(), 2);
289 // Timestamp column is Int64 (u64 reinterpret), value column is Float64.
290 assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
291 assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
292 }
293
294 #[test]
295 fn batch_path_is_value_for_value_identical_to_the_row_path() {
296 // AC2: behavioural parity with the row-at-a-time path.
297 let block = sealed_columnar_chunk(257);
298 let row_points = points_from_column_block(&block).expect("row path");
299 let batch =
300 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
301 .expect("batch path");
302
303 assert_eq!(batch.len(), row_points.len());
304 for (i, p) in row_points.iter().enumerate() {
305 // u64 timestamp survives the i64 reinterpret round-trip.
306 let ts = match &batch.columns[0] {
307 ColumnVector::Int64 { data, .. } => data[i] as u64,
308 _ => unreachable!(),
309 };
310 let val = match &batch.columns[1] {
311 ColumnVector::Float64 { data, .. } => data[i],
312 _ => unreachable!(),
313 };
314 assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
315 assert_eq!(val, p.value, "value parity at row {i}");
316 }
317 }
318
319 #[test]
320 fn projection_decodes_only_referenced_columns() {
321 // AC3: only the projected column is materialised.
322 let block = sealed_columnar_chunk(128);
323 let ts_only =
324 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
325 assert_eq!(ts_only.schema.len(), 1);
326 assert_eq!(ts_only.columns.len(), 1);
327 assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
328 assert_eq!(ts_only.schema.index_of("col_1"), None);
329
330 let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
331 .expect("value-only projection");
332 assert_eq!(val_only.schema.len(), 1);
333 assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
334 }
335
336 #[test]
337 fn missing_column_is_an_error() {
338 let block = sealed_columnar_chunk(16);
339 // `ColumnBatch` isn't `PartialEq`, so match on the error arm directly
340 // rather than comparing the whole `Result`.
341 assert_eq!(
342 column_batch_from_block(&block, &[42]).unwrap_err(),
343 ColumnarScanError::MissingColumn(42)
344 );
345 }
346
347 #[test]
348 fn measured_row_vs_batch_decode_comparison() {
349 // AC4 (Phase 2 gate): record a measured comparison of the columnar
350 // batch decode vs the row-at-a-time decode over the same chunk.
351 // This test never asserts on timing (wall-clock is machine- and
352 // load-dependent and would be flaky); it asserts parity and prints
353 // the measurement so the Phase 2 go/no-go has a number to read. The
354 // figure is also captured in the issue envelope / commit message.
355 use std::time::Instant;
356
357 let n = 50_000;
358 let block = sealed_columnar_chunk(n);
359 let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
360
361 // Warm both paths once (codec setup, allocator) before timing.
362 let _ = points_from_column_block(&block).unwrap();
363 let _ = column_batch_from_block(&block, &projection).unwrap();
364
365 let reps = 20;
366 let t_row = Instant::now();
367 let mut row_rows = 0usize;
368 for _ in 0..reps {
369 row_rows = points_from_column_block(&block).unwrap().len();
370 }
371 let row_elapsed = t_row.elapsed();
372
373 let t_batch = Instant::now();
374 let mut batch_rows = 0usize;
375 for _ in 0..reps {
376 batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
377 }
378 let batch_elapsed = t_batch.elapsed();
379
380 // Both paths see the same rows — the comparison is apples-to-apples.
381 assert_eq!(row_rows, n);
382 assert_eq!(batch_rows, n);
383
384 let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
385 let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
386 eprintln!(
387 "[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
388 row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
389 ratio {:.2}x (batch/row)",
390 batch_ns / row_ns
391 );
392 }
393}