Skip to main content

rns_core/transport/
announce_queue.rs

1//! Per-interface announce bandwidth queuing.
2//!
3//! Announces with hops > 0 (propagation, not locally-originated) are gated
4//! by a per-interface bandwidth cap (default 2%). When bandwidth is exhausted,
5//! announces are queued and released when bandwidth becomes available.
6//!
7//! Python reference: Transport.py:1085-1165, Interface.py:246-286
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15/// A queued announce entry waiting for bandwidth availability.
16#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18    /// Destination hash of the announce.
19    pub destination_hash: [u8; 16],
20    /// Time the announce was queued.
21    pub time: f64,
22    /// Hops from the announce.
23    pub hops: u8,
24    /// Time the announce was originally emitted (from random blob).
25    pub emitted: f64,
26    /// Raw announce bytes (ready to send).
27    pub raw: Vec<u8>,
28}
29
30/// Per-interface announce queue with bandwidth tracking.
31#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33    /// Queued announce entries.
34    pub entries: Vec<AnnounceQueueEntry>,
35    /// Earliest time another announce is allowed on this interface.
36    pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40    pub fn new() -> Self {
41        InterfaceAnnounceQueue {
42            entries: Vec::new(),
43            announce_allowed_at: 0.0,
44        }
45    }
46
47    /// Insert an announce into the queue.
48    /// If an entry for the same destination already exists, update it if the new one
49    /// has fewer hops or is newer.
50    pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51        // Check for existing entry with same destination
52        if let Some(pos) = self
53            .entries
54            .iter()
55            .position(|e| e.destination_hash == entry.destination_hash)
56        {
57            let existing = &self.entries[pos];
58            // Update if new entry has fewer hops, or same hops and newer
59            if entry.hops < existing.hops
60                || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61            {
62                self.entries[pos] = entry;
63            }
64            // Otherwise discard the new entry
65        } else {
66            // Enforce max queue size
67            if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68                // Drop oldest entry
69                self.entries.remove(0);
70            }
71            self.entries.push(entry);
72        }
73    }
74
75    /// Remove stale entries (older than QUEUED_ANNOUNCE_LIFE).
76    pub fn remove_stale(&mut self, now: f64) {
77        self.entries
78            .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79    }
80
81    /// Select the next announce to send: minimum hops, then oldest (FIFO).
82    /// Returns the index of the selected entry, or None if empty.
83    pub fn select_next(&self) -> Option<usize> {
84        if self.entries.is_empty() {
85            return None;
86        }
87        let mut best_idx = 0;
88        let mut best_hops = self.entries[0].hops;
89        let mut best_time = self.entries[0].time;
90
91        for (i, entry) in self.entries.iter().enumerate().skip(1) {
92            if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93                best_idx = i;
94                best_hops = entry.hops;
95                best_time = entry.time;
96            }
97        }
98        Some(best_idx)
99    }
100
101    /// Check if an announce is allowed now based on bandwidth.
102    pub fn is_allowed(&self, now: f64) -> bool {
103        now >= self.announce_allowed_at
104    }
105
106    /// Calculate the next allowed time after sending an announce.
107    /// `raw_len`: size of the announce in bytes
108    /// `bitrate`: interface bitrate in bits/second
109    /// `announce_cap`: fraction of bitrate reserved for announces
110    pub fn calculate_next_allowed(
111        now: f64,
112        raw_len: usize,
113        bitrate: u64,
114        announce_cap: f64,
115    ) -> f64 {
116        if bitrate == 0 || announce_cap <= 0.0 {
117            return now; // no cap
118        }
119        let bits = (raw_len * 8) as f64;
120        let time_to_send = bits / (bitrate as f64);
121        let delay = time_to_send / announce_cap;
122        now + delay
123    }
124}
125
126impl Default for InterfaceAnnounceQueue {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132/// Manage announce queues for all interfaces.
133#[derive(Debug, Clone)]
134pub struct AnnounceQueues {
135    queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
136}
137
138impl AnnounceQueues {
139    pub fn new() -> Self {
140        AnnounceQueues {
141            queues: BTreeMap::new(),
142        }
143    }
144
145    /// Try to send an announce on an interface. If bandwidth is available,
146    /// returns the action immediately. Otherwise, queues it.
147    ///
148    /// Returns Some(action) if the announce should be sent now, None if queued.
149    #[allow(clippy::too_many_arguments)]
150    pub fn gate_announce(
151        &mut self,
152        interface: InterfaceId,
153        raw: Vec<u8>,
154        dest_hash: [u8; 16],
155        hops: u8,
156        emitted: f64,
157        now: f64,
158        bitrate: Option<u64>,
159        announce_cap: f64,
160    ) -> Option<TransportAction> {
161        let queue = self
162            .queues
163            .entry(interface)
164            .or_default();
165
166        // If no bitrate, no cap applies — send immediately
167        let bitrate = match bitrate {
168            Some(br) if br > 0 => br,
169            _ => {
170                return Some(TransportAction::SendOnInterface { interface, raw });
171            }
172        };
173
174        if queue.is_allowed(now) {
175            // Bandwidth available — send now and update allowed_at
176            queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
177                now,
178                raw.len(),
179                bitrate,
180                announce_cap,
181            );
182            Some(TransportAction::SendOnInterface { interface, raw })
183        } else {
184            // Queue the announce
185            queue.insert(AnnounceQueueEntry {
186                destination_hash: dest_hash,
187                time: now,
188                hops,
189                emitted,
190                raw,
191            });
192            None
193        }
194    }
195
196    /// Process all announce queues: dequeue and send when bandwidth is available.
197    /// Called from tick().
198    pub fn process_queues(
199        &mut self,
200        now: f64,
201        interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
202    ) -> Vec<TransportAction> {
203        let mut actions = Vec::new();
204
205        for (iface_id, queue) in self.queues.iter_mut() {
206            // Remove stale entries
207            queue.remove_stale(now);
208
209            // Process as many announces as bandwidth allows
210            while queue.is_allowed(now) {
211                if let Some(idx) = queue.select_next() {
212                    let entry = queue.entries.remove(idx);
213
214                    // Look up bitrate for this interface
215                    let (bitrate, announce_cap) = if let Some(info) = interfaces.get(iface_id) {
216                        (info.bitrate.unwrap_or(0), info.announce_cap)
217                    } else {
218                        (0, constants::ANNOUNCE_CAP)
219                    };
220
221                    if bitrate > 0 {
222                        queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
223                            now,
224                            entry.raw.len(),
225                            bitrate,
226                            announce_cap,
227                        );
228                    }
229
230                    actions.push(TransportAction::SendOnInterface {
231                        interface: *iface_id,
232                        raw: entry.raw,
233                    });
234                } else {
235                    break;
236                }
237            }
238        }
239
240        actions
241    }
242
243    /// Get the queue for a specific interface (for testing).
244    #[cfg(test)]
245    pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
246        self.queues.get(id)
247    }
248}
249
250impl Default for AnnounceQueues {
251    fn default() -> Self {
252        Self::new()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use alloc::string::String;
260
261    fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
262        AnnounceQueueEntry {
263            destination_hash: [dest; 16],
264            time,
265            hops,
266            emitted: time,
267            raw: vec![0x01, 0x02, 0x03],
268        }
269    }
270
271    fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
272        super::super::types::InterfaceInfo {
273            id: InterfaceId(id),
274            name: String::from("test"),
275            mode: crate::constants::MODE_FULL,
276            out_capable: true,
277            in_capable: true,
278            bitrate,
279            announce_rate_target: None,
280            announce_rate_grace: 0,
281            announce_rate_penalty: 0.0,
282            announce_cap: constants::ANNOUNCE_CAP,
283            is_local_client: false,
284            wants_tunnel: false,
285            tunnel_id: None,
286            mtu: constants::MTU as u32,
287            ingress_control: false,
288            ia_freq: 0.0,
289            started: 0.0,
290        }
291    }
292
293    // --- InterfaceAnnounceQueue tests ---
294
295    #[test]
296    fn test_queue_entry_creation() {
297        let entry = make_entry(0xAA, 3, 1000.0);
298        assert_eq!(entry.hops, 3);
299        assert_eq!(entry.destination_hash, [0xAA; 16]);
300    }
301
302    #[test]
303    fn test_queue_insert_and_select() {
304        let mut queue = InterfaceAnnounceQueue::new();
305        queue.insert(make_entry(0x01, 3, 100.0));
306        queue.insert(make_entry(0x02, 1, 200.0));
307        queue.insert(make_entry(0x03, 2, 150.0));
308
309        // Should select min hops first (0x02 with hops=1)
310        let idx = queue.select_next().unwrap();
311        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
312    }
313
314    #[test]
315    fn test_queue_select_fifo_on_same_hops() {
316        let mut queue = InterfaceAnnounceQueue::new();
317        queue.insert(make_entry(0x01, 2, 200.0)); // newer
318        queue.insert(make_entry(0x02, 2, 100.0)); // older
319
320        // Same hops — should pick oldest (0x02 at time 100)
321        let idx = queue.select_next().unwrap();
322        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
323    }
324
325    #[test]
326    fn test_queue_dedup_update() {
327        let mut queue = InterfaceAnnounceQueue::new();
328        queue.insert(make_entry(0x01, 3, 100.0));
329        assert_eq!(queue.entries.len(), 1);
330
331        // Insert same dest with fewer hops — should update
332        queue.insert(make_entry(0x01, 1, 200.0));
333        assert_eq!(queue.entries.len(), 1);
334        assert_eq!(queue.entries[0].hops, 1);
335
336        // Insert same dest with more hops — should NOT update
337        queue.insert(make_entry(0x01, 5, 300.0));
338        assert_eq!(queue.entries.len(), 1);
339        assert_eq!(queue.entries[0].hops, 1);
340    }
341
342    #[test]
343    fn test_queue_stale_removal() {
344        let mut queue = InterfaceAnnounceQueue::new();
345        queue.insert(make_entry(0x01, 1, 100.0));
346        queue.insert(make_entry(0x02, 2, 200.0));
347
348        // At time 100 + 86400 + 1 = 86501, entry 0x01 should be stale
349        queue.remove_stale(86501.0);
350        assert_eq!(queue.entries.len(), 1);
351        assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
352    }
353
354    #[test]
355    fn test_queue_max_size() {
356        let mut queue = InterfaceAnnounceQueue::new();
357        for i in 0..constants::MAX_QUEUED_ANNOUNCES {
358            queue.insert(AnnounceQueueEntry {
359                destination_hash: {
360                    let mut d = [0u8; 16];
361                    d[0] = (i >> 8) as u8;
362                    d[1] = i as u8;
363                    d
364                },
365                time: i as f64,
366                hops: 1,
367                emitted: i as f64,
368                raw: vec![0x01],
369            });
370        }
371        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
372
373        // Add one more — oldest should be dropped
374        queue.insert(make_entry(0xFF, 1, 99999.0));
375        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
376    }
377
378    #[test]
379    fn test_queue_empty_select() {
380        let queue = InterfaceAnnounceQueue::new();
381        assert!(queue.select_next().is_none());
382    }
383
384    #[test]
385    fn test_bandwidth_allowed() {
386        let mut queue = InterfaceAnnounceQueue::new();
387        assert!(queue.is_allowed(0.0));
388        assert!(queue.is_allowed(100.0));
389
390        queue.announce_allowed_at = 200.0;
391        assert!(!queue.is_allowed(100.0));
392        assert!(!queue.is_allowed(199.9));
393        assert!(queue.is_allowed(200.0));
394        assert!(queue.is_allowed(300.0));
395    }
396
397    #[test]
398    fn test_calculate_next_allowed() {
399        // 100 bytes = 800 bits, bitrate = 1000 bps, cap = 0.02
400        // time_to_send = 800/1000 = 0.8s
401        // delay = 0.8 / 0.02 = 40.0s
402        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
403        assert!((next - 1040.0).abs() < 0.001);
404    }
405
406    #[test]
407    fn test_calculate_next_allowed_zero_bitrate() {
408        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
409        assert_eq!(next, 1000.0); // no cap
410    }
411
412    // --- AnnounceQueues tests ---
413
414    #[test]
415    fn test_gate_announce_no_bitrate_immediate() {
416        let mut queues = AnnounceQueues::new();
417        let result = queues.gate_announce(
418            InterfaceId(1),
419            vec![0x01, 0x02, 0x03],
420            [0xAA; 16],
421            2,
422            1000.0,
423            1000.0,
424            None, // no bitrate
425            0.02,
426        );
427        assert!(result.is_some());
428        assert!(matches!(
429            result.unwrap(),
430            TransportAction::SendOnInterface { .. }
431        ));
432    }
433
434    #[test]
435    fn test_gate_announce_bandwidth_available() {
436        let mut queues = AnnounceQueues::new();
437        let result = queues.gate_announce(
438            InterfaceId(1),
439            vec![0x01; 100],
440            [0xBB; 16],
441            2,
442            1000.0,
443            1000.0,
444            Some(10000), // 10 kbps
445            0.02,
446        );
447        // First announce should go through
448        assert!(result.is_some());
449
450        // Check that allowed_at was updated
451        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
452        assert!(queue.announce_allowed_at > 1000.0);
453    }
454
455    #[test]
456    fn test_gate_announce_bandwidth_exhausted_queues() {
457        let mut queues = AnnounceQueues::new();
458
459        // First announce goes through
460        let r1 = queues.gate_announce(
461            InterfaceId(1),
462            vec![0x01; 100],
463            [0xAA; 16],
464            2,
465            1000.0,
466            1000.0,
467            Some(1000), // 1 kbps — very slow
468            0.02,
469        );
470        assert!(r1.is_some());
471
472        // Second announce at same time should be queued
473        let r2 = queues.gate_announce(
474            InterfaceId(1),
475            vec![0x02; 100],
476            [0xBB; 16],
477            3,
478            1000.0,
479            1000.0,
480            Some(1000),
481            0.02,
482        );
483        assert!(r2.is_none()); // queued
484
485        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
486        assert_eq!(queue.entries.len(), 1);
487    }
488
489    #[test]
490    fn test_process_queues_dequeues_when_allowed() {
491        let mut queues = AnnounceQueues::new();
492
493        // Queue an announce by exhausting bandwidth first
494        let _ = queues.gate_announce(
495            InterfaceId(1),
496            vec![0x01; 10],
497            [0xAA; 16],
498            2,
499            0.0,
500            0.0,
501            Some(1000),
502            0.02,
503        );
504        let _ = queues.gate_announce(
505            InterfaceId(1),
506            vec![0x02; 10],
507            [0xBB; 16],
508            3,
509            0.0,
510            0.0,
511            Some(1000),
512            0.02,
513        );
514
515        // Queue should have one entry
516        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
517
518        let mut interfaces = BTreeMap::new();
519        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
520
521        // Process at a future time when bandwidth is available
522        let allowed_at = queues
523            .queue_for(&InterfaceId(1))
524            .unwrap()
525            .announce_allowed_at;
526        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
527
528        assert_eq!(actions.len(), 1);
529        assert!(matches!(
530            &actions[0],
531            TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
532        ));
533
534        // Queue should be empty now
535        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
536    }
537
538    #[test]
539    fn test_local_announce_bypasses_cap() {
540        // hops == 0 means locally-originated, should not be queued
541        // The caller (TransportEngine) is responsible for only calling gate_announce
542        // for hops > 0. We verify the gate_announce works for hops=0 too.
543        let mut queues = AnnounceQueues::new();
544
545        // Exhaust bandwidth
546        let _ = queues.gate_announce(
547            InterfaceId(1),
548            vec![0x01; 100],
549            [0xAA; 16],
550            2,
551            0.0,
552            0.0,
553            Some(1000),
554            0.02,
555        );
556
557        // hops=0 should still be queued by gate_announce since hops filtering
558        // is the caller's responsibility. gate_announce is agnostic.
559        let r = queues.gate_announce(
560            InterfaceId(1),
561            vec![0x02; 100],
562            [0xBB; 16],
563            0,
564            0.0,
565            0.0,
566            Some(1000),
567            0.02,
568        );
569        assert!(r.is_none()); // queued — caller must bypass for hops==0
570    }
571}