Skip to main content

rns_core/transport/
announce_queue.rs

1//! Per-interface announce bandwidth queuing.
2//!
3//! Announces with hops > 0 (propagation, not locally-originated) are gated
4//! by a per-interface bandwidth cap (default 2%). When bandwidth is exhausted,
5//! announces are queued and released when bandwidth becomes available.
6//!
7//! Python reference: Transport.py:1085-1165, Interface.py:246-286
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{AirtimeProfile, InterfaceId, PacketBytes, TransportAction};
13use crate::constants;
14
15/// A queued announce entry waiting for bandwidth availability.
16#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18    /// Destination hash of the announce.
19    pub destination_hash: [u8; 16],
20    /// Time the announce was queued.
21    pub time: f64,
22    /// Hops from the announce.
23    pub hops: u8,
24    /// Time the announce was originally emitted (from random blob).
25    pub emitted: f64,
26    /// Raw announce bytes (ready to send).
27    pub raw: PacketBytes,
28}
29
30/// Per-interface announce queue with bandwidth tracking.
31#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33    /// Queued announce entries.
34    pub entries: Vec<AnnounceQueueEntry>,
35    /// Earliest time another announce is allowed on this interface.
36    pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40    pub fn new() -> Self {
41        InterfaceAnnounceQueue {
42            entries: Vec::new(),
43            announce_allowed_at: 0.0,
44        }
45    }
46
47    /// Insert an announce into the queue.
48    /// If an entry for the same destination already exists, update it if the new one
49    /// has fewer hops or is newer.
50    pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51        // Check for existing entry with same destination
52        if let Some(pos) = self
53            .entries
54            .iter()
55            .position(|e| e.destination_hash == entry.destination_hash)
56        {
57            let existing = &self.entries[pos];
58            // Update if new entry has fewer hops, or same hops and newer
59            if entry.hops < existing.hops
60                || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61            {
62                self.entries[pos] = entry;
63            }
64            // Otherwise discard the new entry
65        } else {
66            // Enforce max queue size
67            if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68                // Drop oldest entry
69                self.entries.remove(0);
70            }
71            self.entries.push(entry);
72        }
73    }
74
75    /// Remove stale entries (older than QUEUED_ANNOUNCE_LIFE).
76    pub fn remove_stale(&mut self, now: f64) {
77        self.entries
78            .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79    }
80
81    /// Select the next announce to send: minimum hops, then oldest (FIFO).
82    /// Returns the index of the selected entry, or None if empty.
83    pub fn select_next(&self) -> Option<usize> {
84        if self.entries.is_empty() {
85            return None;
86        }
87        let mut best_idx = 0;
88        let mut best_hops = self.entries[0].hops;
89        let mut best_time = self.entries[0].time;
90
91        for (i, entry) in self.entries.iter().enumerate().skip(1) {
92            if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93                best_idx = i;
94                best_hops = entry.hops;
95                best_time = entry.time;
96            }
97        }
98        Some(best_idx)
99    }
100
101    /// Check if an announce is allowed now based on bandwidth.
102    pub fn is_allowed(&self, now: f64) -> bool {
103        now >= self.announce_allowed_at
104    }
105
106    /// Calculate the next allowed time after sending an announce.
107    /// `raw_len`: size of the announce in bytes
108    /// `bitrate`: interface bitrate in bits/second
109    /// `announce_cap`: fraction of bitrate reserved for announces
110    pub fn calculate_next_allowed(
111        now: f64,
112        raw_len: usize,
113        bitrate: u64,
114        airtime_profile: Option<AirtimeProfile>,
115        announce_cap: f64,
116    ) -> f64 {
117        if announce_cap <= 0.0 {
118            return now; // no cap
119        }
120
121        let time_to_send = airtime_profile
122            .map(|profile| profile.transmit_time_secs(raw_len))
123            .unwrap_or_else(|| {
124                if bitrate == 0 {
125                    0.0
126                } else {
127                    let bits = (raw_len * 8) as f64;
128                    bits / (bitrate as f64)
129                }
130            });
131        if time_to_send <= 0.0 {
132            return now;
133        }
134        let delay = time_to_send / announce_cap;
135        now + delay
136    }
137}
138
139impl Default for InterfaceAnnounceQueue {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145/// Manage announce queues for all interfaces.
146#[derive(Debug, Clone)]
147pub struct AnnounceQueues {
148    queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
149    max_interfaces: usize,
150    interface_cap_drops: u64,
151}
152
153impl AnnounceQueues {
154    pub fn new(max_interfaces: usize) -> Self {
155        AnnounceQueues {
156            queues: BTreeMap::new(),
157            max_interfaces,
158            interface_cap_drops: 0,
159        }
160    }
161
162    /// Try to send an announce on an interface. If bandwidth is available,
163    /// returns the action immediately. Otherwise, queues it.
164    ///
165    /// Returns Some(action) if the announce should be sent now, None if queued.
166    #[allow(clippy::too_many_arguments)]
167    pub fn gate_announce(
168        &mut self,
169        interface: InterfaceId,
170        raw: PacketBytes,
171        dest_hash: [u8; 16],
172        hops: u8,
173        emitted: f64,
174        now: f64,
175        bitrate: Option<u64>,
176        airtime_profile: Option<AirtimeProfile>,
177        announce_cap: f64,
178    ) -> Option<TransportAction> {
179        // If no timing model is available, no cap applies — send immediately
180        let bitrate = match bitrate {
181            Some(br) if br > 0 => br,
182            _ if airtime_profile.is_none() => {
183                return Some(TransportAction::SendOnInterface { interface, raw });
184            }
185            _ => 0,
186        };
187
188        if !self.queues.contains_key(&interface) && self.queues.len() >= self.max_interfaces {
189            self.interface_cap_drops = self.interface_cap_drops.saturating_add(1);
190            return None;
191        }
192
193        let queue = self.queues.entry(interface).or_default();
194
195        if queue.is_allowed(now) {
196            // Bandwidth available — send now and update allowed_at
197            queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
198                now,
199                raw.len(),
200                bitrate,
201                airtime_profile,
202                announce_cap,
203            );
204            Some(TransportAction::SendOnInterface { interface, raw })
205        } else {
206            // Queue the announce
207            queue.insert(AnnounceQueueEntry {
208                destination_hash: dest_hash,
209                time: now,
210                hops,
211                emitted,
212                raw,
213            });
214            None
215        }
216    }
217
218    /// Process all announce queues: dequeue and send when bandwidth is available.
219    /// Called from tick().
220    pub fn process_queues(
221        &mut self,
222        now: f64,
223        interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
224    ) -> Vec<TransportAction> {
225        let mut actions = Vec::new();
226        let mut empty_queues = Vec::new();
227
228        for (iface_id, queue) in self.queues.iter_mut() {
229            // Remove stale entries
230            queue.remove_stale(now);
231
232            // Process as many announces as bandwidth allows
233            while queue.is_allowed(now) {
234                if let Some(idx) = queue.select_next() {
235                    let entry = queue.entries.remove(idx);
236
237                    // Look up bitrate for this interface
238                    let (bitrate, airtime_profile, announce_cap) =
239                        if let Some(info) = interfaces.get(iface_id) {
240                            (
241                                info.bitrate.unwrap_or(0),
242                                info.airtime_profile,
243                                info.announce_cap,
244                            )
245                        } else {
246                            (0, None, constants::ANNOUNCE_CAP)
247                        };
248
249                    if bitrate > 0 || airtime_profile.is_some() {
250                        queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
251                            now,
252                            entry.raw.len(),
253                            bitrate,
254                            airtime_profile,
255                            announce_cap,
256                        );
257                    } else {
258                        queue.announce_allowed_at = now;
259                    }
260
261                    actions.push(TransportAction::SendOnInterface {
262                        interface: *iface_id,
263                        raw: entry.raw,
264                    });
265                } else {
266                    break;
267                }
268            }
269
270            if queue.entries.is_empty() {
271                empty_queues.push(*iface_id);
272            }
273        }
274
275        for iface_id in empty_queues {
276            self.queues.remove(&iface_id);
277        }
278
279        actions
280    }
281
282    /// Remove all announce queue state for an interface.
283    pub fn remove_interface(&mut self, interface: InterfaceId) -> bool {
284        self.queues.remove(&interface).is_some()
285    }
286
287    /// Number of interface queues currently tracked.
288    pub fn queue_count(&self) -> usize {
289        self.queues.len()
290    }
291
292    /// Number of interface queues that currently hold buffered announces.
293    pub fn nonempty_queue_count(&self) -> usize {
294        self.queues
295            .values()
296            .filter(|queue| !queue.entries.is_empty())
297            .count()
298    }
299
300    /// Total number of buffered announce entries across all interfaces.
301    pub fn total_queued_announces(&self) -> usize {
302        self.queues.values().map(|queue| queue.entries.len()).sum()
303    }
304
305    /// Total retained raw-byte payload across all buffered announces.
306    pub fn total_queued_bytes(&self) -> usize {
307        self.queues
308            .values()
309            .flat_map(|queue| queue.entries.iter())
310            .map(|entry| entry.raw.len())
311            .sum()
312    }
313
314    /// Number of announces dropped because the interface queue cap was reached.
315    pub fn interface_cap_drop_count(&self) -> u64 {
316        self.interface_cap_drops
317    }
318
319    /// Get the queue for a specific interface (for testing).
320    #[cfg(test)]
321    pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
322        self.queues.get(id)
323    }
324}
325
326impl Default for AnnounceQueues {
327    fn default() -> Self {
328        Self::new(1024)
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use alloc::string::String;
336
337    fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
338        AnnounceQueueEntry {
339            destination_hash: [dest; 16],
340            time,
341            hops,
342            emitted: time,
343            raw: vec![0x01, 0x02, 0x03].into(),
344        }
345    }
346
347    fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
348        super::super::types::InterfaceInfo {
349            id: InterfaceId(id),
350            name: String::from("test"),
351            mode: crate::constants::MODE_FULL,
352            out_capable: true,
353            in_capable: true,
354            bitrate,
355            airtime_profile: None,
356            announce_rate_target: None,
357            announce_rate_grace: 0,
358            announce_rate_penalty: 0.0,
359            announce_cap: constants::ANNOUNCE_CAP,
360            is_local_client: false,
361            wants_tunnel: false,
362            tunnel_id: None,
363            mtu: constants::MTU as u32,
364            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
365            ia_freq: 0.0,
366            started: 0.0,
367        }
368    }
369
370    // --- InterfaceAnnounceQueue tests ---
371
372    #[test]
373    fn test_queue_entry_creation() {
374        let entry = make_entry(0xAA, 3, 1000.0);
375        assert_eq!(entry.hops, 3);
376        assert_eq!(entry.destination_hash, [0xAA; 16]);
377    }
378
379    #[test]
380    fn test_queue_insert_and_select() {
381        let mut queue = InterfaceAnnounceQueue::new();
382        queue.insert(make_entry(0x01, 3, 100.0));
383        queue.insert(make_entry(0x02, 1, 200.0));
384        queue.insert(make_entry(0x03, 2, 150.0));
385
386        // Should select min hops first (0x02 with hops=1)
387        let idx = queue.select_next().unwrap();
388        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
389    }
390
391    #[test]
392    fn test_queue_select_fifo_on_same_hops() {
393        let mut queue = InterfaceAnnounceQueue::new();
394        queue.insert(make_entry(0x01, 2, 200.0)); // newer
395        queue.insert(make_entry(0x02, 2, 100.0)); // older
396
397        // Same hops — should pick oldest (0x02 at time 100)
398        let idx = queue.select_next().unwrap();
399        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
400    }
401
402    #[test]
403    fn test_queue_dedup_update() {
404        let mut queue = InterfaceAnnounceQueue::new();
405        queue.insert(make_entry(0x01, 3, 100.0));
406        assert_eq!(queue.entries.len(), 1);
407
408        // Insert same dest with fewer hops — should update
409        queue.insert(make_entry(0x01, 1, 200.0));
410        assert_eq!(queue.entries.len(), 1);
411        assert_eq!(queue.entries[0].hops, 1);
412
413        // Insert same dest with more hops — should NOT update
414        queue.insert(make_entry(0x01, 5, 300.0));
415        assert_eq!(queue.entries.len(), 1);
416        assert_eq!(queue.entries[0].hops, 1);
417    }
418
419    #[test]
420    fn test_queue_stale_removal() {
421        let mut queue = InterfaceAnnounceQueue::new();
422        queue.insert(make_entry(0x01, 1, 100.0));
423        queue.insert(make_entry(0x02, 2, 200.0));
424
425        // At time 100 + 86400 + 1 = 86501, entry 0x01 should be stale
426        queue.remove_stale(86501.0);
427        assert_eq!(queue.entries.len(), 1);
428        assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
429    }
430
431    #[test]
432    fn test_queue_max_size() {
433        let mut queue = InterfaceAnnounceQueue::new();
434        for i in 0..constants::MAX_QUEUED_ANNOUNCES {
435            queue.insert(AnnounceQueueEntry {
436                destination_hash: {
437                    let mut d = [0u8; 16];
438                    d[0] = (i >> 8) as u8;
439                    d[1] = i as u8;
440                    d
441                },
442                time: i as f64,
443                hops: 1,
444                emitted: i as f64,
445                raw: vec![0x01].into(),
446            });
447        }
448        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
449
450        // Add one more — oldest should be dropped
451        queue.insert(make_entry(0xFF, 1, 99999.0));
452        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
453    }
454
455    #[test]
456    fn test_queue_empty_select() {
457        let queue = InterfaceAnnounceQueue::new();
458        assert!(queue.select_next().is_none());
459    }
460
461    #[test]
462    fn test_bandwidth_allowed() {
463        let mut queue = InterfaceAnnounceQueue::new();
464        assert!(queue.is_allowed(0.0));
465        assert!(queue.is_allowed(100.0));
466
467        queue.announce_allowed_at = 200.0;
468        assert!(!queue.is_allowed(100.0));
469        assert!(!queue.is_allowed(199.9));
470        assert!(queue.is_allowed(200.0));
471        assert!(queue.is_allowed(300.0));
472    }
473
474    #[test]
475    fn test_calculate_next_allowed() {
476        // 100 bytes = 800 bits, bitrate = 1000 bps, cap = 0.02
477        // time_to_send = 800/1000 = 0.8s
478        // delay = 0.8 / 0.02 = 40.0s
479        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, None, 0.02);
480        assert!((next - 1040.0).abs() < 0.001);
481    }
482
483    #[test]
484    fn test_calculate_next_allowed_zero_bitrate() {
485        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, None, 0.02);
486        assert_eq!(next, 1000.0); // no cap
487    }
488
489    #[test]
490    fn test_calculate_next_allowed_uses_lora_airtime() {
491        let profile = AirtimeProfile::Lora {
492            bandwidth: 125_000,
493            spreading_factor: 8,
494            coding_rate: 5,
495            preamble_symbols: 8,
496            explicit_header: true,
497            crc: true,
498        };
499
500        let next =
501            InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, Some(profile), 0.02);
502
503        // 100-byte explicit-header LoRa packet at BW125/SF8/CR4/5:
504        // (8 + ceil((800 - 32 + 28 + 16) / 32) * 5 + 12.25) symbols
505        // * 2.048 ms/symbol = 307.712 ms airtime.
506        assert!((next - 1015.3856).abs() < 0.0001);
507    }
508
509    // --- AnnounceQueues tests ---
510
511    #[test]
512    fn test_gate_announce_no_bitrate_immediate() {
513        let mut queues = AnnounceQueues::new(1024);
514        let result = queues.gate_announce(
515            InterfaceId(1),
516            vec![0x01, 0x02, 0x03].into(),
517            [0xAA; 16],
518            2,
519            1000.0,
520            1000.0,
521            None, // no bitrate
522            None,
523            0.02,
524        );
525        assert!(result.is_some());
526        assert!(matches!(
527            result.unwrap(),
528            TransportAction::SendOnInterface { .. }
529        ));
530    }
531
532    #[test]
533    fn test_gate_announce_uses_airtime_profile_without_bitrate() {
534        let mut queues = AnnounceQueues::new(1024);
535        let profile = AirtimeProfile::Lora {
536            bandwidth: 125_000,
537            spreading_factor: 8,
538            coding_rate: 5,
539            preamble_symbols: 8,
540            explicit_header: true,
541            crc: true,
542        };
543
544        let first = queues.gate_announce(
545            InterfaceId(1),
546            vec![0x01; 100].into(),
547            [0xAA; 16],
548            2,
549            1000.0,
550            1000.0,
551            None,
552            Some(profile),
553            0.02,
554        );
555        assert!(first.is_some());
556
557        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
558        assert!((queue.announce_allowed_at - 1015.3856).abs() < 0.0001);
559
560        let second = queues.gate_announce(
561            InterfaceId(1),
562            vec![0x02; 100].into(),
563            [0xBB; 16],
564            2,
565            1000.0,
566            1000.0,
567            None,
568            Some(profile),
569            0.02,
570        );
571        assert!(second.is_none());
572        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
573    }
574
575    #[test]
576    fn test_gate_announce_bandwidth_available() {
577        let mut queues = AnnounceQueues::new(1024);
578        let result = queues.gate_announce(
579            InterfaceId(1),
580            vec![0x01; 100].into(),
581            [0xBB; 16],
582            2,
583            1000.0,
584            1000.0,
585            Some(10000), // 10 kbps
586            None,
587            0.02,
588        );
589        // First announce should go through
590        assert!(result.is_some());
591
592        // Check that allowed_at was updated
593        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
594        assert!(queue.announce_allowed_at > 1000.0);
595    }
596
597    #[test]
598    fn test_gate_announce_bandwidth_exhausted_queues() {
599        let mut queues = AnnounceQueues::new(1024);
600
601        // First announce goes through
602        let r1 = queues.gate_announce(
603            InterfaceId(1),
604            vec![0x01; 100].into(),
605            [0xAA; 16],
606            2,
607            1000.0,
608            1000.0,
609            Some(1000), // 1 kbps — very slow
610            None,
611            0.02,
612        );
613        assert!(r1.is_some());
614
615        // Second announce at same time should be queued
616        let r2 = queues.gate_announce(
617            InterfaceId(1),
618            vec![0x02; 100].into(),
619            [0xBB; 16],
620            3,
621            1000.0,
622            1000.0,
623            Some(1000),
624            None,
625            0.02,
626        );
627        assert!(r2.is_none()); // queued
628
629        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
630        assert_eq!(queue.entries.len(), 1);
631    }
632
633    #[test]
634    fn test_process_queues_dequeues_when_allowed() {
635        let mut queues = AnnounceQueues::new(1024);
636
637        // Queue an announce by exhausting bandwidth first
638        let _ = queues.gate_announce(
639            InterfaceId(1),
640            vec![0x01; 10].into(),
641            [0xAA; 16],
642            2,
643            0.0,
644            0.0,
645            Some(1000),
646            None,
647            0.02,
648        );
649        let _ = queues.gate_announce(
650            InterfaceId(1),
651            vec![0x02; 10].into(),
652            [0xBB; 16],
653            3,
654            0.0,
655            0.0,
656            Some(1000),
657            None,
658            0.02,
659        );
660
661        // Queue should have one entry
662        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
663
664        let mut interfaces = BTreeMap::new();
665        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
666
667        // Process at a future time when bandwidth is available
668        let allowed_at = queues
669            .queue_for(&InterfaceId(1))
670            .unwrap()
671            .announce_allowed_at;
672        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
673
674        assert_eq!(actions.len(), 1);
675        assert!(matches!(
676            &actions[0],
677            TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
678        ));
679
680        // Queue should be pruned now that it is empty
681        assert!(queues.queue_for(&InterfaceId(1)).is_none());
682    }
683
684    #[test]
685    fn test_local_announce_bypasses_cap() {
686        // hops == 0 means locally-originated, should not be queued
687        // The caller (TransportEngine) is responsible for only calling gate_announce
688        // for hops > 0. We verify the gate_announce works for hops=0 too.
689        let mut queues = AnnounceQueues::new(1024);
690
691        // Exhaust bandwidth
692        let _ = queues.gate_announce(
693            InterfaceId(1),
694            vec![0x01; 100].into(),
695            [0xAA; 16],
696            2,
697            0.0,
698            0.0,
699            Some(1000),
700            None,
701            0.02,
702        );
703
704        // hops=0 should still be queued by gate_announce since hops filtering
705        // is the caller's responsibility. gate_announce is agnostic.
706        let r = queues.gate_announce(
707            InterfaceId(1),
708            vec![0x02; 100].into(),
709            [0xBB; 16],
710            0,
711            0.0,
712            0.0,
713            Some(1000),
714            None,
715            0.02,
716        );
717        assert!(r.is_none()); // queued — caller must bypass for hops==0
718    }
719
720    #[test]
721    fn test_remove_interface_queue() {
722        let mut queues = AnnounceQueues::new(1024);
723        let _ = queues.gate_announce(
724            InterfaceId(1),
725            vec![0x01; 100].into(),
726            [0xAA; 16],
727            2,
728            0.0,
729            0.0,
730            Some(1000),
731            None,
732            0.02,
733        );
734        let _ = queues.gate_announce(
735            InterfaceId(1),
736            vec![0x02; 100].into(),
737            [0xBB; 16],
738            3,
739            0.0,
740            0.0,
741            Some(1000),
742            None,
743            0.02,
744        );
745
746        assert!(queues.queue_for(&InterfaceId(1)).is_some());
747        assert!(queues.remove_interface(InterfaceId(1)));
748        assert!(queues.queue_for(&InterfaceId(1)).is_none());
749        assert!(!queues.remove_interface(InterfaceId(1)));
750    }
751
752    #[test]
753    fn test_process_queues_prunes_empty_queue() {
754        let mut queues = AnnounceQueues::new(1024);
755
756        let _ = queues.gate_announce(
757            InterfaceId(1),
758            vec![0x01; 10].into(),
759            [0xAA; 16],
760            2,
761            0.0,
762            0.0,
763            Some(1000),
764            None,
765            0.02,
766        );
767        let _ = queues.gate_announce(
768            InterfaceId(1),
769            vec![0x02; 10].into(),
770            [0xBB; 16],
771            3,
772            0.0,
773            0.0,
774            Some(1000),
775            None,
776            0.02,
777        );
778
779        let mut interfaces = BTreeMap::new();
780        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
781        let allowed_at = queues
782            .queue_for(&InterfaceId(1))
783            .unwrap()
784            .announce_allowed_at;
785
786        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
787        assert_eq!(actions.len(), 1);
788        assert!(queues.queue_for(&InterfaceId(1)).is_none());
789        assert_eq!(queues.queue_count(), 0);
790    }
791
792    #[test]
793    fn test_process_queues_keeps_nonempty_queue() {
794        let mut queues = AnnounceQueues::new(1024);
795        let _ = queues.gate_announce(
796            InterfaceId(1),
797            vec![0x01; 100].into(),
798            [0xAA; 16],
799            2,
800            0.0,
801            0.0,
802            Some(1000),
803            None,
804            0.02,
805        );
806        let _ = queues.gate_announce(
807            InterfaceId(1),
808            vec![0x02; 100].into(),
809            [0xBB; 16],
810            3,
811            0.0,
812            0.0,
813            Some(1000),
814            None,
815            0.02,
816        );
817        let _ = queues.gate_announce(
818            InterfaceId(1),
819            vec![0x03; 100].into(),
820            [0xCC; 16],
821            4,
822            0.0,
823            0.0,
824            Some(1000),
825            None,
826            0.02,
827        );
828
829        let mut interfaces = BTreeMap::new();
830        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
831        let allowed_at = queues
832            .queue_for(&InterfaceId(1))
833            .unwrap()
834            .announce_allowed_at;
835
836        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
837        assert_eq!(actions.len(), 1);
838        assert!(queues.queue_for(&InterfaceId(1)).is_some());
839        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
840    }
841
842    #[test]
843    fn test_gate_announce_refuses_new_interface_when_at_capacity() {
844        let mut queues = AnnounceQueues::new(1);
845
846        let _ = queues.gate_announce(
847            InterfaceId(1),
848            vec![0x01; 100].into(),
849            [0xAA; 16],
850            2,
851            0.0,
852            0.0,
853            Some(1000),
854            None,
855            0.02,
856        );
857        let second = queues.gate_announce(
858            InterfaceId(1),
859            vec![0x02; 100].into(),
860            [0xBB; 16],
861            3,
862            0.0,
863            0.0,
864            Some(1000),
865            None,
866            0.02,
867        );
868        assert!(second.is_none());
869        assert_eq!(queues.queue_count(), 1);
870
871        let rejected = queues.gate_announce(
872            InterfaceId(2),
873            vec![0x03; 100].into(),
874            [0xCC; 16],
875            4,
876            0.0,
877            0.0,
878            Some(1000),
879            None,
880            0.02,
881        );
882        assert!(rejected.is_none());
883        assert_eq!(queues.queue_count(), 1);
884        assert!(queues.queue_for(&InterfaceId(2)).is_none());
885        assert_eq!(queues.interface_cap_drop_count(), 1);
886    }
887
888    #[test]
889    fn test_gate_announce_allows_existing_queue_when_at_capacity() {
890        let mut queues = AnnounceQueues::new(1);
891
892        let _ = queues.gate_announce(
893            InterfaceId(1),
894            vec![0x01; 100].into(),
895            [0xAA; 16],
896            2,
897            0.0,
898            0.0,
899            Some(1000),
900            None,
901            0.02,
902        );
903        let queued = queues.gate_announce(
904            InterfaceId(1),
905            vec![0x02; 100].into(),
906            [0xBB; 16],
907            3,
908            0.0,
909            0.0,
910            Some(1000),
911            None,
912            0.02,
913        );
914        assert!(queued.is_none());
915        assert_eq!(queues.queue_count(), 1);
916        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
917        assert_eq!(queues.interface_cap_drop_count(), 0);
918    }
919}