1use std::collections::{HashMap, HashSet};
4
5use super::BloomFilter;
6use crate::NodeAddr;
7
8#[derive(Clone, Debug)]
12pub struct BloomState {
13 own_node_addr: NodeAddr,
15 leaf_dependents: HashSet<NodeAddr>,
17 is_leaf_only: bool,
19 update_debounce_ms: u64,
21 last_update_sent: HashMap<NodeAddr, u64>,
23 pending_updates: HashSet<NodeAddr>,
25 sequence: u64,
27 last_sent_filters: HashMap<NodeAddr, BloomFilter>,
29}
30
31impl BloomState {
32 pub fn new(own_node_addr: NodeAddr) -> Self {
34 Self {
35 own_node_addr,
36 leaf_dependents: HashSet::new(),
37 is_leaf_only: false,
38 update_debounce_ms: 500,
39 last_update_sent: HashMap::new(),
40 pending_updates: HashSet::new(),
41 sequence: 0,
42 last_sent_filters: HashMap::new(),
43 }
44 }
45
46 pub fn leaf_only(own_node_addr: NodeAddr) -> Self {
48 let mut state = Self::new(own_node_addr);
49 state.is_leaf_only = true;
50 state
51 }
52
53 pub fn own_node_addr(&self) -> &NodeAddr {
55 &self.own_node_addr
56 }
57
58 pub fn is_leaf_only(&self) -> bool {
60 self.is_leaf_only
61 }
62
63 pub fn sequence(&self) -> u64 {
65 self.sequence
66 }
67
68 pub fn next_sequence(&mut self) -> u64 {
70 self.sequence += 1;
71 self.sequence
72 }
73
74 pub fn update_debounce_ms(&self) -> u64 {
76 self.update_debounce_ms
77 }
78
79 pub fn set_update_debounce_ms(&mut self, ms: u64) {
81 self.update_debounce_ms = ms;
82 }
83
84 pub fn add_leaf_dependent(&mut self, node_addr: NodeAddr) {
86 self.leaf_dependents.insert(node_addr);
87 }
88
89 pub fn remove_leaf_dependent(&mut self, node_addr: &NodeAddr) -> bool {
91 self.leaf_dependents.remove(node_addr)
92 }
93
94 pub fn leaf_dependents(&self) -> &HashSet<NodeAddr> {
96 &self.leaf_dependents
97 }
98
99 pub fn leaf_dependent_count(&self) -> usize {
101 self.leaf_dependents.len()
102 }
103
104 pub fn mark_update_needed(&mut self, peer_id: NodeAddr) {
106 self.pending_updates.insert(peer_id);
107 }
108
109 pub fn mark_all_updates_needed(&mut self, peer_ids: impl IntoIterator<Item = NodeAddr>) {
111 self.pending_updates.extend(peer_ids);
112 }
113
114 pub fn needs_update(&self, peer_id: &NodeAddr) -> bool {
116 self.pending_updates.contains(peer_id)
117 }
118
119 pub fn should_send_update(&self, peer_id: &NodeAddr, current_time_ms: u64) -> bool {
121 if !self.pending_updates.contains(peer_id) {
122 return false;
123 }
124
125 match self.last_update_sent.get(peer_id) {
126 Some(&last_time) => current_time_ms >= last_time + self.update_debounce_ms,
127 None => true,
128 }
129 }
130
131 pub fn record_update_sent(&mut self, peer_id: NodeAddr, current_time_ms: u64) {
133 self.last_update_sent.insert(peer_id, current_time_ms);
134 self.pending_updates.remove(&peer_id);
135 }
136
137 pub fn clear_pending_updates(&mut self) {
139 self.pending_updates.clear();
140 }
141
142 pub fn record_sent_filter(&mut self, peer_id: NodeAddr, filter: BloomFilter) {
144 self.last_sent_filters.insert(peer_id, filter);
145 }
146
147 pub fn remove_peer_state(&mut self, peer_id: &NodeAddr) {
149 self.last_sent_filters.remove(peer_id);
150 self.last_update_sent.remove(peer_id);
151 self.pending_updates.remove(peer_id);
152 }
153
154 pub fn mark_changed_peers(
160 &mut self,
161 exclude_from: &NodeAddr,
162 peer_addrs: &[NodeAddr],
163 peer_filters: &HashMap<NodeAddr, BloomFilter>,
164 ) {
165 for peer_addr in peer_addrs {
166 if peer_addr == exclude_from {
167 continue;
168 }
169 let new_filter = self.compute_outgoing_filter(peer_addr, peer_filters);
170 let changed = match self.last_sent_filters.get(peer_addr) {
171 Some(last) => *last != new_filter,
172 None => true, };
174 if changed {
175 self.pending_updates.insert(*peer_addr);
176 }
177 }
178 }
179
180 pub fn compute_outgoing_filter(
190 &self,
191 exclude_peer: &NodeAddr,
192 peer_filters: &HashMap<NodeAddr, BloomFilter>,
193 ) -> BloomFilter {
194 let mut filter = BloomFilter::new();
195
196 filter.insert(&self.own_node_addr);
198
199 for dep in &self.leaf_dependents {
201 filter.insert(dep);
202 }
203
204 for (peer_id, peer_filter) in peer_filters {
206 if peer_id != exclude_peer {
207 let _ = filter.merge(peer_filter);
209 }
210 }
211
212 filter
213 }
214
215 pub fn base_filter(&self) -> BloomFilter {
217 let mut filter = BloomFilter::new();
218 filter.insert(&self.own_node_addr);
219 for dep in &self.leaf_dependents {
220 filter.insert(dep);
221 }
222 filter
223 }
224}