1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
use serde::{Deserialize, Serialize};
use std::hash::Hasher;
/// Current hasher version. Persisted bloom filters whose `hash_version` differs
/// from this constant were built with a different hash function and MUST be
/// rebuilt before being trusted for queries. The engine does this
/// automatically on open (`Engine::open`).
///
/// History:
/// * `0` — legacy `ahash::AHasher::default()` (compile-time randomised seed,
/// unsafe across recompiles). Used by flowdb <= 0.1.6.
/// * `1` — `std::hash::DefaultHasher` (SipHash-1-3) with a fixed seed baked
/// into the binary. Stable across restarts AND recompiles.
pub(crate) const CURRENT_HASH_VERSION: u8 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BloomFilter {
bits: Vec<u64>,
num_hashes: u8,
/// Hasher version this filter was built with. Old manifests serialise
/// without this field, so `#[serde(default)]` maps them to `0` (legacy).
/// `Engine::open` detects version mismatches and rebuilds the filter.
#[serde(default)]
hash_version: u8,
}
impl BloomFilter {
pub fn with_bits_per_key(expected_keys: usize, bits_per_key: usize) -> Self {
// For 10 bits/key the optimal k = round((bits/key) * ln 2) ~= 7.
// The legacy code hard-coded k=2 which yielded ~5% FPR; raising to 7
// brings the FPR down to ~0.8%, eliminating unnecessary block reads.
let num_hashes = bits_per_key.saturating_mul(69).div_ceil(100).min(30) as u8;
// Guarantee at least 2 hash functions for small configs.
let num_hashes = num_hashes.max(2);
let num_bits = (expected_keys * bits_per_key).max(64);
let num_words = num_bits.div_ceil(64);
Self {
bits: vec![0u64; num_words],
num_hashes,
hash_version: CURRENT_HASH_VERSION,
}
}
pub fn from_keys_with_bits(keys: &[Vec<u8>], bits_per_key: usize) -> Self {
let mut bf = Self::with_bits_per_key(keys.len(), bits_per_key);
for key in keys {
bf.insert(key);
}
bf
}
/// Hasher version this filter was built with. Filters whose version differs
/// from `CURRENT_HASH_VERSION` must not be queried (`may_contain` could
/// return false negatives for keys that were actually inserted).
pub fn hash_version(&self) -> u8 {
self.hash_version
}
/// Mark this filter as upgraded to the current hasher version in-place.
/// Only call this AFTER rebuilding `bits` with the current hasher (e.g. in
/// `Engine::open`'s migration path). The dedicated migration code path
/// rebuilds filters wholesale via `from_keys_with_bits`, so this method
/// is currently only exercised by unit tests — kept public for future
/// in-place rebuild scenarios and explicitly allowed dead-code.
#[allow(dead_code)]
pub fn mark_current(&mut self) {
self.hash_version = CURRENT_HASH_VERSION;
}
/// Double-hashing with a fixed seed. We use `std::hash::DefaultHasher`
/// (SipHash-1-3) seeded with compile-time constants. Unlike
/// `ahash::AHasher::default()`, the seed does NOT depend on build-time
/// randomness, so persisted filters stay valid across recompiles and
/// process restarts.
///
/// Two independent hashes are derived by re-seeding the hasher with a
/// distinguishing prefix byte, then combined via the standard
/// `h1 + i * h2` double-hashing scheme.
fn hash_pair(data: &[u8]) -> (u64, u64) {
const SEED_K0: u64 = 0x5365_7244_424c_4f4f;
const SEED_K1: u64 = 0x5744_425f_464c_4f57;
let mut h1 = std::hash::DefaultHasher::new();
h1.write_u64(SEED_K0);
h1.write_u64(SEED_K1);
h1.write(data);
let hash1 = h1.finish();
let mut h2 = std::hash::DefaultHasher::new();
h2.write_u64(!SEED_K0);
h2.write_u64(!SEED_K1);
h2.write_u8(0xFF);
h2.write(data);
let hash2 = h2.finish();
(hash1, hash2)
}
pub fn insert(&mut self, key: &[u8]) {
let (h1, h2) = Self::hash_pair(key);
let num_bits = self.bits.len() as u64 * 64;
for i in 0..self.num_hashes {
let hash = h1.wrapping_add((i as u64).wrapping_mul(h2));
let bit_pos = hash % num_bits;
let word_idx = (bit_pos / 64) as usize;
let bit_idx = bit_pos % 64;
self.bits[word_idx] |= 1u64 << bit_idx;
}
}
pub fn may_contain(&self, key: &[u8]) -> bool {
// Safety net: if the filter was built with a different hasher, the
// bit positions are meaningless for the current `hash_pair`. We
// conservatively return `true` (possibly-in-set) so the caller falls
// back to a full block read rather than silently skipping data.
// `Engine::open` rebuilds stale filters on startup, so this path only
// triggers transiently during the upgrade migration.
if self.hash_version != CURRENT_HASH_VERSION {
return true;
}
let (h1, h2) = Self::hash_pair(key);
let num_bits = self.bits.len() as u64 * 64;
for i in 0..self.num_hashes {
let hash = h1.wrapping_add((i as u64).wrapping_mul(h2));
let bit_pos = hash % num_bits;
let word_idx = (bit_pos / 64) as usize;
let bit_idx = bit_pos % 64;
if self.bits[word_idx] & (1u64 << bit_idx) == 0 {
return false;
}
}
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bloom_no_false_negatives() {
let keys: Vec<Vec<u8>> = (0..1000)
.map(|i| format!("key_{:06}", i).into_bytes())
.collect();
let bf = BloomFilter::from_keys_with_bits(&keys, 10);
for key in &keys {
assert!(bf.may_contain(key), "false negative");
}
}
#[test]
fn test_bloom_serialization() {
let keys: Vec<Vec<u8>> = (0..100).map(|i| format!("k{}", i).into_bytes()).collect();
let bf = BloomFilter::from_keys_with_bits(&keys, 10);
let json = serde_json::to_string(&bf).unwrap();
let bf2: BloomFilter = serde_json::from_str(&json).unwrap();
for key in &keys {
assert!(bf2.may_contain(key));
}
}
#[test]
fn test_bloom_version_is_current_after_build() {
let bf = BloomFilter::from_keys_with_bits(&[b"k1".to_vec()], 10);
assert_eq!(bf.hash_version(), CURRENT_HASH_VERSION);
}
#[test]
fn test_bloom_legacy_version_loads_as_zero() {
// Simulate an old manifest entry that has no `hash_version` field.
// serde's `#[serde(default)]` should map it to 0 (legacy ahash).
let json = r#"{"bits":[18446744073709551615],"num_hashes":2}"#;
let bf: BloomFilter = serde_json::from_str(json).unwrap();
assert_eq!(bf.hash_version(), 0);
}
#[test]
fn test_bloom_stale_version_returns_true_safely() {
// A bloom with hash_version=0 (legacy) must never produce a false
// negative. `may_contain` should conservatively return true even for
// keys that were never inserted, so callers fall back to a real read.
let mut bf = BloomFilter::with_bits_per_key(10, 10);
bf.hash_version = 0;
// Bits are all zero — for a "real" bloom this would always return
// false. But because the version is stale we MUST return true.
assert!(bf.may_contain(b"never-inserted"));
}
#[test]
fn test_bloom_mark_current_after_rebuild() {
let keys: Vec<Vec<u8>> = (0..50).map(|i| format!("k{}", i).into_bytes()).collect();
let mut bf = BloomFilter::from_keys_with_bits(&keys, 10);
// Simulate rebuild: same bits computed with the current hasher, but
// pretend it was a legacy filter before the rebuild.
bf.hash_version = 0;
// After rebuilding bits (no-op here since we reuse the same hasher),
// we mark it current.
bf.mark_current();
assert_eq!(bf.hash_version(), CURRENT_HASH_VERSION);
for k in &keys {
assert!(bf.may_contain(k));
}
}
#[test]
fn test_bloom_false_positive_rate_is_bounded() {
// With k=7 hashes for 10 bits/key, theoretical FPR is ~0.8%.
// The legacy k=2 had ~5% FPR. We assert the new FPR is meaningfully
// lower so we don't silently regress to the old value.
let keys: Vec<Vec<u8>> = (0..10000)
.map(|i| format!("key_{:06}", i).into_bytes())
.collect();
let bf = BloomFilter::from_keys_with_bits(&keys, 10);
let mut fp = 0usize;
let test_count = 10000;
for i in 0..test_count {
let key = format!("other_{:06}", i).into_bytes();
if bf.may_contain(&key) {
fp += 1;
}
}
let fp_rate = fp as f64 / test_count as f64;
// Upper bound: never worse than the legacy 5%.
assert!(fp_rate < 0.05, "false positive rate too high: {}", fp_rate);
// Lower bound sanity: hash count should have improved FPR. Allow some
// headroom (3%) for run-to-run variation in hash distribution.
assert!(
fp_rate < 0.03,
"expected improved FPR with optimal num_hashes, got {}",
fp_rate
);
}
}