Skip to main content

rns_core/transport/
announce_queue.rs

1//! Per-interface announce bandwidth queuing.
2//!
3//! Announces with hops > 0 (propagation, not locally-originated) are gated
4//! by a per-interface bandwidth cap (default 2%). When bandwidth is exhausted,
5//! announces are queued and released when bandwidth becomes available.
6//!
7//! Python reference: Transport.py:1085-1165, Interface.py:246-286
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15/// A queued announce entry waiting for bandwidth availability.
16#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18    /// Destination hash of the announce.
19    pub destination_hash: [u8; 16],
20    /// Time the announce was queued.
21    pub time: f64,
22    /// Hops from the announce.
23    pub hops: u8,
24    /// Time the announce was originally emitted (from random blob).
25    pub emitted: f64,
26    /// Raw announce bytes (ready to send).
27    pub raw: Vec<u8>,
28}
29
30/// Per-interface announce queue with bandwidth tracking.
31#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33    /// Queued announce entries.
34    pub entries: Vec<AnnounceQueueEntry>,
35    /// Earliest time another announce is allowed on this interface.
36    pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40    pub fn new() -> Self {
41        InterfaceAnnounceQueue {
42            entries: Vec::new(),
43            announce_allowed_at: 0.0,
44        }
45    }
46
47    /// Insert an announce into the queue.
48    /// If an entry for the same destination already exists, update it if the new one
49    /// has fewer hops or is newer.
50    pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51        // Check for existing entry with same destination
52        if let Some(pos) = self
53            .entries
54            .iter()
55            .position(|e| e.destination_hash == entry.destination_hash)
56        {
57            let existing = &self.entries[pos];
58            // Update if new entry has fewer hops, or same hops and newer
59            if entry.hops < existing.hops
60                || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61            {
62                self.entries[pos] = entry;
63            }
64            // Otherwise discard the new entry
65        } else {
66            // Enforce max queue size
67            if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68                // Drop oldest entry
69                self.entries.remove(0);
70            }
71            self.entries.push(entry);
72        }
73    }
74
75    /// Remove stale entries (older than QUEUED_ANNOUNCE_LIFE).
76    pub fn remove_stale(&mut self, now: f64) {
77        self.entries
78            .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79    }
80
81    /// Select the next announce to send: minimum hops, then oldest (FIFO).
82    /// Returns the index of the selected entry, or None if empty.
83    pub fn select_next(&self) -> Option<usize> {
84        if self.entries.is_empty() {
85            return None;
86        }
87        let mut best_idx = 0;
88        let mut best_hops = self.entries[0].hops;
89        let mut best_time = self.entries[0].time;
90
91        for (i, entry) in self.entries.iter().enumerate().skip(1) {
92            if entry.hops < best_hops
93                || (entry.hops == best_hops && entry.time < best_time)
94            {
95                best_idx = i;
96                best_hops = entry.hops;
97                best_time = entry.time;
98            }
99        }
100        Some(best_idx)
101    }
102
103    /// Check if an announce is allowed now based on bandwidth.
104    pub fn is_allowed(&self, now: f64) -> bool {
105        now >= self.announce_allowed_at
106    }
107
108    /// Calculate the next allowed time after sending an announce.
109    /// `raw_len`: size of the announce in bytes
110    /// `bitrate`: interface bitrate in bits/second
111    /// `announce_cap`: fraction of bitrate reserved for announces
112    pub fn calculate_next_allowed(now: f64, raw_len: usize, bitrate: u64, announce_cap: f64) -> f64 {
113        if bitrate == 0 || announce_cap <= 0.0 {
114            return now; // no cap
115        }
116        let bits = (raw_len * 8) as f64;
117        let time_to_send = bits / (bitrate as f64);
118        let delay = time_to_send / announce_cap;
119        now + delay
120    }
121}
122
123/// Manage announce queues for all interfaces.
124#[derive(Debug, Clone)]
125pub struct AnnounceQueues {
126    queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
127}
128
129impl AnnounceQueues {
130    pub fn new() -> Self {
131        AnnounceQueues {
132            queues: BTreeMap::new(),
133        }
134    }
135
136    /// Try to send an announce on an interface. If bandwidth is available,
137    /// returns the action immediately. Otherwise, queues it.
138    ///
139    /// Returns Some(action) if the announce should be sent now, None if queued.
140    pub fn gate_announce(
141        &mut self,
142        interface: InterfaceId,
143        raw: Vec<u8>,
144        dest_hash: [u8; 16],
145        hops: u8,
146        emitted: f64,
147        now: f64,
148        bitrate: Option<u64>,
149        announce_cap: f64,
150    ) -> Option<TransportAction> {
151        let queue = self
152            .queues
153            .entry(interface)
154            .or_insert_with(InterfaceAnnounceQueue::new);
155
156        // If no bitrate, no cap applies — send immediately
157        let bitrate = match bitrate {
158            Some(br) if br > 0 => br,
159            _ => {
160                return Some(TransportAction::SendOnInterface {
161                    interface,
162                    raw,
163                });
164            }
165        };
166
167        if queue.is_allowed(now) {
168            // Bandwidth available — send now and update allowed_at
169            queue.announce_allowed_at =
170                InterfaceAnnounceQueue::calculate_next_allowed(now, raw.len(), bitrate, announce_cap);
171            Some(TransportAction::SendOnInterface { interface, raw })
172        } else {
173            // Queue the announce
174            queue.insert(AnnounceQueueEntry {
175                destination_hash: dest_hash,
176                time: now,
177                hops,
178                emitted,
179                raw,
180            });
181            None
182        }
183    }
184
185    /// Process all announce queues: dequeue and send when bandwidth is available.
186    /// Called from tick().
187    pub fn process_queues(
188        &mut self,
189        now: f64,
190        interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
191    ) -> Vec<TransportAction> {
192        let mut actions = Vec::new();
193
194        for (iface_id, queue) in self.queues.iter_mut() {
195            // Remove stale entries
196            queue.remove_stale(now);
197
198            // Process as many announces as bandwidth allows
199            while queue.is_allowed(now) {
200                if let Some(idx) = queue.select_next() {
201                    let entry = queue.entries.remove(idx);
202
203                    // Look up bitrate for this interface
204                    let (bitrate, announce_cap) =
205                        if let Some(info) = interfaces.get(iface_id) {
206                            (info.bitrate.unwrap_or(0), info.announce_cap)
207                        } else {
208                            (0, constants::ANNOUNCE_CAP)
209                        };
210
211                    if bitrate > 0 {
212                        queue.announce_allowed_at =
213                            InterfaceAnnounceQueue::calculate_next_allowed(
214                                now,
215                                entry.raw.len(),
216                                bitrate,
217                                announce_cap,
218                            );
219                    }
220
221                    actions.push(TransportAction::SendOnInterface {
222                        interface: *iface_id,
223                        raw: entry.raw,
224                    });
225                } else {
226                    break;
227                }
228            }
229        }
230
231        actions
232    }
233
234    /// Get the queue for a specific interface (for testing).
235    #[cfg(test)]
236    pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
237        self.queues.get(id)
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use alloc::string::String;
245
246    fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
247        AnnounceQueueEntry {
248            destination_hash: [dest; 16],
249            time,
250            hops,
251            emitted: time,
252            raw: vec![0x01, 0x02, 0x03],
253        }
254    }
255
256    fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
257        super::super::types::InterfaceInfo {
258            id: InterfaceId(id),
259            name: String::from("test"),
260            mode: crate::constants::MODE_FULL,
261            out_capable: true,
262            in_capable: true,
263            bitrate,
264            announce_rate_target: None,
265            announce_rate_grace: 0,
266            announce_rate_penalty: 0.0,
267            announce_cap: constants::ANNOUNCE_CAP,
268            is_local_client: false,
269            wants_tunnel: false,
270            tunnel_id: None,
271        }
272    }
273
274    // --- InterfaceAnnounceQueue tests ---
275
276    #[test]
277    fn test_queue_entry_creation() {
278        let entry = make_entry(0xAA, 3, 1000.0);
279        assert_eq!(entry.hops, 3);
280        assert_eq!(entry.destination_hash, [0xAA; 16]);
281    }
282
283    #[test]
284    fn test_queue_insert_and_select() {
285        let mut queue = InterfaceAnnounceQueue::new();
286        queue.insert(make_entry(0x01, 3, 100.0));
287        queue.insert(make_entry(0x02, 1, 200.0));
288        queue.insert(make_entry(0x03, 2, 150.0));
289
290        // Should select min hops first (0x02 with hops=1)
291        let idx = queue.select_next().unwrap();
292        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
293    }
294
295    #[test]
296    fn test_queue_select_fifo_on_same_hops() {
297        let mut queue = InterfaceAnnounceQueue::new();
298        queue.insert(make_entry(0x01, 2, 200.0)); // newer
299        queue.insert(make_entry(0x02, 2, 100.0)); // older
300
301        // Same hops — should pick oldest (0x02 at time 100)
302        let idx = queue.select_next().unwrap();
303        assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
304    }
305
306    #[test]
307    fn test_queue_dedup_update() {
308        let mut queue = InterfaceAnnounceQueue::new();
309        queue.insert(make_entry(0x01, 3, 100.0));
310        assert_eq!(queue.entries.len(), 1);
311
312        // Insert same dest with fewer hops — should update
313        queue.insert(make_entry(0x01, 1, 200.0));
314        assert_eq!(queue.entries.len(), 1);
315        assert_eq!(queue.entries[0].hops, 1);
316
317        // Insert same dest with more hops — should NOT update
318        queue.insert(make_entry(0x01, 5, 300.0));
319        assert_eq!(queue.entries.len(), 1);
320        assert_eq!(queue.entries[0].hops, 1);
321    }
322
323    #[test]
324    fn test_queue_stale_removal() {
325        let mut queue = InterfaceAnnounceQueue::new();
326        queue.insert(make_entry(0x01, 1, 100.0));
327        queue.insert(make_entry(0x02, 2, 200.0));
328
329        // At time 100 + 86400 + 1 = 86501, entry 0x01 should be stale
330        queue.remove_stale(86501.0);
331        assert_eq!(queue.entries.len(), 1);
332        assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
333    }
334
335    #[test]
336    fn test_queue_max_size() {
337        let mut queue = InterfaceAnnounceQueue::new();
338        for i in 0..constants::MAX_QUEUED_ANNOUNCES {
339            queue.insert(AnnounceQueueEntry {
340                destination_hash: {
341                    let mut d = [0u8; 16];
342                    d[0] = (i >> 8) as u8;
343                    d[1] = i as u8;
344                    d
345                },
346                time: i as f64,
347                hops: 1,
348                emitted: i as f64,
349                raw: vec![0x01],
350            });
351        }
352        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
353
354        // Add one more — oldest should be dropped
355        queue.insert(make_entry(0xFF, 1, 99999.0));
356        assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
357    }
358
359    #[test]
360    fn test_queue_empty_select() {
361        let queue = InterfaceAnnounceQueue::new();
362        assert!(queue.select_next().is_none());
363    }
364
365    #[test]
366    fn test_bandwidth_allowed() {
367        let mut queue = InterfaceAnnounceQueue::new();
368        assert!(queue.is_allowed(0.0));
369        assert!(queue.is_allowed(100.0));
370
371        queue.announce_allowed_at = 200.0;
372        assert!(!queue.is_allowed(100.0));
373        assert!(!queue.is_allowed(199.9));
374        assert!(queue.is_allowed(200.0));
375        assert!(queue.is_allowed(300.0));
376    }
377
378    #[test]
379    fn test_calculate_next_allowed() {
380        // 100 bytes = 800 bits, bitrate = 1000 bps, cap = 0.02
381        // time_to_send = 800/1000 = 0.8s
382        // delay = 0.8 / 0.02 = 40.0s
383        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
384        assert!((next - 1040.0).abs() < 0.001);
385    }
386
387    #[test]
388    fn test_calculate_next_allowed_zero_bitrate() {
389        let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
390        assert_eq!(next, 1000.0); // no cap
391    }
392
393    // --- AnnounceQueues tests ---
394
395    #[test]
396    fn test_gate_announce_no_bitrate_immediate() {
397        let mut queues = AnnounceQueues::new();
398        let result = queues.gate_announce(
399            InterfaceId(1),
400            vec![0x01, 0x02, 0x03],
401            [0xAA; 16],
402            2,
403            1000.0,
404            1000.0,
405            None, // no bitrate
406            0.02,
407        );
408        assert!(result.is_some());
409        assert!(matches!(
410            result.unwrap(),
411            TransportAction::SendOnInterface { .. }
412        ));
413    }
414
415    #[test]
416    fn test_gate_announce_bandwidth_available() {
417        let mut queues = AnnounceQueues::new();
418        let result = queues.gate_announce(
419            InterfaceId(1),
420            vec![0x01; 100],
421            [0xBB; 16],
422            2,
423            1000.0,
424            1000.0,
425            Some(10000), // 10 kbps
426            0.02,
427        );
428        // First announce should go through
429        assert!(result.is_some());
430
431        // Check that allowed_at was updated
432        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
433        assert!(queue.announce_allowed_at > 1000.0);
434    }
435
436    #[test]
437    fn test_gate_announce_bandwidth_exhausted_queues() {
438        let mut queues = AnnounceQueues::new();
439
440        // First announce goes through
441        let r1 = queues.gate_announce(
442            InterfaceId(1),
443            vec![0x01; 100],
444            [0xAA; 16],
445            2,
446            1000.0,
447            1000.0,
448            Some(1000), // 1 kbps — very slow
449            0.02,
450        );
451        assert!(r1.is_some());
452
453        // Second announce at same time should be queued
454        let r2 = queues.gate_announce(
455            InterfaceId(1),
456            vec![0x02; 100],
457            [0xBB; 16],
458            3,
459            1000.0,
460            1000.0,
461            Some(1000),
462            0.02,
463        );
464        assert!(r2.is_none()); // queued
465
466        let queue = queues.queue_for(&InterfaceId(1)).unwrap();
467        assert_eq!(queue.entries.len(), 1);
468    }
469
470    #[test]
471    fn test_process_queues_dequeues_when_allowed() {
472        let mut queues = AnnounceQueues::new();
473
474        // Queue an announce by exhausting bandwidth first
475        let _ = queues.gate_announce(
476            InterfaceId(1),
477            vec![0x01; 10],
478            [0xAA; 16],
479            2,
480            0.0,
481            0.0,
482            Some(1000),
483            0.02,
484        );
485        let _ = queues.gate_announce(
486            InterfaceId(1),
487            vec![0x02; 10],
488            [0xBB; 16],
489            3,
490            0.0,
491            0.0,
492            Some(1000),
493            0.02,
494        );
495
496        // Queue should have one entry
497        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
498
499        let mut interfaces = BTreeMap::new();
500        interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
501
502        // Process at a future time when bandwidth is available
503        let allowed_at = queues.queue_for(&InterfaceId(1)).unwrap().announce_allowed_at;
504        let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
505
506        assert_eq!(actions.len(), 1);
507        assert!(matches!(
508            &actions[0],
509            TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
510        ));
511
512        // Queue should be empty now
513        assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
514    }
515
516    #[test]
517    fn test_local_announce_bypasses_cap() {
518        // hops == 0 means locally-originated, should not be queued
519        // The caller (TransportEngine) is responsible for only calling gate_announce
520        // for hops > 0. We verify the gate_announce works for hops=0 too.
521        let mut queues = AnnounceQueues::new();
522
523        // Exhaust bandwidth
524        let _ = queues.gate_announce(
525            InterfaceId(1),
526            vec![0x01; 100],
527            [0xAA; 16],
528            2,
529            0.0,
530            0.0,
531            Some(1000),
532            0.02,
533        );
534
535        // hops=0 should still be queued by gate_announce since hops filtering
536        // is the caller's responsibility. gate_announce is agnostic.
537        let r = queues.gate_announce(
538            InterfaceId(1),
539            vec![0x02; 100],
540            [0xBB; 16],
541            0,
542            0.0,
543            0.0,
544            Some(1000),
545            0.02,
546        );
547        assert!(r.is_none()); // queued — caller must bypass for hops==0
548    }
549}