use {
crate::{
cluster_info_metrics::GossipStats,
contact_info::ContactInfo,
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_gossip,
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
protocol::{Ping, PingCache},
},
itertools::Itertools,
rand::{
distributions::{Distribution, WeightedIndex},
Rng,
},
rayon::{prelude::*, ThreadPool},
serde::{Deserialize, Serialize},
solana_bloom::bloom::{Bloom, ConcurrentBloom},
solana_hash::Hash,
solana_keypair::Keypair,
solana_native_token::LAMPORTS_PER_SOL,
solana_packet::PACKET_DATA_SIZE,
solana_pubkey::Pubkey,
solana_signer::Signer,
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
iter::{repeat, repeat_with},
net::SocketAddr,
ops::Index,
sync::{
atomic::{AtomicI64, AtomicUsize, Ordering},
LazyLock, Mutex, RwLock,
},
time::Duration,
},
};
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct CrdsFilter {
pub filter: Bloom<Hash>,
mask: u64,
mask_bits: u32,
}
pub struct PullRequest {
pub pubkey: Pubkey, pub addr: SocketAddr, pub wallclock: u64, pub filter: CrdsFilter,
}
impl Default for CrdsFilter {
fn default() -> Self {
CrdsFilter {
filter: Bloom::default(),
mask: !0u64,
mask_bits: 0u32,
}
}
}
impl solana_sanitize::Sanitize for CrdsFilter {
fn sanitize(&self) -> std::result::Result<(), solana_sanitize::SanitizeError> {
self.filter.sanitize()?;
Ok(())
}
}
impl CrdsFilter {
#[cfg(test)]
pub(crate) fn new_rand(num_items: usize, max_bytes: usize) -> Self {
let max_bits = (max_bytes * 8) as f64;
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
let mask_bits = Self::mask_bits(num_items as f64, max_items);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
let seed: u64 = rand::thread_rng().gen_range(0..2u64.pow(mask_bits));
let mask = Self::compute_mask(seed, mask_bits);
CrdsFilter {
filter,
mask,
mask_bits,
}
}
fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
assert!(seed <= 2u64.pow(mask_bits));
let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0)
}
fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 {
let m = max_bits;
let p = false_rate;
let k = num_keys;
(m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil()
}
fn mask_bits(num_items: f64, max_items: f64) -> u32 {
((num_items / max_items).log2().ceil()).max(0.0) as u32
}
pub fn hash_as_u64(item: &Hash) -> u64 {
let buf = item.as_ref()[..8].try_into().unwrap();
u64::from_le_bytes(buf)
}
fn test_mask(&self, item: &Hash) -> bool {
Self::hash_matches_mask_prefix(self.mask, self.mask_bits, Self::hash_as_u64(item))
}
#[cfg(test)]
fn add(&mut self, item: &Hash) {
if self.test_mask(item) {
self.filter.add(item);
}
}
#[cfg(test)]
fn contains(&self, item: &Hash) -> bool {
if !self.test_mask(item) {
return true;
}
self.filter.contains(item)
}
fn filter_contains(&self, item: &Hash) -> bool {
self.filter.contains(item)
}
#[inline]
fn lsb_mask(mask_bits: u32) -> u64 {
(!0u64).checked_shr(mask_bits).unwrap_or(0)
}
#[inline]
pub(crate) fn canonical_mask(mask: u64, mask_bits: u32) -> u64 {
mask | Self::lsb_mask(mask_bits)
}
#[inline]
pub(crate) fn hash_matches_mask_prefix(mask: u64, mask_bits: u32, hash_u64: u64) -> bool {
let lsb_mask = Self::lsb_mask(mask_bits);
(hash_u64 | lsb_mask) == Self::canonical_mask(mask, mask_bits)
}
}
struct CrdsFilterSet {
filters: Vec<Option<ConcurrentBloom<Hash>>>,
mask_bits: u32,
}
impl CrdsFilterSet {
fn new<R: Rng>(rng: &mut R, num_items: usize, max_bytes: usize) -> Self {
const SAMPLE_RATE: usize = 8;
const MAX_NUM_FILTERS: usize = 1024;
let max_bits = (max_bytes * 8) as f64;
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items);
let mut filters: Vec<_> = repeat_with(|| None).take(1usize << mask_bits).collect();
let mut indices: Vec<_> = (0..filters.len()).collect();
let size = filters.len().div_ceil(SAMPLE_RATE);
for _ in 0..MAX_NUM_FILTERS.min(size) {
let k = rng.gen_range(0..indices.len());
let k = indices.swap_remove(k);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
filters[k] = Some(ConcurrentBloom::<Hash>::from(filter));
}
Self { filters, mask_bits }
}
fn add(&self, hash_value: Hash) {
let shift = u64::BITS.checked_sub(self.mask_bits).unwrap();
let index = usize::try_from(
CrdsFilter::hash_as_u64(&hash_value)
.checked_shr(shift)
.unwrap_or_default(),
)
.unwrap();
if let Some(filter) = &self.filters[index] {
filter.add(&hash_value);
}
}
}
impl From<CrdsFilterSet> for Vec<CrdsFilter> {
fn from(cfs: CrdsFilterSet) -> Self {
let mask_bits = cfs.mask_bits;
cfs.filters
.into_iter()
.enumerate()
.filter_map(|(seed, filter)| {
Some(CrdsFilter {
filter: Bloom::<Hash>::from(filter?),
mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
mask_bits,
})
})
.collect()
}
}
#[derive(Default)]
pub struct ProcessPullStats {
pub success: usize,
pub failed_insert: usize,
pub failed_timeout: usize,
}
pub struct CrdsGossipPull {
failed_inserts: RwLock<VecDeque<(Hash, /*timestamp:*/ u64)>>,
pub crds_timeout: u64,
pub num_pulls: AtomicUsize,
}
impl Default for CrdsGossipPull {
fn default() -> Self {
Self {
failed_inserts: RwLock::default(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
num_pulls: AtomicUsize::default(),
}
}
}
impl CrdsGossipPull {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_pull_request(
&self,
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
self_keypair: &Keypair,
self_shred_version: u16,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
bloom_size: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<impl Iterator<Item = (SocketAddr, CrdsFilter)> + Clone, CrdsGossipError> {
let mut rng = rand::thread_rng();
let nodes = crds_gossip::get_gossip_nodes(
&mut rng,
now,
&self_keypair.pubkey(),
|shred_version| shred_version == self_shred_version,
crds,
gossip_validators,
stakes,
socket_addr_space,
);
let nodes = crds_gossip::maybe_ping_gossip_addresses(
&mut rng,
nodes,
self_keypair,
ping_cache,
pings,
);
let stake_cap = stakes
.get(&self_keypair.pubkey())
.copied()
.unwrap_or_default();
let (weights, nodes): (Vec<u64>, Vec<ContactInfo>) =
crds_gossip::dedup_gossip_addresses(nodes, stakes)
.into_values()
.map(|(stake, node)| {
let stake = stake.min(stake_cap) / LAMPORTS_PER_SOL;
let weight = u64::BITS - stake.leading_zeros();
let weight = u64::from(weight).saturating_add(1).saturating_pow(2);
(weight, node)
})
.unzip();
if nodes.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
let dist = WeightedIndex::new(weights).unwrap();
Ok(filters.into_iter().filter_map(move |filter| {
let node = &nodes[dist.sample(&mut rng)];
Some((node.gossip()?, filter))
}))
}
pub(crate) fn generate_pull_responses(
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
requests: &[PullRequest],
output_size_limit: usize, now: u64,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
self_shred_version: u16,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> {
Self::filter_crds_values(
thread_pool,
crds,
requests,
output_size_limit,
now,
should_retain_crds_value,
self_shred_version,
stats,
)
}
pub(crate) fn filter_pull_responses(
&self,
crds: &RwLock<Crds>,
timeouts: &CrdsTimeouts,
responses: Vec<CrdsValue>,
now: u64,
stats: &mut ProcessPullStats,
) -> (Vec<CrdsValue>, Vec<CrdsValue>, Vec<Hash>) {
let mut active_values = vec![];
let mut expired_values = vec![];
let crds = crds.read().unwrap();
let upsert = |response: CrdsValue| {
let owner = response.label().pubkey();
let timeout = timeouts[&owner];
if !crds.upserts(&response) {
Some(response)
} else if now <= response.wallclock().saturating_add(timeout) {
active_values.push(response);
None
} else if crds.get::<&ContactInfo>(owner).is_some() {
expired_values.push(response);
None
} else {
stats.failed_timeout += 1;
Some(response)
}
};
let failed_inserts = responses
.into_iter()
.filter_map(upsert)
.map(|resp| *resp.hash())
.collect();
(active_values, expired_values, failed_inserts)
}
pub(crate) fn process_pull_responses(
&self,
crds: &RwLock<Crds>,
responses: Vec<CrdsValue>,
responses_expired_timeout: Vec<CrdsValue>,
failed_inserts: Vec<Hash>,
now: u64,
stats: &mut ProcessPullStats,
) {
let mut owners = HashSet::new();
let mut crds = crds.write().unwrap();
for response in responses_expired_timeout {
let _ = crds.insert(response, now, GossipRoute::PullResponse);
}
let mut num_inserts = 0;
for response in responses {
let owner = response.pubkey();
if let Ok(()) = crds.insert(response, now, GossipRoute::PullResponse) {
num_inserts += 1;
owners.insert(owner);
}
}
stats.success += num_inserts;
self.num_pulls.fetch_add(num_inserts, Ordering::Relaxed);
for owner in owners {
crds.update_record_timestamp(&owner, now);
}
drop(crds);
stats.failed_insert += failed_inserts.len();
self.purge_failed_inserts(now);
let failed_inserts = failed_inserts.into_iter().zip(repeat(now));
self.failed_inserts.write().unwrap().extend(failed_inserts);
}
pub(crate) fn purge_failed_inserts(&self, now: u64) {
if FAILED_INSERTS_RETENTION_MS < now {
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
let mut failed_inserts = self.failed_inserts.write().unwrap();
let outdated = failed_inserts
.iter()
.take_while(|(_, ts)| *ts < cutoff)
.count();
failed_inserts.drain(..outdated);
}
}
pub(crate) fn failed_inserts_size(&self) -> usize {
self.failed_inserts.read().unwrap().len()
}
pub fn build_crds_filters(
&self,
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
bloom_size: usize,
) -> Vec<CrdsFilter> {
const PAR_MIN_LENGTH: usize = 512;
#[cfg(debug_assertions)]
const MIN_NUM_BLOOM_ITEMS: usize = 512;
#[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = 65_536;
let failed_inserts = self.failed_inserts.read().unwrap();
let crds = crds.read().unwrap();
let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(&mut rand::thread_rng(), num_items, bloom_size);
thread_pool.install(|| {
crds.par_values()
.with_min_len(PAR_MIN_LENGTH)
.map(|v| *v.value.hash())
.chain(crds.purged().with_min_len(PAR_MIN_LENGTH))
.chain(
failed_inserts
.par_iter()
.with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v),
)
.for_each(|v| filters.add(v));
});
drop(crds);
drop(failed_inserts);
filters.into()
}
fn filter_crds_values(
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
requests: &[PullRequest],
output_size_limit: usize, now: u64,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
self_shred_version: u16,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let jitter = rand::thread_rng().gen_range(0..msg_timeout / 4);
let caller_wallclock_window =
now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
let dropped_requests = AtomicUsize::default();
let total_skipped = AtomicUsize::default();
let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX);
let output_size_limit = AtomicI64::new(output_size_limit);
let crds = crds.read().unwrap();
let apply_filter = |request: &PullRequest| {
if output_size_limit.load(Ordering::Relaxed) <= 0 {
return Vec::default();
}
let filter = &request.filter;
let caller_wallclock = request.wallclock;
if !caller_wallclock_window.contains(&caller_wallclock) {
dropped_requests.fetch_add(1, Ordering::Relaxed);
return Vec::default();
}
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
let pred = |entry: &&VersionedCrdsValue| {
debug_assert!(filter.test_mask(entry.value.hash()));
if entry.value.wallclock() > caller_wallclock {
total_skipped.fetch_add(1, Ordering::Relaxed);
false
} else {
!filter.filter_contains(entry.value.hash())
&& should_retain_crds_value(&entry.value)
}
};
let out: Vec<_> = crds
.filter_bitmask(filter.mask, filter.mask_bits, self_shred_version)
.filter(pred)
.map(|entry| entry.value.clone())
.take(output_size_limit.load(Ordering::Relaxed).max(0) as usize)
.collect();
output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed);
out
};
let ret: Vec<_> = thread_pool.install(|| requests.par_iter().map(apply_filter).collect());
stats
.filter_crds_values_dropped_requests
.add_relaxed(dropped_requests.into_inner() as u64);
stats
.filter_crds_values_dropped_values
.add_relaxed(total_skipped.into_inner() as u64);
ret
}
pub(crate) fn make_timeouts<'a>(
&self,
self_pubkey: Pubkey,
stakes: &'a HashMap<Pubkey, u64>,
epoch_duration: Duration,
) -> CrdsTimeouts<'a> {
CrdsTimeouts::new(self_pubkey, self.crds_timeout, epoch_duration, stakes)
}
pub(crate) fn purge_active(
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
now: u64,
timeouts: &CrdsTimeouts,
) -> usize {
let mut crds = crds.write().unwrap();
let labels = crds.find_old_labels(thread_pool, now, timeouts);
for label in &labels {
crds.remove(label, now);
}
labels.len()
}
#[cfg(test)]
fn process_pull_response(
&self,
crds: &RwLock<Crds>,
timeouts: &CrdsTimeouts,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize, usize) {
let mut stats = ProcessPullStats::default();
let (versioned, versioned_expired_timeout, failed_inserts) =
self.filter_pull_responses(crds, timeouts, response, now, &mut stats);
self.process_pull_responses(
crds,
versioned,
versioned_expired_timeout,
failed_inserts,
now,
&mut stats,
);
(
stats.failed_timeout + stats.failed_insert,
stats.failed_timeout,
stats.success,
)
}
}
pub struct CrdsTimeouts<'a> {
pubkey: Pubkey,
stakes: &'a HashMap<Pubkey, u64>,
default_timeout: u64,
extended_timeout: u64,
}
impl<'a> CrdsTimeouts<'a> {
pub fn new(
pubkey: Pubkey,
default_timeout: u64,
epoch_duration: Duration,
stakes: &'a HashMap<Pubkey, u64>,
) -> Self {
let extended_timeout = default_timeout.max(epoch_duration.as_millis() as u64);
let default_timeout = if stakes.values().all(|&stake| stake == 0u64) {
extended_timeout
} else {
default_timeout
};
Self {
pubkey,
stakes,
default_timeout,
extended_timeout,
}
}
}
impl Index<&Pubkey> for CrdsTimeouts<'_> {
type Output = u64;
fn index(&self, pubkey: &Pubkey) -> &Self::Output {
if pubkey == &self.pubkey {
&u64::MAX
} else if self.stakes.get(pubkey) > Some(&0u64) {
&self.extended_timeout
} else {
&self.default_timeout
}
}
}
pub(crate) fn get_max_bloom_filter_bytes(caller: &CrdsValue) -> usize {
static MAX_BYTES_CACHE: LazyLock<[u16; PACKET_DATA_SIZE + 1]> = LazyLock::new(|| {
let mut rng = rand::thread_rng();
let mut out = [0u16; PACKET_DATA_SIZE + 1];
for max_bytes in 1..=PACKET_DATA_SIZE {
let filters = CrdsFilterSet::new(&mut rng, 1, max_bytes);
let mut iter = Vec::<CrdsFilter>::from(filters)
.into_iter()
.map(|filter| {
bincode::serialized_size(&filter)
.map(usize::try_from)
.unwrap()
.unwrap()
})
.dedup();
let size_of_filter = iter.next().unwrap();
assert_eq!(iter.next(), None);
out[size_of_filter] = u16::try_from(max_bytes).unwrap();
}
let _ = out.iter_mut().fold(0, |state, entry| {
if *entry == 0 {
*entry = state;
}
*entry
});
out
});
let size_of_filter = PACKET_DATA_SIZE
.checked_sub(
4 + caller.bincode_serialized_size(),
)
.unwrap();
MAX_BYTES_CACHE
.get(size_of_filter)
.copied()
.map(usize::from)
.unwrap()
}
#[cfg(test)]
pub(crate) mod tests {
use {
super::*,
crate::{
crds_data::{CrdsData, Vote},
legacy_contact_info::LegacyContactInfo,
protocol::Protocol,
},
itertools::Itertools,
rand::{seq::SliceRandom, SeedableRng},
rand_chacha::ChaChaRng,
rayon::ThreadPoolBuilder,
solana_hash::HASH_BYTES,
solana_keypair::keypair_from_seed,
solana_packet::PACKET_DATA_SIZE,
solana_perf::test_tx::new_test_vote_tx,
solana_sha256_hasher::hash,
solana_time_utils::timestamp,
std::{
net::{IpAddr, Ipv6Addr},
time::Instant,
},
test_case::test_case,
};
#[cfg(debug_assertions)]
pub(crate) const MIN_NUM_BLOOM_FILTERS: usize = 1;
#[cfg(not(debug_assertions))]
pub(crate) const MIN_NUM_BLOOM_FILTERS: usize = 64;
impl CrdsGossipPull {
#[allow(clippy::too_many_arguments)]
fn old_pull_request(
&self,
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
self_keypair: &Keypair,
self_shred_version: u16,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
bloom_size: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<Vec<(ContactInfo, Vec<CrdsFilter>)>, CrdsGossipError> {
let out = self.new_pull_request(
thread_pool,
crds,
self_keypair,
self_shred_version,
now,
gossip_validators,
stakes,
bloom_size,
ping_cache,
pings,
socket_addr_space,
)?;
let nodes: HashMap<SocketAddr, ContactInfo> = crds
.read()
.unwrap()
.get_nodes_contact_info()
.map(|node| (node.gossip().unwrap(), node.clone()))
.collect();
Ok(out
.into_group_map()
.into_iter()
.map(|(addr, filters)| (nodes.get(&addr).cloned().unwrap(), filters))
.collect())
}
}
fn new_ping_cache() -> PingCache {
PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
Duration::from_secs(20 * 60), Duration::from_secs(20 * 60) / 64, 128, )
}
#[test]
fn test_hash_as_u64() {
let arr: [u8; HASH_BYTES] = std::array::from_fn(|i| i as u8 + 1);
let hash = Hash::new_from_array(arr);
assert_eq!(CrdsFilter::hash_as_u64(&hash), 0x807060504030201);
}
#[test]
fn test_hash_as_u64_random() {
fn hash_as_u64_bitops(hash: &Hash) -> u64 {
let mut out = 0;
for (i, val) in hash.as_ref().iter().enumerate().take(8) {
out |= (u64::from(*val)) << (i * 8) as u64;
}
out
}
for _ in 0..100 {
let hash = Hash::new_unique();
assert_eq!(CrdsFilter::hash_as_u64(&hash), hash_as_u64_bitops(&hash));
}
}
#[test]
fn test_crds_filter_default() {
let filter = CrdsFilter::default();
let mask = CrdsFilter::compute_mask(0, filter.mask_bits);
assert_eq!(filter.mask, mask);
for _ in 0..10 {
let hash = Hash::new_unique();
assert!(filter.test_mask(&hash));
}
}
#[test]
fn test_crds_filter_set_add() {
let mut rng = rand::thread_rng();
let crds_filter_set = CrdsFilterSet::new(
&mut rng, 59672788, 8196,
);
let hash_values: Vec<_> = repeat_with(|| {
let buf: [u8; 32] = rng.gen();
solana_sha256_hasher::hashv(&[&buf])
})
.take(1024)
.collect();
assert_eq!(crds_filter_set.filters.len(), 8192);
for hash_value in &hash_values {
crds_filter_set.add(*hash_value);
}
let filters: Vec<CrdsFilter> = crds_filter_set.into();
let mut num_hits = 0;
assert_eq!(filters.len(), 1024);
for hash_value in hash_values {
let mut hit = false;
let mut false_positives = 0;
for filter in &filters {
if filter.test_mask(&hash_value) {
num_hits += 1;
assert!(!hit);
hit = true;
assert!(filter.contains(&hash_value));
assert!(filter.filter.contains(&hash_value));
} else if filter.filter.contains(&hash_value) {
false_positives += 1;
}
}
assert!(false_positives < 5);
}
assert!(num_hits > 96, "num_hits: {num_hits}");
}
#[test]
fn test_crds_filter_set_new() {
let filters = CrdsFilterSet::new(
&mut rand::thread_rng(),
55345017, 4098, );
assert_eq!(filters.filters.len(), 16384);
let filters = Vec::<CrdsFilter>::from(filters);
assert_eq!(filters.len(), 1024);
let mask_bits = filters[0].mask_bits;
let right_shift = 64 - mask_bits;
let ones = !0u64 >> mask_bits;
for filter in &filters {
assert_eq!(mask_bits, filter.mask_bits);
assert!((0..16384).contains(&(filter.mask >> right_shift)));
assert_eq!(ones, ones & filter.mask);
}
}
#[test]
fn test_build_crds_filter() {
const SEED: [u8; 32] = [0x55; 32];
let mut rng = ChaChaRng::from_seed(SEED);
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let crds_gossip_pull = CrdsGossipPull::default();
let mut crds = Crds::default();
let keypairs: Vec<_> = repeat_with(|| {
let mut seed = [0u8; Keypair::SECRET_KEY_LENGTH];
rng.fill(&mut seed[..]);
keypair_from_seed(&seed).unwrap()
})
.take(10_000)
.collect();
let mut num_inserts = 0;
for _ in 0..40_000 {
let keypair = keypairs.choose(&mut rng).unwrap();
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
if crds
.insert(value, rng.gen(), GossipRoute::LocalMessage)
.is_ok()
{
num_inserts += 1;
}
}
let crds = RwLock::new(crds);
assert!(num_inserts > 30_000, "num inserts: {num_inserts}");
let filters = crds_gossip_pull.build_crds_filters(
&thread_pool,
&crds,
992, );
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4));
let crds = crds.read().unwrap();
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
let hash_values: Vec<_> = crds
.values()
.map(|v| *v.value.hash())
.chain(purged)
.collect();
assert!(
hash_values.len() >= 40_000 - 5,
"hash_values.len(): {}",
hash_values.len()
);
let mut num_hits = 0;
let mut false_positives = 0;
for hash_value in hash_values {
let mut hit = false;
for filter in &filters {
if filter.test_mask(&hash_value) {
num_hits += 1;
assert!(!hit);
hit = true;
assert!(filter.contains(&hash_value));
assert!(filter.filter.contains(&hash_value));
} else if filter.filter.contains(&hash_value) {
false_positives += 1;
}
}
}
assert!(num_hits > 4000, "num_hits: {num_hits}");
assert!(false_positives < 20_000, "fp: {false_positives}");
}
#[test]
fn test_new_pull_request() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let crds = RwLock::<Crds>::default();
let node_keypair = Keypair::new();
let entry = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&node_keypair.pubkey(),
0,
)));
let node = CrdsGossipPull::default();
let mut pings = Vec::new();
let ping_cache = Mutex::new(new_ping_cache());
assert_eq!(
node.old_pull_request(
&thread_pool,
&crds,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
),
Err(CrdsGossipError::NoPeers)
);
crds.write()
.unwrap()
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
assert_eq!(
node.old_pull_request(
&thread_pool,
&crds,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
),
Err(CrdsGossipError::NoPeers)
);
let now = 1625029781069;
let mut new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now);
new.set_gossip(([127, 0, 0, 1], 8020)).unwrap();
ping_cache
.lock()
.unwrap()
.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::from(new));
crds.write()
.unwrap()
.insert(new.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let req = node.old_pull_request(
&thread_pool,
&crds,
&node_keypair,
0,
now,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
);
let peers: Vec<_> = req.unwrap().into_iter().map(|(node, _)| node).collect();
assert_eq!(peers, vec![new.contact_info().unwrap().clone()]);
let mut offline = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now);
offline.set_gossip(([127, 0, 0, 1], 8021)).unwrap();
let offline = CrdsValue::new_unsigned(CrdsData::from(offline));
crds.write()
.unwrap()
.insert(offline, now, GossipRoute::LocalMessage)
.unwrap();
let req = node.old_pull_request(
&thread_pool,
&crds,
&node_keypair,
0,
now,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
);
let peers: Vec<_> = req.unwrap().into_iter().map(|(node, _)| node).collect();
assert_eq!(peers, vec![new.contact_info().unwrap().clone()]);
}
#[test]
fn test_new_mark_creation_time() {
let now: u64 = 1_605_127_770_789;
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut ping_cache = new_ping_cache();
let mut crds = Crds::default();
let node_keypair = Keypair::new();
let entry = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&node_keypair.pubkey(),
0,
)));
let node = CrdsGossipPull::default();
crds.insert(entry, now, GossipRoute::LocalMessage).unwrap();
let mut old = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0);
old.set_gossip(([127, 0, 0, 1], 8020)).unwrap();
ping_cache.mock_pong(*old.pubkey(), old.gossip().unwrap(), Instant::now());
let old = CrdsValue::new_unsigned(CrdsData::from(old));
crds.insert(old.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let mut new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0);
new.set_gossip(([127, 0, 0, 1], 8021)).unwrap();
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::from(new));
crds.insert(new, now, GossipRoute::LocalMessage).unwrap();
let crds = RwLock::new(crds);
let now = now + 50_000;
let now = now + 1_000;
let mut pings = Vec::new();
let ping_cache = Mutex::new(ping_cache);
let old = old.contact_info().unwrap();
let count = repeat_with(|| {
let requests = node
.old_pull_request(
&thread_pool,
&crds,
&node_keypair,
0, now,
None, &HashMap::new(), PACKET_DATA_SIZE, &ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
)
.unwrap();
requests.into_iter().map(|(node, _)| node)
})
.flatten()
.take(100)
.filter(|peer| peer != old)
.count();
assert!(count < 75, "count of peer != old: {count}");
}
#[test]
fn test_generate_pull_responses() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let mut ping_cache = new_ping_cache();
let now = timestamp();
let entry = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&node_keypair.pubkey(),
now,
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds
.insert(entry, now, GossipRoute::LocalMessage)
.unwrap();
let new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::from(new));
node_crds
.insert(new, now, GossipRoute::LocalMessage)
.unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new();
let req = node.old_pull_request(
&thread_pool,
&node_crds,
&node_keypair,
0, now,
None, &HashMap::new(), PACKET_DATA_SIZE, &Mutex::new(ping_cache),
&mut pings,
&SocketAddrSpace::Unspecified,
);
let dest_crds = RwLock::<Crds>::default();
let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters);
let mut requests: Vec<_> = filters
.map(|filter| PullRequest {
pubkey: caller.pubkey(),
addr: SocketAddr::from(([0; 4], 0)),
wallclock: caller.wallclock(),
filter,
})
.collect();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&requests,
usize::MAX, now,
|_| true, 0, &GossipStats::default(),
);
assert_eq!(rsp[0].len(), 0);
let now = now + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let new = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&solana_pubkey::new_rand(),
now,
)));
dest_crds
.write()
.unwrap()
.insert(new, now, GossipRoute::LocalMessage)
.unwrap();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&requests,
usize::MAX, now,
|_| true, 0, &GossipStats::default(),
);
assert_eq!(rsp[0].len(), 0);
assert_eq!(requests.len(), MIN_NUM_BLOOM_FILTERS);
requests.extend({
let now = now + 1;
let caller = ContactInfo::new_localhost(&Pubkey::new_unique(), now);
let caller = CrdsValue::new_unsigned(CrdsData::from(caller));
requests
.iter()
.map(|PullRequest { filter, .. }| PullRequest {
pubkey: caller.pubkey(),
addr: SocketAddr::from(([0; 4], 0)),
wallclock: caller.wallclock(),
filter: filter.clone(),
})
.collect::<Vec<_>>()
});
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&requests,
usize::MAX, now,
|_| true, 0, &GossipStats::default(),
);
assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS);
assert!(rsp.iter().take(MIN_NUM_BLOOM_FILTERS).all(|r| r.is_empty()));
assert_eq!(rsp.iter().filter(|r| r.is_empty()).count(), rsp.len() - 1);
assert_eq!(rsp.iter().find(|r| r.len() == 1).unwrap().len(), 1);
}
#[test]
fn test_process_pull_request_response() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&node_keypair.pubkey(),
1,
)));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = new_ping_cache();
let new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 1);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::from(new));
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let mut dest_crds = Crds::default();
let new_id = solana_pubkey::new_rand();
let same_key = ContactInfo::new_localhost(&new_id, 0);
let new = ContactInfo::new_localhost(&new_id, 1);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::from(new));
dest_crds
.insert(new.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let dest_crds = RwLock::new(dest_crds);
ping_cache.mock_pong(
*same_key.pubkey(),
same_key.gossip().unwrap(),
Instant::now(),
);
let same_key = CrdsValue::new_unsigned(CrdsData::from(same_key));
assert_eq!(same_key.label(), new.label());
assert!(same_key.wallclock() < new.wallclock());
node_crds
.insert(same_key.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
assert_eq!(0, {
let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
entry.local_timestamp
});
let node_crds = RwLock::new(node_crds);
let mut done = false;
let mut pings = Vec::new();
let ping_cache = Mutex::new(ping_cache);
for _ in 0..30 {
let req = node.old_pull_request(
&thread_pool,
&node_crds,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
&SocketAddrSpace::Unspecified,
);
let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters);
let requests: Vec<_> = filters
.map(|filter| PullRequest {
pubkey: caller.pubkey(),
addr: SocketAddr::from(([0; 4], 0)),
wallclock: caller.wallclock(),
filter,
})
.collect();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&requests,
usize::MAX, 0, |_| true, 0, &GossipStats::default(),
);
if rsp.is_empty() {
continue;
}
if rsp.is_empty() {
continue;
}
assert_eq!(rsp.len(), MIN_NUM_BLOOM_FILTERS);
let failed = node
.process_pull_response(
&node_crds,
&node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()),
rsp.into_iter().flatten().collect(),
1,
)
.0;
assert_eq!(failed, 0);
assert_eq!(1, {
let node_crds = node_crds.read().unwrap();
let entry: &VersionedCrdsValue = node_crds.get(&new.label()).unwrap();
entry.local_timestamp
});
assert_eq!(1, {
let node_crds = node_crds.read().unwrap();
let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
entry.local_timestamp
});
done = true;
break;
}
assert!(done);
}
#[test]
fn test_gossip_purge() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&solana_pubkey::new_rand(),
0,
)));
let node_label = entry.label();
let node_pubkey = node_label.pubkey();
let node = CrdsGossipPull::default();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let old = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
&solana_pubkey::new_rand(),
0,
)));
node_crds
.insert(old.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let value_hash = *node_crds.get::<&CrdsValue>(&old.label()).unwrap().hash();
assert_eq!(
node_crds.get::<&CrdsValue>(&node_label).unwrap().label(),
node_label
);
let node_crds = RwLock::new(node_crds);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = node.make_timeouts(node_pubkey, &stakes, Duration::default());
CrdsGossipPull::purge_active(&thread_pool, &node_crds, node.crds_timeout, &timeouts);
assert_eq!(node_label, {
let node_crds = node_crds.read().unwrap();
node_crds.get::<&CrdsValue>(&node_label).unwrap().label()
});
assert_eq!(
node_crds.read().unwrap().get::<&CrdsValue>(&old.label()),
None
);
assert_eq!(node_crds.read().unwrap().num_purged(), 1);
for _ in 0..30 {
let filters = node.build_crds_filters(&thread_pool, &node_crds, PACKET_DATA_SIZE);
assert!(filters.iter().any(|filter| filter.contains(&value_hash)));
}
let mut node_crds = node_crds.write().unwrap();
node_crds.trim_purged(node.crds_timeout + 1);
assert_eq!(node_crds.num_purged(), 0);
}
#[test]
fn test_crds_filter_mask() {
let filter = CrdsFilter::new_rand(1, 128);
assert_eq!(filter.mask, !0x0);
assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64);
assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32);
let filter = CrdsFilter::new_rand(1000, 10);
assert_eq!(filter.mask & 0x00_ffff_ffff, 0x00_ffff_ffff);
}
#[test]
fn test_crds_filter_add_no_mask() {
let mut filter = CrdsFilter::new_rand(1, 128);
let h: Hash = hash(Hash::default().as_ref());
assert!(!filter.contains(&h));
filter.add(&h);
assert!(filter.contains(&h));
let h: Hash = hash(h.as_ref());
assert!(!filter.contains(&h));
}
#[test]
fn test_crds_filter_add_mask() {
let mut filter = CrdsFilter::new_rand(1000, 10);
let mut h: Hash = Hash::default();
while !filter.test_mask(&h) {
h = hash(h.as_ref());
}
assert!(filter.test_mask(&h));
assert!(!filter.contains(&h));
filter.add(&h);
assert!(filter.contains(&h));
}
#[test]
fn test_crds_filter_complete_set_add_mask() {
let mut filters =
Vec::<CrdsFilter>::from(CrdsFilterSet::new(&mut rand::thread_rng(), 1000, 10));
assert!(filters.iter().all(|f| f.mask_bits > 0));
let mut h: Hash = Hash::default();
while !filters.iter().rev().any(|f| f.test_mask(&h)) {
h = hash(h.as_ref());
}
let filter = filters.iter_mut().find(|f| f.test_mask(&h)).unwrap();
assert!(filter.test_mask(&h));
assert!(!filter.contains(&h));
filter.add(&h);
assert!(filter.contains(&h));
}
#[test]
fn test_crds_filter_contains_mask() {
let filter = CrdsFilter::new_rand(1000, 10);
assert!(filter.mask_bits > 0);
let mut h: Hash = Hash::default();
while filter.test_mask(&h) {
h = hash(h.as_ref());
}
assert!(!filter.test_mask(&h));
assert!(filter.contains(&h));
}
#[test]
fn test_mask() {
for i in 0..16 {
run_test_mask(i);
}
}
fn run_test_mask(mask_bits: u32) {
assert_eq!(
(0..2u64.pow(mask_bits))
.map(|seed| CrdsFilter::compute_mask(seed, mask_bits))
.dedup()
.count(),
2u64.pow(mask_bits) as usize
)
}
#[test]
fn test_process_pull_response() {
let mut rng = rand::thread_rng();
let node_crds = RwLock::<Crds>::default();
let node = CrdsGossipPull::default();
let peer_pubkey = solana_pubkey::new_rand();
let peer_entry =
CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(&peer_pubkey, 0)));
let stakes = HashMap::from([(peer_pubkey, 1u64)]);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
node.crds_timeout, Duration::from_millis(node.crds_timeout + 1),
&stakes,
);
assert_eq!(
node.process_pull_response(&node_crds, &timeouts, vec![peer_entry.clone()], 1,)
.0,
0
);
let node_crds = RwLock::<Crds>::default();
let unstaked_peer_entry =
CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(&peer_pubkey, 0)));
assert_eq!(
node.process_pull_response(
&node_crds,
&timeouts,
vec![peer_entry.clone(), unstaked_peer_entry],
node.crds_timeout + 100,
)
.0,
4
);
let node_crds = RwLock::<Crds>::default();
assert_eq!(
node.process_pull_response(
&node_crds,
&timeouts,
vec![peer_entry],
node.crds_timeout + 1,
)
.0,
0
);
let peer_vote = Vote::new(peer_pubkey, new_test_vote_tx(&mut rng), 0).unwrap();
let peer_vote = CrdsValue::new_unsigned(CrdsData::Vote(0, peer_vote));
assert_eq!(
node.process_pull_response(
&node_crds,
&timeouts,
vec![peer_vote.clone()],
node.crds_timeout + 1,
)
.0,
0
);
let node_crds = RwLock::<Crds>::default();
assert_eq!(
node.process_pull_response(
&node_crds,
&timeouts,
vec![peer_vote],
node.crds_timeout + 2,
)
.0,
2
);
}
fn verify_get_max_bloom_filter_bytes<R: Rng>(
rng: &mut R,
caller: &CrdsValue,
num_items: usize,
) {
let packet_data_size_range = (PACKET_DATA_SIZE - 7)..=PACKET_DATA_SIZE;
let max_bytes = get_max_bloom_filter_bytes(caller);
let filters = CrdsFilterSet::new(rng, num_items, max_bytes);
let request_bytes = caller.bincode_serialized_size() as u64;
for filter in Vec::<CrdsFilter>::from(filters) {
let request_bytes = 4 + request_bytes + bincode::serialized_size(&filter).unwrap();
let request = Protocol::PullRequest(filter, caller.clone());
let request = bincode::serialize(&request).unwrap();
assert!(packet_data_size_range.contains(&request.len()));
assert_eq!(request.len() as u64, request_bytes);
}
}
#[test_case(1)]
#[test_case(7)]
#[test_case(81)]
#[test_case(329)]
#[test_case(5691)]
#[test_case(85689)]
#[test_case(645043)]
#[test_case(3873238)]
fn test_get_max_bloom_filter_bytes(num_items: usize) {
let mut rng = rand::thread_rng();
let keypair = Keypair::new();
let node = {
let mut node =
ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
node.set_shred_version(rng.gen());
node
};
{
let caller = CrdsValue::new(CrdsData::from(&node), &keypair);
assert_eq!(get_max_bloom_filter_bytes(&caller), 1175);
verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items);
}
let node = LegacyContactInfo::try_from(&node).unwrap();
{
let caller = CrdsValue::new(CrdsData::LegacyContactInfo(node), &keypair);
assert_eq!(get_max_bloom_filter_bytes(&caller), 1136);
verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items);
}
let node = {
let addr = Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888);
let socket = SocketAddr::new(IpAddr::from(addr), 8053);
let mut node = ContactInfo::new_with_socketaddr(&keypair.pubkey(), &socket);
node.set_shred_version(rng.gen());
node
};
{
let caller = CrdsValue::new(CrdsData::from(&node), &keypair);
assert_eq!(get_max_bloom_filter_bytes(&caller), 1155);
verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items);
}
let node = LegacyContactInfo::try_from(&node).unwrap();
{
let caller = CrdsValue::new(CrdsData::LegacyContactInfo(node), &keypair);
assert_eq!(get_max_bloom_filter_bytes(&caller), 992);
verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items);
}
}
#[test]
fn test_lsb_mask() {
assert_eq!(CrdsFilter::lsb_mask(0), !0u64);
assert_eq!(CrdsFilter::lsb_mask(1), !0u64 >> 1);
assert_eq!(CrdsFilter::lsb_mask(4), !0u64 >> 4);
assert_eq!(CrdsFilter::lsb_mask(64), 0);
assert_eq!(CrdsFilter::lsb_mask(65), 0);
}
#[test]
fn test_canonical_mask_normalizes_low_bits() {
let mask_bits = 8;
let lsb = CrdsFilter::lsb_mask(mask_bits);
let prefix: u64 = 0b1010_1100;
let high = prefix << (64 - mask_bits);
let garbage_low = 0x1234_5678_u64;
let raw_mask = high | garbage_low;
let canonical = CrdsFilter::canonical_mask(raw_mask, mask_bits);
assert_eq!(canonical >> (64 - mask_bits), prefix);
assert_eq!(canonical & lsb, lsb);
}
#[test]
fn test_hash_matches_mask_prefix_positive_and_negative() {
let mask_bits = 4;
let lsb = CrdsFilter::lsb_mask(mask_bits);
let prefix: u64 = 0b1010;
let high = prefix << (64 - mask_bits);
let canonical_mask = CrdsFilter::canonical_mask(high, mask_bits);
let hash_match: u64 = high | 0x1234;
assert!(CrdsFilter::hash_matches_mask_prefix(
canonical_mask,
mask_bits,
hash_match
));
let other_prefix: u64 = 0b1011;
let other_high = other_prefix << (64 - mask_bits);
let hash_nomatch: u64 = other_high | 0x1234;
assert!(!CrdsFilter::hash_matches_mask_prefix(
canonical_mask,
mask_bits,
hash_nomatch
));
let malformed_mask = canonical_mask & !lsb;
assert!(CrdsFilter::hash_matches_mask_prefix(
malformed_mask,
mask_bits,
hash_match
));
assert!(!CrdsFilter::hash_matches_mask_prefix(
malformed_mask,
mask_bits,
hash_nomatch
));
}
#[test]
fn test_mask_prefix_matching_with_malformed_mask() {
let mask_bits = 5;
let lsb = CrdsFilter::lsb_mask(mask_bits);
let prefix: u64 = 0b1_0011;
let high = prefix << (64 - mask_bits);
let canonical_mask = CrdsFilter::canonical_mask(high, mask_bits);
let mut filter = CrdsFilter {
mask_bits,
mask: canonical_mask,
..Default::default()
};
let h_u64 = high | 0x55u64; let mut arr = [0u8; HASH_BYTES];
arr[..8].copy_from_slice(&h_u64.to_le_bytes());
let hash = arr.into();
assert!(filter.test_mask(&hash));
let bad_u64 = h_u64 ^ (1u64 << (64 - mask_bits)); let mut bad_arr = [0u8; HASH_BYTES];
bad_arr[..8].copy_from_slice(&bad_u64.to_le_bytes());
let bad_hash = bad_arr.into();
assert!(!filter.test_mask(&bad_hash));
filter.mask = canonical_mask & !lsb;
assert!(filter.test_mask(&hash));
assert!(!filter.test_mask(&bad_hash));
}
}