use super::segment_codec::{
decode_bytes, encode_bytes, select_codecs, CodecError, ColumnCodec, ColumnSemantics,
};
use crate::storage::engine::crc32::crc32;
use crate::storage::primitives::split_block_bloom::{hash_bytes_u32, SplitBlockBloom};
pub const COLUMN_BLOCK_MAGIC: [u8; 4] = *b"RDCC";
pub const COLUMN_BLOCK_VERSION_V1: u16 = 1;
const HEADER_LEN: usize = 52;
const DIR_ENTRY_LEN: usize = 54;
const FOOTER_LEN: usize = 24;
#[derive(Debug, Clone)]
pub struct ColumnInput<'a> {
pub column_id: u32,
pub logical_type: u8,
pub semantics: ColumnSemantics,
pub data: &'a [u8],
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodedColumn {
pub column_id: u32,
pub logical_type: u8,
pub codec_tag: u8,
pub data: Vec<u8>,
pub granule_index: Option<GranuleIndex>,
pub granule_bloom: Option<GranuleBloom>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodedColumnBlock {
pub chunk_id: u64,
pub schema_ref: u64,
pub row_count: u64,
pub min_ts_ns: u64,
pub max_ts_ns: u64,
pub columns: Vec<DecodedColumn>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnBlockError {
Truncated,
BadMagic([u8; 4]),
BadTailMagic([u8; 4]),
UnsupportedVersion(u16),
BadDirectory,
ChecksumMismatch { expected: u32, actual: u32 },
Codec(CodecError),
}
impl std::fmt::Display for ColumnBlockError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Truncated => write!(f, "column block truncated"),
Self::BadMagic(m) => write!(f, "bad column block magic: {m:?}"),
Self::BadTailMagic(m) => write!(f, "bad column block tail magic: {m:?}"),
Self::UnsupportedVersion(v) => write!(f, "unsupported column block version: {v}"),
Self::BadDirectory => write!(f, "column directory entry out of range"),
Self::ChecksumMismatch { expected, actual } => write!(
f,
"column block checksum mismatch: expected 0x{expected:08X}, got 0x{actual:08X}"
),
Self::Codec(e) => write!(f, "column stream codec error: {e}"),
}
}
}
impl std::error::Error for ColumnBlockError {}
impl From<CodecError> for ColumnBlockError {
fn from(e: CodecError) -> Self {
Self::Codec(e)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GranuleStats {
pub min: Vec<u8>,
pub max: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GranuleIndex {
pub granule_size: u32,
pub value_width: u32,
pub granules: Vec<GranuleStats>,
}
impl GranuleIndex {
pub fn granule_count(&self) -> usize {
self.granules.len()
}
pub fn row_range(&self, i: usize, row_count: usize) -> (usize, usize) {
let g = (self.granule_size as usize).max(1);
let start = i.saturating_mul(g).min(row_count);
let end = (i + 1).saturating_mul(g).min(row_count);
(start, end)
}
pub fn surviving_granules<F>(&self, overlaps: F) -> Vec<usize>
where
F: Fn(&[u8], &[u8]) -> bool,
{
(0..self.granules.len())
.filter(|&i| overlaps(&self.granules[i].min, &self.granules[i].max))
.collect()
}
fn serialize(&self) -> Vec<u8> {
let w = self.value_width as usize;
let mut out = Vec::with_capacity(12 + self.granules.len() * w * 2);
out.extend_from_slice(&self.granule_size.to_le_bytes());
out.extend_from_slice(&self.value_width.to_le_bytes());
out.extend_from_slice(&(self.granules.len() as u32).to_le_bytes());
for g in &self.granules {
out.extend_from_slice(&g.min);
out.extend_from_slice(&g.max);
}
out
}
fn deserialize(bytes: &[u8]) -> Result<GranuleIndex, ColumnBlockError> {
if bytes.len() < 12 {
return Err(ColumnBlockError::BadDirectory);
}
let granule_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let value_width = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
let count = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
let w = value_width as usize;
if w == 0 {
return Err(ColumnBlockError::BadDirectory);
}
let need = 12usize
.checked_add(
count
.checked_mul(w * 2)
.ok_or(ColumnBlockError::BadDirectory)?,
)
.ok_or(ColumnBlockError::BadDirectory)?;
if bytes.len() < need {
return Err(ColumnBlockError::BadDirectory);
}
let mut granules = Vec::with_capacity(count);
let mut cur = 12;
for _ in 0..count {
let min = bytes[cur..cur + w].to_vec();
cur += w;
let max = bytes[cur..cur + w].to_vec();
cur += w;
granules.push(GranuleStats { min, max });
}
Ok(GranuleIndex {
granule_size,
value_width,
granules,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GranuleBloom {
pub granule_size: u32,
pub blooms: Vec<SplitBlockBloom>,
}
impl GranuleBloom {
pub fn granule_count(&self) -> usize {
self.blooms.len()
}
pub fn row_range(&self, i: usize, row_count: usize) -> (usize, usize) {
let g = (self.granule_size as usize).max(1);
let start = i.saturating_mul(g).min(row_count);
let end = (i + 1).saturating_mul(g).min(row_count);
(start, end)
}
pub fn surviving_granules(&self, key_bytes: &[u8]) -> Vec<usize> {
let key = hash_bytes_u32(key_bytes);
(0..self.blooms.len())
.filter(|&i| self.blooms[i].probe(key))
.collect()
}
fn serialize(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(8 + self.blooms.len() * 36);
out.extend_from_slice(&self.granule_size.to_le_bytes());
out.extend_from_slice(&(self.blooms.len() as u32).to_le_bytes());
for bloom in &self.blooms {
out.extend_from_slice(&bloom.to_bytes());
}
out
}
fn deserialize(bytes: &[u8]) -> Result<GranuleBloom, ColumnBlockError> {
if bytes.len() < 8 {
return Err(ColumnBlockError::BadDirectory);
}
let granule_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let count = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
let mut blooms = Vec::with_capacity(count);
let mut cur = 8;
for _ in 0..count {
if cur + 4 > bytes.len() {
return Err(ColumnBlockError::BadDirectory);
}
let num_blocks = u32::from_le_bytes(bytes[cur..cur + 4].try_into().unwrap()) as usize;
let bloom_len = 4usize
.checked_add(
num_blocks
.checked_mul(32)
.ok_or(ColumnBlockError::BadDirectory)?,
)
.ok_or(ColumnBlockError::BadDirectory)?;
let end = cur
.checked_add(bloom_len)
.ok_or(ColumnBlockError::BadDirectory)?;
if end > bytes.len() {
return Err(ColumnBlockError::BadDirectory);
}
let bloom = SplitBlockBloom::from_bytes(&bytes[cur..end])
.ok_or(ColumnBlockError::BadDirectory)?;
blooms.push(bloom);
cur = end;
}
Ok(GranuleBloom {
granule_size,
blooms,
})
}
}
fn build_granule_bloom(logical_type: u8, granule_size: u32, data: &[u8]) -> Option<GranuleBloom> {
if granule_size == 0 {
return None;
}
NumKind::from_logical(logical_type)?;
if data.is_empty() || !data.len().is_multiple_of(8) {
return None;
}
let n = data.len() / 8;
let g = granule_size as usize;
let mut blooms = Vec::with_capacity(n.div_ceil(g));
let mut start = 0usize;
while start < n {
let end = (start + g).min(n);
let mut bloom = SplitBlockBloom::with_capacity(end - start);
for v in data[start * 8..end * 8].chunks_exact(8) {
bloom.insert(hash_bytes_u32(v));
}
blooms.push(bloom);
start = end;
}
Some(GranuleBloom {
granule_size,
blooms,
})
}
#[derive(Debug, Clone, Copy)]
enum NumKind {
I64,
U64,
F64,
}
impl NumKind {
fn from_logical(logical_type: u8) -> Option<NumKind> {
match logical_type {
1 | 7 | 8 => Some(NumKind::I64),
2 => Some(NumKind::U64), 3 => Some(NumKind::F64), _ => None,
}
}
}
fn build_granule_index(logical_type: u8, granule_size: u32, data: &[u8]) -> Option<GranuleIndex> {
if granule_size == 0 {
return None;
}
let kind = NumKind::from_logical(logical_type)?;
if data.is_empty() || !data.len().is_multiple_of(8) {
return None;
}
let n = data.len() / 8;
let g = granule_size as usize;
let mut granules = Vec::with_capacity(n.div_ceil(g));
let mut start = 0usize;
while start < n {
let end = (start + g).min(n);
let granule = &data[start * 8..end * 8];
let (min, max) = granule_min_max(kind, granule);
granules.push(GranuleStats {
min: min.to_vec(),
max: max.to_vec(),
});
start = end;
}
Some(GranuleIndex {
granule_size,
value_width: 8,
granules,
})
}
fn granule_min_max(kind: NumKind, slice: &[u8]) -> ([u8; 8], [u8; 8]) {
let mut elems = slice.chunks_exact(8);
let first: [u8; 8] = elems.next().unwrap().try_into().unwrap();
let mut min = first;
let mut max = first;
for e in elems {
let cur: [u8; 8] = e.try_into().unwrap();
if num_lt(kind, &cur, &min) {
min = cur;
}
if num_lt(kind, &max, &cur) {
max = cur;
}
}
(min, max)
}
fn num_lt(kind: NumKind, a: &[u8; 8], b: &[u8; 8]) -> bool {
match kind {
NumKind::I64 => i64::from_le_bytes(*a) < i64::from_le_bytes(*b),
NumKind::U64 => u64::from_le_bytes(*a) < u64::from_le_bytes(*b),
NumKind::F64 => f64::from_le_bytes(*a)
.total_cmp(&f64::from_le_bytes(*b))
.is_lt(),
}
}
pub fn write_column_block(
chunk_id: u64,
schema_ref: u64,
row_count: u64,
min_ts_ns: u64,
max_ts_ns: u64,
granule_size: u32,
columns: &[ColumnInput<'_>],
) -> Result<Vec<u8>, ColumnBlockError> {
let column_count = columns.len();
let dir_off = HEADER_LEN;
let dir_len = column_count * DIR_ENTRY_LEN;
let streams_off = dir_off + dir_len;
let mut streams: Vec<Vec<u8>> = Vec::with_capacity(column_count);
let mut codec_tags: Vec<u8> = Vec::with_capacity(column_count);
let mut granule_blobs: Vec<Vec<u8>> = Vec::with_capacity(column_count);
let mut bloom_blobs: Vec<Vec<u8>> = Vec::with_capacity(column_count);
for col in columns {
let codecs = select_codecs(col.logical_type, col.semantics);
codec_tags.push(codecs.first().unwrap_or(&ColumnCodec::None).tag());
streams.push(encode_bytes(&codecs, col.data)?);
granule_blobs.push(
build_granule_index(col.logical_type, granule_size, col.data)
.map(|gi| gi.serialize())
.unwrap_or_default(),
);
bloom_blobs.push(
build_granule_bloom(col.logical_type, granule_size, col.data)
.map(|gb| gb.serialize())
.unwrap_or_default(),
);
}
let granule_region_off =
streams_off as u64 + streams.iter().map(|s| s.len() as u64).sum::<u64>();
let bloom_region_off =
granule_region_off + granule_blobs.iter().map(|s| s.len() as u64).sum::<u64>();
let mut out = Vec::with_capacity(
streams_off
+ streams.iter().map(Vec::len).sum::<usize>()
+ granule_blobs.iter().map(Vec::len).sum::<usize>()
+ bloom_blobs.iter().map(Vec::len).sum::<usize>()
+ FOOTER_LEN,
);
out.extend_from_slice(&COLUMN_BLOCK_MAGIC);
out.extend_from_slice(&COLUMN_BLOCK_VERSION_V1.to_le_bytes());
out.extend_from_slice(&0u16.to_le_bytes()); out.extend_from_slice(&chunk_id.to_le_bytes());
out.extend_from_slice(&schema_ref.to_le_bytes());
out.extend_from_slice(&row_count.to_le_bytes());
out.extend_from_slice(&(column_count as u32).to_le_bytes());
out.extend_from_slice(&min_ts_ns.to_le_bytes());
out.extend_from_slice(&max_ts_ns.to_le_bytes());
debug_assert_eq!(out.len(), HEADER_LEN);
let mut cursor = streams_off as u64;
let mut granule_cursor = granule_region_off;
let mut bloom_cursor = bloom_region_off;
for ((((col, stream), codec_tag), granule), bloom) in columns
.iter()
.zip(streams.iter())
.zip(codec_tags.iter())
.zip(granule_blobs.iter())
.zip(bloom_blobs.iter())
{
out.extend_from_slice(&col.column_id.to_le_bytes());
out.push(col.logical_type);
out.push(*codec_tag);
out.extend_from_slice(&cursor.to_le_bytes()); out.extend_from_slice(&(stream.len() as u64).to_le_bytes()); if granule.is_empty() {
out.extend_from_slice(&0u64.to_le_bytes()); out.extend_from_slice(&0u64.to_le_bytes()); } else {
out.extend_from_slice(&granule_cursor.to_le_bytes()); out.extend_from_slice(&(granule.len() as u64).to_le_bytes()); granule_cursor += granule.len() as u64;
}
if bloom.is_empty() {
out.extend_from_slice(&0u64.to_le_bytes()); out.extend_from_slice(&0u64.to_le_bytes()); } else {
out.extend_from_slice(&bloom_cursor.to_le_bytes()); out.extend_from_slice(&(bloom.len() as u64).to_le_bytes()); bloom_cursor += bloom.len() as u64;
}
cursor += stream.len() as u64;
}
debug_assert_eq!(out.len(), streams_off);
for stream in &streams {
out.extend_from_slice(stream);
}
debug_assert_eq!(out.len() as u64, granule_region_off);
for granule in &granule_blobs {
out.extend_from_slice(granule);
}
debug_assert_eq!(out.len() as u64, bloom_region_off);
for bloom in &bloom_blobs {
out.extend_from_slice(bloom);
}
let crc = crc32(&out); out.extend_from_slice(&(dir_off as u64).to_le_bytes());
out.extend_from_slice(&(dir_len as u64).to_le_bytes());
out.extend_from_slice(&crc.to_le_bytes());
out.extend_from_slice(&COLUMN_BLOCK_MAGIC);
Ok(out)
}
pub fn peek_column_block_version(bytes: &[u8]) -> Option<u16> {
if bytes.len() < HEADER_LEN + FOOTER_LEN {
return None;
}
let magic: [u8; 4] = bytes[0..4].try_into().unwrap();
if magic != COLUMN_BLOCK_MAGIC {
return None;
}
Some(u16::from_le_bytes([bytes[4], bytes[5]]))
}
pub fn read_column_block(bytes: &[u8]) -> Result<DecodedColumnBlock, ColumnBlockError> {
read_column_block_filtered(bytes, None)
}
pub fn read_column_block_projected(
bytes: &[u8],
want: &[u32],
) -> Result<DecodedColumnBlock, ColumnBlockError> {
read_column_block_filtered(bytes, Some(want))
}
fn read_column_block_filtered(
bytes: &[u8],
want: Option<&[u32]>,
) -> Result<DecodedColumnBlock, ColumnBlockError> {
if bytes.len() < HEADER_LEN + FOOTER_LEN {
return Err(ColumnBlockError::Truncated);
}
let magic: [u8; 4] = bytes[0..4].try_into().unwrap();
if magic != COLUMN_BLOCK_MAGIC {
return Err(ColumnBlockError::BadMagic(magic));
}
let version = u16::from_le_bytes([bytes[4], bytes[5]]);
if version != COLUMN_BLOCK_VERSION_V1 {
return Err(ColumnBlockError::UnsupportedVersion(version));
}
let footer_start = bytes.len() - FOOTER_LEN;
let tail_magic: [u8; 4] = bytes[bytes.len() - 4..].try_into().unwrap();
if tail_magic != COLUMN_BLOCK_MAGIC {
return Err(ColumnBlockError::BadTailMagic(tail_magic));
}
let dir_off = u64::from_le_bytes(bytes[footer_start..footer_start + 8].try_into().unwrap());
let dir_len = u64::from_le_bytes(
bytes[footer_start + 8..footer_start + 16]
.try_into()
.unwrap(),
);
let stored_crc = u32::from_le_bytes(
bytes[footer_start + 16..footer_start + 20]
.try_into()
.unwrap(),
);
let actual_crc = crc32(&bytes[..footer_start]);
if actual_crc != stored_crc {
return Err(ColumnBlockError::ChecksumMismatch {
expected: stored_crc,
actual: actual_crc,
});
}
let chunk_id = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
let schema_ref = u64::from_le_bytes(bytes[16..24].try_into().unwrap());
let row_count = u64::from_le_bytes(bytes[24..32].try_into().unwrap());
let column_count = u32::from_le_bytes(bytes[32..36].try_into().unwrap()) as usize;
let min_ts_ns = u64::from_le_bytes(bytes[36..44].try_into().unwrap());
let max_ts_ns = u64::from_le_bytes(bytes[44..52].try_into().unwrap());
let dir_off = dir_off as usize;
let dir_len = dir_len as usize;
if dir_off != HEADER_LEN
|| dir_len != column_count * DIR_ENTRY_LEN
|| dir_off + dir_len > footer_start
{
return Err(ColumnBlockError::BadDirectory);
}
let mut columns = Vec::with_capacity(column_count);
for i in 0..column_count {
let base = dir_off + i * DIR_ENTRY_LEN;
let column_id = u32::from_le_bytes(bytes[base..base + 4].try_into().unwrap());
let logical_type = bytes[base + 4];
let codec_tag = bytes[base + 5];
let stream_offset =
u64::from_le_bytes(bytes[base + 6..base + 14].try_into().unwrap()) as usize;
let stream_len =
u64::from_le_bytes(bytes[base + 14..base + 22].try_into().unwrap()) as usize;
let granule_off =
u64::from_le_bytes(bytes[base + 22..base + 30].try_into().unwrap()) as usize;
let granule_len =
u64::from_le_bytes(bytes[base + 30..base + 38].try_into().unwrap()) as usize;
let bloom_off =
u64::from_le_bytes(bytes[base + 38..base + 46].try_into().unwrap()) as usize;
let bloom_len =
u64::from_le_bytes(bytes[base + 46..base + 54].try_into().unwrap()) as usize;
let end = stream_offset
.checked_add(stream_len)
.ok_or(ColumnBlockError::BadDirectory)?;
if stream_offset < dir_off + dir_len || end > footer_start {
return Err(ColumnBlockError::BadDirectory);
}
if want.is_some_and(|ids| !ids.contains(&column_id)) {
continue;
}
let data = decode_bytes(&bytes[stream_offset..end])?;
let granule_index = if granule_len == 0 {
None
} else {
let g_end = granule_off
.checked_add(granule_len)
.ok_or(ColumnBlockError::BadDirectory)?;
if granule_off < dir_off + dir_len || g_end > footer_start {
return Err(ColumnBlockError::BadDirectory);
}
Some(GranuleIndex::deserialize(&bytes[granule_off..g_end])?)
};
let granule_bloom = if bloom_len == 0 {
None
} else {
let b_end = bloom_off
.checked_add(bloom_len)
.ok_or(ColumnBlockError::BadDirectory)?;
if bloom_off < dir_off + dir_len || b_end > footer_start {
return Err(ColumnBlockError::BadDirectory);
}
Some(GranuleBloom::deserialize(&bytes[bloom_off..b_end])?)
};
columns.push(DecodedColumn {
column_id,
logical_type,
codec_tag,
data,
granule_index,
granule_bloom,
});
}
Ok(DecodedColumnBlock {
chunk_id,
schema_ref,
row_count,
min_ts_ns,
max_ts_ns,
columns,
})
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
fn u64_stream(values: &[u64]) -> Vec<u8> {
values.iter().flat_map(|v| v.to_le_bytes()).collect()
}
fn f64_stream(values: &[f64]) -> Vec<u8> {
values.iter().flat_map(|v| v.to_le_bytes()).collect()
}
#[test]
fn round_trips_two_columns_value_for_value() {
let ts: Vec<u64> = (0..500)
.map(|i| 1_700_000_000_000 + i * 1_000_000)
.collect();
let vals: Vec<f64> = (0..500).map(|i| 95.0 + (i % 7) as f64 * 0.25).collect();
let ts_raw = u64_stream(&ts);
let val_raw = f64_stream(&vals);
let block = write_column_block(
42,
7,
ts.len() as u64,
*ts.first().unwrap(),
*ts.last().unwrap(),
128,
&[
ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &ts_raw,
},
ColumnInput {
column_id: 1,
logical_type: 3,
semantics: ColumnSemantics::Gauge,
data: &val_raw,
},
],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
assert_eq!(decoded.chunk_id, 42);
assert_eq!(decoded.schema_ref, 7);
assert_eq!(decoded.row_count, 500);
assert_eq!(decoded.min_ts_ns, *ts.first().unwrap());
assert_eq!(decoded.max_ts_ns, *ts.last().unwrap());
assert_eq!(decoded.columns.len(), 2);
assert_eq!(decoded.columns[0].column_id, 0);
assert_eq!(decoded.columns[0].logical_type, 2);
assert_eq!(decoded.columns[0].codec_tag, ColumnCodec::DoubleDelta.tag());
assert_eq!(decoded.columns[1].codec_tag, ColumnCodec::Xor.tag());
assert_eq!(decoded.columns[0].data, ts_raw);
assert_eq!(decoded.columns[1].data, val_raw);
for col in &decoded.columns {
let gi = col
.granule_index
.as_ref()
.expect("numeric column must carry a granule index");
assert_eq!(gi.granule_size, 128);
assert_eq!(gi.value_width, 8);
assert_eq!(gi.granule_count(), 4);
}
let ts_gi = decoded.columns[0].granule_index.as_ref().unwrap();
assert_eq!(
u64::from_le_bytes(ts_gi.granules[0].min.clone().try_into().unwrap()),
*ts.first().unwrap()
);
assert_eq!(
u64::from_le_bytes(ts_gi.granules[3].max.clone().try_into().unwrap()),
*ts.last().unwrap()
);
}
#[test]
fn projected_read_decodes_only_wanted_columns() {
let ts: Vec<u64> = (0..500)
.map(|i| 1_700_000_000_000 + i * 1_000_000)
.collect();
let vals: Vec<f64> = (0..500).map(|i| 95.0 + (i % 7) as f64 * 0.25).collect();
let ts_raw = u64_stream(&ts);
let val_raw = f64_stream(&vals);
let block = write_column_block(
42,
7,
ts.len() as u64,
*ts.first().unwrap(),
*ts.last().unwrap(),
128,
&[
ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &ts_raw,
},
ColumnInput {
column_id: 1,
logical_type: 3,
semantics: ColumnSemantics::Gauge,
data: &val_raw,
},
],
)
.unwrap();
let projected = read_column_block_projected(&block, &[1]).unwrap();
assert_eq!(projected.columns.len(), 1);
assert_eq!(projected.columns[0].column_id, 1);
assert_eq!(projected.columns[0].data, val_raw);
assert_eq!(projected.row_count, 500);
let full = read_column_block(&block).unwrap();
let both = read_column_block_projected(&block, &[0, 1]).unwrap();
assert_eq!(both, full);
let none = read_column_block_projected(&block, &[99]).unwrap();
assert!(none.columns.is_empty());
}
fn str_stream(items: &[&str]) -> Vec<u8> {
let mut out = (items.len() as u32).to_le_bytes().to_vec();
for s in items {
out.extend_from_slice(&(s.len() as u16).to_le_bytes());
out.extend_from_slice(s.as_bytes());
}
out
}
#[test]
fn records_counter_and_low_cardinality_codecs_and_round_trips() {
let counter = u64_stream(&(0..300).map(|i| (i * 5) as u64).collect::<Vec<_>>());
let labels: Vec<&str> = (0..300).map(|i| ["a", "b", "c"][i % 3]).collect();
let labels_raw = str_stream(&labels);
let block = write_column_block(
9,
1,
300,
0,
0,
64,
&[
ColumnInput {
column_id: 10,
logical_type: 2,
semantics: ColumnSemantics::Counter,
data: &counter,
},
ColumnInput {
column_id: 11,
logical_type: 4,
semantics: ColumnSemantics::LowCardinality,
data: &labels_raw,
},
],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
assert_eq!(decoded.columns[0].codec_tag, ColumnCodec::Delta.tag());
assert_eq!(decoded.columns[1].codec_tag, ColumnCodec::Dict.tag());
assert_eq!(decoded.columns[0].data, counter);
assert_eq!(decoded.columns[1].data, labels_raw);
assert!(decoded.columns[0].granule_index.is_some());
assert!(decoded.columns[1].granule_index.is_none());
}
#[test]
fn header_carries_magic_and_version() {
let block = write_column_block(1, 0, 0, 0, 0, 0, &[]).unwrap();
assert_eq!(&block[0..4], &COLUMN_BLOCK_MAGIC);
assert_eq!(
u16::from_le_bytes([block[4], block[5]]),
COLUMN_BLOCK_VERSION_V1
);
assert_eq!(&block[block.len() - 4..], &COLUMN_BLOCK_MAGIC);
let decoded = read_column_block(&block).unwrap();
assert!(decoded.columns.is_empty());
}
#[test]
fn peek_version_classifies_rdcc_and_rejects_non_rdcc() {
let block = write_column_block(1, 0, 0, 0, 0, 0, &[]).unwrap();
assert_eq!(
peek_column_block_version(&block),
Some(COLUMN_BLOCK_VERSION_V1)
);
let mut future = block.clone();
future[4..6].copy_from_slice(&(COLUMN_BLOCK_VERSION_V1 + 7).to_le_bytes());
assert_eq!(
peek_column_block_version(&future),
Some(COLUMN_BLOCK_VERSION_V1 + 7)
);
let mut wrong_magic = block.clone();
wrong_magic[0] = b'X';
assert_eq!(peek_column_block_version(&wrong_magic), None);
assert_eq!(peek_column_block_version(b"row-stored bytes"), None);
assert_eq!(peek_column_block_version(&[]), None);
}
#[test]
fn rejects_bad_leading_magic() {
let mut block = write_column_block(1, 0, 0, 0, 0, 0, &[]).unwrap();
block[0] = b'X';
assert!(matches!(
read_column_block(&block),
Err(ColumnBlockError::BadMagic(_))
));
}
#[test]
fn rejects_future_version() {
let raw = u64_stream(&[1, 2, 3]);
let mut block = write_column_block(
1,
0,
3,
1,
3,
0,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Generic,
data: &raw,
}],
)
.unwrap();
block[4..6].copy_from_slice(&(COLUMN_BLOCK_VERSION_V1 + 1).to_le_bytes());
assert!(matches!(
read_column_block(&block),
Err(ColumnBlockError::UnsupportedVersion(_))
));
}
#[test]
fn detects_payload_corruption_via_crc() {
let raw = f64_stream(&[1.5, 2.5, 3.5, 4.5]);
let mut block = write_column_block(
1,
0,
4,
0,
0,
0,
&[ColumnInput {
column_id: 0,
logical_type: 3,
semantics: ColumnSemantics::Gauge,
data: &raw,
}],
)
.unwrap();
block[HEADER_LEN + DIR_ENTRY_LEN] ^= 0xFF;
assert!(matches!(
read_column_block(&block),
Err(ColumnBlockError::ChecksumMismatch { .. })
));
}
#[test]
fn granule_index_round_trips_and_prunes_on_u64() {
let ts: Vec<u64> = (0..250).map(|i| 1_000 + i * 10).collect();
let raw = u64_stream(&ts);
let block = write_column_block(
1,
0,
ts.len() as u64,
*ts.first().unwrap(),
*ts.last().unwrap(),
100,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &raw,
}],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
let gi = decoded.columns[0].granule_index.as_ref().unwrap();
assert_eq!(gi.granule_count(), 3);
let as_u64 = |b: &[u8]| u64::from_le_bytes(b.try_into().unwrap());
assert_eq!(as_u64(&gi.granules[0].min), 1_000);
assert_eq!(as_u64(&gi.granules[0].max), 1_990);
assert_eq!(as_u64(&gi.granules[2].min), 3_000);
assert_eq!(as_u64(&gi.granules[2].max), 3_490);
let survivors = gi.surviving_granules(|min, max| {
let (lo, hi) = (2_100u64, 2_200u64);
as_u64(min) <= hi && as_u64(max) >= lo
});
assert_eq!(survivors, vec![1]);
assert_eq!(gi.row_range(1, ts.len()), (100, 200));
}
#[test]
fn granule_bloom_round_trips_and_prunes_on_equality() {
let keys: Vec<u64> = (0..250).map(|i| (i as u64) * 7 + 3).collect();
let raw = u64_stream(&keys);
let block = write_column_block(
1,
0,
keys.len() as u64,
*keys.first().unwrap(),
*keys.last().unwrap(),
100,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &raw,
}],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
let gb = decoded.columns[0]
.granule_bloom
.as_ref()
.expect("numeric column must carry a granule bloom");
assert_eq!(gb.granule_count(), 3);
assert_eq!(gb.granule_size, 100);
let target = keys[150];
let survivors = gb.surviving_granules(&target.to_le_bytes());
assert!(
survivors.contains(&1),
"granule holding the value was pruned: {survivors:?}"
);
assert_eq!(gb.row_range(1, keys.len()), (100, 200));
for (row, &k) in keys.iter().enumerate() {
let g = row / 100;
let survivors = gb.surviving_granules(&k.to_le_bytes());
assert!(
survivors.contains(&g),
"key {k} at row {row} pruned from its granule {g}: {survivors:?}"
);
}
}
#[test]
fn variable_width_column_gets_no_granule_bloom() {
let labels: Vec<&str> = (0..120).map(|i| ["a", "b", "c"][i % 3]).collect();
let labels_raw = str_stream(&labels);
let block = write_column_block(
9,
1,
120,
0,
0,
64,
&[ColumnInput {
column_id: 11,
logical_type: 4,
semantics: ColumnSemantics::LowCardinality,
data: &labels_raw,
}],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
assert!(decoded.columns[0].granule_index.is_none());
assert!(decoded.columns[0].granule_bloom.is_none());
}
proptest! {
#[test]
fn bloom_never_prunes_a_granule_holding_the_key(
values in proptest::collection::vec(0u64..5_000, 1..400usize),
granule_size in 1u32..128,
probe in 0u64..5_000,
) {
let raw = u64_stream(&values);
let block = write_column_block(
1, 0, values.len() as u64, 0, 0, granule_size,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Generic,
data: &raw,
}],
).unwrap();
let decoded = read_column_block(&block).unwrap();
let gb = decoded.columns[0].granule_bloom.as_ref().unwrap();
let survivors = gb.surviving_granules(&probe.to_le_bytes());
let g = granule_size as usize;
for (row, &v) in values.iter().enumerate() {
if v == probe {
let gi = row / g;
prop_assert!(
survivors.contains(&gi),
"granule {gi} holds {probe} at row {row} but was pruned"
);
}
}
}
}
#[test]
fn rejects_truncated_buffer() {
assert!(matches!(
read_column_block(&[0u8; 8]),
Err(ColumnBlockError::Truncated)
));
}
}