use crate::NodeAddr;
use crate::bloom::BloomFilter;
use crate::protocol::FilterAnnounce;
use super::{Node, NodeError};
use std::collections::HashMap;
use tracing::{debug, warn};
impl Node {
pub(super) fn peer_inbound_filters(&self) -> HashMap<NodeAddr, BloomFilter> {
let mut filters = HashMap::new();
for (addr, peer) in &self.peers {
if self.is_tree_peer(addr)
&& let Some(filter) = peer.inbound_filter()
{
filters.insert(*addr, filter.clone());
}
}
filters
}
fn build_filter_announce(&mut self, exclude_peer: &NodeAddr) -> FilterAnnounce {
let peer_filters = self.peer_inbound_filters();
let filter = self
.bloom_state
.compute_outgoing_filter(exclude_peer, &peer_filters);
let sequence = self.bloom_state.next_sequence();
FilterAnnounce::new(filter, sequence)
}
pub(super) async fn send_filter_announce_to_peer(
&mut self,
peer_addr: &NodeAddr,
) -> Result<(), NodeError> {
let now_ms = Self::now_ms();
if !self.bloom_state.should_send_update(peer_addr, now_ms) {
self.stats_mut().bloom.debounce_suppressed += 1;
return Ok(());
}
let announce = self.build_filter_announce(peer_addr);
let sent_filter = announce.filter.clone();
let encoded = announce.encode().map_err(|e| NodeError::SendFailed {
node_addr: *peer_addr,
reason: format!("FilterAnnounce encode failed: {}", e),
})?;
if let Err(e) = self.send_encrypted_link_message(peer_addr, &encoded).await {
self.stats_mut().bloom.send_failed += 1;
return Err(e);
}
self.stats_mut().bloom.sent += 1;
let max_fpr = self.config.node.bloom.max_inbound_fpr;
let out_fill = sent_filter.fill_ratio();
let out_fpr = out_fill.powi(sent_filter.hash_count() as i32);
if out_fpr > max_fpr {
let now = std::time::Instant::now();
let should_warn = self
.last_self_warn
.map(|t| now.duration_since(t) >= std::time::Duration::from_secs(60))
.unwrap_or(true);
if should_warn {
self.last_self_warn = Some(now);
warn!(
to = %self.peer_display_name(peer_addr),
fill = format_args!("{:.3}", out_fill),
fpr = format_args!("{:.4}", out_fpr),
cap = format_args!("{:.4}", max_fpr),
"Outgoing filter above FPR cap — aggregation drift or missed ingress?"
);
}
}
debug!(
peer = %self.peer_display_name(peer_addr),
seq = announce.sequence,
est_entries = match sent_filter.estimated_count(max_fpr) {
Some(n) => format!("{:.0}", n),
None => "—".to_string(),
},
set_bits = sent_filter.count_ones(),
fill = format_args!("{:.1}%", sent_filter.fill_ratio() * 100.0),
tree_peer = self.is_tree_peer(peer_addr),
"Sent FilterAnnounce"
);
self.bloom_state.record_update_sent(*peer_addr, now_ms);
self.bloom_state.record_sent_filter(*peer_addr, sent_filter);
if let Some(peer) = self.peers.get_mut(peer_addr) {
peer.clear_filter_update_needed();
}
Ok(())
}
pub(super) async fn send_pending_filter_announces(&mut self) {
let now_ms = Self::now_ms();
let ready: Vec<NodeAddr> = self
.peers
.keys()
.filter(|addr| self.bloom_state.should_send_update(addr, now_ms))
.copied()
.collect();
for peer_addr in ready {
if let Err(e) = self.send_filter_announce_to_peer(&peer_addr).await {
debug!(
peer = %self.peer_display_name(&peer_addr),
error = %e,
"Failed to send pending FilterAnnounce"
);
}
}
}
pub(super) async fn handle_filter_announce(&mut self, from: &NodeAddr, payload: &[u8]) {
self.stats_mut().bloom.received += 1;
let announce = match FilterAnnounce::decode(payload) {
Ok(a) => a,
Err(e) => {
self.stats_mut().bloom.decode_error += 1;
debug!(from = %self.peer_display_name(from), error = %e, "Malformed FilterAnnounce");
return;
}
};
if !announce.is_valid() {
self.stats_mut().bloom.invalid += 1;
debug!(from = %self.peer_display_name(from), "FilterAnnounce filter/size_class mismatch");
return;
}
if !announce.is_v1_compliant() {
self.stats_mut().bloom.non_v1 += 1;
debug!(from = %self.peer_display_name(from), size_class = announce.size_class, "Non-v1 FilterAnnounce rejected");
return;
}
let current_seq = match self.peers.get(from) {
Some(peer) => peer.filter_sequence(),
None => {
self.stats_mut().bloom.unknown_peer += 1;
debug!(from = %self.peer_display_name(from), "FilterAnnounce from unknown peer");
return;
}
};
if announce.sequence <= current_seq {
self.stats_mut().bloom.stale += 1;
debug!(
from = %self.peer_display_name(from),
received_seq = announce.sequence,
current_seq = current_seq,
"Stale FilterAnnounce rejected"
);
return;
}
let max_fpr = self.config.node.bloom.max_inbound_fpr;
let fill = announce.filter.fill_ratio();
let fpr = fill.powi(announce.filter.hash_count() as i32);
if fpr > max_fpr {
self.stats_mut().bloom.fill_exceeded += 1;
warn!(
from = %self.peer_display_name(from),
seq = announce.sequence,
fill = format_args!("{:.3}", fill),
fpr = format_args!("{:.4}", fpr),
cap = format_args!("{:.4}", max_fpr),
"FilterAnnounce above FPR cap — rejected"
);
return;
}
self.stats_mut().bloom.accepted += 1;
let now_ms = Self::now_ms();
debug!(
from = %self.peer_display_name(from),
seq = announce.sequence,
est_entries = match announce.filter.estimated_count(max_fpr) {
Some(n) => format!("{:.0}", n),
None => "—".to_string(),
},
set_bits = announce.filter.count_ones(),
fill = format_args!("{:.1}%", announce.filter.fill_ratio() * 100.0),
tree_peer = self.is_tree_peer(from),
"Received FilterAnnounce"
);
if let Some(peer) = self.peers.get_mut(from) {
peer.update_filter(announce.filter, announce.sequence, now_ms);
}
let peer_addrs: Vec<NodeAddr> = self.peers.keys().copied().collect();
let peer_filters = self.peer_inbound_filters();
self.bloom_state
.mark_changed_peers(from, &peer_addrs, &peer_filters);
}
pub(super) async fn check_bloom_state(&mut self) {
self.send_pending_filter_announces().await;
}
}