use std::sync::Arc;
use super::column_batch::{ColumnBatch, ColumnKind, ColumnVector, Field, Schema};
use crate::storage::schema::types::DataType;
use crate::storage::unified::column_block::{read_column_block_projected, ColumnBlockError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnarScanError {
Block(ColumnBlockError),
MissingColumn(u32),
UnsupportedLogicalType(u8),
RaggedStream { column_id: u32, len: usize },
}
impl From<ColumnBlockError> for ColumnarScanError {
fn from(e: ColumnBlockError) -> Self {
ColumnarScanError::Block(e)
}
}
fn kind_for_logical_type(tag: u8) -> Option<ColumnKind> {
match DataType::from_byte(tag)? {
DataType::Integer
| DataType::UnsignedInteger
| DataType::Timestamp
| DataType::Duration => Some(ColumnKind::Int64),
DataType::Float => Some(ColumnKind::Float64),
DataType::Boolean => Some(ColumnKind::Bool),
DataType::Text => Some(ColumnKind::Text),
_ => None,
}
}
fn numeric_vector(
column_id: u32,
kind: &ColumnKind,
raw: &[u8],
) -> Result<ColumnVector, ColumnarScanError> {
if !raw.len().is_multiple_of(8) {
return Err(ColumnarScanError::RaggedStream {
column_id,
len: raw.len(),
});
}
Ok(match kind {
ColumnKind::Int64 => ColumnVector::Int64 {
data: raw
.chunks_exact(8)
.map(|b| i64::from_le_bytes(b.try_into().unwrap()))
.collect(),
validity: None,
},
ColumnKind::Float64 => ColumnVector::Float64 {
data: raw
.chunks_exact(8)
.map(|b| f64::from_le_bytes(b.try_into().unwrap()))
.collect(),
validity: None,
},
other => {
return Err(ColumnarScanError::UnsupportedLogicalType(match other {
ColumnKind::Bool => DataType::Boolean.to_byte(),
ColumnKind::Text => DataType::Text.to_byte(),
_ => unreachable!(),
}))
}
})
}
pub fn column_batch_from_block(
bytes: &[u8],
projection: &[u32],
) -> Result<ColumnBatch, ColumnarScanError> {
let block = read_column_block_projected(bytes, projection)?;
let mut fields = Vec::with_capacity(projection.len());
let mut columns = Vec::with_capacity(projection.len());
for &id in projection {
let col = block
.columns
.iter()
.find(|c| c.column_id == id)
.ok_or(ColumnarScanError::MissingColumn(id))?;
let kind = kind_for_logical_type(col.logical_type)
.ok_or(ColumnarScanError::UnsupportedLogicalType(col.logical_type))?;
let vector = match kind {
ColumnKind::Int64 | ColumnKind::Float64 => numeric_vector(id, &kind, &col.data)?,
ColumnKind::Bool | ColumnKind::Text => {
return Err(ColumnarScanError::UnsupportedLogicalType(col.logical_type))
}
};
fields.push(Field {
name: format!("col_{id}"),
kind,
nullable: false,
});
columns.push(vector);
}
let schema = Arc::new(Schema::new(fields));
Ok(ColumnBatch::new(schema, columns))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::timeseries::chunk::{
points_from_column_block, TimeSeriesChunk, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID,
};
fn sealed_columnar_chunk(n: usize) -> Vec<u8> {
let mut chunk = TimeSeriesChunk::with_max_points("cpu.idle", Default::default(), n.max(1));
for i in 0..n {
assert!(chunk.append(
1_700_000_000_000 + i as u64 * 1_000_000,
95.0 + (i % 7) as f64 * 0.25
));
}
chunk.seal_columnar(7, 1).expect("seal columnar chunk")
}
#[test]
fn scan_produces_results_through_the_column_batch_path() {
let block = sealed_columnar_chunk(300);
let batch =
column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
.expect("decode into ColumnBatch");
assert_eq!(batch.len(), 300);
assert_eq!(batch.schema.len(), 2);
assert!(matches!(batch.columns[0], ColumnVector::Int64 { .. }));
assert!(matches!(batch.columns[1], ColumnVector::Float64 { .. }));
}
#[test]
fn batch_path_is_value_for_value_identical_to_the_row_path() {
let block = sealed_columnar_chunk(257);
let row_points = points_from_column_block(&block).expect("row path");
let batch =
column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID])
.expect("batch path");
assert_eq!(batch.len(), row_points.len());
for (i, p) in row_points.iter().enumerate() {
let ts = match &batch.columns[0] {
ColumnVector::Int64 { data, .. } => data[i] as u64,
_ => unreachable!(),
};
let val = match &batch.columns[1] {
ColumnVector::Float64 { data, .. } => data[i],
_ => unreachable!(),
};
assert_eq!(ts, p.timestamp_ns, "timestamp parity at row {i}");
assert_eq!(val, p.value, "value parity at row {i}");
}
}
#[test]
fn projection_decodes_only_referenced_columns() {
let block = sealed_columnar_chunk(128);
let ts_only =
column_batch_from_block(&block, &[COLUMNAR_TS_COLUMN_ID]).expect("ts-only projection");
assert_eq!(ts_only.schema.len(), 1);
assert_eq!(ts_only.columns.len(), 1);
assert_eq!(ts_only.schema.index_of("col_0"), Some(0));
assert_eq!(ts_only.schema.index_of("col_1"), None);
let val_only = column_batch_from_block(&block, &[COLUMNAR_VALUE_COLUMN_ID])
.expect("value-only projection");
assert_eq!(val_only.schema.len(), 1);
assert!(matches!(val_only.columns[0], ColumnVector::Float64 { .. }));
}
#[test]
fn missing_column_is_an_error() {
let block = sealed_columnar_chunk(16);
assert_eq!(
column_batch_from_block(&block, &[42]).unwrap_err(),
ColumnarScanError::MissingColumn(42)
);
}
#[test]
fn measured_row_vs_batch_decode_comparison() {
use std::time::Instant;
let n = 50_000;
let block = sealed_columnar_chunk(n);
let projection = [COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID];
let _ = points_from_column_block(&block).unwrap();
let _ = column_batch_from_block(&block, &projection).unwrap();
let reps = 20;
let t_row = Instant::now();
let mut row_rows = 0usize;
for _ in 0..reps {
row_rows = points_from_column_block(&block).unwrap().len();
}
let row_elapsed = t_row.elapsed();
let t_batch = Instant::now();
let mut batch_rows = 0usize;
for _ in 0..reps {
batch_rows = column_batch_from_block(&block, &projection).unwrap().len();
}
let batch_elapsed = t_batch.elapsed();
assert_eq!(row_rows, n);
assert_eq!(batch_rows, n);
let row_ns = row_elapsed.as_nanos() as f64 / reps as f64;
let batch_ns = batch_elapsed.as_nanos() as f64 / reps as f64;
eprintln!(
"[#856 Phase 2 gate] columnar decode of {n} rows ({reps} reps): \
row-path {row_ns:.0} ns/scan, batch-path {batch_ns:.0} ns/scan, \
ratio {:.2}x (batch/row)",
batch_ns / row_ns
);
}
}