1use alloc::collections::BTreeMap;
2use alloc::vec::Vec;
3
4use crate::packet::RawPacket;
5
6use super::types::InterfaceId;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum OverflowPolicy {
10 DropNewest,
11 DropOldest,
12 DropWorst,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
16pub struct AnnounceVerifyKey {
17 pub destination_hash: [u8; 16],
18 pub random_blob: [u8; 10],
19 pub received_from: [u8; 16],
20}
21
22#[derive(Debug, Clone)]
23pub struct PendingAnnounce {
24 pub original_raw: Vec<u8>,
25 pub packet: RawPacket,
26 pub interface: InterfaceId,
27 pub received_from: [u8; 16],
28 pub queued_at: f64,
29 pub best_hops: u8,
30 pub emission_ts: u64,
31 pub random_blob: [u8; 10],
32}
33
34#[derive(Debug, Clone)]
35pub enum QueueEntry {
36 Pending(PendingAnnounce),
37 InFlight(PendingAnnounce),
38}
39
40#[derive(Debug, Clone)]
41pub struct AnnounceVerifyQueue {
42 pending: BTreeMap<AnnounceVerifyKey, QueueEntry>,
43 max_entries: usize,
44 max_bytes: usize,
45 max_stale_secs: f64,
46 overflow_policy: OverflowPolicy,
47 queued_bytes: usize,
48}
49
50impl AnnounceVerifyQueue {
51 pub fn new(max_entries: usize) -> Self {
52 Self::with_limits(max_entries, 256 * 1024, 30.0, OverflowPolicy::DropWorst)
53 }
54
55 pub fn with_limits(
56 max_entries: usize,
57 max_bytes: usize,
58 max_stale_secs: f64,
59 overflow_policy: OverflowPolicy,
60 ) -> Self {
61 Self {
62 pending: BTreeMap::new(),
63 max_entries: max_entries.max(1),
64 max_bytes: max_bytes.max(1),
65 max_stale_secs: max_stale_secs.max(0.001),
66 overflow_policy,
67 queued_bytes: 0,
68 }
69 }
70
71 pub fn enqueue(&mut self, key: AnnounceVerifyKey, entry: PendingAnnounce) -> bool {
72 if let Some(existing) = self.pending.get_mut(&key) {
73 return match existing {
74 QueueEntry::Pending(current) | QueueEntry::InFlight(current) => {
75 if entry.best_hops < current.best_hops {
76 let current_bytes = pending_bytes(current);
77 let replacement_bytes = pending_bytes(&entry);
78 self.queued_bytes = self
79 .queued_bytes
80 .saturating_sub(current_bytes)
81 .saturating_add(replacement_bytes);
82 *current = entry;
83 true
84 } else {
85 false
86 }
87 }
88 };
89 }
90
91 let entry_bytes = pending_bytes(&entry);
92 if entry_bytes > self.max_bytes {
93 return false;
94 }
95
96 while self.pending.len() >= self.max_entries
97 || self.queued_bytes.saturating_add(entry_bytes) > self.max_bytes
98 {
99 let Some(evict_key) = self.select_eviction_candidate(&entry) else {
100 return false;
101 };
102 self.remove_entry(&evict_key);
103 }
104
105 self.queued_bytes = self.queued_bytes.saturating_add(entry_bytes);
106 self.pending.insert(key, QueueEntry::Pending(entry));
107 true
108 }
109
110 pub fn take_pending(&mut self, now: f64) -> Vec<(AnnounceVerifyKey, PendingAnnounce)> {
111 let stale_before = now - self.max_stale_secs;
112 let stale_keys: Vec<_> = self
113 .pending
114 .iter()
115 .filter_map(|(key, entry)| match entry {
116 QueueEntry::Pending(current) | QueueEntry::InFlight(current)
117 if current.queued_at < stale_before =>
118 {
119 Some(*key)
120 }
121 _ => None,
122 })
123 .collect();
124 for key in stale_keys {
125 self.remove_entry(&key);
126 }
127
128 let keys: Vec<_> = self
129 .pending
130 .iter()
131 .filter_map(|(key, entry)| match entry {
132 QueueEntry::Pending(_) => Some(*key),
133 QueueEntry::InFlight(_) => None,
134 })
135 .collect();
136
137 let mut drained = Vec::with_capacity(keys.len());
138 for key in keys {
139 if let Some(entry) = self.pending.get_mut(&key) {
140 if let QueueEntry::Pending(current) = entry {
141 let cloned = current.clone();
142 *entry = QueueEntry::InFlight(cloned.clone());
143 drained.push((key, cloned));
144 }
145 }
146 }
147
148 drained
149 }
150
151 pub fn complete_success(&mut self, key: &AnnounceVerifyKey) -> Option<PendingAnnounce> {
152 match self.remove_entry(key) {
153 Some(QueueEntry::InFlight(entry)) => Some(entry),
154 Some(QueueEntry::Pending(entry)) => Some(entry),
155 None => None,
156 }
157 }
158
159 pub fn complete_failure(&mut self, key: &AnnounceVerifyKey) -> bool {
160 self.remove_entry(key).is_some()
161 }
162
163 pub fn len(&self) -> usize {
164 self.pending.len()
165 }
166
167 pub fn is_empty(&self) -> bool {
168 self.pending.is_empty()
169 }
170
171 pub fn queued_bytes(&self) -> usize {
172 self.queued_bytes
173 }
174
175 pub fn clear(&mut self) {
176 self.pending.clear();
177 self.queued_bytes = 0;
178 }
179
180 fn select_eviction_candidate(
181 &self,
182 incoming_entry: &PendingAnnounce,
183 ) -> Option<AnnounceVerifyKey> {
184 match self.overflow_policy {
185 OverflowPolicy::DropNewest => None,
186 OverflowPolicy::DropOldest => self
187 .pending
188 .iter()
189 .min_by(|a, b| {
190 queued_at_of(a.1)
191 .partial_cmp(&queued_at_of(b.1))
192 .unwrap_or(core::cmp::Ordering::Equal)
193 })
194 .map(|(key, _)| *key),
195 OverflowPolicy::DropWorst => {
196 let candidate = self
197 .pending
198 .iter()
199 .map(|(existing_key, existing_entry)| {
200 (*existing_key, pending_of(existing_entry))
201 })
202 .max_by(|a, b| {
203 a.1.best_hops.cmp(&b.1.best_hops).then_with(|| {
204 a.1.queued_at
205 .partial_cmp(&b.1.queued_at)
206 .unwrap_or(core::cmp::Ordering::Equal)
207 })
208 })?;
209 if incoming_entry.best_hops >= candidate.1.best_hops {
210 None
211 } else {
212 Some(candidate.0)
213 }
214 }
215 }
216 }
217
218 fn remove_entry(&mut self, key: &AnnounceVerifyKey) -> Option<QueueEntry> {
219 let removed = self.pending.remove(key)?;
220 self.queued_bytes = self
221 .queued_bytes
222 .saturating_sub(pending_bytes(pending_of(&removed)));
223 Some(removed)
224 }
225}
226
227fn pending_of(entry: &QueueEntry) -> &PendingAnnounce {
228 match entry {
229 QueueEntry::Pending(current) | QueueEntry::InFlight(current) => current,
230 }
231}
232
233fn queued_at_of(entry: &QueueEntry) -> f64 {
234 pending_of(entry).queued_at
235}
236
237fn pending_bytes(entry: &PendingAnnounce) -> usize {
238 entry.original_raw.len()
239 + entry.packet.data.len()
240 + entry.packet.transport_id.as_ref().map_or(0, |id| id.len())
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::constants;
247 use crate::packet::{PacketFlags, RawPacket};
248
249 fn make_packet(dest: [u8; 16], hops: u8, fill: u8) -> RawPacket {
250 RawPacket::pack(
251 PacketFlags {
252 header_type: constants::HEADER_1,
253 context_flag: constants::FLAG_UNSET,
254 transport_type: constants::TRANSPORT_BROADCAST,
255 destination_type: constants::DESTINATION_SINGLE,
256 packet_type: constants::PACKET_TYPE_ANNOUNCE,
257 },
258 hops,
259 &dest,
260 None,
261 constants::CONTEXT_NONE,
262 &[fill; 8],
263 )
264 .unwrap()
265 }
266
267 fn make_pending(
268 dest: [u8; 16],
269 random_blob: [u8; 10],
270 received_from: [u8; 16],
271 hops: u8,
272 ) -> (AnnounceVerifyKey, PendingAnnounce) {
273 (
274 AnnounceVerifyKey {
275 destination_hash: dest,
276 random_blob,
277 received_from,
278 },
279 PendingAnnounce {
280 original_raw: vec![hops],
281 packet: make_packet(dest, hops, hops),
282 interface: InterfaceId(1),
283 received_from,
284 queued_at: 10.0,
285 best_hops: hops,
286 emission_ts: 42,
287 random_blob,
288 },
289 )
290 }
291
292 #[test]
293 fn enqueue_replaces_lower_hops_and_preserves_distinct_paths() {
294 let mut queue = AnnounceVerifyQueue::new(8);
295 let dest = [1; 16];
296 let random = [2; 10];
297 let rx_a = [3; 16];
298 let rx_b = [4; 16];
299
300 let (key_a, entry_a) = make_pending(dest, random, rx_a, 5);
301 assert!(queue.enqueue(key_a, entry_a));
302
303 let (_, better_a) = make_pending(dest, random, rx_a, 3);
304 assert!(queue.enqueue(key_a, better_a));
305 assert_eq!(queue.len(), 1);
306
307 let (key_b, entry_b) = make_pending(dest, random, rx_b, 4);
308 assert!(queue.enqueue(key_b, entry_b));
309 assert_eq!(queue.len(), 2);
310
311 let taken = queue.take_pending(10.0);
312 assert_eq!(taken.len(), 2);
313 assert!(taken
314 .iter()
315 .any(|(key, entry)| *key == key_a && entry.best_hops == 3));
316 assert!(taken
317 .iter()
318 .any(|(key, entry)| *key == key_b && entry.best_hops == 4));
319 }
320
321 #[test]
322 fn enqueue_updates_inflight_and_cleans_stale_entries() {
323 let mut queue = AnnounceVerifyQueue::new(2);
324 let dest = [8; 16];
325 let random = [9; 10];
326 let recv = [10; 16];
327
328 let (key, entry) = make_pending(dest, random, recv, 6);
329 assert!(queue.enqueue(key, entry));
330 let _ = queue.take_pending(20.0);
331
332 let (_, better) = make_pending(dest, random, recv, 2);
333 assert!(queue.enqueue(key, better));
334 let completed = queue.complete_success(&key).unwrap();
335 assert_eq!(completed.best_hops, 2);
336
337 let (stale_key, mut stale) = make_pending([11; 16], [12; 10], [13; 16], 7);
338 stale.queued_at = 1.0;
339 assert!(queue.enqueue(stale_key, stale));
340 assert!(queue.take_pending(40.0).is_empty());
341 assert_eq!(queue.len(), 0);
342 }
343
344 #[test]
345 fn enqueue_evicts_worst_entry_when_full() {
346 let mut queue = AnnounceVerifyQueue::with_limits(2, 1024, 30.0, OverflowPolicy::DropWorst);
347 let (k1, e1) = make_pending([1; 16], [1; 10], [1; 16], 8);
348 let (k2, e2) = make_pending([2; 16], [2; 10], [2; 16], 5);
349 let (k3, e3) = make_pending([3; 16], [3; 10], [3; 16], 4);
350 let (_, e4) = make_pending([4; 16], [4; 10], [4; 16], 9);
351
352 assert!(queue.enqueue(k1, e1));
353 assert!(queue.enqueue(k2, e2));
354 assert!(queue.enqueue(k3, e3));
355 assert_eq!(queue.len(), 2);
356 assert!(!queue.enqueue(
357 AnnounceVerifyKey {
358 destination_hash: [4; 16],
359 random_blob: [4; 10],
360 received_from: [4; 16],
361 },
362 e4
363 ));
364
365 let taken = queue.take_pending(10.0);
366 assert_eq!(taken.len(), 2);
367 assert!(taken.iter().all(|(key, _)| *key != k1));
368 }
369
370 #[test]
371 fn drop_newest_policy_rejects_when_full() {
372 let mut queue = AnnounceVerifyQueue::with_limits(1, 1024, 30.0, OverflowPolicy::DropNewest);
373 let (k1, e1) = make_pending([1; 16], [1; 10], [1; 16], 4);
374 let (k2, e2) = make_pending([2; 16], [2; 10], [2; 16], 1);
375 assert!(queue.enqueue(k1, e1));
376 assert!(!queue.enqueue(k2, e2));
377 let taken = queue.take_pending(10.0);
378 assert_eq!(taken.len(), 1);
379 assert_eq!(taken[0].0, k1);
380 }
381
382 #[test]
383 fn drop_oldest_policy_evicts_oldest_for_byte_cap() {
384 let mut queue = AnnounceVerifyQueue::with_limits(4, 24, 30.0, OverflowPolicy::DropOldest);
385 let (k1, mut e1) = make_pending([1; 16], [1; 10], [1; 16], 4);
386 let (k2, mut e2) = make_pending([2; 16], [2; 10], [2; 16], 3);
387 e1.original_raw = vec![1; 12];
388 e2.original_raw = vec![2; 12];
389 e1.queued_at = 1.0;
390 e2.queued_at = 2.0;
391 assert!(queue.enqueue(k1, e1));
392 assert!(queue.enqueue(k2, e2));
393 assert_eq!(queue.len(), 1);
394 let taken = queue.take_pending(10.0);
395 assert_eq!(taken.len(), 1);
396 assert_eq!(taken[0].0, k2);
397 }
398
399 #[test]
400 fn clear_removes_pending_and_inflight_entries_and_resets_bytes() {
401 let mut queue = AnnounceVerifyQueue::new(4);
402 let (pending_key, pending) = make_pending([1; 16], [1; 10], [1; 16], 4);
403 let (inflight_key, inflight) = make_pending([2; 16], [2; 10], [2; 16], 3);
404 assert!(queue.enqueue(pending_key, pending));
405 assert!(queue.enqueue(inflight_key, inflight));
406 let _ = queue.take_pending(10.0);
407
408 assert_eq!(queue.len(), 2);
409 assert!(queue.queued_bytes() > 0);
410
411 queue.clear();
412
413 assert!(queue.is_empty());
414 assert_eq!(queue.queued_bytes(), 0);
415 assert!(queue.take_pending(10.0).is_empty());
416 }
417}