Skip to main content

rns_core/transport/
announce_queue.rs

1//! Per-interface announce bandwidth queuing.
2//!
3//! Announces with hops > 0 (propagation, not locally-originated) are gated
4//! by a per-interface bandwidth cap (default 2%). When bandwidth is exhausted,
5//! announces are queued and released when bandwidth becomes available.
6//!
7//! Python reference: Transport.py:1085-1165, Interface.py:246-286
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15/// A queued announce entry waiting for bandwidth availability.
16#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18    /// Destination hash of the announce.
19    pub destination_hash: [u8; 16],
20    /// Time the announce was queued.
21    pub time: f64,
22    /// Hops from the announce.
23    pub hops: u8,
24    /// Time the announce was originally emitted (from random blob).
25    pub emitted: f64,
26    /// Raw announce bytes (ready to send).
27    pub raw: Vec<u8>,
28}
29
30/// Per-interface announce queue with bandwidth tracking.
31#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33    /// Queued announce entries.
34    pub entries: Vec<AnnounceQueueEntry>,
35    /// Earliest time another announce is allowed on this interface.
36    pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40    pub fn new() -> Self {
41        InterfaceAnnounceQueue {
42            entries: Vec::new(),
43            announce_allowed_at: 0.0,
44        }
45    }
46
47    /// Insert an announce into the queue.
48    /// If an entry for the same destination already exists, update it if the new one
49    /// has fewer hops or is newer.
50    pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51        // Check for existing entry with same destination
52        if let Some(pos) = self
53            .entries
54            .iter()
55            .position(|e| e.destination_hash == entry.destination_hash)
56        {
57            let existing = &self.entries[pos];
58            // Update if new entry has fewer hops, or same hops and newer
59            if entry.hops < existing.hops
60                || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61            {
62                self.entries[pos] = entry;
63            }
64            // Otherwise discard the new entry
65        } else {
66            // Enforce max queue size
67            if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68                // Drop oldest entry
69                self.entries.remove(0);
70            }
71            self.entries.push(entry);
72        }
73    }
74
75    /// Remove stale entries (older than QUEUED_ANNOUNCE_LIFE).
76    pub fn remove_stale(&mut self, now: f64) {
77        self.entries
78            .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79    }
80
81    /// Select the next announce to send: minimum hops, then oldest (FIFO).
82    /// Returns the index of the selected entry, or None if empty.
83    pub fn select_next(&self) -> Option<usize> {
84        if self.entries.is_empty() {
85            return None;
86        }
87        let mut best_idx = 0;
88        let mut best_hops = self.entries[0].hops;
89        let mut best_time = self.entries[0].time;
90
91        for (i, entry) in self.entries.iter().enumerate().skip(1) {
92            if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93                best_idx = i;
94                best_hops = entry.hops;
95                best_time = entry.time;
96            }
97        }
98        Some(best_idx)
99    }
100
101    /// Check if an announce is allowed now based on bandwidth.
102    pub fn is_allowed(&self, now: f64) -> bool {
103        now >= self.announce_allowed_at
104    }
105
106    /// Calculate the next allowed time after sending an announce.
107    /// `raw_len`: size of the announce in bytes
108    /// `bitrate`: interface bitrate in bits/second
109    /// `announce_cap`: fraction of bitrate reserved for announces
110    pub fn calculate_next_allowed(
111        now: f64,
112        raw_len: usize,
113        bitrate: u64,
114        announce_cap: f64,
115    ) -> f64 {
116        if bitrate == 0 || announce_cap <= 0.0 {
117            return now; // no cap
118        }
119        let bits = (raw_len * 8) as f64;
120        let time_to_send = bits / (bitrate as f64);
121        let delay = time_to_send / announce_cap;
122        now + delay
123    }
124}
125
126/// Manage announce queues for all interfaces.
127#[derive(Debug, Clone)]
128pub struct AnnounceQueues {
129    queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
130}
131
132impl AnnounceQueues {
133    pub fn new() -> Self {
134        AnnounceQueues {
135            queues: BTreeMap::new(),
136        }
137    }
138
139    /// Try to send an announce on an interface. If bandwidth is available,
140    /// returns the action immediately. Otherwise, queues it.
141    ///
142    /// Returns Some(action) if the announce should be sent now, None if queued.
143    pub fn gate_announce(
144        &mut self,
145        interface: InterfaceId,
146        raw: Vec<u8>,
147        dest_hash: [u8; 16],
148        hops: u8,
149        emitted: f64,
150        now: f64,
151        bitrate: Option<u64>,
152        announce_cap: f64,
153    ) -> Option<TransportAction> {
154        let queue = self
155            .queues
156            .entry(interface)
157            .or_insert_with(InterfaceAnnounceQueue::new);
158
159        // If no bitrate, no cap applies — send immediately
160        let bitrate = match bitrate {
161            Some(br) if br > 0 => br,
162            _ => {
163                return Some(TransportAction::SendOnInterface { interface, raw });
164            }
165        };
166
167        if queue.is_allowed(now) {
168            // Bandwidth available — send now and update allowed_at
169            queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
170                now,
171                raw.len(),
172                bitrate,
173                announce_cap,
174            );
175            Some(TransportAction::SendOnInterface { interface, raw })
176        } else {
177            // Queue the announce
178            queue.insert(AnnounceQueueEntry {
179                destination_hash: dest_hash,
180                time: now,
181                hops,
182                emitted,
183                raw,
184            });
185            None
186        }
187    }
188
189    /// Process all announce queues: dequeue and send when bandwidth is available.
190    /// Called from tick().
191    pub fn process_queues(
192        &mut self,
193        now: f64,
194        interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
195    ) -> Vec<TransportAction> {
196        let mut actions = Vec::new();
197
198        for (iface_id, queue) in self.queues.iter_mut() {
199            // Remove stale entries
200            queue.remove_stale(now);
201
202            // Process as many announces as bandwidth allows
203            while queue.is_allowed(now) {
204                if let Some(idx) = queue.select_next() {
205                    let entry = queue.entries.remove(idx);
206
207                    // Look up bitrate for this interface
208                    let (bitrate, announce_cap) = if let Some(info) = interfaces.get(iface_id) {
209                        (info.bitrate.unwrap_or(0), info.announce_cap)
210                    } else {
211                        (0, constants::ANNOUNCE_CAP)
212                    };
213
214                    if bitrate > 0 {
215                        queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
216                            now,
217                            entry.raw.len(),
218                            bitrate,
219                            announce_cap,
220                        );
221                    }
222
223                    actions.push(TransportAction::SendOnInterface {
224                        interface: *iface_id,
225                        raw: entry.raw,
226                    });
227                } else {
228                    break;
229                }
230            }
231        }
232
233        actions
234    }
235
236    /// Get the queue for a specific interface (for testing).
237    #[cfg(test)]
238    pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
239        self.queues.get(id)
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use alloc::string::String;
247
248    fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
249        AnnounceQueueEntry {
250            destination_hash: [dest; 16],
251            time,
252            hops,
253            emitted: time,
254            raw: vec![0x01, 0x02, 0x03],
255        }
256    }
257
258    fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
259        super::super::types::InterfaceInfo {
260            id: InterfaceId(id),
261            name: String::from("test"),
262            mode: crate::constants::MODE_FULL,
263            out_capable: true,
264            in_capable: true,
265            bitrate,
266            announce_rate_target: None,
267            announce_rate_grace: 0,
268            announce_rate_penalty: 0.0,
269            announce_cap: constants::ANNOUNCE_CAP,
270            is_local_client: false,
271            wants_tunnel: false,
272            tunnel_id: None,
273            mtu: constants::MTU as u32,
274            ingress_control: false,
275            ia_freq: 0.0,
276            started: 0.0,
277        }
278    }
279
280    // --- InterfaceAnnounceQueue tests ---
281
282    #[test]
283    fn test_queue_entry_creation() {
284        let entry = make_entry(0xAA, 3, 1000.0);
285        assert_eq!(entry.hops, 3);
286        assert_eq!(entry.destination_hash, [0xAA; 16]);
287    }
288
289    #[test]
290    fn test_queue_insert_and_select() {
291        let mut queue = InterfaceAnnounceQueue::new();
292        queue.insert(make_entry(0x01, 3, 100.0));
293        queue.insert(make_entry(0x02, 1, 200.0));
294        queue.insert(make_entry(0x03, 2, 150.0));
295
296        // Should select min hops first (0x02 with hops=1)
297        let idx = queue.select_next().unwrap();
298        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
299    }
300
301    #[test]
302    fn test_queue_select_fifo_on_same_hops() {
303        let mut queue = InterfaceAnnounceQueue::new();
304        queue.insert(make_entry(0x01, 2, 200.0)); // newer
305        queue.insert(make_entry(0x02, 2, 100.0)); // older
306
307        // Same hops — should pick oldest (0x02 at time 100)
308        let idx = queue.select_next().unwrap();
309        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
310    }
311
312    #[test]
313    fn test_queue_dedup_update() {
314        let mut queue = InterfaceAnnounceQueue::new();
315        queue.insert(make_entry(0x01, 3, 100.0));
316        assert_eq!(queue.entries.len(), 1);
317
318        // Insert same dest with fewer hops — should update
319        queue.insert(make_entry(0x01, 1, 200.0));
320        assert_eq!(queue.entries.len(), 1);
321        assert_eq!(queue.entries[0].hops, 1);
322
323        // Insert same dest with more hops — should NOT update
324        queue.insert(make_entry(0x01, 5, 300.0));
325        assert_eq!(queue.entries.len(), 1);
326        assert_eq!(queue.entries[0].hops, 1);
327    }
328
329    #[test]
330    fn test_queue_stale_removal() {
331        let mut queue = InterfaceAnnounceQueue::new();
332        queue.insert(make_entry(0x01, 1, 100.0));
333        queue.insert(make_entry(0x02, 2, 200.0));
334
335        // At time 100 + 86400 + 1 = 86501, entry 0x01 should be stale
336        queue.remove_stale(86501.0);
337        assert_eq!(queue.entries.len(), 1);
338        assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
339    }
340
341    #[test]
342    fn test_queue_max_size() {
343        let mut queue = InterfaceAnnounceQueue::new();
344        for i in 0..constants::MAX_QUEUED_ANNOUNCES {
345            queue.insert(AnnounceQueueEntry {
346                destination_hash: {
347                    let mut d = [0u8; 16];
348                    d[0] = (i >> 8) as u8;
349                    d[1] = i as u8;
350                    d
351                },
352                time: i as f64,
353                hops: 1,
354                emitted: i as f64,
355                raw: vec![0x01],
356            });
357        }
358        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
359
360        // Add one more — oldest should be dropped
361        queue.insert(make_entry(0xFF, 1, 99999.0));
362        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
363    }
364
365    #[test]
366    fn test_queue_empty_select() {
367        let queue = InterfaceAnnounceQueue::new();
368        assert!(queue.select_next().is_none());
369    }
370
371    #[test]
372    fn test_bandwidth_allowed() {
373        let mut queue = InterfaceAnnounceQueue::new();
374        assert!(queue.is_allowed(0.0));
375        assert!(queue.is_allowed(100.0));
376
377        queue.announce_allowed_at = 200.0;
378        assert!(!queue.is_allowed(100.0));
379        assert!(!queue.is_allowed(199.9));
380        assert!(queue.is_allowed(200.0));
381        assert!(queue.is_allowed(300.0));
382    }
383
384    #[test]
385    fn test_calculate_next_allowed() {
386        // 100 bytes = 800 bits, bitrate = 1000 bps, cap = 0.02
387        // time_to_send = 800/1000 = 0.8s
388        // delay = 0.8 / 0.02 = 40.0s
389        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
390        assert!((next - 1040.0).abs() < 0.001);
391    }
392
393    #[test]
394    fn test_calculate_next_allowed_zero_bitrate() {
395        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
396        assert_eq!(next, 1000.0); // no cap
397    }
398
399    // --- AnnounceQueues tests ---
400
401    #[test]
402    fn test_gate_announce_no_bitrate_immediate() {
403        let mut queues = AnnounceQueues::new();
404        let result = queues.gate_announce(
405            InterfaceId(1),
406            vec![0x01, 0x02, 0x03],
407            [0xAA; 16],
408            2,
409            1000.0,
410            1000.0,
411            None, // no bitrate
412            0.02,
413        );
414        assert!(result.is_some());
415        assert!(matches!(
416            result.unwrap(),
417            TransportAction::SendOnInterface { .. }
418        ));
419    }
420
421    #[test]
422    fn test_gate_announce_bandwidth_available() {
423        let mut queues = AnnounceQueues::new();
424        let result = queues.gate_announce(
425            InterfaceId(1),
426            vec![0x01; 100],
427            [0xBB; 16],
428            2,
429            1000.0,
430            1000.0,
431            Some(10000), // 10 kbps
432            0.02,
433        );
434        // First announce should go through
435        assert!(result.is_some());
436
437        // Check that allowed_at was updated
438        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
439        assert!(queue.announce_allowed_at > 1000.0);
440    }
441
442    #[test]
443    fn test_gate_announce_bandwidth_exhausted_queues() {
444        let mut queues = AnnounceQueues::new();
445
446        // First announce goes through
447        let r1 = queues.gate_announce(
448            InterfaceId(1),
449            vec![0x01; 100],
450            [0xAA; 16],
451            2,
452            1000.0,
453            1000.0,
454            Some(1000), // 1 kbps — very slow
455            0.02,
456        );
457        assert!(r1.is_some());
458
459        // Second announce at same time should be queued
460        let r2 = queues.gate_announce(
461            InterfaceId(1),
462            vec![0x02; 100],
463            [0xBB; 16],
464            3,
465            1000.0,
466            1000.0,
467            Some(1000),
468            0.02,
469        );
470        assert!(r2.is_none()); // queued
471
472        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
473        assert_eq!(queue.entries.len(), 1);
474    }
475
476    #[test]
477    fn test_process_queues_dequeues_when_allowed() {
478        let mut queues = AnnounceQueues::new();
479
480        // Queue an announce by exhausting bandwidth first
481        let _ = queues.gate_announce(
482            InterfaceId(1),
483            vec![0x01; 10],
484            [0xAA; 16],
485            2,
486            0.0,
487            0.0,
488            Some(1000),
489            0.02,
490        );
491        let _ = queues.gate_announce(
492            InterfaceId(1),
493            vec![0x02; 10],
494            [0xBB; 16],
495            3,
496            0.0,
497            0.0,
498            Some(1000),
499            0.02,
500        );
501
502        // Queue should have one entry
503        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
504
505        let mut interfaces = BTreeMap::new();
506        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
507
508        // Process at a future time when bandwidth is available
509        let allowed_at = queues
510            .queue_for(&InterfaceId(1))
511            .unwrap()
512            .announce_allowed_at;
513        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
514
515        assert_eq!(actions.len(), 1);
516        assert!(matches!(
517            &actions[0],
518            TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
519        ));
520
521        // Queue should be empty now
522        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
523    }
524
525    #[test]
526    fn test_local_announce_bypasses_cap() {
527        // hops == 0 means locally-originated, should not be queued
528        // The caller (TransportEngine) is responsible for only calling gate_announce
529        // for hops > 0. We verify the gate_announce works for hops=0 too.
530        let mut queues = AnnounceQueues::new();
531
532        // Exhaust bandwidth
533        let _ = queues.gate_announce(
534            InterfaceId(1),
535            vec![0x01; 100],
536            [0xAA; 16],
537            2,
538            0.0,
539            0.0,
540            Some(1000),
541            0.02,
542        );
543
544        // hops=0 should still be queued by gate_announce since hops filtering
545        // is the caller's responsibility. gate_announce is agnostic.
546        let r = queues.gate_announce(
547            InterfaceId(1),
548            vec![0x02; 100],
549            [0xBB; 16],
550            0,
551            0.0,
552            0.0,
553            Some(1000),
554            0.02,
555        );
556        assert!(r.is_none()); // queued — caller must bypass for hops==0
557    }
558}