Skip to main content

rns_core/transport/
announce_queue.rs

1//! Per-interface announce bandwidth queuing.
2//!
3//! Announces with hops > 0 (propagation, not locally-originated) are gated
4//! by a per-interface bandwidth cap (default 2%). When bandwidth is exhausted,
5//! announces are queued and released when bandwidth becomes available.
6//!
7//! Python reference: Transport.py:1085-1165, Interface.py:246-286
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15/// A queued announce entry waiting for bandwidth availability.
16#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18    /// Destination hash of the announce.
19    pub destination_hash: [u8; 16],
20    /// Time the announce was queued.
21    pub time: f64,
22    /// Hops from the announce.
23    pub hops: u8,
24    /// Time the announce was originally emitted (from random blob).
25    pub emitted: f64,
26    /// Raw announce bytes (ready to send).
27    pub raw: Vec<u8>,
28}
29
30/// Per-interface announce queue with bandwidth tracking.
31#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33    /// Queued announce entries.
34    pub entries: Vec<AnnounceQueueEntry>,
35    /// Earliest time another announce is allowed on this interface.
36    pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40    pub fn new() -> Self {
41        InterfaceAnnounceQueue {
42            entries: Vec::new(),
43            announce_allowed_at: 0.0,
44        }
45    }
46
47    /// Insert an announce into the queue.
48    /// If an entry for the same destination already exists, update it if the new one
49    /// has fewer hops or is newer.
50    pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51        // Check for existing entry with same destination
52        if let Some(pos) = self
53            .entries
54            .iter()
55            .position(|e| e.destination_hash == entry.destination_hash)
56        {
57            let existing = &self.entries[pos];
58            // Update if new entry has fewer hops, or same hops and newer
59            if entry.hops < existing.hops
60                || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61            {
62                self.entries[pos] = entry;
63            }
64            // Otherwise discard the new entry
65        } else {
66            // Enforce max queue size
67            if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68                // Drop oldest entry
69                self.entries.remove(0);
70            }
71            self.entries.push(entry);
72        }
73    }
74
75    /// Remove stale entries (older than QUEUED_ANNOUNCE_LIFE).
76    pub fn remove_stale(&mut self, now: f64) {
77        self.entries
78            .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79    }
80
81    /// Select the next announce to send: minimum hops, then oldest (FIFO).
82    /// Returns the index of the selected entry, or None if empty.
83    pub fn select_next(&self) -> Option<usize> {
84        if self.entries.is_empty() {
85            return None;
86        }
87        let mut best_idx = 0;
88        let mut best_hops = self.entries[0].hops;
89        let mut best_time = self.entries[0].time;
90
91        for (i, entry) in self.entries.iter().enumerate().skip(1) {
92            if entry.hops < best_hops
93                || (entry.hops == best_hops && entry.time < best_time)
94            {
95                best_idx = i;
96                best_hops = entry.hops;
97                best_time = entry.time;
98            }
99        }
100        Some(best_idx)
101    }
102
103    /// Check if an announce is allowed now based on bandwidth.
104    pub fn is_allowed(&self, now: f64) -> bool {
105        now >= self.announce_allowed_at
106    }
107
108    /// Calculate the next allowed time after sending an announce.
109    /// `raw_len`: size of the announce in bytes
110    /// `bitrate`: interface bitrate in bits/second
111    /// `announce_cap`: fraction of bitrate reserved for announces
112    pub fn calculate_next_allowed(now: f64, raw_len: usize, bitrate: u64, announce_cap: f64) -> f64 {
113        if bitrate == 0 || announce_cap <= 0.0 {
114            return now; // no cap
115        }
116        let bits = (raw_len * 8) as f64;
117        let time_to_send = bits / (bitrate as f64);
118        let delay = time_to_send / announce_cap;
119        now + delay
120    }
121}
122
123/// Manage announce queues for all interfaces.
124#[derive(Debug, Clone)]
125pub struct AnnounceQueues {
126    queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
127}
128
129impl AnnounceQueues {
130    pub fn new() -> Self {
131        AnnounceQueues {
132            queues: BTreeMap::new(),
133        }
134    }
135
136    /// Try to send an announce on an interface. If bandwidth is available,
137    /// returns the action immediately. Otherwise, queues it.
138    ///
139    /// Returns Some(action) if the announce should be sent now, None if queued.
140    pub fn gate_announce(
141        &mut self,
142        interface: InterfaceId,
143        raw: Vec<u8>,
144        dest_hash: [u8; 16],
145        hops: u8,
146        emitted: f64,
147        now: f64,
148        bitrate: Option<u64>,
149        announce_cap: f64,
150    ) -> Option<TransportAction> {
151        let queue = self
152            .queues
153            .entry(interface)
154            .or_insert_with(InterfaceAnnounceQueue::new);
155
156        // If no bitrate, no cap applies — send immediately
157        let bitrate = match bitrate {
158            Some(br) if br > 0 => br,
159            _ => {
160                return Some(TransportAction::SendOnInterface {
161                    interface,
162                    raw,
163                });
164            }
165        };
166
167        if queue.is_allowed(now) {
168            // Bandwidth available — send now and update allowed_at
169            queue.announce_allowed_at =
170                InterfaceAnnounceQueue::calculate_next_allowed(now, raw.len(), bitrate, announce_cap);
171            Some(TransportAction::SendOnInterface { interface, raw })
172        } else {
173            // Queue the announce
174            queue.insert(AnnounceQueueEntry {
175                destination_hash: dest_hash,
176                time: now,
177                hops,
178                emitted,
179                raw,
180            });
181            None
182        }
183    }
184
185    /// Process all announce queues: dequeue and send when bandwidth is available.
186    /// Called from tick().
187    pub fn process_queues(
188        &mut self,
189        now: f64,
190        interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
191    ) -> Vec<TransportAction> {
192        let mut actions = Vec::new();
193
194        for (iface_id, queue) in self.queues.iter_mut() {
195            // Remove stale entries
196            queue.remove_stale(now);
197
198            // Process as many announces as bandwidth allows
199            while queue.is_allowed(now) {
200                if let Some(idx) = queue.select_next() {
201                    let entry = queue.entries.remove(idx);
202
203                    // Look up bitrate for this interface
204                    let (bitrate, announce_cap) =
205                        if let Some(info) = interfaces.get(iface_id) {
206                            (info.bitrate.unwrap_or(0), info.announce_cap)
207                        } else {
208                            (0, constants::ANNOUNCE_CAP)
209                        };
210
211                    if bitrate > 0 {
212                        queue.announce_allowed_at =
213                            InterfaceAnnounceQueue::calculate_next_allowed(
214                                now,
215                                entry.raw.len(),
216                                bitrate,
217                                announce_cap,
218                            );
219                    }
220
221                    actions.push(TransportAction::SendOnInterface {
222                        interface: *iface_id,
223                        raw: entry.raw,
224                    });
225                } else {
226                    break;
227                }
228            }
229        }
230
231        actions
232    }
233
234    /// Get the queue for a specific interface (for testing).
235    #[cfg(test)]
236    pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
237        self.queues.get(id)
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use alloc::string::String;
245
246    fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
247        AnnounceQueueEntry {
248            destination_hash: [dest; 16],
249            time,
250            hops,
251            emitted: time,
252            raw: vec![0x01, 0x02, 0x03],
253        }
254    }
255
256    fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
257        super::super::types::InterfaceInfo {
258            id: InterfaceId(id),
259            name: String::from("test"),
260            mode: crate::constants::MODE_FULL,
261            out_capable: true,
262            in_capable: true,
263            bitrate,
264            announce_rate_target: None,
265            announce_rate_grace: 0,
266            announce_rate_penalty: 0.0,
267            announce_cap: constants::ANNOUNCE_CAP,
268            is_local_client: false,
269            wants_tunnel: false,
270            tunnel_id: None,
271            mtu: constants::MTU as u32,
272            ingress_control: false,
273            ia_freq: 0.0,
274            started: 0.0,
275        }
276    }
277
278    // --- InterfaceAnnounceQueue tests ---
279
280    #[test]
281    fn test_queue_entry_creation() {
282        let entry = make_entry(0xAA, 3, 1000.0);
283        assert_eq!(entry.hops, 3);
284        assert_eq!(entry.destination_hash, [0xAA; 16]);
285    }
286
287    #[test]
288    fn test_queue_insert_and_select() {
289        let mut queue = InterfaceAnnounceQueue::new();
290        queue.insert(make_entry(0x01, 3, 100.0));
291        queue.insert(make_entry(0x02, 1, 200.0));
292        queue.insert(make_entry(0x03, 2, 150.0));
293
294        // Should select min hops first (0x02 with hops=1)
295        let idx = queue.select_next().unwrap();
296        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
297    }
298
299    #[test]
300    fn test_queue_select_fifo_on_same_hops() {
301        let mut queue = InterfaceAnnounceQueue::new();
302        queue.insert(make_entry(0x01, 2, 200.0)); // newer
303        queue.insert(make_entry(0x02, 2, 100.0)); // older
304
305        // Same hops — should pick oldest (0x02 at time 100)
306        let idx = queue.select_next().unwrap();
307        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
308    }
309
310    #[test]
311    fn test_queue_dedup_update() {
312        let mut queue = InterfaceAnnounceQueue::new();
313        queue.insert(make_entry(0x01, 3, 100.0));
314        assert_eq!(queue.entries.len(), 1);
315
316        // Insert same dest with fewer hops — should update
317        queue.insert(make_entry(0x01, 1, 200.0));
318        assert_eq!(queue.entries.len(), 1);
319        assert_eq!(queue.entries[0].hops, 1);
320
321        // Insert same dest with more hops — should NOT update
322        queue.insert(make_entry(0x01, 5, 300.0));
323        assert_eq!(queue.entries.len(), 1);
324        assert_eq!(queue.entries[0].hops, 1);
325    }
326
327    #[test]
328    fn test_queue_stale_removal() {
329        let mut queue = InterfaceAnnounceQueue::new();
330        queue.insert(make_entry(0x01, 1, 100.0));
331        queue.insert(make_entry(0x02, 2, 200.0));
332
333        // At time 100 + 86400 + 1 = 86501, entry 0x01 should be stale
334        queue.remove_stale(86501.0);
335        assert_eq!(queue.entries.len(), 1);
336        assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
337    }
338
339    #[test]
340    fn test_queue_max_size() {
341        let mut queue = InterfaceAnnounceQueue::new();
342        for i in 0..constants::MAX_QUEUED_ANNOUNCES {
343            queue.insert(AnnounceQueueEntry {
344                destination_hash: {
345                    let mut d = [0u8; 16];
346                    d[0] = (i >> 8) as u8;
347                    d[1] = i as u8;
348                    d
349                },
350                time: i as f64,
351                hops: 1,
352                emitted: i as f64,
353                raw: vec![0x01],
354            });
355        }
356        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
357
358        // Add one more — oldest should be dropped
359        queue.insert(make_entry(0xFF, 1, 99999.0));
360        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
361    }
362
363    #[test]
364    fn test_queue_empty_select() {
365        let queue = InterfaceAnnounceQueue::new();
366        assert!(queue.select_next().is_none());
367    }
368
369    #[test]
370    fn test_bandwidth_allowed() {
371        let mut queue = InterfaceAnnounceQueue::new();
372        assert!(queue.is_allowed(0.0));
373        assert!(queue.is_allowed(100.0));
374
375        queue.announce_allowed_at = 200.0;
376        assert!(!queue.is_allowed(100.0));
377        assert!(!queue.is_allowed(199.9));
378        assert!(queue.is_allowed(200.0));
379        assert!(queue.is_allowed(300.0));
380    }
381
382    #[test]
383    fn test_calculate_next_allowed() {
384        // 100 bytes = 800 bits, bitrate = 1000 bps, cap = 0.02
385        // time_to_send = 800/1000 = 0.8s
386        // delay = 0.8 / 0.02 = 40.0s
387        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
388        assert!((next - 1040.0).abs() < 0.001);
389    }
390
391    #[test]
392    fn test_calculate_next_allowed_zero_bitrate() {
393        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
394        assert_eq!(next, 1000.0); // no cap
395    }
396
397    // --- AnnounceQueues tests ---
398
399    #[test]
400    fn test_gate_announce_no_bitrate_immediate() {
401        let mut queues = AnnounceQueues::new();
402        let result = queues.gate_announce(
403            InterfaceId(1),
404            vec![0x01, 0x02, 0x03],
405            [0xAA; 16],
406            2,
407            1000.0,
408            1000.0,
409            None, // no bitrate
410            0.02,
411        );
412        assert!(result.is_some());
413        assert!(matches!(
414            result.unwrap(),
415            TransportAction::SendOnInterface { .. }
416        ));
417    }
418
419    #[test]
420    fn test_gate_announce_bandwidth_available() {
421        let mut queues = AnnounceQueues::new();
422        let result = queues.gate_announce(
423            InterfaceId(1),
424            vec![0x01; 100],
425            [0xBB; 16],
426            2,
427            1000.0,
428            1000.0,
429            Some(10000), // 10 kbps
430            0.02,
431        );
432        // First announce should go through
433        assert!(result.is_some());
434
435        // Check that allowed_at was updated
436        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
437        assert!(queue.announce_allowed_at > 1000.0);
438    }
439
440    #[test]
441    fn test_gate_announce_bandwidth_exhausted_queues() {
442        let mut queues = AnnounceQueues::new();
443
444        // First announce goes through
445        let r1 = queues.gate_announce(
446            InterfaceId(1),
447            vec![0x01; 100],
448            [0xAA; 16],
449            2,
450            1000.0,
451            1000.0,
452            Some(1000), // 1 kbps — very slow
453            0.02,
454        );
455        assert!(r1.is_some());
456
457        // Second announce at same time should be queued
458        let r2 = queues.gate_announce(
459            InterfaceId(1),
460            vec![0x02; 100],
461            [0xBB; 16],
462            3,
463            1000.0,
464            1000.0,
465            Some(1000),
466            0.02,
467        );
468        assert!(r2.is_none()); // queued
469
470        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
471        assert_eq!(queue.entries.len(), 1);
472    }
473
474    #[test]
475    fn test_process_queues_dequeues_when_allowed() {
476        let mut queues = AnnounceQueues::new();
477
478        // Queue an announce by exhausting bandwidth first
479        let _ = queues.gate_announce(
480            InterfaceId(1),
481            vec![0x01; 10],
482            [0xAA; 16],
483            2,
484            0.0,
485            0.0,
486            Some(1000),
487            0.02,
488        );
489        let _ = queues.gate_announce(
490            InterfaceId(1),
491            vec![0x02; 10],
492            [0xBB; 16],
493            3,
494            0.0,
495            0.0,
496            Some(1000),
497            0.02,
498        );
499
500        // Queue should have one entry
501        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
502
503        let mut interfaces = BTreeMap::new();
504        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
505
506        // Process at a future time when bandwidth is available
507        let allowed_at = queues.queue_for(&InterfaceId(1)).unwrap().announce_allowed_at;
508        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
509
510        assert_eq!(actions.len(), 1);
511        assert!(matches!(
512            &actions[0],
513            TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
514        ));
515
516        // Queue should be empty now
517        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
518    }
519
520    #[test]
521    fn test_local_announce_bypasses_cap() {
522        // hops == 0 means locally-originated, should not be queued
523        // The caller (TransportEngine) is responsible for only calling gate_announce
524        // for hops > 0. We verify the gate_announce works for hops=0 too.
525        let mut queues = AnnounceQueues::new();
526
527        // Exhaust bandwidth
528        let _ = queues.gate_announce(
529            InterfaceId(1),
530            vec![0x01; 100],
531            [0xAA; 16],
532            2,
533            0.0,
534            0.0,
535            Some(1000),
536            0.02,
537        );
538
539        // hops=0 should still be queued by gate_announce since hops filtering
540        // is the caller's responsibility. gate_announce is agnostic.
541        let r = queues.gate_announce(
542            InterfaceId(1),
543            vec![0x02; 100],
544            [0xBB; 16],
545            0,
546            0.0,
547            0.0,
548            Some(1000),
549            0.02,
550        );
551        assert!(r.is_none()); // queued — caller must bypass for hops==0
552    }
553}