use std::collections::{HashMap, HashSet};
use super::BloomFilter;
use crate::NodeAddr;
#[derive(Clone, Debug)]
pub struct BloomState {
own_node_addr: NodeAddr,
leaf_dependents: HashSet<NodeAddr>,
is_leaf_only: bool,
update_debounce_ms: u64,
last_update_sent: HashMap<NodeAddr, u64>,
pending_updates: HashSet<NodeAddr>,
sequence: u64,
last_sent_filters: HashMap<NodeAddr, BloomFilter>,
}
impl BloomState {
pub fn new(own_node_addr: NodeAddr) -> Self {
Self {
own_node_addr,
leaf_dependents: HashSet::new(),
is_leaf_only: false,
update_debounce_ms: 500,
last_update_sent: HashMap::new(),
pending_updates: HashSet::new(),
sequence: 0,
last_sent_filters: HashMap::new(),
}
}
pub fn leaf_only(own_node_addr: NodeAddr) -> Self {
let mut state = Self::new(own_node_addr);
state.is_leaf_only = true;
state
}
pub fn own_node_addr(&self) -> &NodeAddr {
&self.own_node_addr
}
pub fn is_leaf_only(&self) -> bool {
self.is_leaf_only
}
pub fn sequence(&self) -> u64 {
self.sequence
}
pub fn next_sequence(&mut self) -> u64 {
self.sequence += 1;
self.sequence
}
pub fn update_debounce_ms(&self) -> u64 {
self.update_debounce_ms
}
pub fn set_update_debounce_ms(&mut self, ms: u64) {
self.update_debounce_ms = ms;
}
pub fn add_leaf_dependent(&mut self, node_addr: NodeAddr) {
self.leaf_dependents.insert(node_addr);
}
pub fn remove_leaf_dependent(&mut self, node_addr: &NodeAddr) -> bool {
self.leaf_dependents.remove(node_addr)
}
pub fn leaf_dependents(&self) -> &HashSet<NodeAddr> {
&self.leaf_dependents
}
pub fn leaf_dependent_count(&self) -> usize {
self.leaf_dependents.len()
}
pub fn mark_update_needed(&mut self, peer_id: NodeAddr) {
self.pending_updates.insert(peer_id);
}
pub fn mark_all_updates_needed(&mut self, peer_ids: impl IntoIterator<Item = NodeAddr>) {
self.pending_updates.extend(peer_ids);
}
pub fn needs_update(&self, peer_id: &NodeAddr) -> bool {
self.pending_updates.contains(peer_id)
}
pub fn should_send_update(&self, peer_id: &NodeAddr, current_time_ms: u64) -> bool {
if !self.pending_updates.contains(peer_id) {
return false;
}
match self.last_update_sent.get(peer_id) {
Some(&last_time) => current_time_ms >= last_time + self.update_debounce_ms,
None => true,
}
}
pub fn record_update_sent(&mut self, peer_id: NodeAddr, current_time_ms: u64) {
self.last_update_sent.insert(peer_id, current_time_ms);
self.pending_updates.remove(&peer_id);
}
pub fn clear_pending_updates(&mut self) {
self.pending_updates.clear();
}
pub fn record_sent_filter(&mut self, peer_id: NodeAddr, filter: BloomFilter) {
self.last_sent_filters.insert(peer_id, filter);
}
pub fn remove_peer_state(&mut self, peer_id: &NodeAddr) {
self.last_sent_filters.remove(peer_id);
self.last_update_sent.remove(peer_id);
self.pending_updates.remove(peer_id);
}
pub fn mark_changed_peers(
&mut self,
exclude_from: &NodeAddr,
peer_addrs: &[NodeAddr],
peer_filters: &HashMap<NodeAddr, BloomFilter>,
) {
for peer_addr in peer_addrs {
if peer_addr == exclude_from {
continue;
}
let new_filter = self.compute_outgoing_filter(peer_addr, peer_filters);
let changed = match self.last_sent_filters.get(peer_addr) {
Some(last) => *last != new_filter,
None => true, };
if changed {
self.pending_updates.insert(*peer_addr);
}
}
}
pub fn compute_outgoing_filter(
&self,
exclude_peer: &NodeAddr,
peer_filters: &HashMap<NodeAddr, BloomFilter>,
) -> BloomFilter {
let mut filter = BloomFilter::new();
filter.insert(&self.own_node_addr);
for dep in &self.leaf_dependents {
filter.insert(dep);
}
for (peer_id, peer_filter) in peer_filters {
if peer_id != exclude_peer {
let _ = filter.merge(peer_filter);
}
}
filter
}
pub fn base_filter(&self) -> BloomFilter {
let mut filter = BloomFilter::new();
filter.insert(&self.own_node_addr);
for dep in &self.leaf_dependents {
filter.insert(dep);
}
filter
}
}