use serde::{Deserialize, Serialize};
use zerompk::{FromMessagePack, ToMessagePack};
pub const MAGIC: [u8; 4] = *b"NDBS";
pub const VERSION_MAJOR: u8 = 1;
pub const VERSION_MINOR: u8 = 0;
pub const ENDIANNESS_LE: u8 = 0x01;
pub const BLOCK_SIZE: usize = 1024;
pub const HEADER_SIZE: usize = 7;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SegmentHeader {
pub magic: [u8; 4],
pub version_major: u8,
pub version_minor: u8,
pub endianness: u8,
}
impl SegmentHeader {
pub fn current() -> Self {
Self {
magic: MAGIC,
version_major: VERSION_MAJOR,
version_minor: VERSION_MINOR,
endianness: ENDIANNESS_LE,
}
}
pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
let mut buf = [0u8; HEADER_SIZE];
buf[0..4].copy_from_slice(&self.magic);
buf[4] = self.version_major;
buf[5] = self.version_minor;
buf[6] = self.endianness;
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
if data.len() < HEADER_SIZE {
return Err(crate::error::ColumnarError::TruncatedSegment {
expected: HEADER_SIZE,
got: data.len(),
});
}
let mut magic = [0u8; 4];
magic.copy_from_slice(&data[0..4]);
if magic != MAGIC {
return Err(crate::error::ColumnarError::InvalidMagic(magic));
}
let version_major = data[4];
let version_minor = data[5];
if version_major > VERSION_MAJOR {
return Err(crate::error::ColumnarError::IncompatibleVersion {
reader_major: VERSION_MAJOR,
segment_major: version_major,
segment_minor: version_minor,
});
}
Ok(Self {
magic,
version_major,
version_minor,
endianness: data[6],
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
pub struct BlockStats {
pub min: f64,
pub max: f64,
pub null_count: u32,
pub row_count: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub str_min: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub str_max: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bloom: Option<Vec<u8>>,
}
impl BlockStats {
pub fn numeric(min: f64, max: f64, null_count: u32, row_count: u32) -> Self {
Self {
min,
max,
null_count,
row_count,
str_min: None,
str_max: None,
bloom: None,
}
}
pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
Self {
min: f64::NAN,
max: f64::NAN,
null_count,
row_count,
str_min: None,
str_max: None,
bloom: None,
}
}
pub fn string_block(
null_count: u32,
row_count: u32,
str_min: Option<String>,
str_max: Option<String>,
bloom: Option<Vec<u8>>,
) -> Self {
Self {
min: f64::NAN,
max: f64::NAN,
null_count,
row_count,
str_min,
str_max,
bloom,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
pub struct ColumnMeta {
pub name: String,
pub offset: u64,
pub length: u64,
pub codec: nodedb_codec::ColumnCodec,
pub block_count: u32,
pub block_stats: Vec<BlockStats>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dictionary: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
pub struct SegmentFooter {
pub schema_hash: u64,
pub column_count: u32,
pub row_count: u64,
pub profile_tag: u8,
pub columns: Vec<ColumnMeta>,
}
impl SegmentFooter {
pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
let footer_msgpack = zerompk::to_msgpack_vec(self)
.map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
let footer_len = footer_msgpack.len() as u32;
let footer_crc = crc32c::crc32c(&footer_msgpack);
let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
buf.extend_from_slice(&footer_msgpack);
buf.extend_from_slice(&footer_len.to_le_bytes());
buf.extend_from_slice(&footer_crc.to_le_bytes());
Ok(buf)
}
pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
if data.len() < 8 {
return Err(crate::error::ColumnarError::TruncatedSegment {
expected: 8,
got: data.len(),
});
}
let tail = &data[data.len() - 8..];
let footer_len =
u32::from_le_bytes(tail[0..4].try_into().expect("4 bytes from slice")) as usize;
let stored_crc = u32::from_le_bytes(tail[4..8].try_into().expect("4 bytes from slice"));
let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
crate::error::ColumnarError::TruncatedSegment {
expected: 8 + footer_len,
got: data.len(),
},
)?;
let footer_bytes = &data[footer_start..footer_start + footer_len];
let computed_crc = crc32c::crc32c(footer_bytes);
if computed_crc != stored_crc {
return Err(crate::error::ColumnarError::FooterCrcMismatch {
stored: stored_crc,
computed: computed_crc,
});
}
zerompk::from_msgpack(footer_bytes)
.map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn header_roundtrip() {
let header = SegmentHeader::current();
let bytes = header.to_bytes();
let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
assert_eq!(parsed, header);
}
#[test]
fn header_invalid_magic() {
let mut bytes = SegmentHeader::current().to_bytes();
bytes[0] = b'X';
assert!(matches!(
SegmentHeader::from_bytes(&bytes),
Err(crate::error::ColumnarError::InvalidMagic(_))
));
}
#[test]
fn header_incompatible_major() {
let mut bytes = SegmentHeader::current().to_bytes();
bytes[4] = VERSION_MAJOR + 1; assert!(matches!(
SegmentHeader::from_bytes(&bytes),
Err(crate::error::ColumnarError::IncompatibleVersion { .. })
));
}
#[test]
fn header_compatible_minor() {
let mut bytes = SegmentHeader::current().to_bytes();
bytes[5] = VERSION_MINOR + 5; let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
assert_eq!(parsed.version_major, VERSION_MAJOR);
assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
}
#[test]
fn footer_roundtrip() {
let footer = SegmentFooter {
schema_hash: 0xDEAD_BEEF_CAFE_1234,
column_count: 3,
row_count: 2048,
profile_tag: 0,
columns: vec![
ColumnMeta {
name: "id".into(),
offset: 7,
length: 512,
codec: nodedb_codec::ColumnCodec::DeltaFastLanesLz4,
block_count: 2,
block_stats: vec![
BlockStats::numeric(1.0, 1024.0, 0, 1024),
BlockStats::numeric(1025.0, 2048.0, 0, 1024),
],
dictionary: None,
},
ColumnMeta {
name: "name".into(),
offset: 519,
length: 256,
codec: nodedb_codec::ColumnCodec::FsstLz4,
block_count: 2,
block_stats: vec![
BlockStats::non_numeric(0, 1024),
BlockStats::non_numeric(5, 1024),
],
dictionary: None,
},
ColumnMeta {
name: "score".into(),
offset: 775,
length: 128,
codec: nodedb_codec::ColumnCodec::AlpFastLanesLz4,
block_count: 2,
block_stats: vec![
BlockStats::numeric(0.0, 100.0, 10, 1024),
BlockStats::numeric(0.5, 99.5, 3, 1024),
],
dictionary: None,
},
],
};
let footer_bytes = footer.to_bytes().expect("serialize");
let mut segment = Vec::new();
segment.extend_from_slice(&SegmentHeader::current().to_bytes());
segment.extend_from_slice(&vec![0u8; 896]); segment.extend_from_slice(&footer_bytes);
let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
assert_eq!(parsed.schema_hash, footer.schema_hash);
assert_eq!(parsed.column_count, 3);
assert_eq!(parsed.row_count, 2048);
assert_eq!(parsed.columns.len(), 3);
assert_eq!(parsed.columns[0].name, "id");
assert_eq!(parsed.columns[1].name, "name");
assert_eq!(parsed.columns[2].name, "score");
}
#[test]
fn footer_crc_mismatch() {
let footer = SegmentFooter {
schema_hash: 0,
column_count: 0,
row_count: 0,
profile_tag: 0,
columns: vec![],
};
let mut bytes = footer.to_bytes().expect("serialize");
let len = bytes.len();
bytes[len - 1] ^= 0xFF;
assert!(matches!(
SegmentFooter::from_segment_tail(&bytes),
Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
));
}
#[test]
fn block_stats_predicate_skip() {
let stats = BlockStats::numeric(10.0, 50.0, 0, 1024);
use crate::predicate::ScanPredicate;
assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
}
}