#![allow(
clippy::doc_markdown,
clippy::too_long_first_doc_paragraph,
clippy::cast_precision_loss,
clippy::missing_const_for_fn
)]
use bloomfilter::Bloom;
use mnem_core::id::Cid;
use mnem_core::store::Blockstore;
use crate::error::TransportError;
pub const BLOOM_SEED_KEY: [u8; 32] = {
let mut k = [0u8; 32];
let src = b"mnem-have-set-bloom-1";
let mut i = 0;
while i < src.len() {
k[i] = src[i];
i += 1;
}
k
};
pub const DEFAULT_FPR: f64 = 0.01;
pub trait HaveSet {
fn contains(&self, cid: &Cid) -> bool;
fn extend<I: IntoIterator<Item = Cid>>(&mut self, cids: I);
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn serialize(&self) -> Vec<u8>;
}
pub struct BloomHaveSet {
inner: Bloom<[u8]>,
item_count: usize,
}
impl BloomHaveSet {
#[must_use]
pub fn new(items_hint: usize) -> Self {
Self::with_params(items_hint.max(1), DEFAULT_FPR)
}
#[must_use]
pub fn with_params(items_hint: usize, fpr: f64) -> Self {
assert!(items_hint > 0, "items_hint must be positive");
assert!(
fpr > 0.0 && fpr < 1.0,
"fpr must be in (0, 1) but got {fpr}",
);
let inner = Bloom::new_for_fp_rate_with_seed(items_hint, fpr, &BLOOM_SEED_KEY)
.expect("valid bloom params (items_hint > 0, 0 < fpr < 1)");
Self {
inner,
item_count: 0,
}
}
}
impl HaveSet for BloomHaveSet {
fn contains(&self, cid: &Cid) -> bool {
self.inner.check(cid.to_bytes().as_slice())
}
fn extend<I: IntoIterator<Item = Cid>>(&mut self, cids: I) {
for cid in cids {
let bytes = cid.to_bytes();
if !self.inner.check_and_set(bytes.as_slice()) {
self.item_count += 1;
}
}
}
fn len(&self) -> usize {
self.item_count
}
fn serialize(&self) -> Vec<u8> {
#[derive(serde::Serialize)]
struct Wire<'a> {
_kind: &'static str,
k: u32,
bitmap: &'a [u8],
m_bits: u64,
seed_key: &'a [u8],
item_hint: u32,
}
let bitmap = self.inner.as_slice();
let m_bits = self.inner.len();
let k = self.inner.number_of_hash_functions();
let w = Wire {
_kind: "have-set-bloom",
k,
bitmap,
m_bits,
seed_key: &BLOOM_SEED_KEY,
item_hint: u32::try_from(self.item_count).unwrap_or(u32::MAX),
};
serde_ipld_dagcbor::to_vec(&w).expect("infallible dag-cbor of fixed shape")
}
}
impl std::fmt::Debug for BloomHaveSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BloomHaveSet")
.field("items_inserted", &self.item_count)
.field("m_bits", &self.inner.len())
.field("k", &self.inner.number_of_hash_functions())
.finish()
}
}
pub fn build_have_set<B>(bs: &B, root: &Cid) -> Result<BloomHaveSet, TransportError>
where
B: Blockstore + ?Sized,
{
let mut hs = BloomHaveSet::new(10_000);
for item in bs.iter_from_root(root) {
let (cid, _bytes) = item?;
hs.inner.set(cid.to_bytes().as_slice());
hs.item_count += 1;
}
Ok(hs)
}
#[cfg(test)]
mod tests {
use super::*;
use mnem_core::id::{CODEC_RAW, Multihash};
fn raw(n: u64) -> Cid {
Cid::new(CODEC_RAW, Multihash::sha2_256(&n.to_be_bytes()))
}
#[test]
fn empty_have_set_contains_nothing_inserted() {
let hs = BloomHaveSet::new(100);
assert!(hs.is_empty());
assert_eq!(hs.len(), 0);
assert!(!hs.contains(&raw(1)));
}
#[test]
fn inserted_cids_are_always_contained() {
let mut hs = BloomHaveSet::new(1_000);
let cids: Vec<Cid> = (0..500).map(raw).collect();
hs.extend(cids.clone());
for c in &cids {
assert!(hs.contains(c), "false negative on {c}");
}
assert_eq!(hs.len(), 500);
}
#[test]
fn false_positive_rate_stays_sane_at_10k_items() {
let mut hs = BloomHaveSet::with_params(10_000, 0.01);
let inserted: Vec<Cid> = (0..10_000).map(raw).collect();
hs.extend(inserted);
let probes: Vec<Cid> = (100_000..110_000).map(raw).collect();
let fp = probes.iter().filter(|c| hs.contains(c)).count();
let observed = fp as f64 / probes.len() as f64;
assert!(
observed < 0.03,
"observed FPR {observed} > 3x target (0.03); bloom sizing is off",
);
}
#[test]
fn extend_is_idempotent_for_duplicate_cids() {
let mut a = BloomHaveSet::new(100);
let cids: Vec<Cid> = (0..10).map(raw).collect();
a.extend(cids.clone());
let len_first = a.len();
a.extend(cids.clone());
for c in &cids {
assert!(a.contains(c));
}
assert_eq!(a.len(), len_first);
}
#[test]
fn serialize_is_stable_and_parseable_as_dagcbor() {
let mut hs = BloomHaveSet::with_params(100, 0.01);
let cids: Vec<Cid> = (0..50).map(raw).collect();
hs.extend(cids);
let a = hs.serialize();
let b = hs.serialize();
assert_eq!(a, b, "serialize must be deterministic");
let ipld: ipld_core::ipld::Ipld = serde_ipld_dagcbor::from_slice(&a).unwrap();
if let ipld_core::ipld::Ipld::Map(m) = ipld {
assert!(m.contains_key("_kind"));
assert!(m.contains_key("bitmap"));
assert!(m.contains_key("m_bits"));
assert!(m.contains_key("k"));
assert!(m.contains_key("seed_key"));
assert!(m.contains_key("item_hint"));
} else {
panic!("expected map, got {ipld:?}");
}
}
#[test]
fn seed_key_is_frozen_and_ascii_prefix() {
assert_eq!(&BLOOM_SEED_KEY[..21], b"mnem-have-set-bloom-1");
assert!(BLOOM_SEED_KEY[21..].iter().all(|b| *b == 0));
}
}