use super::segment_codec::{
decode_bytes, encode_bytes, select_codecs, CodecError, ColumnCodec, ColumnSemantics,
};
use crate::storage::primitives::split_block_bloom::{hash_bytes_u32, SplitBlockBloom};
pub use reddb_file::{COLUMN_BLOCK_MAGIC, COLUMN_BLOCK_VERSION_V1};
const HEADER_LEN: usize = reddb_file::COLUMN_BLOCK_HEADER_LEN;
const DIR_ENTRY_LEN: usize = reddb_file::COLUMN_BLOCK_DIR_ENTRY_LEN;
const FOOTER_LEN: usize = reddb_file::COLUMN_BLOCK_FOOTER_LEN;
#[derive(Debug, Clone)]
pub struct ColumnInput<'a> {
pub column_id: u32,
pub logical_type: u8,
pub semantics: ColumnSemantics,
pub data: &'a [u8],
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodedColumn {
pub column_id: u32,
pub logical_type: u8,
pub codec_tag: u8,
pub data: Vec<u8>,
pub granule_index: Option<GranuleIndex>,
pub granule_bloom: Option<GranuleBloom>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodedColumnBlock {
pub chunk_id: u64,
pub schema_ref: u64,
pub row_count: u64,
pub min_ts_ns: u64,
pub max_ts_ns: u64,
pub columns: Vec<DecodedColumn>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnBlockError {
Truncated,
BadMagic([u8; 4]),
BadTailMagic([u8; 4]),
UnsupportedVersion(u16),
BadDirectory,
ChecksumMismatch { expected: u32, actual: u32 },
Codec(CodecError),
}
impl std::fmt::Display for ColumnBlockError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Truncated => write!(f, "column block truncated"),
Self::BadMagic(m) => write!(f, "bad column block magic: {m:?}"),
Self::BadTailMagic(m) => write!(f, "bad column block tail magic: {m:?}"),
Self::UnsupportedVersion(v) => write!(f, "unsupported column block version: {v}"),
Self::BadDirectory => write!(f, "column directory entry out of range"),
Self::ChecksumMismatch { expected, actual } => write!(
f,
"column block checksum mismatch: expected 0x{expected:08X}, got 0x{actual:08X}"
),
Self::Codec(e) => write!(f, "column stream codec error: {e}"),
}
}
}
impl std::error::Error for ColumnBlockError {}
impl From<CodecError> for ColumnBlockError {
fn from(e: CodecError) -> Self {
Self::Codec(e)
}
}
impl From<reddb_file::ColumnBlockFrameError> for ColumnBlockError {
fn from(e: reddb_file::ColumnBlockFrameError) -> Self {
match e {
reddb_file::ColumnBlockFrameError::Truncated => Self::Truncated,
reddb_file::ColumnBlockFrameError::BadMagic(magic) => Self::BadMagic(magic),
reddb_file::ColumnBlockFrameError::BadTailMagic(magic) => Self::BadTailMagic(magic),
reddb_file::ColumnBlockFrameError::UnsupportedVersion(version) => {
Self::UnsupportedVersion(version)
}
reddb_file::ColumnBlockFrameError::BadDirectory => Self::BadDirectory,
reddb_file::ColumnBlockFrameError::ChecksumMismatch { expected, actual } => {
Self::ChecksumMismatch { expected, actual }
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GranuleStats {
pub min: Vec<u8>,
pub max: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GranuleIndex {
pub granule_size: u32,
pub value_width: u32,
pub granules: Vec<GranuleStats>,
}
impl GranuleIndex {
pub fn granule_count(&self) -> usize {
self.granules.len()
}
pub fn row_range(&self, i: usize, row_count: usize) -> (usize, usize) {
let g = (self.granule_size as usize).max(1);
let start = i.saturating_mul(g).min(row_count);
let end = (i + 1).saturating_mul(g).min(row_count);
(start, end)
}
pub fn surviving_granules<F>(&self, overlaps: F) -> Vec<usize>
where
F: Fn(&[u8], &[u8]) -> bool,
{
(0..self.granules.len())
.filter(|&i| overlaps(&self.granules[i].min, &self.granules[i].max))
.collect()
}
fn serialize(&self) -> Vec<u8> {
let granules = self
.granules
.iter()
.map(|g| reddb_file::ColumnBlockGranuleStats {
min: g.min.clone(),
max: g.max.clone(),
})
.collect();
reddb_file::encode_column_block_granule_index_blob(&reddb_file::ColumnBlockGranuleIndex {
granule_size: self.granule_size,
value_width: self.value_width,
granules,
})
}
fn deserialize(bytes: &[u8]) -> Result<GranuleIndex, ColumnBlockError> {
let decoded = reddb_file::decode_column_block_granule_index_blob(bytes)?;
let granules = decoded
.granules
.into_iter()
.map(|g| GranuleStats {
min: g.min,
max: g.max,
})
.collect();
Ok(GranuleIndex {
granule_size: decoded.granule_size,
value_width: decoded.value_width,
granules,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GranuleBloom {
pub granule_size: u32,
pub blooms: Vec<SplitBlockBloom>,
}
impl GranuleBloom {
pub fn granule_count(&self) -> usize {
self.blooms.len()
}
pub fn row_range(&self, i: usize, row_count: usize) -> (usize, usize) {
let g = (self.granule_size as usize).max(1);
let start = i.saturating_mul(g).min(row_count);
let end = (i + 1).saturating_mul(g).min(row_count);
(start, end)
}
pub fn surviving_granules(&self, key_bytes: &[u8]) -> Vec<usize> {
let key = hash_bytes_u32(key_bytes);
(0..self.blooms.len())
.filter(|&i| self.blooms[i].probe(key))
.collect()
}
fn serialize(&self) -> Vec<u8> {
let bloom_bytes: Vec<Vec<u8>> = self.blooms.iter().map(SplitBlockBloom::to_bytes).collect();
let bloom_refs: Vec<&[u8]> = bloom_bytes.iter().map(Vec::as_slice).collect();
reddb_file::encode_column_block_granule_bloom_blob(self.granule_size, &bloom_refs)
}
fn deserialize(bytes: &[u8]) -> Result<GranuleBloom, ColumnBlockError> {
let decoded = reddb_file::decode_column_block_granule_bloom_blob(bytes)?;
let mut blooms = Vec::with_capacity(decoded.blooms.len());
for bloom_bytes in decoded.blooms {
let bloom =
SplitBlockBloom::from_bytes(bloom_bytes).ok_or(ColumnBlockError::BadDirectory)?;
blooms.push(bloom);
}
Ok(GranuleBloom {
granule_size: decoded.granule_size,
blooms,
})
}
}
fn build_granule_bloom(logical_type: u8, granule_size: u32, data: &[u8]) -> Option<GranuleBloom> {
if granule_size == 0 {
return None;
}
NumKind::from_logical(logical_type)?;
if data.is_empty() || !data.len().is_multiple_of(8) {
return None;
}
let n = data.len() / 8;
let g = granule_size as usize;
let mut blooms = Vec::with_capacity(n.div_ceil(g));
let mut start = 0usize;
while start < n {
let end = (start + g).min(n);
let mut bloom = SplitBlockBloom::with_capacity(end - start);
for v in data[start * 8..end * 8].chunks_exact(8) {
bloom.insert(hash_bytes_u32(v));
}
blooms.push(bloom);
start = end;
}
Some(GranuleBloom {
granule_size,
blooms,
})
}
#[derive(Debug, Clone, Copy)]
enum NumKind {
I64,
U64,
F64,
}
impl NumKind {
fn from_logical(logical_type: u8) -> Option<NumKind> {
match logical_type {
1 | 7 | 8 => Some(NumKind::I64),
2 => Some(NumKind::U64), 3 => Some(NumKind::F64), _ => None,
}
}
}
fn build_granule_index(logical_type: u8, granule_size: u32, data: &[u8]) -> Option<GranuleIndex> {
if granule_size == 0 {
return None;
}
let kind = NumKind::from_logical(logical_type)?;
if data.is_empty() || !data.len().is_multiple_of(8) {
return None;
}
let n = data.len() / 8;
let g = granule_size as usize;
let mut granules = Vec::with_capacity(n.div_ceil(g));
let mut start = 0usize;
while start < n {
let end = (start + g).min(n);
let granule = &data[start * 8..end * 8];
let (min, max) = granule_min_max(kind, granule);
granules.push(GranuleStats {
min: min.to_vec(),
max: max.to_vec(),
});
start = end;
}
Some(GranuleIndex {
granule_size,
value_width: 8,
granules,
})
}
fn granule_min_max(kind: NumKind, slice: &[u8]) -> ([u8; 8], [u8; 8]) {
let mut elems = slice.chunks_exact(8);
let first: [u8; 8] = elems.next().unwrap().try_into().unwrap();
let mut min = first;
let mut max = first;
for e in elems {
let cur: [u8; 8] = e.try_into().unwrap();
if num_lt(kind, &cur, &min) {
min = cur;
}
if num_lt(kind, &max, &cur) {
max = cur;
}
}
(min, max)
}
fn num_lt(kind: NumKind, a: &[u8; 8], b: &[u8; 8]) -> bool {
match kind {
NumKind::I64 => i64::from_le_bytes(*a) < i64::from_le_bytes(*b),
NumKind::U64 => u64::from_le_bytes(*a) < u64::from_le_bytes(*b),
NumKind::F64 => f64::from_le_bytes(*a)
.total_cmp(&f64::from_le_bytes(*b))
.is_lt(),
}
}
pub fn write_column_block(
chunk_id: u64,
schema_ref: u64,
row_count: u64,
min_ts_ns: u64,
max_ts_ns: u64,
granule_size: u32,
columns: &[ColumnInput<'_>],
) -> Result<Vec<u8>, ColumnBlockError> {
let column_count = columns.len();
let dir_off = HEADER_LEN;
let dir_len = column_count * DIR_ENTRY_LEN;
let streams_off = dir_off + dir_len;
let mut streams: Vec<Vec<u8>> = Vec::with_capacity(column_count);
let mut codec_tags: Vec<u8> = Vec::with_capacity(column_count);
let mut granule_blobs: Vec<Vec<u8>> = Vec::with_capacity(column_count);
let mut bloom_blobs: Vec<Vec<u8>> = Vec::with_capacity(column_count);
for col in columns {
let codecs = select_codecs(col.logical_type, col.semantics);
codec_tags.push(codecs.first().unwrap_or(&ColumnCodec::None).tag());
streams.push(encode_bytes(&codecs, col.data)?);
granule_blobs.push(
build_granule_index(col.logical_type, granule_size, col.data)
.map(|gi| gi.serialize())
.unwrap_or_default(),
);
bloom_blobs.push(
build_granule_bloom(col.logical_type, granule_size, col.data)
.map(|gb| gb.serialize())
.unwrap_or_default(),
);
}
let parts: Vec<reddb_file::ColumnBlockPart<'_>> = columns
.iter()
.zip(streams.iter())
.zip(codec_tags.iter())
.zip(granule_blobs.iter())
.zip(bloom_blobs.iter())
.map(
|((((col, stream), codec_tag), granule), bloom)| reddb_file::ColumnBlockPart {
column_id: col.column_id,
logical_type: col.logical_type,
codec_tag: *codec_tag,
stream,
granule_index: granule,
granule_bloom: bloom,
},
)
.collect();
Ok(reddb_file::encode_column_block_frame(
chunk_id, schema_ref, row_count, min_ts_ns, max_ts_ns, &parts,
))
}
pub fn peek_column_block_version(bytes: &[u8]) -> Option<u16> {
reddb_file::peek_column_block_version(bytes)
}
pub fn read_column_block(bytes: &[u8]) -> Result<DecodedColumnBlock, ColumnBlockError> {
read_column_block_filtered(bytes, None)
}
pub fn read_column_block_projected(
bytes: &[u8],
want: &[u32],
) -> Result<DecodedColumnBlock, ColumnBlockError> {
read_column_block_filtered(bytes, Some(want))
}
fn read_column_block_filtered(
bytes: &[u8],
want: Option<&[u32]>,
) -> Result<DecodedColumnBlock, ColumnBlockError> {
let frame = reddb_file::decode_column_block_frame(bytes)?;
let mut columns = Vec::with_capacity(frame.columns.len());
for col in frame.columns {
if want.is_some_and(|ids| !ids.contains(&col.column_id)) {
continue;
}
let data = decode_bytes(col.stream)?;
let granule_index = col
.granule_index
.map(GranuleIndex::deserialize)
.transpose()?;
let granule_bloom = col
.granule_bloom
.map(GranuleBloom::deserialize)
.transpose()?;
columns.push(DecodedColumn {
column_id: col.column_id,
logical_type: col.logical_type,
codec_tag: col.codec_tag,
data,
granule_index,
granule_bloom,
});
}
Ok(DecodedColumnBlock {
chunk_id: frame.chunk_id,
schema_ref: frame.schema_ref,
row_count: frame.row_count,
min_ts_ns: frame.min_ts_ns,
max_ts_ns: frame.max_ts_ns,
columns,
})
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
fn u64_stream(values: &[u64]) -> Vec<u8> {
values.iter().flat_map(|v| v.to_le_bytes()).collect()
}
fn f64_stream(values: &[f64]) -> Vec<u8> {
values.iter().flat_map(|v| v.to_le_bytes()).collect()
}
#[test]
fn round_trips_two_columns_value_for_value() {
let ts: Vec<u64> = (0..500)
.map(|i| 1_700_000_000_000 + i * 1_000_000)
.collect();
let vals: Vec<f64> = (0..500).map(|i| 95.0 + (i % 7) as f64 * 0.25).collect();
let ts_raw = u64_stream(&ts);
let val_raw = f64_stream(&vals);
let block = write_column_block(
42,
7,
ts.len() as u64,
*ts.first().unwrap(),
*ts.last().unwrap(),
128,
&[
ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &ts_raw,
},
ColumnInput {
column_id: 1,
logical_type: 3,
semantics: ColumnSemantics::Gauge,
data: &val_raw,
},
],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
assert_eq!(decoded.chunk_id, 42);
assert_eq!(decoded.schema_ref, 7);
assert_eq!(decoded.row_count, 500);
assert_eq!(decoded.min_ts_ns, *ts.first().unwrap());
assert_eq!(decoded.max_ts_ns, *ts.last().unwrap());
assert_eq!(decoded.columns.len(), 2);
assert_eq!(decoded.columns[0].column_id, 0);
assert_eq!(decoded.columns[0].logical_type, 2);
assert_eq!(decoded.columns[0].codec_tag, ColumnCodec::DoubleDelta.tag());
assert_eq!(decoded.columns[1].codec_tag, ColumnCodec::Xor.tag());
assert_eq!(decoded.columns[0].data, ts_raw);
assert_eq!(decoded.columns[1].data, val_raw);
for col in &decoded.columns {
let gi = col
.granule_index
.as_ref()
.expect("numeric column must carry a granule index");
assert_eq!(gi.granule_size, 128);
assert_eq!(gi.value_width, 8);
assert_eq!(gi.granule_count(), 4);
}
let ts_gi = decoded.columns[0].granule_index.as_ref().unwrap();
assert_eq!(
u64::from_le_bytes(ts_gi.granules[0].min.clone().try_into().unwrap()),
*ts.first().unwrap()
);
assert_eq!(
u64::from_le_bytes(ts_gi.granules[3].max.clone().try_into().unwrap()),
*ts.last().unwrap()
);
}
#[test]
fn projected_read_decodes_only_wanted_columns() {
let ts: Vec<u64> = (0..500)
.map(|i| 1_700_000_000_000 + i * 1_000_000)
.collect();
let vals: Vec<f64> = (0..500).map(|i| 95.0 + (i % 7) as f64 * 0.25).collect();
let ts_raw = u64_stream(&ts);
let val_raw = f64_stream(&vals);
let block = write_column_block(
42,
7,
ts.len() as u64,
*ts.first().unwrap(),
*ts.last().unwrap(),
128,
&[
ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &ts_raw,
},
ColumnInput {
column_id: 1,
logical_type: 3,
semantics: ColumnSemantics::Gauge,
data: &val_raw,
},
],
)
.unwrap();
let projected = read_column_block_projected(&block, &[1]).unwrap();
assert_eq!(projected.columns.len(), 1);
assert_eq!(projected.columns[0].column_id, 1);
assert_eq!(projected.columns[0].data, val_raw);
assert_eq!(projected.row_count, 500);
let full = read_column_block(&block).unwrap();
let both = read_column_block_projected(&block, &[0, 1]).unwrap();
assert_eq!(both, full);
let none = read_column_block_projected(&block, &[99]).unwrap();
assert!(none.columns.is_empty());
}
fn str_stream(items: &[&str]) -> Vec<u8> {
let mut out = (items.len() as u32).to_le_bytes().to_vec();
for s in items {
out.extend_from_slice(&(s.len() as u16).to_le_bytes());
out.extend_from_slice(s.as_bytes());
}
out
}
#[test]
fn records_counter_and_low_cardinality_codecs_and_round_trips() {
let counter = u64_stream(&(0..300).map(|i| (i * 5) as u64).collect::<Vec<_>>());
let labels: Vec<&str> = (0..300).map(|i| ["a", "b", "c"][i % 3]).collect();
let labels_raw = str_stream(&labels);
let block = write_column_block(
9,
1,
300,
0,
0,
64,
&[
ColumnInput {
column_id: 10,
logical_type: 2,
semantics: ColumnSemantics::Counter,
data: &counter,
},
ColumnInput {
column_id: 11,
logical_type: 4,
semantics: ColumnSemantics::LowCardinality,
data: &labels_raw,
},
],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
assert_eq!(decoded.columns[0].codec_tag, ColumnCodec::Delta.tag());
assert_eq!(decoded.columns[1].codec_tag, ColumnCodec::Dict.tag());
assert_eq!(decoded.columns[0].data, counter);
assert_eq!(decoded.columns[1].data, labels_raw);
assert!(decoded.columns[0].granule_index.is_some());
assert!(decoded.columns[1].granule_index.is_none());
}
#[test]
fn header_carries_magic_and_version() {
let block = write_column_block(1, 0, 0, 0, 0, 0, &[]).unwrap();
assert_eq!(&block[0..4], &COLUMN_BLOCK_MAGIC);
assert_eq!(
u16::from_le_bytes([block[4], block[5]]),
COLUMN_BLOCK_VERSION_V1
);
assert_eq!(&block[block.len() - 4..], &COLUMN_BLOCK_MAGIC);
let decoded = read_column_block(&block).unwrap();
assert!(decoded.columns.is_empty());
}
#[test]
fn peek_version_classifies_rdcc_and_rejects_non_rdcc() {
let block = write_column_block(1, 0, 0, 0, 0, 0, &[]).unwrap();
assert_eq!(
peek_column_block_version(&block),
Some(COLUMN_BLOCK_VERSION_V1)
);
let mut future = block.clone();
future[4..6].copy_from_slice(&(COLUMN_BLOCK_VERSION_V1 + 7).to_le_bytes());
assert_eq!(
peek_column_block_version(&future),
Some(COLUMN_BLOCK_VERSION_V1 + 7)
);
let mut wrong_magic = block.clone();
wrong_magic[0] = b'X';
assert_eq!(peek_column_block_version(&wrong_magic), None);
assert_eq!(peek_column_block_version(b"row-stored bytes"), None);
assert_eq!(peek_column_block_version(&[]), None);
}
#[test]
fn rejects_bad_leading_magic() {
let mut block = write_column_block(1, 0, 0, 0, 0, 0, &[]).unwrap();
block[0] = b'X';
assert!(matches!(
read_column_block(&block),
Err(ColumnBlockError::BadMagic(_))
));
}
#[test]
fn rejects_future_version() {
let raw = u64_stream(&[1, 2, 3]);
let mut block = write_column_block(
1,
0,
3,
1,
3,
0,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Generic,
data: &raw,
}],
)
.unwrap();
block[4..6].copy_from_slice(&(COLUMN_BLOCK_VERSION_V1 + 1).to_le_bytes());
assert!(matches!(
read_column_block(&block),
Err(ColumnBlockError::UnsupportedVersion(_))
));
}
#[test]
fn detects_payload_corruption_via_crc() {
let raw = f64_stream(&[1.5, 2.5, 3.5, 4.5]);
let mut block = write_column_block(
1,
0,
4,
0,
0,
0,
&[ColumnInput {
column_id: 0,
logical_type: 3,
semantics: ColumnSemantics::Gauge,
data: &raw,
}],
)
.unwrap();
block[HEADER_LEN + DIR_ENTRY_LEN] ^= 0xFF;
assert!(matches!(
read_column_block(&block),
Err(ColumnBlockError::ChecksumMismatch { .. })
));
}
#[test]
fn granule_index_round_trips_and_prunes_on_u64() {
let ts: Vec<u64> = (0..250).map(|i| 1_000 + i * 10).collect();
let raw = u64_stream(&ts);
let block = write_column_block(
1,
0,
ts.len() as u64,
*ts.first().unwrap(),
*ts.last().unwrap(),
100,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &raw,
}],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
let gi = decoded.columns[0].granule_index.as_ref().unwrap();
assert_eq!(gi.granule_count(), 3);
let as_u64 = |b: &[u8]| u64::from_le_bytes(b.try_into().unwrap());
assert_eq!(as_u64(&gi.granules[0].min), 1_000);
assert_eq!(as_u64(&gi.granules[0].max), 1_990);
assert_eq!(as_u64(&gi.granules[2].min), 3_000);
assert_eq!(as_u64(&gi.granules[2].max), 3_490);
let survivors = gi.surviving_granules(|min, max| {
let (lo, hi) = (2_100u64, 2_200u64);
as_u64(min) <= hi && as_u64(max) >= lo
});
assert_eq!(survivors, vec![1]);
assert_eq!(gi.row_range(1, ts.len()), (100, 200));
}
#[test]
fn granule_bloom_round_trips_and_prunes_on_equality() {
let keys: Vec<u64> = (0..250).map(|i| (i as u64) * 7 + 3).collect();
let raw = u64_stream(&keys);
let block = write_column_block(
1,
0,
keys.len() as u64,
*keys.first().unwrap(),
*keys.last().unwrap(),
100,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Timestamp,
data: &raw,
}],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
let gb = decoded.columns[0]
.granule_bloom
.as_ref()
.expect("numeric column must carry a granule bloom");
assert_eq!(gb.granule_count(), 3);
assert_eq!(gb.granule_size, 100);
let target = keys[150];
let survivors = gb.surviving_granules(&target.to_le_bytes());
assert!(
survivors.contains(&1),
"granule holding the value was pruned: {survivors:?}"
);
assert_eq!(gb.row_range(1, keys.len()), (100, 200));
for (row, &k) in keys.iter().enumerate() {
let g = row / 100;
let survivors = gb.surviving_granules(&k.to_le_bytes());
assert!(
survivors.contains(&g),
"key {k} at row {row} pruned from its granule {g}: {survivors:?}"
);
}
}
#[test]
fn variable_width_column_gets_no_granule_bloom() {
let labels: Vec<&str> = (0..120).map(|i| ["a", "b", "c"][i % 3]).collect();
let labels_raw = str_stream(&labels);
let block = write_column_block(
9,
1,
120,
0,
0,
64,
&[ColumnInput {
column_id: 11,
logical_type: 4,
semantics: ColumnSemantics::LowCardinality,
data: &labels_raw,
}],
)
.unwrap();
let decoded = read_column_block(&block).unwrap();
assert!(decoded.columns[0].granule_index.is_none());
assert!(decoded.columns[0].granule_bloom.is_none());
}
proptest! {
#[test]
fn bloom_never_prunes_a_granule_holding_the_key(
values in proptest::collection::vec(0u64..5_000, 1..400usize),
granule_size in 1u32..128,
probe in 0u64..5_000,
) {
let raw = u64_stream(&values);
let block = write_column_block(
1, 0, values.len() as u64, 0, 0, granule_size,
&[ColumnInput {
column_id: 0,
logical_type: 2,
semantics: ColumnSemantics::Generic,
data: &raw,
}],
).unwrap();
let decoded = read_column_block(&block).unwrap();
let gb = decoded.columns[0].granule_bloom.as_ref().unwrap();
let survivors = gb.surviving_granules(&probe.to_le_bytes());
let g = granule_size as usize;
for (row, &v) in values.iter().enumerate() {
if v == probe {
let gi = row / g;
prop_assert!(
survivors.contains(&gi),
"granule {gi} holds {probe} at row {row} but was pruned"
);
}
}
}
}
#[test]
fn rejects_truncated_buffer() {
assert!(matches!(
read_column_block(&[0u8; 8]),
Err(ColumnBlockError::Truncated)
));
}
}