use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::data_type::AsBytes;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
};
use crate::thrift_struct;
use bytes::Bytes;
use std::io::Write;
use twox_hash::XxHash64;
const SALT: [u32; 8] = [
0x47b6137b_u32,
0x44974d91_u32,
0x8824ad5b_u32,
0xa2b7289d_u32,
0x705495c7_u32,
0x2df1424b_u32,
0x9efc4947_u32,
0x5c6bfb31_u32,
];
thrift_struct!(
pub struct BloomFilterHeader {
1: required i32 num_bytes;
2: required BloomFilterAlgorithm algorithm;
3: required BloomFilterHash hash;
4: required BloomFilterCompression compression;
}
);
#[derive(Debug, Copy, Clone)]
#[repr(transparent)]
struct Block([u32; 8]);
impl Block {
const ZERO: Block = Block([0; 8]);
fn mask(x: u32) -> Self {
let mut result = [0_u32; 8];
for i in 0..8 {
let y = x.wrapping_mul(SALT[i]); let y = y >> 27; result[i] = 1 << y; }
Self(result)
}
#[inline]
#[cfg(not(target_endian = "little"))]
fn to_ne_bytes(self) -> [u8; 32] {
unsafe { std::mem::transmute(self.0) }
}
#[inline]
#[cfg(not(target_endian = "little"))]
fn to_le_bytes(self) -> [u8; 32] {
self.swap_bytes().to_ne_bytes()
}
#[inline]
#[cfg(not(target_endian = "little"))]
fn swap_bytes(mut self) -> Self {
self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
self
}
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
self[i] |= mask[i];
}
}
fn check(&self, hash: u32) -> bool {
let mask = Self::mask(hash);
for i in 0..8 {
if self[i] & mask[i] == 0 {
return false;
}
}
true
}
}
impl std::ops::Index<usize> for Block {
type Output = u32;
#[inline]
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}
impl std::ops::IndexMut<usize> for Block {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
self.0.index_mut(index)
}
}
impl std::ops::BitOr for Block {
type Output = Self;
#[inline]
fn bitor(self, rhs: Self) -> Self {
let mut result = [0u32; 8];
for (i, item) in result.iter_mut().enumerate() {
*item = self.0[i] | rhs.0[i];
}
Self(result)
}
}
impl std::ops::BitOrAssign for Block {
#[inline]
fn bitor_assign(&mut self, rhs: Self) {
for i in 0..8 {
self.0[i] |= rhs.0[i];
}
}
}
impl Block {
#[inline]
fn count_ones(self) -> u32 {
self.0.iter().map(|w| w.count_ones()).sum()
}
}
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);
pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
pub(crate) fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
Ok((header, offset + length))
}
#[inline]
pub(crate) fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
read_bloom_filter_header_and_length_from_bytes(buffer.as_ref())
}
#[inline]
fn read_bloom_filter_header_and_length_from_bytes(
buffer: &[u8],
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
let mut prot = ThriftSliceInputProtocol::new(buffer);
let header = BloomFilterHeader::read_thrift(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
Ok((header, (total_length - prot.as_slice().len()) as u64))
}
pub const BITSET_MIN_LENGTH: usize = 32;
pub const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
num_bytes.next_power_of_two()
}
#[inline]
fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
num_bits as usize
}
impl Sbbf {
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
if !(0.0..1.0).contains(&fpp) {
return Err(ParquetError::General(format!(
"False positive probability must be between 0.0 and 1.0, got {fpp}"
)));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(num_bytes % size_of::<Block>(), 0);
let num_blocks = num_bytes / size_of::<Block>();
let bitset = vec![Block::ZERO; num_blocks];
Self(bitset)
}
pub fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
let mut block = Block::ZERO;
for (i, word) in chunk.chunks_exact(4).enumerate() {
block[i] = u32::from_le_bytes(word.try_into().unwrap());
}
block
})
.collect::<Vec<Block>>();
Self(data)
}
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
self.header().write_thrift(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {e}"))
})?;
self.write_bitset(&mut writer)?;
Ok(())
}
#[cfg(not(target_endian = "little"))]
pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
writer
.write_all(block.to_le_bytes().as_slice())
.map_err(|e| {
ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
})?;
}
Ok(())
}
#[cfg(target_endian = "little")]
pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let slice = unsafe {
std::slice::from_raw_parts(
self.0.as_ptr() as *const u8,
self.0.len() * size_of::<Block>(),
)
};
writer.write_all(slice).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
})?;
Ok(())
}
fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
num_bytes: self.0.len() as i32 * 4 * 8,
algorithm: BloomFilterAlgorithm::BLOCK,
hash: BloomFilterHash::XXHASH,
compression: BloomFilterCompression::UNCOMPRESSED,
}
}
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: &R,
) -> Result<Option<Self>, ParquetError> {
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset
.try_into()
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
} else {
return Ok(None);
};
let buffer = match column_metadata.bloom_filter_length() {
Some(length) => reader.get_bytes(offset, length as usize),
None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
}?;
let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
match header.algorithm {
BloomFilterAlgorithm::BLOCK => {
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED => {
}
}
match header.hash {
BloomFilterHash::XXHASH => {
}
}
let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset - offset) as usize..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
reader.get_bytes(bitset_offset, bitset_length)?
}
};
Ok(Some(Self::new(&bitset)))
}
#[inline]
fn hash_to_block_index(&self, hash: u64) -> usize {
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}
pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}
fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].insert(hash as u32)
}
pub fn check<T: AsBytes + ?Sized>(&self, value: &T) -> bool {
self.check_hash(hash_as_bytes(value))
}
fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}
pub(crate) fn estimated_memory_size(&self) -> usize {
self.0.capacity() * std::mem::size_of::<Block>()
}
pub fn num_blocks(&self) -> usize {
self.0.len()
}
pub fn fold_to_target_fpp(&mut self, target_fpp: f64) {
let num_folds = self.num_folds_for_target_fpp(target_fpp);
if num_folds > 0 {
self.fold_n(num_folds);
}
}
fn num_folds_for_target_fpp(&self, target_fpp: f64) -> u32 {
let len = self.0.len();
if len < 2 {
return 0;
}
let total_set_bits: u64 = self.0.iter().map(|b| u64::from(b.count_ones())).sum();
let avg_fill = total_set_bits as f64 / (len as f64 * 256.0);
if avg_fill == 0.0 {
return len.trailing_zeros();
}
assert!(
len.is_power_of_two(),
"Number of blocks must be a power of 2 for folding"
);
let max_folds = len.trailing_zeros(); let one_minus_f = 1.0 - avg_fill;
let mut num_folds = 0u32;
let mut one_minus_fk = one_minus_f;
for _ in 0..max_folds {
one_minus_fk = one_minus_fk * one_minus_fk;
let fk = 1.0 - one_minus_fk;
let estimated_fpp = fk.powi(8);
if estimated_fpp > target_fpp {
break;
}
num_folds += 1;
}
num_folds
}
fn fold_n(&mut self, num_folds: u32) {
assert!(num_folds > 0, "num_folds must be at least 1");
let len = self.0.len();
let group_size = 1usize << num_folds;
assert!(
group_size <= len,
"Cannot fold {num_folds} times: need at least {group_size} blocks, have {len}"
);
let new_len = len / group_size;
for i in 0..new_len {
let start = i * group_size;
let mut merged = self.0[start];
for j in 1..group_size {
merged |= self.0[start + j];
}
self.0[i] = merged;
}
self.0.truncate(new_len);
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ParquetError> {
let (header, header_len) = read_bloom_filter_header_and_length_from_bytes(bytes)?;
let bitset_length: u64 = header
.num_bytes
.try_into()
.map_err(|_| ParquetError::General("Bloom filter length is invalid".to_string()))?;
if header_len + bitset_length != bytes.len() as u64 {
return Err(ParquetError::General(format!(
"Bloom filter data contains extra bytes: expected {} total bytes, got {}",
header_len + bitset_length,
bytes.len()
)));
}
let start = header_len as usize;
let end = (header_len + bitset_length) as usize;
let bitset = bytes
.get(start..end)
.ok_or_else(|| ParquetError::General("Bloom filter bitset is invalid".to_string()))?;
Ok(Self::new(bitset))
}
}
const SEED: u64 = 0;
#[inline]
fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
XxHash64::oneshot(SEED, value.as_bytes())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_bytes() {
assert_eq!(hash_as_bytes(""), 17241709254077376921);
}
#[test]
fn test_mask_set_quick_check() {
for i in 0..1_000_000 {
let result = Block::mask(i);
assert!(result.0.iter().all(|&x| x.is_power_of_two()));
}
}
#[test]
fn test_block_insert_and_check() {
for i in 0..1_000_000 {
let mut block = Block::ZERO;
block.insert(i);
assert!(block.check(i));
}
}
#[test]
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
for i in 0..1_000_000 {
sbbf.insert(&i);
assert!(sbbf.check(&i));
}
}
#[test]
fn test_with_fixture() {
let bitset: &[u8] = &[
200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
];
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{a}");
assert!(sbbf.check(&value.as_str()));
}
}
#[test]
fn test_bloom_filter_header_size_assumption() {
let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
let (
BloomFilterHeader {
algorithm,
compression,
hash,
num_bytes,
},
read_length,
) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
assert_eq!(read_length, 15);
assert_eq!(algorithm, BloomFilterAlgorithm::BLOCK);
assert_eq!(compression, BloomFilterCompression::UNCOMPRESSED);
assert_eq!(hash, BloomFilterHash::XXHASH);
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}
#[test]
fn test_optimal_num_of_bytes() {
for (input, expected) in &[
(0, 32),
(9, 32),
(31, 32),
(32, 32),
(33, 64),
(99, 128),
(1024, 1024),
(999_000_000, 128 * 1024 * 1024),
] {
assert_eq!(*expected, optimal_num_of_bytes(*input));
}
}
#[test]
fn test_num_of_bits_from_ndv_fpp() {
for (fpp, ndv, num_bits) in &[
(0.1, 10, 57),
(0.01, 10, 96),
(0.001, 10, 146),
(0.1, 100, 577),
(0.01, 100, 968),
(0.001, 100, 1460),
(0.1, 1000, 5772),
(0.01, 1000, 9681),
(0.001, 1000, 14607),
(0.1, 10000, 57725),
(0.01, 10000, 96815),
(0.001, 10000, 146076),
(0.1, 100000, 577254),
(0.01, 100000, 968152),
(0.001, 100000, 1460769),
(0.1, 1000000, 5772541),
(0.01, 1000000, 9681526),
(0.001, 1000000, 14607697),
(1e-50, 1_000_000_000_000, 14226231280773240832),
] {
assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
}
}
#[test]
fn test_fold_n_halves_block_count() {
let mut sbbf = Sbbf::new_with_num_of_bytes(1024); assert_eq!(sbbf.num_blocks(), 32);
sbbf.fold_n(1);
assert_eq!(sbbf.num_blocks(), 16);
sbbf.fold_n(1);
assert_eq!(sbbf.num_blocks(), 8);
}
#[test]
fn test_fold_preserves_inserted_values() {
let mut sbbf = Sbbf::new_with_num_of_bytes(32 * 1024); let values: Vec<String> = (0..1000).map(|i| format!("value_{i}")).collect();
for v in &values {
sbbf.insert(v.as_str());
}
let original_blocks = sbbf.num_blocks();
sbbf.fold_to_target_fpp(0.05);
assert!(
sbbf.num_blocks() < original_blocks,
"should have folded at least once"
);
for v in &values {
assert!(
sbbf.check(v.as_str()),
"Value '{}' missing after folding (false negative!)",
v
);
}
}
#[test]
fn test_fold_to_target_fpp_stops_before_exceeding_target() {
let mut sbbf = Sbbf::new_with_num_of_bytes(64 * 1024); for i in 0..5000 {
sbbf.insert(&i);
}
let target_fpp = 0.01;
sbbf.fold_to_target_fpp(target_fpp);
let total_bits = (sbbf.num_blocks() * 256) as f64;
let set_bits: u64 = sbbf
.0
.iter()
.flat_map(|b| b.0.iter())
.map(|w| w.count_ones() as u64)
.sum();
let fill = set_bits as f64 / total_bits;
let current_fpp = fill.powi(8);
assert!(
current_fpp <= target_fpp,
"FPP {current_fpp} exceeds target {target_fpp}"
);
}
#[test]
fn test_fold_empty_filter_folds_to_minimum() {
let mut sbbf = Sbbf::new_with_num_of_bytes(1024); sbbf.fold_to_target_fpp(0.01);
assert_eq!(sbbf.num_blocks(), 1);
}
#[test]
#[should_panic(expected = "Cannot fold 1 times: need at least 2 blocks, have 1")]
fn test_fold_n_panics_at_minimum_size() {
let mut sbbf = Sbbf::new_with_num_of_bytes(32); sbbf.fold_n(1);
}
#[test]
fn test_sbbf_write_round_trip() {
let bitset_bytes = vec![0u8; 32];
let mut original = Sbbf::new(&bitset_bytes);
let test_values = ["hello", "world", "rust", "parquet", "bloom", "filter"];
for value in &test_values {
original.insert(value);
}
let mut output = Vec::new();
original.write(&mut output).unwrap();
let mut protocol = ThriftSliceInputProtocol::new(&output);
let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap();
assert_eq!(header.num_bytes, bitset_bytes.len() as i32);
assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK);
assert_eq!(header.hash, BloomFilterHash::XXHASH);
assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED);
let reconstructed = Sbbf::from_bytes(&output).unwrap();
for value in &test_values {
assert!(
reconstructed.check(value),
"Value '{}' should be present after round-trip",
value
);
}
}
#[test]
fn test_sbbf_folded_equals_fresh() {
let values = (0..5000).map(|i| format!("elem_{i}")).collect::<Vec<_>>();
let hashes = values
.iter()
.map(|v| hash_as_bytes(v.as_str()))
.collect::<Vec<_>>();
for num_blocks in [64, 256, 1024] {
let half = num_blocks / 2;
let mut original = Sbbf::new_with_num_of_bytes(num_blocks * 32);
assert_eq!(original.num_blocks(), num_blocks);
for &h in &hashes {
original.insert_hash(h);
}
for &h in hashes.iter() {
let mask = Block::mask(h as u32);
let orig_idx = original.hash_to_block_index(h);
assert!(orig_idx < num_blocks);
let fresh_idx = {
let tmp = Sbbf(vec![Block::ZERO; half]);
tmp.hash_to_block_index(h)
};
let folded_idx = orig_idx / 2;
assert_eq!(
fresh_idx, folded_idx,
"Lemma 1 failed: fresh index {fresh_idx} != folded index {folded_idx}"
);
for w in 0..8 {
assert_ne!(
original.0[orig_idx].0[w] & mask.0[w],
0,
"Lemma 2 failed: mask bit not set in word {w} of block {orig_idx}"
);
}
}
let mut folded = original.clone();
folded.fold_n(1);
assert_eq!(folded.num_blocks(), half);
let mut fresh = Sbbf::new_with_num_of_bytes(half * 32);
for &h in &hashes {
fresh.insert_hash(h);
}
for j in 0..half {
assert_eq!(
folded.0[j].0, fresh.0[j].0,
"Block {j} differs after fold (N={num_blocks} → {half})"
);
}
}
}
#[test]
fn test_multi_step_fold() {
let values = (0..3000).map(|i| format!("x_{i}")).collect::<Vec<_>>();
let mut filter = Sbbf::new_with_num_of_bytes(512 * 32);
for v in &values {
filter.insert(v.as_str());
}
for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] {
filter.fold_n(1);
assert_eq!(filter.num_blocks(), expected_blocks);
let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32);
for v in &values {
fresh.insert(v.as_str());
}
for (fb, rb) in filter.0.iter().zip(fresh.0.iter()) {
assert_eq!(fb.0, rb.0);
}
}
}
#[test]
fn test_fold_size_vs_optimal_fixed_size() {
for (ndv, target_fpp) in [
(1000, 0.05),
(1000, 0.01),
(5000, 0.05),
(5000, 0.01),
(10000, 0.05),
] {
let values = (0..ndv).map(|i| format!("d_{i}")).collect::<Vec<_>>();
let mut folded = Sbbf::new_with_num_of_bytes(128 * 1024); for v in &values {
folded.insert(v.as_str());
}
folded.fold_to_target_fpp(target_fpp);
let folded_bytes = folded.num_blocks() * 32;
let optimal = Sbbf::new_with_ndv_fpp(ndv as u64, target_fpp).unwrap();
let optimal_bytes = optimal.num_blocks() * 32;
let ratio = folded_bytes as f64 / optimal_bytes as f64;
assert_eq!(ratio, 1.0);
}
}
#[test]
fn test_folded_fpp_matches_fresh_fpp() {
let ndv = 2000;
let num_probes = 50_000;
let inserted = (0..ndv)
.map(|i| format!("ins_{i}"))
.collect::<Vec<String>>();
let probes = (0..num_probes)
.map(|i| format!("probe_{i}"))
.collect::<Vec<String>>();
let mut folded = Sbbf::new_with_num_of_bytes(512 * 32); for v in &inserted {
folded.insert(v.as_str());
}
for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] {
folded.fold_n(1);
assert_eq!(folded.num_blocks(), expected_blocks);
let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32);
for v in &inserted {
fresh.insert(v.as_str());
}
let mut folded_fp = 0u64;
let mut fresh_fp = 0u64;
for p in &probes {
if folded.check(p.as_str()) {
folded_fp += 1;
}
if fresh.check(p.as_str()) {
fresh_fp += 1;
}
}
assert_eq!(folded_fp, fresh_fp);
}
}
}