Skip to main content

fips_core/bloom/
state.rs

1//! FIPS-specific Bloom filter announcement state management.
2
3use std::collections::{HashMap, HashSet};
4
5use super::BloomFilter;
6use crate::NodeAddr;
7
8/// State for managing Bloom filter announcements.
9///
10/// Tracks local filter state and what needs to be sent to peers.
11#[derive(Clone, Debug)]
12pub struct BloomState {
13    /// This node's NodeAddr (always included in outgoing filters).
14    own_node_addr: NodeAddr,
15    /// Leaf-only nodes we speak for (included in our filter).
16    leaf_dependents: HashSet<NodeAddr>,
17    /// Whether this node operates in leaf-only mode.
18    is_leaf_only: bool,
19    /// Rate limiting: minimum interval between outgoing updates (milliseconds).
20    update_debounce_ms: u64,
21    /// Timestamp of last update sent (per peer, in milliseconds).
22    last_update_sent: HashMap<NodeAddr, u64>,
23    /// Peers that need a filter update.
24    pending_updates: HashSet<NodeAddr>,
25    /// Current sequence number for outgoing filters.
26    sequence: u64,
27    /// Last outgoing filter sent to each peer (for change detection).
28    last_sent_filters: HashMap<NodeAddr, BloomFilter>,
29}
30
31impl BloomState {
32    /// Create new Bloom state for a node.
33    pub fn new(own_node_addr: NodeAddr) -> Self {
34        Self {
35            own_node_addr,
36            leaf_dependents: HashSet::new(),
37            is_leaf_only: false,
38            update_debounce_ms: 500,
39            last_update_sent: HashMap::new(),
40            pending_updates: HashSet::new(),
41            sequence: 0,
42            last_sent_filters: HashMap::new(),
43        }
44    }
45
46    /// Create state for a leaf-only node.
47    pub fn leaf_only(own_node_addr: NodeAddr) -> Self {
48        let mut state = Self::new(own_node_addr);
49        state.is_leaf_only = true;
50        state
51    }
52
53    /// Get the node's own ID.
54    pub fn own_node_addr(&self) -> &NodeAddr {
55        &self.own_node_addr
56    }
57
58    /// Check if this is a leaf-only node.
59    pub fn is_leaf_only(&self) -> bool {
60        self.is_leaf_only
61    }
62
63    /// Get the current sequence number.
64    pub fn sequence(&self) -> u64 {
65        self.sequence
66    }
67
68    /// Increment and return the next sequence number.
69    pub fn next_sequence(&mut self) -> u64 {
70        self.sequence += 1;
71        self.sequence
72    }
73
74    /// Get the update debounce interval in milliseconds.
75    pub fn update_debounce_ms(&self) -> u64 {
76        self.update_debounce_ms
77    }
78
79    /// Set the update debounce interval.
80    pub fn set_update_debounce_ms(&mut self, ms: u64) {
81        self.update_debounce_ms = ms;
82    }
83
84    /// Add a leaf dependent that we'll include in our filter.
85    pub fn add_leaf_dependent(&mut self, node_addr: NodeAddr) {
86        self.leaf_dependents.insert(node_addr);
87    }
88
89    /// Remove a leaf dependent.
90    pub fn remove_leaf_dependent(&mut self, node_addr: &NodeAddr) -> bool {
91        self.leaf_dependents.remove(node_addr)
92    }
93
94    /// Get the set of leaf dependents.
95    pub fn leaf_dependents(&self) -> &HashSet<NodeAddr> {
96        &self.leaf_dependents
97    }
98
99    /// Number of leaf dependents.
100    pub fn leaf_dependent_count(&self) -> usize {
101        self.leaf_dependents.len()
102    }
103
104    /// Mark that a peer needs an update.
105    pub fn mark_update_needed(&mut self, peer_id: NodeAddr) {
106        self.pending_updates.insert(peer_id);
107    }
108
109    /// Mark all peers as needing updates.
110    pub fn mark_all_updates_needed(&mut self, peer_ids: impl IntoIterator<Item = NodeAddr>) {
111        self.pending_updates.extend(peer_ids);
112    }
113
114    /// Check if a peer needs an update.
115    pub fn needs_update(&self, peer_id: &NodeAddr) -> bool {
116        self.pending_updates.contains(peer_id)
117    }
118
119    /// Check if we should send an update to a peer (respecting debounce).
120    pub fn should_send_update(&self, peer_id: &NodeAddr, current_time_ms: u64) -> bool {
121        if !self.pending_updates.contains(peer_id) {
122            return false;
123        }
124
125        match self.last_update_sent.get(peer_id) {
126            Some(&last_time) => current_time_ms >= last_time + self.update_debounce_ms,
127            None => true,
128        }
129    }
130
131    /// Record that we sent an update to a peer.
132    pub fn record_update_sent(&mut self, peer_id: NodeAddr, current_time_ms: u64) {
133        self.last_update_sent.insert(peer_id, current_time_ms);
134        self.pending_updates.remove(&peer_id);
135    }
136
137    /// Clear all pending updates.
138    pub fn clear_pending_updates(&mut self) {
139        self.pending_updates.clear();
140    }
141
142    /// Record the outgoing filter that was sent to a peer.
143    pub fn record_sent_filter(&mut self, peer_id: NodeAddr, filter: BloomFilter) {
144        self.last_sent_filters.insert(peer_id, filter);
145    }
146
147    /// Remove stored filter state for a peer that was removed.
148    pub fn remove_peer_state(&mut self, peer_id: &NodeAddr) {
149        self.last_sent_filters.remove(peer_id);
150        self.last_update_sent.remove(peer_id);
151        self.pending_updates.remove(peer_id);
152    }
153
154    /// Mark only peers whose outgoing filter has actually changed.
155    ///
156    /// Computes the outgoing filter for each peer and compares it
157    /// against what was last sent. Only marks peers where the filter
158    /// differs. This prevents cascading update loops in steady state.
159    pub fn mark_changed_peers(
160        &mut self,
161        exclude_from: &NodeAddr,
162        peer_addrs: &[NodeAddr],
163        peer_filters: &HashMap<NodeAddr, BloomFilter>,
164    ) {
165        for peer_addr in peer_addrs {
166            if peer_addr == exclude_from {
167                continue;
168            }
169            let new_filter = self.compute_outgoing_filter(peer_addr, peer_filters);
170            let changed = match self.last_sent_filters.get(peer_addr) {
171                Some(last) => *last != new_filter,
172                None => true, // never sent → must send
173            };
174            if changed {
175                self.pending_updates.insert(*peer_addr);
176            }
177        }
178    }
179
180    /// Compute the outgoing filter for a specific peer.
181    ///
182    /// The filter includes:
183    /// - This node's own ID
184    /// - All leaf dependents
185    /// - Entries from other peers' inbound filters (excluding the destination peer)
186    ///
187    /// The `peer_filters` map contains inbound filters from each peer.
188    /// The filter for `exclude_peer` is excluded to prevent routing loops.
189    pub fn compute_outgoing_filter(
190        &self,
191        exclude_peer: &NodeAddr,
192        peer_filters: &HashMap<NodeAddr, BloomFilter>,
193    ) -> BloomFilter {
194        let mut filter = BloomFilter::new();
195
196        // Always include ourselves
197        filter.insert(&self.own_node_addr);
198
199        // Include leaf dependents
200        for dep in &self.leaf_dependents {
201            filter.insert(dep);
202        }
203
204        // Merge filters from other peers
205        for (peer_id, peer_filter) in peer_filters {
206            if peer_id != exclude_peer {
207                // Ignore merge errors (size mismatches) - just skip that filter
208                let _ = filter.merge(peer_filter);
209            }
210        }
211
212        filter
213    }
214
215    /// Create a base filter containing just this node and its dependents.
216    pub fn base_filter(&self) -> BloomFilter {
217        let mut filter = BloomFilter::new();
218        filter.insert(&self.own_node_addr);
219        for dep in &self.leaf_dependents {
220            filter.insert(dep);
221        }
222        filter
223    }
224}