use crate::storage::tracking_bloom_filter::TrackingBloomFilter;
use crate::storage::{buffer_cache::FBuf, file::BLOOM_FILTER_SEED};
use binrw::{BinRead, BinResult, BinWrite, Error as BinError, binrw, binwrite};
#[cfg(doc)]
use crc32c;
use fastbloom::BloomFilter;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use size_of::SizeOf;
pub const VERSION_NUMBER: u32 = 5;
pub const DATA_BLOCK_MAGIC: [u8; 4] = *b"LFDB";
pub const INDEX_BLOCK_MAGIC: [u8; 4] = *b"LFIB";
pub const FILE_TRAILER_BLOCK_MAGIC: [u8; 4] = *b"LFFT";
pub const FILTER_BLOCK_MAGIC: [u8; 4] = *b"LFFB";
#[binrw]
#[derive(Copy, Clone, Debug)]
pub struct BlockHeader {
pub checksum: u32,
pub magic: [u8; 4],
}
impl BlockHeader {
pub(crate) fn new(magic: &[u8; 4]) -> Self {
Self {
checksum: 0,
magic: *magic,
}
}
}
#[binrw]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct BatchMetadata {
pub negative_weight_count: u64,
}
#[binrw]
#[derive(Clone, Debug)]
pub struct FileTrailer {
#[brw(assert(header.magic == FILE_TRAILER_BLOCK_MAGIC, "file trailer has bad magic"))]
pub header: BlockHeader,
pub version: u32,
#[bw(write_with = Compression::write_opt)]
#[br(parse_with = Compression::parse_opt)]
pub compression: Option<Compression>,
#[bw(calc(columns.len() as u32))]
pub n_columns: u32,
#[br(count = n_columns)]
pub columns: Vec<FileTrailerColumn>,
pub filter_offset: u64,
pub filter_size: u32,
pub compatible_features: u64,
pub incompatible_features: u64,
pub filter_offset64: u64,
pub filter_size64: u64,
pub metadata: BatchMetadata,
}
impl FileTrailer {
pub fn unsupported_compatible_features(&self) -> Option<u64> {
let unsupported_compatible_features = self.compatible_features
& !COMPATIBLE_FEATURE_FILTER64
& !COMPATIBLE_FEATURE_NEGATIVE_WEIGHT_COUNT;
if unsupported_compatible_features != 0 {
Some(unsupported_compatible_features)
} else {
None
}
}
pub fn has_compatible_feature(&self, feature: u64) -> bool {
(self.compatible_features & feature) != 0
}
pub fn has_filter64(&self) -> bool {
self.has_compatible_feature(COMPATIBLE_FEATURE_FILTER64)
}
}
pub const COMPATIBLE_FEATURE_FILTER64: u64 = 1 << 0;
pub const COMPATIBLE_FEATURE_NEGATIVE_WEIGHT_COUNT: u64 = 1 << 1;
#[binrw]
#[derive(Debug, Copy, Clone)]
pub struct FileTrailerColumn {
pub node_offset: u64,
pub node_size: u32,
#[brw(align_after = 4)]
pub node_type: NodeType,
pub n_rows: u64,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, SizeOf)]
#[binrw]
#[brw(repr(u8))]
pub enum NodeType {
Data = 0,
Index = 1,
}
pub(crate) trait FixedLen {
const LEN: usize;
}
#[binrw]
pub struct IndexBlockHeader {
#[brw(assert(header.magic == INDEX_BLOCK_MAGIC, "index block has bad magic"))]
pub header: BlockHeader,
pub bound_map_offset: u32,
pub row_totals_offset: u32,
pub child_offsets_offset: u32,
pub child_sizes_offset: u32,
pub n_children: u16,
pub child_type: NodeType,
pub bound_map_varint: Varint,
pub row_total_varint: Varint,
pub child_offset_varint: Varint,
#[brw(align_after = 16)]
pub child_size_varint: Varint,
}
impl FixedLen for IndexBlockHeader {
const LEN: usize = 32;
}
#[binrw]
pub struct DataBlockHeader {
#[brw(assert(header.magic == DATA_BLOCK_MAGIC, "data block has bad magic"))]
pub header: BlockHeader,
pub n_values: u32,
pub value_map_ofs: u32,
pub row_groups_ofs: u32,
#[bw(write_with = Varint::write_opt)]
#[br(parse_with = Varint::parse_opt)]
pub value_map_varint: Option<Varint>,
#[bw(write_with = Varint::write_opt)]
#[br(parse_with = Varint::parse_opt)]
#[brw(align_after = 16)]
pub row_group_varint: Option<Varint>,
}
impl FixedLen for DataBlockHeader {
const LEN: usize = 32;
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, FromPrimitive)]
#[binrw]
#[brw(repr(u8))]
pub enum Varint {
B8 = 1,
B16 = 2,
B24 = 3,
B32 = 4,
B48 = 6,
B64 = 8,
}
impl Varint {
pub(crate) fn from_max_value(max_value: u64) -> Varint {
#[allow(clippy::unusual_byte_groupings, clippy::match_overlapping_arm)]
match max_value {
..=0xff => Varint::B8,
..=0xffff => Varint::B16,
..=0xffff_ff => Varint::B24,
..=0xffff_ffff => Varint::B32,
..=0xffff_ffff_ffff => Varint::B48,
_ => Varint::B64,
}
}
pub(crate) fn from_len(len: usize) -> Varint {
Self::from_max_value(len as u64 - 1)
}
pub(crate) fn alignment(&self) -> usize {
match self {
Self::B24 => 1,
Self::B48 => 2,
_ => *self as usize,
}
}
pub(crate) fn align(&self, offset: usize) -> usize {
next_multiple_of_pow2(offset, self.alignment())
}
pub(crate) fn len(&self) -> usize {
*self as usize
}
pub(crate) fn put(&self, dst: &mut FBuf, value: u64) {
#[allow(clippy::unnecessary_cast)]
match *self {
Self::B8 => dst.push(value as u8),
Self::B16 => dst.extend_from_slice(&(value as u16).to_le_bytes()),
Self::B24 => dst.extend_from_slice(&(value as u32).to_le_bytes()[..3]),
Self::B32 => dst.extend_from_slice(&(value as u32).to_le_bytes()),
Self::B48 => dst.extend_from_slice(&(value as u64).to_le_bytes()[..6]),
Self::B64 => dst.extend_from_slice(&(value as u64).to_le_bytes()),
}
}
pub(crate) fn get(&self, src: &FBuf, offset: usize) -> u64 {
let mut raw = [0u8; 8];
raw[..self.len()].copy_from_slice(&src[offset..offset + self.len()]);
u64::from_le_bytes(raw)
}
#[binrw::parser(reader, endian)]
pub(crate) fn parse_opt() -> BinResult<Option<Varint>> {
let byte: u8 = <_>::read_options(reader, endian, ())?;
match byte {
0 => Ok(None),
_ => match FromPrimitive::from_u8(byte) {
Some(varint) => Ok(Some(varint)),
None => Err(BinError::NoVariantMatch {
pos: reader.stream_position()? - 1,
}),
},
}
}
#[binrw::writer(writer, endian)]
pub(crate) fn write_opt(value: &Option<Varint>) -> BinResult<()> {
value
.map_or(0, |varint| varint as u8)
.write_options(writer, endian, ())
}
}
fn next_multiple_of_pow2(offset: usize, alignment: usize) -> usize {
let mask = alignment - 1;
(offset + mask) & !mask
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, FromPrimitive, SizeOf)]
#[binrw]
#[brw(repr(u8))]
pub enum Compression {
Snappy = 1,
}
impl Compression {
#[binrw::parser(reader, endian)]
pub(crate) fn parse_opt() -> BinResult<Option<Self>> {
let byte: u8 = <_>::read_options(reader, endian, ())?;
match byte {
0 => Ok(None),
_ => match FromPrimitive::from_u8(byte) {
Some(value) => Ok(Some(value)),
None => Err(BinError::NoVariantMatch {
pos: reader.stream_position()? - 1,
}),
},
}
}
#[binrw::writer(writer, endian)]
pub(crate) fn write_opt(value: &Option<Self>) -> BinResult<()> {
value
.map_or(0, |value| value as u8)
.write_options(writer, endian, ())
}
}
#[binrw]
pub struct FilterBlock {
#[brw(assert(header.magic == FILTER_BLOCK_MAGIC, "filter block has bad magic"))]
pub header: BlockHeader,
pub num_hashes: u32,
#[bw(try_calc(u64::try_from(data.len())))]
pub len: u64,
#[br(count = len)]
pub data: Vec<u64>,
}
impl From<FilterBlock> for TrackingBloomFilter {
fn from(block: FilterBlock) -> Self {
TrackingBloomFilter::new(
BloomFilter::from_vec(block.data)
.seed(&BLOOM_FILTER_SEED)
.hashes(block.num_hashes),
)
}
}
#[binwrite]
pub struct FilterBlockRef<'a> {
#[bw(assert(header.magic == FILTER_BLOCK_MAGIC, "filter block has bad magic"))]
pub header: BlockHeader,
pub num_hashes: u32,
#[bw(try_calc(u64::try_from(data.len())))]
pub len: u64,
pub data: &'a [u64],
}
impl<'a> From<&'a TrackingBloomFilter> for FilterBlockRef<'a> {
fn from(value: &'a TrackingBloomFilter) -> Self {
FilterBlockRef {
header: BlockHeader::new(&FILTER_BLOCK_MAGIC),
num_hashes: value.num_hashes(),
data: value.as_slice(),
}
}
}