1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{AirtimeProfile, InterfaceId, PacketBytes, TransportAction};
13use crate::constants;
14
15#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18 pub destination_hash: [u8; 16],
20 pub time: f64,
22 pub hops: u8,
24 pub emitted: f64,
26 pub raw: PacketBytes,
28}
29
30#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33 pub entries: Vec<AnnounceQueueEntry>,
35 pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40 pub fn new() -> Self {
41 InterfaceAnnounceQueue {
42 entries: Vec::new(),
43 announce_allowed_at: 0.0,
44 }
45 }
46
47 pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51 if let Some(pos) = self
53 .entries
54 .iter()
55 .position(|e| e.destination_hash == entry.destination_hash)
56 {
57 let existing = &self.entries[pos];
58 if entry.hops < existing.hops
60 || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61 {
62 self.entries[pos] = entry;
63 }
64 } else {
66 if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68 self.entries.remove(0);
70 }
71 self.entries.push(entry);
72 }
73 }
74
75 pub fn remove_stale(&mut self, now: f64) {
77 self.entries
78 .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79 }
80
81 pub fn select_next(&self) -> Option<usize> {
84 if self.entries.is_empty() {
85 return None;
86 }
87 let mut best_idx = 0;
88 let mut best_hops = self.entries[0].hops;
89 let mut best_time = self.entries[0].time;
90
91 for (i, entry) in self.entries.iter().enumerate().skip(1) {
92 if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93 best_idx = i;
94 best_hops = entry.hops;
95 best_time = entry.time;
96 }
97 }
98 Some(best_idx)
99 }
100
101 pub fn is_allowed(&self, now: f64) -> bool {
103 now >= self.announce_allowed_at
104 }
105
106 pub fn calculate_next_allowed(
111 now: f64,
112 raw_len: usize,
113 bitrate: u64,
114 airtime_profile: Option<AirtimeProfile>,
115 announce_cap: f64,
116 ) -> f64 {
117 if announce_cap <= 0.0 {
118 return now; }
120
121 let time_to_send = airtime_profile
122 .map(|profile| profile.transmit_time_secs(raw_len))
123 .unwrap_or_else(|| {
124 if bitrate == 0 {
125 0.0
126 } else {
127 let bits = (raw_len * 8) as f64;
128 bits / (bitrate as f64)
129 }
130 });
131 if time_to_send <= 0.0 {
132 return now;
133 }
134 let delay = time_to_send / announce_cap;
135 now + delay
136 }
137}
138
139impl Default for InterfaceAnnounceQueue {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct AnnounceQueues {
148 queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
149 max_interfaces: usize,
150 interface_cap_drops: u64,
151}
152
153impl AnnounceQueues {
154 pub fn new(max_interfaces: usize) -> Self {
155 AnnounceQueues {
156 queues: BTreeMap::new(),
157 max_interfaces,
158 interface_cap_drops: 0,
159 }
160 }
161
162 #[allow(clippy::too_many_arguments)]
167 pub fn gate_announce(
168 &mut self,
169 interface: InterfaceId,
170 raw: PacketBytes,
171 dest_hash: [u8; 16],
172 hops: u8,
173 emitted: f64,
174 now: f64,
175 bitrate: Option<u64>,
176 airtime_profile: Option<AirtimeProfile>,
177 announce_cap: f64,
178 ) -> Option<TransportAction> {
179 let bitrate = match bitrate {
181 Some(br) if br > 0 => br,
182 _ if airtime_profile.is_none() => {
183 return Some(TransportAction::SendOnInterface { interface, raw });
184 }
185 _ => 0,
186 };
187
188 if !self.queues.contains_key(&interface) && self.queues.len() >= self.max_interfaces {
189 self.interface_cap_drops = self.interface_cap_drops.saturating_add(1);
190 return None;
191 }
192
193 let queue = self.queues.entry(interface).or_default();
194
195 if queue.is_allowed(now) {
196 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
198 now,
199 raw.len(),
200 bitrate,
201 airtime_profile,
202 announce_cap,
203 );
204 Some(TransportAction::SendOnInterface { interface, raw })
205 } else {
206 queue.insert(AnnounceQueueEntry {
208 destination_hash: dest_hash,
209 time: now,
210 hops,
211 emitted,
212 raw,
213 });
214 None
215 }
216 }
217
218 pub fn process_queues(
221 &mut self,
222 now: f64,
223 interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
224 ) -> Vec<TransportAction> {
225 let mut actions = Vec::new();
226 let mut empty_queues = Vec::new();
227
228 for (iface_id, queue) in self.queues.iter_mut() {
229 queue.remove_stale(now);
231
232 while queue.is_allowed(now) {
234 if let Some(idx) = queue.select_next() {
235 let entry = queue.entries.remove(idx);
236
237 let (bitrate, airtime_profile, announce_cap) =
239 if let Some(info) = interfaces.get(iface_id) {
240 (
241 info.bitrate.unwrap_or(0),
242 info.airtime_profile,
243 info.announce_cap,
244 )
245 } else {
246 (0, None, constants::ANNOUNCE_CAP)
247 };
248
249 if bitrate > 0 || airtime_profile.is_some() {
250 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
251 now,
252 entry.raw.len(),
253 bitrate,
254 airtime_profile,
255 announce_cap,
256 );
257 } else {
258 queue.announce_allowed_at = now;
259 }
260
261 actions.push(TransportAction::SendOnInterface {
262 interface: *iface_id,
263 raw: entry.raw,
264 });
265 } else {
266 break;
267 }
268 }
269
270 if queue.entries.is_empty() {
271 empty_queues.push(*iface_id);
272 }
273 }
274
275 for iface_id in empty_queues {
276 self.queues.remove(&iface_id);
277 }
278
279 actions
280 }
281
282 pub fn remove_interface(&mut self, interface: InterfaceId) -> bool {
284 self.queues.remove(&interface).is_some()
285 }
286
287 pub fn queue_count(&self) -> usize {
289 self.queues.len()
290 }
291
292 pub fn nonempty_queue_count(&self) -> usize {
294 self.queues
295 .values()
296 .filter(|queue| !queue.entries.is_empty())
297 .count()
298 }
299
300 pub fn total_queued_announces(&self) -> usize {
302 self.queues.values().map(|queue| queue.entries.len()).sum()
303 }
304
305 pub fn total_queued_bytes(&self) -> usize {
307 self.queues
308 .values()
309 .flat_map(|queue| queue.entries.iter())
310 .map(|entry| entry.raw.len())
311 .sum()
312 }
313
314 pub fn interface_cap_drop_count(&self) -> u64 {
316 self.interface_cap_drops
317 }
318
319 #[cfg(test)]
321 pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
322 self.queues.get(id)
323 }
324}
325
326impl Default for AnnounceQueues {
327 fn default() -> Self {
328 Self::new(1024)
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use alloc::string::String;
336
337 fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
338 AnnounceQueueEntry {
339 destination_hash: [dest; 16],
340 time,
341 hops,
342 emitted: time,
343 raw: vec![0x01, 0x02, 0x03].into(),
344 }
345 }
346
347 fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
348 super::super::types::InterfaceInfo {
349 id: InterfaceId(id),
350 name: String::from("test"),
351 mode: crate::constants::MODE_FULL,
352 out_capable: true,
353 in_capable: true,
354 bitrate,
355 airtime_profile: None,
356 announce_rate_target: None,
357 announce_rate_grace: 0,
358 announce_rate_penalty: 0.0,
359 announce_cap: constants::ANNOUNCE_CAP,
360 is_local_client: false,
361 wants_tunnel: false,
362 tunnel_id: None,
363 mtu: constants::MTU as u32,
364 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
365 ia_freq: 0.0,
366 started: 0.0,
367 }
368 }
369
370 #[test]
373 fn test_queue_entry_creation() {
374 let entry = make_entry(0xAA, 3, 1000.0);
375 assert_eq!(entry.hops, 3);
376 assert_eq!(entry.destination_hash, [0xAA; 16]);
377 }
378
379 #[test]
380 fn test_queue_insert_and_select() {
381 let mut queue = InterfaceAnnounceQueue::new();
382 queue.insert(make_entry(0x01, 3, 100.0));
383 queue.insert(make_entry(0x02, 1, 200.0));
384 queue.insert(make_entry(0x03, 2, 150.0));
385
386 let idx = queue.select_next().unwrap();
388 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
389 }
390
391 #[test]
392 fn test_queue_select_fifo_on_same_hops() {
393 let mut queue = InterfaceAnnounceQueue::new();
394 queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0)); let idx = queue.select_next().unwrap();
399 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
400 }
401
402 #[test]
403 fn test_queue_dedup_update() {
404 let mut queue = InterfaceAnnounceQueue::new();
405 queue.insert(make_entry(0x01, 3, 100.0));
406 assert_eq!(queue.entries.len(), 1);
407
408 queue.insert(make_entry(0x01, 1, 200.0));
410 assert_eq!(queue.entries.len(), 1);
411 assert_eq!(queue.entries[0].hops, 1);
412
413 queue.insert(make_entry(0x01, 5, 300.0));
415 assert_eq!(queue.entries.len(), 1);
416 assert_eq!(queue.entries[0].hops, 1);
417 }
418
419 #[test]
420 fn test_queue_stale_removal() {
421 let mut queue = InterfaceAnnounceQueue::new();
422 queue.insert(make_entry(0x01, 1, 100.0));
423 queue.insert(make_entry(0x02, 2, 200.0));
424
425 queue.remove_stale(86501.0);
427 assert_eq!(queue.entries.len(), 1);
428 assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
429 }
430
431 #[test]
432 fn test_queue_max_size() {
433 let mut queue = InterfaceAnnounceQueue::new();
434 for i in 0..constants::MAX_QUEUED_ANNOUNCES {
435 queue.insert(AnnounceQueueEntry {
436 destination_hash: {
437 let mut d = [0u8; 16];
438 d[0] = (i >> 8) as u8;
439 d[1] = i as u8;
440 d
441 },
442 time: i as f64,
443 hops: 1,
444 emitted: i as f64,
445 raw: vec![0x01].into(),
446 });
447 }
448 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
449
450 queue.insert(make_entry(0xFF, 1, 99999.0));
452 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
453 }
454
455 #[test]
456 fn test_queue_empty_select() {
457 let queue = InterfaceAnnounceQueue::new();
458 assert!(queue.select_next().is_none());
459 }
460
461 #[test]
462 fn test_bandwidth_allowed() {
463 let mut queue = InterfaceAnnounceQueue::new();
464 assert!(queue.is_allowed(0.0));
465 assert!(queue.is_allowed(100.0));
466
467 queue.announce_allowed_at = 200.0;
468 assert!(!queue.is_allowed(100.0));
469 assert!(!queue.is_allowed(199.9));
470 assert!(queue.is_allowed(200.0));
471 assert!(queue.is_allowed(300.0));
472 }
473
474 #[test]
475 fn test_calculate_next_allowed() {
476 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, None, 0.02);
480 assert!((next - 1040.0).abs() < 0.001);
481 }
482
483 #[test]
484 fn test_calculate_next_allowed_zero_bitrate() {
485 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, None, 0.02);
486 assert_eq!(next, 1000.0); }
488
489 #[test]
490 fn test_calculate_next_allowed_uses_lora_airtime() {
491 let profile = AirtimeProfile::Lora {
492 bandwidth: 125_000,
493 spreading_factor: 8,
494 coding_rate: 5,
495 preamble_symbols: 8,
496 explicit_header: true,
497 crc: true,
498 };
499
500 let next =
501 InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, Some(profile), 0.02);
502
503 assert!((next - 1015.3856).abs() < 0.0001);
507 }
508
509 #[test]
512 fn test_gate_announce_no_bitrate_immediate() {
513 let mut queues = AnnounceQueues::new(1024);
514 let result = queues.gate_announce(
515 InterfaceId(1),
516 vec![0x01, 0x02, 0x03].into(),
517 [0xAA; 16],
518 2,
519 1000.0,
520 1000.0,
521 None, None,
523 0.02,
524 );
525 assert!(result.is_some());
526 assert!(matches!(
527 result.unwrap(),
528 TransportAction::SendOnInterface { .. }
529 ));
530 }
531
532 #[test]
533 fn test_gate_announce_uses_airtime_profile_without_bitrate() {
534 let mut queues = AnnounceQueues::new(1024);
535 let profile = AirtimeProfile::Lora {
536 bandwidth: 125_000,
537 spreading_factor: 8,
538 coding_rate: 5,
539 preamble_symbols: 8,
540 explicit_header: true,
541 crc: true,
542 };
543
544 let first = queues.gate_announce(
545 InterfaceId(1),
546 vec![0x01; 100].into(),
547 [0xAA; 16],
548 2,
549 1000.0,
550 1000.0,
551 None,
552 Some(profile),
553 0.02,
554 );
555 assert!(first.is_some());
556
557 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
558 assert!((queue.announce_allowed_at - 1015.3856).abs() < 0.0001);
559
560 let second = queues.gate_announce(
561 InterfaceId(1),
562 vec![0x02; 100].into(),
563 [0xBB; 16],
564 2,
565 1000.0,
566 1000.0,
567 None,
568 Some(profile),
569 0.02,
570 );
571 assert!(second.is_none());
572 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
573 }
574
575 #[test]
576 fn test_gate_announce_bandwidth_available() {
577 let mut queues = AnnounceQueues::new(1024);
578 let result = queues.gate_announce(
579 InterfaceId(1),
580 vec![0x01; 100].into(),
581 [0xBB; 16],
582 2,
583 1000.0,
584 1000.0,
585 Some(10000), None,
587 0.02,
588 );
589 assert!(result.is_some());
591
592 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
594 assert!(queue.announce_allowed_at > 1000.0);
595 }
596
597 #[test]
598 fn test_gate_announce_bandwidth_exhausted_queues() {
599 let mut queues = AnnounceQueues::new(1024);
600
601 let r1 = queues.gate_announce(
603 InterfaceId(1),
604 vec![0x01; 100].into(),
605 [0xAA; 16],
606 2,
607 1000.0,
608 1000.0,
609 Some(1000), None,
611 0.02,
612 );
613 assert!(r1.is_some());
614
615 let r2 = queues.gate_announce(
617 InterfaceId(1),
618 vec![0x02; 100].into(),
619 [0xBB; 16],
620 3,
621 1000.0,
622 1000.0,
623 Some(1000),
624 None,
625 0.02,
626 );
627 assert!(r2.is_none()); let queue = queues.queue_for(&InterfaceId(1)).unwrap();
630 assert_eq!(queue.entries.len(), 1);
631 }
632
633 #[test]
634 fn test_process_queues_dequeues_when_allowed() {
635 let mut queues = AnnounceQueues::new(1024);
636
637 let _ = queues.gate_announce(
639 InterfaceId(1),
640 vec![0x01; 10].into(),
641 [0xAA; 16],
642 2,
643 0.0,
644 0.0,
645 Some(1000),
646 None,
647 0.02,
648 );
649 let _ = queues.gate_announce(
650 InterfaceId(1),
651 vec![0x02; 10].into(),
652 [0xBB; 16],
653 3,
654 0.0,
655 0.0,
656 Some(1000),
657 None,
658 0.02,
659 );
660
661 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
663
664 let mut interfaces = BTreeMap::new();
665 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
666
667 let allowed_at = queues
669 .queue_for(&InterfaceId(1))
670 .unwrap()
671 .announce_allowed_at;
672 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
673
674 assert_eq!(actions.len(), 1);
675 assert!(matches!(
676 &actions[0],
677 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
678 ));
679
680 assert!(queues.queue_for(&InterfaceId(1)).is_none());
682 }
683
684 #[test]
685 fn test_local_announce_bypasses_cap() {
686 let mut queues = AnnounceQueues::new(1024);
690
691 let _ = queues.gate_announce(
693 InterfaceId(1),
694 vec![0x01; 100].into(),
695 [0xAA; 16],
696 2,
697 0.0,
698 0.0,
699 Some(1000),
700 None,
701 0.02,
702 );
703
704 let r = queues.gate_announce(
707 InterfaceId(1),
708 vec![0x02; 100].into(),
709 [0xBB; 16],
710 0,
711 0.0,
712 0.0,
713 Some(1000),
714 None,
715 0.02,
716 );
717 assert!(r.is_none()); }
719
720 #[test]
721 fn test_remove_interface_queue() {
722 let mut queues = AnnounceQueues::new(1024);
723 let _ = queues.gate_announce(
724 InterfaceId(1),
725 vec![0x01; 100].into(),
726 [0xAA; 16],
727 2,
728 0.0,
729 0.0,
730 Some(1000),
731 None,
732 0.02,
733 );
734 let _ = queues.gate_announce(
735 InterfaceId(1),
736 vec![0x02; 100].into(),
737 [0xBB; 16],
738 3,
739 0.0,
740 0.0,
741 Some(1000),
742 None,
743 0.02,
744 );
745
746 assert!(queues.queue_for(&InterfaceId(1)).is_some());
747 assert!(queues.remove_interface(InterfaceId(1)));
748 assert!(queues.queue_for(&InterfaceId(1)).is_none());
749 assert!(!queues.remove_interface(InterfaceId(1)));
750 }
751
752 #[test]
753 fn test_process_queues_prunes_empty_queue() {
754 let mut queues = AnnounceQueues::new(1024);
755
756 let _ = queues.gate_announce(
757 InterfaceId(1),
758 vec![0x01; 10].into(),
759 [0xAA; 16],
760 2,
761 0.0,
762 0.0,
763 Some(1000),
764 None,
765 0.02,
766 );
767 let _ = queues.gate_announce(
768 InterfaceId(1),
769 vec![0x02; 10].into(),
770 [0xBB; 16],
771 3,
772 0.0,
773 0.0,
774 Some(1000),
775 None,
776 0.02,
777 );
778
779 let mut interfaces = BTreeMap::new();
780 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
781 let allowed_at = queues
782 .queue_for(&InterfaceId(1))
783 .unwrap()
784 .announce_allowed_at;
785
786 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
787 assert_eq!(actions.len(), 1);
788 assert!(queues.queue_for(&InterfaceId(1)).is_none());
789 assert_eq!(queues.queue_count(), 0);
790 }
791
792 #[test]
793 fn test_process_queues_keeps_nonempty_queue() {
794 let mut queues = AnnounceQueues::new(1024);
795 let _ = queues.gate_announce(
796 InterfaceId(1),
797 vec![0x01; 100].into(),
798 [0xAA; 16],
799 2,
800 0.0,
801 0.0,
802 Some(1000),
803 None,
804 0.02,
805 );
806 let _ = queues.gate_announce(
807 InterfaceId(1),
808 vec![0x02; 100].into(),
809 [0xBB; 16],
810 3,
811 0.0,
812 0.0,
813 Some(1000),
814 None,
815 0.02,
816 );
817 let _ = queues.gate_announce(
818 InterfaceId(1),
819 vec![0x03; 100].into(),
820 [0xCC; 16],
821 4,
822 0.0,
823 0.0,
824 Some(1000),
825 None,
826 0.02,
827 );
828
829 let mut interfaces = BTreeMap::new();
830 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
831 let allowed_at = queues
832 .queue_for(&InterfaceId(1))
833 .unwrap()
834 .announce_allowed_at;
835
836 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
837 assert_eq!(actions.len(), 1);
838 assert!(queues.queue_for(&InterfaceId(1)).is_some());
839 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
840 }
841
842 #[test]
843 fn test_gate_announce_refuses_new_interface_when_at_capacity() {
844 let mut queues = AnnounceQueues::new(1);
845
846 let _ = queues.gate_announce(
847 InterfaceId(1),
848 vec![0x01; 100].into(),
849 [0xAA; 16],
850 2,
851 0.0,
852 0.0,
853 Some(1000),
854 None,
855 0.02,
856 );
857 let second = queues.gate_announce(
858 InterfaceId(1),
859 vec![0x02; 100].into(),
860 [0xBB; 16],
861 3,
862 0.0,
863 0.0,
864 Some(1000),
865 None,
866 0.02,
867 );
868 assert!(second.is_none());
869 assert_eq!(queues.queue_count(), 1);
870
871 let rejected = queues.gate_announce(
872 InterfaceId(2),
873 vec![0x03; 100].into(),
874 [0xCC; 16],
875 4,
876 0.0,
877 0.0,
878 Some(1000),
879 None,
880 0.02,
881 );
882 assert!(rejected.is_none());
883 assert_eq!(queues.queue_count(), 1);
884 assert!(queues.queue_for(&InterfaceId(2)).is_none());
885 assert_eq!(queues.interface_cap_drop_count(), 1);
886 }
887
888 #[test]
889 fn test_gate_announce_allows_existing_queue_when_at_capacity() {
890 let mut queues = AnnounceQueues::new(1);
891
892 let _ = queues.gate_announce(
893 InterfaceId(1),
894 vec![0x01; 100].into(),
895 [0xAA; 16],
896 2,
897 0.0,
898 0.0,
899 Some(1000),
900 None,
901 0.02,
902 );
903 let queued = queues.gate_announce(
904 InterfaceId(1),
905 vec![0x02; 100].into(),
906 [0xBB; 16],
907 3,
908 0.0,
909 0.0,
910 Some(1000),
911 None,
912 0.02,
913 );
914 assert!(queued.is_none());
915 assert_eq!(queues.queue_count(), 1);
916 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
917 assert_eq!(queues.interface_cap_drop_count(), 0);
918 }
919}