Skip to main content

rns_core/transport/
announce_verify_queue.rs

1use alloc::collections::BTreeMap;
2use alloc::vec::Vec;
3
4use crate::packet::RawPacket;
5
6use super::types::InterfaceId;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum OverflowPolicy {
10    DropNewest,
11    DropOldest,
12    DropWorst,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
16pub struct AnnounceVerifyKey {
17    pub destination_hash: [u8; 16],
18    pub random_blob: [u8; 10],
19    pub received_from: [u8; 16],
20}
21
22#[derive(Debug, Clone)]
23pub struct PendingAnnounce {
24    pub original_raw: Vec<u8>,
25    pub packet: RawPacket,
26    pub interface: InterfaceId,
27    pub received_from: [u8; 16],
28    pub queued_at: f64,
29    pub best_hops: u8,
30    pub emission_ts: u64,
31    pub random_blob: [u8; 10],
32}
33
34#[derive(Debug, Clone)]
35pub enum QueueEntry {
36    Pending(PendingAnnounce),
37    InFlight(PendingAnnounce),
38}
39
40#[derive(Debug, Clone)]
41pub struct AnnounceVerifyQueue {
42    pending: BTreeMap<AnnounceVerifyKey, QueueEntry>,
43    max_entries: usize,
44    max_bytes: usize,
45    max_stale_secs: f64,
46    overflow_policy: OverflowPolicy,
47    queued_bytes: usize,
48}
49
50impl AnnounceVerifyQueue {
51    pub fn new(max_entries: usize) -> Self {
52        Self::with_limits(max_entries, 256 * 1024, 30.0, OverflowPolicy::DropWorst)
53    }
54
55    pub fn with_limits(
56        max_entries: usize,
57        max_bytes: usize,
58        max_stale_secs: f64,
59        overflow_policy: OverflowPolicy,
60    ) -> Self {
61        Self {
62            pending: BTreeMap::new(),
63            max_entries: max_entries.max(1),
64            max_bytes: max_bytes.max(1),
65            max_stale_secs: max_stale_secs.max(0.001),
66            overflow_policy,
67            queued_bytes: 0,
68        }
69    }
70
71    pub fn enqueue(&mut self, key: AnnounceVerifyKey, entry: PendingAnnounce) -> bool {
72        if let Some(existing) = self.pending.get_mut(&key) {
73            return match existing {
74                QueueEntry::Pending(current) | QueueEntry::InFlight(current) => {
75                    if entry.best_hops < current.best_hops {
76                        let current_bytes = pending_bytes(current);
77                        let replacement_bytes = pending_bytes(&entry);
78                        self.queued_bytes = self
79                            .queued_bytes
80                            .saturating_sub(current_bytes)
81                            .saturating_add(replacement_bytes);
82                        *current = entry;
83                        true
84                    } else {
85                        false
86                    }
87                }
88            };
89        }
90
91        let entry_bytes = pending_bytes(&entry);
92        if entry_bytes > self.max_bytes {
93            return false;
94        }
95
96        while self.pending.len() >= self.max_entries
97            || self.queued_bytes.saturating_add(entry_bytes) > self.max_bytes
98        {
99            let Some(evict_key) = self.select_eviction_candidate(&entry) else {
100                return false;
101            };
102            self.remove_entry(&evict_key);
103        }
104
105        self.queued_bytes = self.queued_bytes.saturating_add(entry_bytes);
106        self.pending.insert(key, QueueEntry::Pending(entry));
107        true
108    }
109
110    pub fn take_pending(&mut self, now: f64) -> Vec<(AnnounceVerifyKey, PendingAnnounce)> {
111        let stale_before = now - self.max_stale_secs;
112        let stale_keys: Vec<_> = self
113            .pending
114            .iter()
115            .filter_map(|(key, entry)| match entry {
116                QueueEntry::Pending(current) | QueueEntry::InFlight(current)
117                    if current.queued_at < stale_before =>
118                {
119                    Some(*key)
120                }
121                _ => None,
122            })
123            .collect();
124        for key in stale_keys {
125            self.remove_entry(&key);
126        }
127
128        let keys: Vec<_> = self
129            .pending
130            .iter()
131            .filter_map(|(key, entry)| match entry {
132                QueueEntry::Pending(_) => Some(*key),
133                QueueEntry::InFlight(_) => None,
134            })
135            .collect();
136
137        let mut drained = Vec::with_capacity(keys.len());
138        for key in keys {
139            if let Some(entry) = self.pending.get_mut(&key) {
140                if let QueueEntry::Pending(current) = entry {
141                    let cloned = current.clone();
142                    *entry = QueueEntry::InFlight(cloned.clone());
143                    drained.push((key, cloned));
144                }
145            }
146        }
147
148        drained
149    }
150
151    pub fn complete_success(&mut self, key: &AnnounceVerifyKey) -> Option<PendingAnnounce> {
152        match self.remove_entry(key) {
153            Some(QueueEntry::InFlight(entry)) => Some(entry),
154            Some(QueueEntry::Pending(entry)) => Some(entry),
155            None => None,
156        }
157    }
158
159    pub fn complete_failure(&mut self, key: &AnnounceVerifyKey) -> bool {
160        self.remove_entry(key).is_some()
161    }
162
163    pub fn len(&self) -> usize {
164        self.pending.len()
165    }
166
167    pub fn is_empty(&self) -> bool {
168        self.pending.is_empty()
169    }
170
171    pub fn queued_bytes(&self) -> usize {
172        self.queued_bytes
173    }
174
175    pub fn clear(&mut self) {
176        self.pending.clear();
177        self.queued_bytes = 0;
178    }
179
180    fn select_eviction_candidate(
181        &self,
182        incoming_entry: &PendingAnnounce,
183    ) -> Option<AnnounceVerifyKey> {
184        match self.overflow_policy {
185            OverflowPolicy::DropNewest => None,
186            OverflowPolicy::DropOldest => self
187                .pending
188                .iter()
189                .min_by(|a, b| {
190                    queued_at_of(a.1)
191                        .partial_cmp(&queued_at_of(b.1))
192                        .unwrap_or(core::cmp::Ordering::Equal)
193                })
194                .map(|(key, _)| *key),
195            OverflowPolicy::DropWorst => {
196                let candidate = self
197                    .pending
198                    .iter()
199                    .map(|(existing_key, existing_entry)| {
200                        (*existing_key, pending_of(existing_entry))
201                    })
202                    .max_by(|a, b| {
203                        a.1.best_hops.cmp(&b.1.best_hops).then_with(|| {
204                            a.1.queued_at
205                                .partial_cmp(&b.1.queued_at)
206                                .unwrap_or(core::cmp::Ordering::Equal)
207                        })
208                    })?;
209                if incoming_entry.best_hops >= candidate.1.best_hops {
210                    None
211                } else {
212                    Some(candidate.0)
213                }
214            }
215        }
216    }
217
218    fn remove_entry(&mut self, key: &AnnounceVerifyKey) -> Option<QueueEntry> {
219        let removed = self.pending.remove(key)?;
220        self.queued_bytes = self
221            .queued_bytes
222            .saturating_sub(pending_bytes(pending_of(&removed)));
223        Some(removed)
224    }
225}
226
227fn pending_of(entry: &QueueEntry) -> &PendingAnnounce {
228    match entry {
229        QueueEntry::Pending(current) | QueueEntry::InFlight(current) => current,
230    }
231}
232
233fn queued_at_of(entry: &QueueEntry) -> f64 {
234    pending_of(entry).queued_at
235}
236
237fn pending_bytes(entry: &PendingAnnounce) -> usize {
238    entry.original_raw.len()
239        + entry.packet.data.len()
240        + entry.packet.transport_id.as_ref().map_or(0, |id| id.len())
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::constants;
247    use crate::packet::{PacketFlags, RawPacket};
248
249    fn make_packet(dest: [u8; 16], hops: u8, fill: u8) -> RawPacket {
250        RawPacket::pack(
251            PacketFlags {
252                header_type: constants::HEADER_1,
253                context_flag: constants::FLAG_UNSET,
254                transport_type: constants::TRANSPORT_BROADCAST,
255                destination_type: constants::DESTINATION_SINGLE,
256                packet_type: constants::PACKET_TYPE_ANNOUNCE,
257            },
258            hops,
259            &dest,
260            None,
261            constants::CONTEXT_NONE,
262            &[fill; 8],
263        )
264        .unwrap()
265    }
266
267    fn make_pending(
268        dest: [u8; 16],
269        random_blob: [u8; 10],
270        received_from: [u8; 16],
271        hops: u8,
272    ) -> (AnnounceVerifyKey, PendingAnnounce) {
273        (
274            AnnounceVerifyKey {
275                destination_hash: dest,
276                random_blob,
277                received_from,
278            },
279            PendingAnnounce {
280                original_raw: vec![hops],
281                packet: make_packet(dest, hops, hops),
282                interface: InterfaceId(1),
283                received_from,
284                queued_at: 10.0,
285                best_hops: hops,
286                emission_ts: 42,
287                random_blob,
288            },
289        )
290    }
291
292    #[test]
293    fn enqueue_replaces_lower_hops_and_preserves_distinct_paths() {
294        let mut queue = AnnounceVerifyQueue::new(8);
295        let dest = [1; 16];
296        let random = [2; 10];
297        let rx_a = [3; 16];
298        let rx_b = [4; 16];
299
300        let (key_a, entry_a) = make_pending(dest, random, rx_a, 5);
301        assert!(queue.enqueue(key_a, entry_a));
302
303        let (_, better_a) = make_pending(dest, random, rx_a, 3);
304        assert!(queue.enqueue(key_a, better_a));
305        assert_eq!(queue.len(), 1);
306
307        let (key_b, entry_b) = make_pending(dest, random, rx_b, 4);
308        assert!(queue.enqueue(key_b, entry_b));
309        assert_eq!(queue.len(), 2);
310
311        let taken = queue.take_pending(10.0);
312        assert_eq!(taken.len(), 2);
313        assert!(taken
314            .iter()
315            .any(|(key, entry)| *key == key_a && entry.best_hops == 3));
316        assert!(taken
317            .iter()
318            .any(|(key, entry)| *key == key_b && entry.best_hops == 4));
319    }
320
321    #[test]
322    fn enqueue_updates_inflight_and_cleans_stale_entries() {
323        let mut queue = AnnounceVerifyQueue::new(2);
324        let dest = [8; 16];
325        let random = [9; 10];
326        let recv = [10; 16];
327
328        let (key, entry) = make_pending(dest, random, recv, 6);
329        assert!(queue.enqueue(key, entry));
330        let _ = queue.take_pending(20.0);
331
332        let (_, better) = make_pending(dest, random, recv, 2);
333        assert!(queue.enqueue(key, better));
334        let completed = queue.complete_success(&key).unwrap();
335        assert_eq!(completed.best_hops, 2);
336
337        let (stale_key, mut stale) = make_pending([11; 16], [12; 10], [13; 16], 7);
338        stale.queued_at = 1.0;
339        assert!(queue.enqueue(stale_key, stale));
340        assert!(queue.take_pending(40.0).is_empty());
341        assert_eq!(queue.len(), 0);
342    }
343
344    #[test]
345    fn enqueue_evicts_worst_entry_when_full() {
346        let mut queue = AnnounceVerifyQueue::with_limits(2, 1024, 30.0, OverflowPolicy::DropWorst);
347        let (k1, e1) = make_pending([1; 16], [1; 10], [1; 16], 8);
348        let (k2, e2) = make_pending([2; 16], [2; 10], [2; 16], 5);
349        let (k3, e3) = make_pending([3; 16], [3; 10], [3; 16], 4);
350        let (_, e4) = make_pending([4; 16], [4; 10], [4; 16], 9);
351
352        assert!(queue.enqueue(k1, e1));
353        assert!(queue.enqueue(k2, e2));
354        assert!(queue.enqueue(k3, e3));
355        assert_eq!(queue.len(), 2);
356        assert!(!queue.enqueue(
357            AnnounceVerifyKey {
358                destination_hash: [4; 16],
359                random_blob: [4; 10],
360                received_from: [4; 16],
361            },
362            e4
363        ));
364
365        let taken = queue.take_pending(10.0);
366        assert_eq!(taken.len(), 2);
367        assert!(taken.iter().all(|(key, _)| *key != k1));
368    }
369
370    #[test]
371    fn drop_newest_policy_rejects_when_full() {
372        let mut queue = AnnounceVerifyQueue::with_limits(1, 1024, 30.0, OverflowPolicy::DropNewest);
373        let (k1, e1) = make_pending([1; 16], [1; 10], [1; 16], 4);
374        let (k2, e2) = make_pending([2; 16], [2; 10], [2; 16], 1);
375        assert!(queue.enqueue(k1, e1));
376        assert!(!queue.enqueue(k2, e2));
377        let taken = queue.take_pending(10.0);
378        assert_eq!(taken.len(), 1);
379        assert_eq!(taken[0].0, k1);
380    }
381
382    #[test]
383    fn drop_oldest_policy_evicts_oldest_for_byte_cap() {
384        let mut queue = AnnounceVerifyQueue::with_limits(4, 24, 30.0, OverflowPolicy::DropOldest);
385        let (k1, mut e1) = make_pending([1; 16], [1; 10], [1; 16], 4);
386        let (k2, mut e2) = make_pending([2; 16], [2; 10], [2; 16], 3);
387        e1.original_raw = vec![1; 12];
388        e2.original_raw = vec![2; 12];
389        e1.queued_at = 1.0;
390        e2.queued_at = 2.0;
391        assert!(queue.enqueue(k1, e1));
392        assert!(queue.enqueue(k2, e2));
393        assert_eq!(queue.len(), 1);
394        let taken = queue.take_pending(10.0);
395        assert_eq!(taken.len(), 1);
396        assert_eq!(taken[0].0, k2);
397    }
398
399    #[test]
400    fn clear_removes_pending_and_inflight_entries_and_resets_bytes() {
401        let mut queue = AnnounceVerifyQueue::new(4);
402        let (pending_key, pending) = make_pending([1; 16], [1; 10], [1; 16], 4);
403        let (inflight_key, inflight) = make_pending([2; 16], [2; 10], [2; 16], 3);
404        assert!(queue.enqueue(pending_key, pending));
405        assert!(queue.enqueue(inflight_key, inflight));
406        let _ = queue.take_pending(10.0);
407
408        assert_eq!(queue.len(), 2);
409        assert!(queue.queued_bytes() > 0);
410
411        queue.clear();
412
413        assert!(queue.is_empty());
414        assert_eq!(queue.queued_bytes(), 0);
415        assert!(queue.take_pending(10.0).is_empty());
416    }
417}