Skip to main content

rns_core/transport/
announce_queue.rs

1//! Per-interface announce bandwidth queuing.
2//!
3//! Announces with hops > 0 (propagation, not locally-originated) are gated
4//! by a per-interface bandwidth cap (default 2%). When bandwidth is exhausted,
5//! announces are queued and released when bandwidth becomes available.
6//!
7//! Python reference: Transport.py:1085-1165, Interface.py:246-286
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{AirtimeProfile, InterfaceId, PacketBytes, TransportAction};
13use crate::constants;
14
15/// A queued announce entry waiting for bandwidth availability.
16#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18    /// Destination hash of the announce.
19    pub destination_hash: [u8; 16],
20    /// Time the announce was queued.
21    pub time: f64,
22    /// Hops from the announce.
23    pub hops: u8,
24    /// Time the announce was originally emitted (from random blob).
25    pub emitted: f64,
26    /// Raw announce bytes (ready to send).
27    pub raw: PacketBytes,
28}
29
30/// Per-interface announce queue with bandwidth tracking.
31#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33    /// Queued announce entries.
34    pub entries: Vec<AnnounceQueueEntry>,
35    /// Earliest time another announce is allowed on this interface.
36    pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40    pub fn new() -> Self {
41        InterfaceAnnounceQueue {
42            entries: Vec::new(),
43            announce_allowed_at: 0.0,
44        }
45    }
46
47    /// Insert an announce into the queue.
48    /// If an entry for the same destination already exists, update it if the new one
49    /// has fewer hops or is newer.
50    pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51        // Check for existing entry with same destination
52        if let Some(pos) = self
53            .entries
54            .iter()
55            .position(|e| e.destination_hash == entry.destination_hash)
56        {
57            let existing = &self.entries[pos];
58            // Update if new entry has fewer hops, or same hops and newer
59            if entry.hops < existing.hops
60                || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61            {
62                self.entries[pos] = entry;
63            }
64            // Otherwise discard the new entry
65        } else {
66            // Enforce max queue size
67            if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68                // Drop oldest entry
69                self.entries.remove(0);
70            }
71            self.entries.push(entry);
72        }
73    }
74
75    /// Remove stale entries (older than QUEUED_ANNOUNCE_LIFE).
76    pub fn remove_stale(&mut self, now: f64) {
77        self.entries
78            .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79    }
80
81    /// Select the next announce to send: minimum hops, then oldest (FIFO).
82    /// Returns the index of the selected entry, or None if empty.
83    pub fn select_next(&self) -> Option<usize> {
84        if self.entries.is_empty() {
85            return None;
86        }
87        let mut best_idx = 0;
88        let mut best_hops = self.entries[0].hops;
89        let mut best_time = self.entries[0].time;
90
91        for (i, entry) in self.entries.iter().enumerate().skip(1) {
92            if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93                best_idx = i;
94                best_hops = entry.hops;
95                best_time = entry.time;
96            }
97        }
98        Some(best_idx)
99    }
100
101    /// Check if an announce is allowed now based on bandwidth.
102    pub fn is_allowed(&self, now: f64) -> bool {
103        now >= self.announce_allowed_at
104    }
105
106    /// Calculate the next allowed time after sending an announce.
107    /// `raw_len`: size of the announce in bytes
108    /// `bitrate`: interface bitrate in bits/second
109    /// `announce_cap`: fraction of bitrate reserved for announces
110    pub fn calculate_next_allowed(
111        now: f64,
112        raw_len: usize,
113        bitrate: u64,
114        airtime_profile: Option<AirtimeProfile>,
115        announce_cap: f64,
116    ) -> f64 {
117        if announce_cap <= 0.0 {
118            return now; // no cap
119        }
120
121        let time_to_send = airtime_profile
122            .map(|profile| profile.transmit_time_secs(raw_len))
123            .unwrap_or_else(|| {
124                if bitrate == 0 {
125                    0.0
126                } else {
127                    let bits = (raw_len * 8) as f64;
128                    bits / (bitrate as f64)
129                }
130            });
131        if time_to_send <= 0.0 {
132            return now;
133        }
134        let delay = time_to_send / announce_cap;
135        now + delay
136    }
137}
138
139impl Default for InterfaceAnnounceQueue {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145/// Manage announce queues for all interfaces.
146#[derive(Debug, Clone)]
147pub struct AnnounceQueues {
148    queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
149    max_interfaces: usize,
150    interface_cap_drops: u64,
151}
152
153impl AnnounceQueues {
154    pub fn new(max_interfaces: usize) -> Self {
155        AnnounceQueues {
156            queues: BTreeMap::new(),
157            max_interfaces,
158            interface_cap_drops: 0,
159        }
160    }
161
162    /// Try to send an announce on an interface. If bandwidth is available,
163    /// returns the action immediately. Otherwise, queues it.
164    ///
165    /// Returns Some(action) if the announce should be sent now, None if queued.
166    #[allow(clippy::too_many_arguments)]
167    pub fn gate_announce(
168        &mut self,
169        interface: InterfaceId,
170        raw: PacketBytes,
171        dest_hash: [u8; 16],
172        hops: u8,
173        emitted: f64,
174        now: f64,
175        bitrate: Option<u64>,
176        airtime_profile: Option<AirtimeProfile>,
177        announce_cap: f64,
178    ) -> Option<TransportAction> {
179        // If no timing model is available, no cap applies — send immediately
180        let bitrate = match bitrate {
181            Some(br) if br > 0 => br,
182            _ if airtime_profile.is_none() => {
183                return Some(TransportAction::SendOnInterface { interface, raw });
184            }
185            _ => 0,
186        };
187
188        if !self.queues.contains_key(&interface) && self.queues.len() >= self.max_interfaces {
189            self.interface_cap_drops = self.interface_cap_drops.saturating_add(1);
190            return None;
191        }
192
193        let queue = self.queues.entry(interface).or_default();
194
195        if queue.is_allowed(now) {
196            // Bandwidth available — send now and update allowed_at
197            queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
198                now,
199                raw.len(),
200                bitrate,
201                airtime_profile,
202                announce_cap,
203            );
204            Some(TransportAction::SendOnInterface { interface, raw })
205        } else {
206            // Queue the announce
207            queue.insert(AnnounceQueueEntry {
208                destination_hash: dest_hash,
209                time: now,
210                hops,
211                emitted,
212                raw,
213            });
214            None
215        }
216    }
217
218    /// Process all announce queues: dequeue and send when bandwidth is available.
219    /// Called from tick().
220    pub fn process_queues(
221        &mut self,
222        now: f64,
223        interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
224    ) -> Vec<TransportAction> {
225        let mut actions = Vec::new();
226        let mut empty_queues = Vec::new();
227
228        for (iface_id, queue) in self.queues.iter_mut() {
229            // Remove stale entries
230            queue.remove_stale(now);
231
232            // Process as many announces as bandwidth allows
233            while queue.is_allowed(now) {
234                if let Some(idx) = queue.select_next() {
235                    let entry = queue.entries.remove(idx);
236
237                    // Look up bitrate for this interface
238                    let (bitrate, airtime_profile, announce_cap) =
239                        if let Some(info) = interfaces.get(iface_id) {
240                            (
241                                info.bitrate.unwrap_or(0),
242                                info.airtime_profile,
243                                info.announce_cap,
244                            )
245                        } else {
246                            (0, None, constants::ANNOUNCE_CAP)
247                        };
248
249                    if bitrate > 0 || airtime_profile.is_some() {
250                        queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
251                            now,
252                            entry.raw.len(),
253                            bitrate,
254                            airtime_profile,
255                            announce_cap,
256                        );
257                    } else {
258                        queue.announce_allowed_at = now;
259                    }
260
261                    actions.push(TransportAction::SendOnInterface {
262                        interface: *iface_id,
263                        raw: entry.raw,
264                    });
265                } else {
266                    break;
267                }
268            }
269
270            if queue.entries.is_empty() {
271                empty_queues.push(*iface_id);
272            }
273        }
274
275        for iface_id in empty_queues {
276            self.queues.remove(&iface_id);
277        }
278
279        actions
280    }
281
282    /// Return true when recursive path requests should wait for this interface.
283    pub fn blocks_recursive_path_request(&self, interface: InterfaceId, now: f64) -> bool {
284        self.queues
285            .get(&interface)
286            .is_some_and(|queue| !queue.entries.is_empty() || !queue.is_allowed(now))
287    }
288
289    /// Reserve announce-cap airtime after sending a recursive path request.
290    pub fn reserve_recursive_path_request(
291        &mut self,
292        interface: InterfaceId,
293        raw_len: usize,
294        now: f64,
295        bitrate: Option<u64>,
296        airtime_profile: Option<AirtimeProfile>,
297        announce_cap: f64,
298    ) {
299        let bitrate = match bitrate {
300            Some(br) if br > 0 => br,
301            _ if airtime_profile.is_none() => return,
302            _ => 0,
303        };
304
305        if !self.queues.contains_key(&interface) && self.queues.len() >= self.max_interfaces {
306            self.interface_cap_drops = self.interface_cap_drops.saturating_add(1);
307            return;
308        }
309
310        let queue = self.queues.entry(interface).or_default();
311        queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
312            now,
313            raw_len,
314            bitrate,
315            airtime_profile,
316            announce_cap,
317        );
318    }
319
320    /// Remove all announce queue state for an interface.
321    pub fn remove_interface(&mut self, interface: InterfaceId) -> bool {
322        self.queues.remove(&interface).is_some()
323    }
324
325    /// Number of interface queues currently tracked.
326    pub fn queue_count(&self) -> usize {
327        self.queues.len()
328    }
329
330    /// Number of interface queues that currently hold buffered announces.
331    pub fn nonempty_queue_count(&self) -> usize {
332        self.queues
333            .values()
334            .filter(|queue| !queue.entries.is_empty())
335            .count()
336    }
337
338    /// Total number of buffered announce entries across all interfaces.
339    pub fn total_queued_announces(&self) -> usize {
340        self.queues.values().map(|queue| queue.entries.len()).sum()
341    }
342
343    /// Total retained raw-byte payload across all buffered announces.
344    pub fn total_queued_bytes(&self) -> usize {
345        self.queues
346            .values()
347            .flat_map(|queue| queue.entries.iter())
348            .map(|entry| entry.raw.len())
349            .sum()
350    }
351
352    /// Number of announces dropped because the interface queue cap was reached.
353    pub fn interface_cap_drop_count(&self) -> u64 {
354        self.interface_cap_drops
355    }
356
357    /// Get the queue for a specific interface (for testing).
358    #[cfg(test)]
359    pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
360        self.queues.get(id)
361    }
362}
363
364impl Default for AnnounceQueues {
365    fn default() -> Self {
366        Self::new(1024)
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373    use alloc::string::String;
374
375    fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
376        AnnounceQueueEntry {
377            destination_hash: [dest; 16],
378            time,
379            hops,
380            emitted: time,
381            raw: vec![0x01, 0x02, 0x03].into(),
382        }
383    }
384
385    fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
386        super::super::types::InterfaceInfo {
387            id: InterfaceId(id),
388            name: String::from("test"),
389            mode: crate::constants::MODE_FULL,
390            out_capable: true,
391            in_capable: true,
392            bitrate,
393            airtime_profile: None,
394            announce_rate_target: None,
395            announce_rate_grace: 0,
396            announce_rate_penalty: 0.0,
397            announce_cap: constants::ANNOUNCE_CAP,
398            is_local_client: false,
399            wants_tunnel: false,
400            tunnel_id: None,
401            mtu: constants::MTU as u32,
402            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
403            ia_freq: 0.0,
404            ip_freq: 0.0,
405            op_freq: 0.0,
406            op_samples: 0,
407            started: 0.0,
408        }
409    }
410
411    // --- InterfaceAnnounceQueue tests ---
412
413    #[test]
414    fn test_queue_entry_creation() {
415        let entry = make_entry(0xAA, 3, 1000.0);
416        assert_eq!(entry.hops, 3);
417        assert_eq!(entry.destination_hash, [0xAA; 16]);
418    }
419
420    #[test]
421    fn test_queue_insert_and_select() {
422        let mut queue = InterfaceAnnounceQueue::new();
423        queue.insert(make_entry(0x01, 3, 100.0));
424        queue.insert(make_entry(0x02, 1, 200.0));
425        queue.insert(make_entry(0x03, 2, 150.0));
426
427        // Should select min hops first (0x02 with hops=1)
428        let idx = queue.select_next().unwrap();
429        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
430    }
431
432    #[test]
433    fn test_queue_select_fifo_on_same_hops() {
434        let mut queue = InterfaceAnnounceQueue::new();
435        queue.insert(make_entry(0x01, 2, 200.0)); // newer
436        queue.insert(make_entry(0x02, 2, 100.0)); // older
437
438        // Same hops — should pick oldest (0x02 at time 100)
439        let idx = queue.select_next().unwrap();
440        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
441    }
442
443    #[test]
444    fn test_queue_dedup_update() {
445        let mut queue = InterfaceAnnounceQueue::new();
446        queue.insert(make_entry(0x01, 3, 100.0));
447        assert_eq!(queue.entries.len(), 1);
448
449        // Insert same dest with fewer hops — should update
450        queue.insert(make_entry(0x01, 1, 200.0));
451        assert_eq!(queue.entries.len(), 1);
452        assert_eq!(queue.entries[0].hops, 1);
453
454        // Insert same dest with more hops — should NOT update
455        queue.insert(make_entry(0x01, 5, 300.0));
456        assert_eq!(queue.entries.len(), 1);
457        assert_eq!(queue.entries[0].hops, 1);
458    }
459
460    #[test]
461    fn test_queue_stale_removal() {
462        let mut queue = InterfaceAnnounceQueue::new();
463        queue.insert(make_entry(0x01, 1, 100.0));
464        queue.insert(make_entry(0x02, 2, 200.0));
465
466        // At time 100 + 86400 + 1 = 86501, entry 0x01 should be stale
467        queue.remove_stale(86501.0);
468        assert_eq!(queue.entries.len(), 1);
469        assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
470    }
471
472    #[test]
473    fn test_queue_max_size() {
474        let mut queue = InterfaceAnnounceQueue::new();
475        for i in 0..constants::MAX_QUEUED_ANNOUNCES {
476            queue.insert(AnnounceQueueEntry {
477                destination_hash: {
478                    let mut d = [0u8; 16];
479                    d[0] = (i >> 8) as u8;
480                    d[1] = i as u8;
481                    d
482                },
483                time: i as f64,
484                hops: 1,
485                emitted: i as f64,
486                raw: vec![0x01].into(),
487            });
488        }
489        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
490
491        // Add one more — oldest should be dropped
492        queue.insert(make_entry(0xFF, 1, 99999.0));
493        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
494    }
495
496    #[test]
497    fn test_queue_empty_select() {
498        let queue = InterfaceAnnounceQueue::new();
499        assert!(queue.select_next().is_none());
500    }
501
502    #[test]
503    fn test_bandwidth_allowed() {
504        let mut queue = InterfaceAnnounceQueue::new();
505        assert!(queue.is_allowed(0.0));
506        assert!(queue.is_allowed(100.0));
507
508        queue.announce_allowed_at = 200.0;
509        assert!(!queue.is_allowed(100.0));
510        assert!(!queue.is_allowed(199.9));
511        assert!(queue.is_allowed(200.0));
512        assert!(queue.is_allowed(300.0));
513    }
514
515    #[test]
516    fn test_calculate_next_allowed() {
517        // 100 bytes = 800 bits, bitrate = 1000 bps, cap = 0.02
518        // time_to_send = 800/1000 = 0.8s
519        // delay = 0.8 / 0.02 = 40.0s
520        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, None, 0.02);
521        assert!((next - 1040.0).abs() < 0.001);
522    }
523
524    #[test]
525    fn test_calculate_next_allowed_zero_bitrate() {
526        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, None, 0.02);
527        assert_eq!(next, 1000.0); // no cap
528    }
529
530    #[test]
531    fn test_calculate_next_allowed_uses_lora_airtime() {
532        let profile = AirtimeProfile::Lora {
533            bandwidth: 125_000,
534            spreading_factor: 8,
535            coding_rate: 5,
536            preamble_symbols: 8,
537            explicit_header: true,
538            crc: true,
539        };
540
541        let next =
542            InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, Some(profile), 0.02);
543
544        // 100-byte explicit-header LoRa packet at BW125/SF8/CR4/5:
545        // (8 + ceil((800 - 32 + 28 + 16) / 32) * 5 + 12.25) symbols
546        // * 2.048 ms/symbol = 307.712 ms airtime.
547        assert!((next - 1015.3856).abs() < 0.0001);
548    }
549
550    // --- AnnounceQueues tests ---
551
552    #[test]
553    fn test_gate_announce_no_bitrate_immediate() {
554        let mut queues = AnnounceQueues::new(1024);
555        let result = queues.gate_announce(
556            InterfaceId(1),
557            vec![0x01, 0x02, 0x03].into(),
558            [0xAA; 16],
559            2,
560            1000.0,
561            1000.0,
562            None, // no bitrate
563            None,
564            0.02,
565        );
566        assert!(result.is_some());
567        assert!(matches!(
568            result.unwrap(),
569            TransportAction::SendOnInterface { .. }
570        ));
571    }
572
573    #[test]
574    fn test_gate_announce_uses_airtime_profile_without_bitrate() {
575        let mut queues = AnnounceQueues::new(1024);
576        let profile = AirtimeProfile::Lora {
577            bandwidth: 125_000,
578            spreading_factor: 8,
579            coding_rate: 5,
580            preamble_symbols: 8,
581            explicit_header: true,
582            crc: true,
583        };
584
585        let first = queues.gate_announce(
586            InterfaceId(1),
587            vec![0x01; 100].into(),
588            [0xAA; 16],
589            2,
590            1000.0,
591            1000.0,
592            None,
593            Some(profile),
594            0.02,
595        );
596        assert!(first.is_some());
597
598        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
599        assert!((queue.announce_allowed_at - 1015.3856).abs() < 0.0001);
600
601        let second = queues.gate_announce(
602            InterfaceId(1),
603            vec![0x02; 100].into(),
604            [0xBB; 16],
605            2,
606            1000.0,
607            1000.0,
608            None,
609            Some(profile),
610            0.02,
611        );
612        assert!(second.is_none());
613        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
614    }
615
616    #[test]
617    fn test_gate_announce_bandwidth_available() {
618        let mut queues = AnnounceQueues::new(1024);
619        let result = queues.gate_announce(
620            InterfaceId(1),
621            vec![0x01; 100].into(),
622            [0xBB; 16],
623            2,
624            1000.0,
625            1000.0,
626            Some(10000), // 10 kbps
627            None,
628            0.02,
629        );
630        // First announce should go through
631        assert!(result.is_some());
632
633        // Check that allowed_at was updated
634        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
635        assert!(queue.announce_allowed_at > 1000.0);
636    }
637
638    #[test]
639    fn test_gate_announce_bandwidth_exhausted_queues() {
640        let mut queues = AnnounceQueues::new(1024);
641
642        // First announce goes through
643        let r1 = queues.gate_announce(
644            InterfaceId(1),
645            vec![0x01; 100].into(),
646            [0xAA; 16],
647            2,
648            1000.0,
649            1000.0,
650            Some(1000), // 1 kbps — very slow
651            None,
652            0.02,
653        );
654        assert!(r1.is_some());
655
656        // Second announce at same time should be queued
657        let r2 = queues.gate_announce(
658            InterfaceId(1),
659            vec![0x02; 100].into(),
660            [0xBB; 16],
661            3,
662            1000.0,
663            1000.0,
664            Some(1000),
665            None,
666            0.02,
667        );
668        assert!(r2.is_none()); // queued
669
670        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
671        assert_eq!(queue.entries.len(), 1);
672    }
673
674    #[test]
675    fn test_process_queues_dequeues_when_allowed() {
676        let mut queues = AnnounceQueues::new(1024);
677
678        // Queue an announce by exhausting bandwidth first
679        let _ = queues.gate_announce(
680            InterfaceId(1),
681            vec![0x01; 10].into(),
682            [0xAA; 16],
683            2,
684            0.0,
685            0.0,
686            Some(1000),
687            None,
688            0.02,
689        );
690        let _ = queues.gate_announce(
691            InterfaceId(1),
692            vec![0x02; 10].into(),
693            [0xBB; 16],
694            3,
695            0.0,
696            0.0,
697            Some(1000),
698            None,
699            0.02,
700        );
701
702        // Queue should have one entry
703        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
704
705        let mut interfaces = BTreeMap::new();
706        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
707
708        // Process at a future time when bandwidth is available
709        let allowed_at = queues
710            .queue_for(&InterfaceId(1))
711            .unwrap()
712            .announce_allowed_at;
713        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
714
715        assert_eq!(actions.len(), 1);
716        assert!(matches!(
717            &actions[0],
718            TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
719        ));
720
721        // Queue should be pruned now that it is empty
722        assert!(queues.queue_for(&InterfaceId(1)).is_none());
723    }
724
725    #[test]
726    fn test_local_announce_bypasses_cap() {
727        // hops == 0 means locally-originated, should not be queued
728        // The caller (TransportEngine) is responsible for only calling gate_announce
729        // for hops > 0. We verify the gate_announce works for hops=0 too.
730        let mut queues = AnnounceQueues::new(1024);
731
732        // Exhaust bandwidth
733        let _ = queues.gate_announce(
734            InterfaceId(1),
735            vec![0x01; 100].into(),
736            [0xAA; 16],
737            2,
738            0.0,
739            0.0,
740            Some(1000),
741            None,
742            0.02,
743        );
744
745        // hops=0 should still be queued by gate_announce since hops filtering
746        // is the caller's responsibility. gate_announce is agnostic.
747        let r = queues.gate_announce(
748            InterfaceId(1),
749            vec![0x02; 100].into(),
750            [0xBB; 16],
751            0,
752            0.0,
753            0.0,
754            Some(1000),
755            None,
756            0.02,
757        );
758        assert!(r.is_none()); // queued — caller must bypass for hops==0
759    }
760
761    #[test]
762    fn test_remove_interface_queue() {
763        let mut queues = AnnounceQueues::new(1024);
764        let _ = queues.gate_announce(
765            InterfaceId(1),
766            vec![0x01; 100].into(),
767            [0xAA; 16],
768            2,
769            0.0,
770            0.0,
771            Some(1000),
772            None,
773            0.02,
774        );
775        let _ = queues.gate_announce(
776            InterfaceId(1),
777            vec![0x02; 100].into(),
778            [0xBB; 16],
779            3,
780            0.0,
781            0.0,
782            Some(1000),
783            None,
784            0.02,
785        );
786
787        assert!(queues.queue_for(&InterfaceId(1)).is_some());
788        assert!(queues.remove_interface(InterfaceId(1)));
789        assert!(queues.queue_for(&InterfaceId(1)).is_none());
790        assert!(!queues.remove_interface(InterfaceId(1)));
791    }
792
793    #[test]
794    fn test_process_queues_prunes_empty_queue() {
795        let mut queues = AnnounceQueues::new(1024);
796
797        let _ = queues.gate_announce(
798            InterfaceId(1),
799            vec![0x01; 10].into(),
800            [0xAA; 16],
801            2,
802            0.0,
803            0.0,
804            Some(1000),
805            None,
806            0.02,
807        );
808        let _ = queues.gate_announce(
809            InterfaceId(1),
810            vec![0x02; 10].into(),
811            [0xBB; 16],
812            3,
813            0.0,
814            0.0,
815            Some(1000),
816            None,
817            0.02,
818        );
819
820        let mut interfaces = BTreeMap::new();
821        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
822        let allowed_at = queues
823            .queue_for(&InterfaceId(1))
824            .unwrap()
825            .announce_allowed_at;
826
827        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
828        assert_eq!(actions.len(), 1);
829        assert!(queues.queue_for(&InterfaceId(1)).is_none());
830        assert_eq!(queues.queue_count(), 0);
831    }
832
833    #[test]
834    fn test_process_queues_keeps_nonempty_queue() {
835        let mut queues = AnnounceQueues::new(1024);
836        let _ = queues.gate_announce(
837            InterfaceId(1),
838            vec![0x01; 100].into(),
839            [0xAA; 16],
840            2,
841            0.0,
842            0.0,
843            Some(1000),
844            None,
845            0.02,
846        );
847        let _ = queues.gate_announce(
848            InterfaceId(1),
849            vec![0x02; 100].into(),
850            [0xBB; 16],
851            3,
852            0.0,
853            0.0,
854            Some(1000),
855            None,
856            0.02,
857        );
858        let _ = queues.gate_announce(
859            InterfaceId(1),
860            vec![0x03; 100].into(),
861            [0xCC; 16],
862            4,
863            0.0,
864            0.0,
865            Some(1000),
866            None,
867            0.02,
868        );
869
870        let mut interfaces = BTreeMap::new();
871        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
872        let allowed_at = queues
873            .queue_for(&InterfaceId(1))
874            .unwrap()
875            .announce_allowed_at;
876
877        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
878        assert_eq!(actions.len(), 1);
879        assert!(queues.queue_for(&InterfaceId(1)).is_some());
880        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
881    }
882
883    #[test]
884    fn test_gate_announce_refuses_new_interface_when_at_capacity() {
885        let mut queues = AnnounceQueues::new(1);
886
887        let _ = queues.gate_announce(
888            InterfaceId(1),
889            vec![0x01; 100].into(),
890            [0xAA; 16],
891            2,
892            0.0,
893            0.0,
894            Some(1000),
895            None,
896            0.02,
897        );
898        let second = queues.gate_announce(
899            InterfaceId(1),
900            vec![0x02; 100].into(),
901            [0xBB; 16],
902            3,
903            0.0,
904            0.0,
905            Some(1000),
906            None,
907            0.02,
908        );
909        assert!(second.is_none());
910        assert_eq!(queues.queue_count(), 1);
911
912        let rejected = queues.gate_announce(
913            InterfaceId(2),
914            vec![0x03; 100].into(),
915            [0xCC; 16],
916            4,
917            0.0,
918            0.0,
919            Some(1000),
920            None,
921            0.02,
922        );
923        assert!(rejected.is_none());
924        assert_eq!(queues.queue_count(), 1);
925        assert!(queues.queue_for(&InterfaceId(2)).is_none());
926        assert_eq!(queues.interface_cap_drop_count(), 1);
927    }
928
929    #[test]
930    fn test_gate_announce_allows_existing_queue_when_at_capacity() {
931        let mut queues = AnnounceQueues::new(1);
932
933        let _ = queues.gate_announce(
934            InterfaceId(1),
935            vec![0x01; 100].into(),
936            [0xAA; 16],
937            2,
938            0.0,
939            0.0,
940            Some(1000),
941            None,
942            0.02,
943        );
944        let queued = queues.gate_announce(
945            InterfaceId(1),
946            vec![0x02; 100].into(),
947            [0xBB; 16],
948            3,
949            0.0,
950            0.0,
951            Some(1000),
952            None,
953            0.02,
954        );
955        assert!(queued.is_none());
956        assert_eq!(queues.queue_count(), 1);
957        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
958        assert_eq!(queues.interface_cap_drop_count(), 0);
959    }
960}