reddb_server/storage/query/batch/
columnar_scan.rs1use std::sync::Arc;
29
30use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
31use crate::storage::schema::types::DataType;
32use crate::storage::unified::column_block::{read_column_block_projected, ColumnBlockError};
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ColumnarScanError {
37 Block(ColumnBlockError),
39 MissingColumn(u32),
41 UnsupportedLogicalType(u8),
45 RaggedStream { column_id: u32, len: usize },
48}
49
50impl From<ColumnBlockError> for ColumnarScanError {
51 fn from(e: ColumnBlockError) -> Self {
52 ColumnarScanError::Block(e)
53 }
54}
55
56fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
61 match DataType::from_byte(tag)? {
62 DataType::Integer
65 | DataType::UnsignedInteger
66 | DataType::Timestamp
67 | DataType::Duration => Some(ColumnKind::Int64),
68 DataType::Float => Some(ColumnKind::Float64),
69 DataType::Boolean => Some(ColumnKind::Bool),
70 DataType::Text => Some(ColumnKind::Text),
71 _ => None,
72 }
73}
74
75fn numeric_vector(
78 column_id: u32,
79 kind: &ColumnKind,
80 raw: &[u8],
81) -> Result<ColumnVector, ColumnarScanError> {
82 if !raw.len().is_multiple_of(8) {
83 return Err(ColumnarScanError::RaggedStream {
84 column_id,
85 len: raw.len(),
86 });
87 }
88 Ok(match kind {
89 ColumnKind::Int64 => ColumnVector::Int64 {
90 data: raw
91 .chunks_exact(8)
92 .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
93 .collect(),
94 validity: None,
95 },
96 ColumnKind::Float64 => ColumnVector::Float64 {
97 data: raw
98 .chunks_exact(8)
99 .map(|b| f64::from_le_bytes(b.try_into().unwrap()))
100 .collect(),
101 validity: None,
102 },
103 other => {
106 return Err(ColumnarScanError::UnsupportedLogicalType(match other {
107 ColumnKind::Bool => DataType::Boolean.to_byte(),
108 ColumnKind::Text => DataType::Text.to_byte(),
109 _ => unreachable!(),
110 }))
111 }
112 })
113}
114
115pub fn column_batch_from_block(
132 bytes: &[u8],
133 projection: &[u32],
134) -> Result<ColumnBatch, ColumnarScanError> {
135 let block = read_column_block_projected(bytes, projection)?;
136
137 let mut fields = Vec::with_capacity(projection.len());
138 let mut columns = Vec::with_capacity(projection.len());
139
140 for &id in projection {
143 let col = block
144 .columns
145 .iter()
146 .find(|c| c.column_id == id)
147 .ok_or(ColumnarScanError::MissingColumn(id))?;
148 let kind = kind_for_logical_type(col.logical_type)
149 .ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
150 let vector = match kind {
151 ColumnKind::Int64 | ColumnKind::Float64 => numeric_vector(id, &kind, &col.data)?,
152 ColumnKind::Bool | ColumnKind::Text => {
153 return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
154 }
155 };
156 fields.push(Field {
157 name: format!("col_{id}"),
158 kind,
159 nullable: false,
160 });
161 columns.push(vector);
162 }
163
164 let schema = Arc::new(Schema::new(fields));
165 Ok(ColumnBatch::new(schema, columns))
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::storage::timeseries::chunk::{
172 points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
173 };
174
175 fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
178 let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
181 for i in 0..n {
182 assert!(chunk.append(
183 1_700_000_000_000 + i as u64 * 1_000_000,
184 95.0 + (i % 7) as f64 * 0.25
185 ));
186 }
187 chunk.seal_columnar(7, 1).expect("seal columnar chunk")
188 }
189
190 #[test]
191 fn scan_produces_results_through_the_column_batch_path() {
192 let block = sealed_columnar_chunk(300);
194 let batch =
195 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
196 .expect("decode into ColumnBatch");
197 assert_eq!(batch.len(), 300);
198 assert_eq!(batch.schema.len(), 2);
199 assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
201 assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
202 }
203
204 #[test]
205 fn batch_path_is_value_for_value_identical_to_the_row_path() {
206 let block = sealed_columnar_chunk(257);
208 let row_points = points_from_column_block(&block).expect("row path");
209 let batch =
210 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
211 .expect("batch path");
212
213 assert_eq!(batch.len(), row_points.len());
214 for (i, p) in row_points.iter().enumerate() {
215 let ts = match &batch.columns[0] {
217 ColumnVector::Int64 { data, .. } => data[i] as u64,
218 _ => unreachable!(),
219 };
220 let val = match &batch.columns[1] {
221 ColumnVector::Float64 { data, .. } => data[i],
222 _ => unreachable!(),
223 };
224 assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
225 assert_eq!(val, p.value, "value parity at row {i}");
226 }
227 }
228
229 #[test]
230 fn projection_decodes_only_referenced_columns() {
231 let block = sealed_columnar_chunk(128);
233 let ts_only =
234 column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
235 assert_eq!(ts_only.schema.len(), 1);
236 assert_eq!(ts_only.columns.len(), 1);
237 assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
238 assert_eq!(ts_only.schema.index_of("col_1"), None);
239
240 let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
241 .expect("value-only projection");
242 assert_eq!(val_only.schema.len(), 1);
243 assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
244 }
245
246 #[test]
247 fn missing_column_is_an_error() {
248 let block = sealed_columnar_chunk(16);
249 assert_eq!(
252 column_batch_from_block(&block, &[42]).unwrap_err(),
253 ColumnarScanError::MissingColumn(42)
254 );
255 }
256
257 #[test]
258 fn measured_row_vs_batch_decode_comparison() {
259 use std::time::Instant;
266
267 let n = 50_000;
268 let block = sealed_columnar_chunk(n);
269 let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
270
271 let _ = points_from_column_block(&block).unwrap();
273 let _ = column_batch_from_block(&block, &projection).unwrap();
274
275 let reps = 20;
276 let t_row = Instant::now();
277 let mut row_rows = 0usize;
278 for _ in 0..reps {
279 row_rows = points_from_column_block(&block).unwrap().len();
280 }
281 let row_elapsed = t_row.elapsed();
282
283 let t_batch = Instant::now();
284 let mut batch_rows = 0usize;
285 for _ in 0..reps {
286 batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
287 }
288 let batch_elapsed = t_batch.elapsed();
289
290 assert_eq!(row_rows, n);
292 assert_eq!(batch_rows, n);
293
294 let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
295 let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
296 eprintln!(
297 "[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
298 row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
299 ratio {:.2}x (batch/row)",
300 batch_ns / row_ns
301 );
302 }
303}