reddb_server/storage/query/batch/
columnar_scan.rs1use std::sync::Arc;
29
30use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
31use crate::storage::schema::types::DataType;
32use crate::storage::unified::column_block::{read_column_block_projected, ColumnBlockError};
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ColumnarScanError {
37 Block(ColumnBlockError),
39 MissingColumn(u32),
41 UnsupportedLogicalType(u8),
45 RaggedStream { column_id: u32, len: usize },
48}
49
50impl From<ColumnBlockError> for ColumnarScanError {
51 fn from(e: ColumnBlockError) -> Self {
52 ColumnarScanError::Block(e)
53 }
54}
55
56fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
61 match DataType::from_byte(tag)? {
62 DataType::Integer
65 | DataType::UnsignedInteger
66 | DataType::Timestamp
67 | DataType::Duration => Some(ColumnKind::Int64),
68 DataType::Float => Some(ColumnKind::Float64),
69 DataType::Boolean => Some(ColumnKind::Bool),
70 DataType::Text => Some(ColumnKind::Text),
71 _ => None,
72 }
73}
74
75fn numeric_vector(
78 column_id: u32,
79 kind: &ColumnKind,
80 raw: &[u8],
81) -> Result<ColumnVector, ColumnarScanError> {
82 if !raw.len().is_multiple_of(8) {
83 return Err(ColumnarScanError::RaggedStream {
84 column_id,
85 len: raw.len(),
86 });
87 }
88 let n = raw.len() / 8;
89 Ok(match kind {
90 ColumnKind::Int64 => ColumnVector::Int64 {
91 data: le_bytes_to_i64_vec(raw, n),
92 validity: None,
93 },
94 ColumnKind::Float64 => ColumnVector::Float64 {
95 data: le_bytes_to_f64_vec(raw, n),
96 validity: None,
97 },
98 other => {
101 return Err(ColumnarScanError::UnsupportedLogicalType(match other {
102 ColumnKind::Bool => DataType::Boolean.to_byte(),
103 ColumnKind::Text => DataType::Text.to_byte(),
104 _ => unreachable!(),
105 }))
106 }
107 })
108}
109
110fn le_bytes_to_i64_vec(raw: &[u8], n: usize) -> Vec<i64> {
113 #[cfg(target_endian = "little")]
114 {
115 let mut v: Vec<i64> = Vec::with_capacity(n);
119 unsafe {
120 std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
121 v.set_len(n);
122 }
123 v
124 }
125 #[cfg(not(target_endian = "little"))]
126 raw.chunks_exact(8)
127 .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
128 .collect()
129}
130
131fn le_bytes_to_f64_vec(raw: &[u8], n: usize) -> Vec<f64> {
133 #[cfg(target_endian = "little")]
134 {
135 let mut v: Vec<f64> = Vec::with_capacity(n);
139 unsafe {
140 std::ptr::copy_nonoverlapping(raw.as_ptr(), v.as_mut_ptr() as *mut u8, n * 8);
141 v.set_len(n);
142 }
143 v
144 }
145 #[cfg(not(target_endian = "little"))]
146 raw.chunks_exact(8)
147 .map(|b| f64::from_le_bytes(b.try_into().unwrap()))
148 .collect()
149}
150
151pub fn column_batch_from_block(
168 bytes: &[u8],
169 projection: &[u32],
170) -> Result<ColumnBatch, ColumnarScanError> {
171 let block = read_column_block_projected(bytes, projection)?;
172
173 let mut fields = Vec::with_capacity(projection.len());
174 let mut columns = Vec::with_capacity(projection.len());
175
176 for &id in projection {
179 let col = block
180 .columns
181 .iter()
182 .find(|c| c.column_id == id)
183 .ok_or(ColumnarScanError::MissingColumn(id))?;
184 let kind = kind_for_logical_type(col.logical_type)
185 .ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
186 let vector = match kind {
187 ColumnKind::Int64 | ColumnKind::Float64 => numeric_vector(id, &kind, &col.data)?,
188 ColumnKind::Bool | ColumnKind::Text => {
189 return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
190 }
191 };
192 fields.push(Field {
193 name: format!("col_{id}"),
194 kind,
195 nullable: false,
196 });
197 columns.push(vector);
198 }
199
200 let schema = Arc::new(Schema::new(fields));
201 Ok(ColumnBatch::new(schema, columns))
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::storage::timeseries::chunk::{
208 points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
209 };
210
211 fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
214 let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
217 for i in 0..n {
218 assert!(chunk.append(
219 1_700_000_000_000 + i as u64 * 1_000_000,
220 95.0 + (i % 7) as f64 * 0.25
221 ));
222 }
223 chunk.seal_columnar(7, 1).expect("seal columnar chunk")
224 }
225
226 #[test]
227 fn scan_produces_results_through_the_column_batch_path() {
228 let block = sealed_columnar_chunk(300);
230 let batch =
231 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
232 .expect("decode into ColumnBatch");
233 assert_eq!(batch.len(), 300);
234 assert_eq!(batch.schema.len(), 2);
235 assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
237 assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
238 }
239
240 #[test]
241 fn batch_path_is_value_for_value_identical_to_the_row_path() {
242 let block = sealed_columnar_chunk(257);
244 let row_points = points_from_column_block(&block).expect("row path");
245 let batch =
246 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
247 .expect("batch path");
248
249 assert_eq!(batch.len(), row_points.len());
250 for (i, p) in row_points.iter().enumerate() {
251 let ts = match &batch.columns[0] {
253 ColumnVector::Int64 { data, .. } => data[i] as u64,
254 _ => unreachable!(),
255 };
256 let val = match &batch.columns[1] {
257 ColumnVector::Float64 { data, .. } => data[i],
258 _ => unreachable!(),
259 };
260 assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
261 assert_eq!(val, p.value, "value parity at row {i}");
262 }
263 }
264
265 #[test]
266 fn projection_decodes_only_referenced_columns() {
267 let block = sealed_columnar_chunk(128);
269 let ts_only =
270 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
271 assert_eq!(ts_only.schema.len(), 1);
272 assert_eq!(ts_only.columns.len(), 1);
273 assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
274 assert_eq!(ts_only.schema.index_of("col_1"), None);
275
276 let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
277 .expect("value-only projection");
278 assert_eq!(val_only.schema.len(), 1);
279 assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
280 }
281
282 #[test]
283 fn missing_column_is_an_error() {
284 let block = sealed_columnar_chunk(16);
285 assert_eq!(
288 column_batch_from_block(&block, &[42]).unwrap_err(),
289 ColumnarScanError::MissingColumn(42)
290 );
291 }
292
293 #[test]
294 fn measured_row_vs_batch_decode_comparison() {
295 use std::time::Instant;
302
303 let n = 50_000;
304 let block = sealed_columnar_chunk(n);
305 let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
306
307 let _ = points_from_column_block(&block).unwrap();
309 let _ = column_batch_from_block(&block, &projection).unwrap();
310
311 let reps = 20;
312 let t_row = Instant::now();
313 let mut row_rows = 0usize;
314 for _ in 0..reps {
315 row_rows = points_from_column_block(&block).unwrap().len();
316 }
317 let row_elapsed = t_row.elapsed();
318
319 let t_batch = Instant::now();
320 let mut batch_rows = 0usize;
321 for _ in 0..reps {
322 batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
323 }
324 let batch_elapsed = t_batch.elapsed();
325
326 assert_eq!(row_rows, n);
328 assert_eq!(batch_rows, n);
329
330 let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
331 let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
332 eprintln!(
333 "[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
334 row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
335 ratio {:.2}x (batch/row)",
336 batch_ns / row_ns
337 );
338 }
339}