use serde::{Deserialize, Serialize};
pub const DEFAULT_BLOCK_SIZE: usize = 1024;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinMaxIndex {
pub column_name: String,
pub blocks: Vec<MinMaxEntry>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MinMaxEntry {
pub min: f64,
pub max: f64,
}
impl MinMaxIndex {
pub fn build_f64(column_name: &str, values: &[f64], block_size: usize) -> Self {
Self::build_from_iter(
column_name,
values.chunks(block_size).map(|chunk| {
chunk
.iter()
.copied()
.fold((f64::INFINITY, f64::NEG_INFINITY), |(mn, mx), v| {
(mn.min(v), mx.max(v))
})
}),
)
}
pub fn build_i64(column_name: &str, values: &[i64], block_size: usize) -> Self {
Self::build_from_iter(
column_name,
values.chunks(block_size).map(|chunk| {
let (mn, mx) = chunk
.iter()
.copied()
.fold((i64::MAX, i64::MIN), |(mn, mx), v| (mn.min(v), mx.max(v)));
(mn as f64, mx as f64)
}),
)
}
fn build_from_iter(column_name: &str, blocks: impl Iterator<Item = (f64, f64)>) -> Self {
Self {
column_name: column_name.to_string(),
blocks: blocks.map(|(min, max)| MinMaxEntry { min, max }).collect(),
}
}
pub fn filter_range(&self, min_val: f64, max_val: f64) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, e)| e.max >= min_val && e.min <= max_val)
.map(|(i, _)| i)
.collect()
}
pub fn filter_gt(&self, threshold: f64) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, e)| e.max > threshold)
.map(|(i, _)| i)
.collect()
}
pub fn filter_lt(&self, threshold: f64) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, e)| e.min < threshold)
.map(|(i, _)| i)
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetIndex {
pub column_name: String,
pub blocks: Vec<Vec<u32>>, }
impl SetIndex {
pub fn build(column_name: &str, values: &[u32], block_size: usize) -> Self {
let blocks = values
.chunks(block_size)
.map(|chunk| {
let mut unique: Vec<u32> = chunk.to_vec();
unique.sort_unstable();
unique.dedup();
unique
})
.collect();
Self {
column_name: column_name.to_string(),
blocks,
}
}
pub fn filter_eq(&self, symbol_id: u32) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, set)| set.binary_search(&symbol_id).is_ok())
.map(|(i, _)| i)
.collect()
}
pub fn filter_in(&self, symbol_ids: &[u32]) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, set)| symbol_ids.iter().any(|id| set.binary_search(id).is_ok()))
.map(|(i, _)| i)
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BloomIndex {
pub column_name: String,
pub blocks: Vec<BloomFilter>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BloomFilter {
bits: Vec<u64>,
num_bits: usize,
num_hashes: u8,
}
impl BloomFilter {
pub fn new(num_bits: usize, num_hashes: u8) -> Self {
Self {
bits: vec![0u64; num_bits.div_ceil(64)],
num_bits,
num_hashes,
}
}
pub fn for_capacity(n: usize) -> Self {
let bits_per_elem = 10; let num_bits = (n * bits_per_elem).max(64);
let num_hashes = 7; Self::new(num_bits, num_hashes)
}
pub fn insert(&mut self, value: &[u8]) {
let (h1, h2) = double_hash(value);
for i in 0..self.num_hashes as u64 {
let bit = (h1.wrapping_add(i.wrapping_mul(h2))) as usize % self.num_bits;
self.bits[bit / 64] |= 1u64 << (bit % 64);
}
}
pub fn insert_u64(&mut self, value: u64) {
self.insert(&value.to_le_bytes());
}
pub fn might_contain(&self, value: &[u8]) -> bool {
let (h1, h2) = double_hash(value);
for i in 0..self.num_hashes as u64 {
let bit = (h1.wrapping_add(i.wrapping_mul(h2))) as usize % self.num_bits;
if (self.bits[bit / 64] >> (bit % 64)) & 1 == 0 {
return false;
}
}
true
}
pub fn might_contain_u64(&self, value: u64) -> bool {
self.might_contain(&value.to_le_bytes())
}
}
fn double_hash(value: &[u8]) -> (u64, u64) {
let mut h1: u64 = 0x9e3779b97f4a7c15;
let mut h2: u64 = 0x517cc1b727220a95;
for &b in value {
h1 = h1.wrapping_mul(0x100000001b3).wrapping_add(b as u64);
h2 = h2.wrapping_mul(0x6c62272e07bb0142).wrapping_add(b as u64);
}
(h1, h2)
}
impl BloomIndex {
pub fn build_u64(column_name: &str, values: &[u64], block_size: usize) -> Self {
let blocks = values
.chunks(block_size)
.map(|chunk| {
let mut bf = BloomFilter::for_capacity(chunk.len());
for &v in chunk {
bf.insert_u64(v);
}
bf
})
.collect();
Self {
column_name: column_name.to_string(),
blocks,
}
}
pub fn build_bytes(column_name: &str, values: &[&[u8]], block_size: usize) -> Self {
let blocks = values
.chunks(block_size)
.map(|chunk| {
let mut bf = BloomFilter::for_capacity(chunk.len());
for v in chunk {
bf.insert(v);
}
bf
})
.collect();
Self {
column_name: column_name.to_string(),
blocks,
}
}
pub fn filter_u64(&self, value: u64) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, bf)| bf.might_contain_u64(value))
.map(|(i, _)| i)
.collect()
}
pub fn filter_bytes(&self, value: &[u8]) -> Vec<usize> {
self.blocks
.iter()
.enumerate()
.filter(|(_, bf)| bf.might_contain(value))
.map(|(i, _)| i)
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn minmax_range_filter() {
let values: Vec<f64> = (0..2048).map(|i| i as f64).collect();
let idx = MinMaxIndex::build_f64("cpu", &values, 1024);
assert_eq!(idx.blocks.len(), 2);
let matching = idx.filter_gt(1500.0);
assert_eq!(matching, vec![1]);
let matching = idx.filter_lt(500.0);
assert_eq!(matching, vec![0]);
let matching = idx.filter_range(500.0, 1500.0);
assert_eq!(matching, vec![0, 1]); }
#[test]
fn set_equality_filter() {
let values: Vec<u32> = (0..2048).map(|i| (i / 1024 * 3 + i % 3) as u32).collect();
let idx = SetIndex::build("status", &values, 1024);
assert_eq!(idx.blocks.len(), 2);
let matching = idx.filter_eq(0);
assert_eq!(matching, vec![0]);
let matching = idx.filter_eq(5);
assert_eq!(matching, vec![1]); }
#[test]
fn set_in_filter() {
let values: Vec<u32> = (0..2048).map(|i| (i % 10) as u32).collect();
let idx = SetIndex::build("tag", &values, 1024);
let matching = idx.filter_in(&[0, 5]);
assert_eq!(matching, vec![0, 1]); }
#[test]
fn bloom_no_false_negatives() {
let mut bf = BloomFilter::for_capacity(100);
for i in 0..100u64 {
bf.insert_u64(i);
}
for i in 0..100u64 {
assert!(bf.might_contain_u64(i), "false negative for {i}");
}
}
#[test]
fn bloom_low_false_positive_rate() {
let mut bf = BloomFilter::for_capacity(1000);
for i in 0..1000u64 {
bf.insert_u64(i);
}
let false_positives = (10_000..11_000u64)
.filter(|&v| bf.might_contain_u64(v))
.count();
assert!(
false_positives < 50,
"too many false positives: {false_positives}/1000"
);
}
#[test]
fn bloom_index_block_skip() {
let values: Vec<u64> = (0..2048).collect();
let idx = BloomIndex::build_u64("trace_id", &values, 1024);
assert_eq!(idx.blocks.len(), 2);
let matching = idx.filter_u64(500);
assert!(matching.contains(&0));
let matching = idx.filter_u64(1500);
assert!(matching.contains(&1));
let matching = idx.filter_u64(999_999);
assert!(matching.len() <= 2);
}
}