1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18 pub destination_hash: [u8; 16],
20 pub time: f64,
22 pub hops: u8,
24 pub emitted: f64,
26 pub raw: Vec<u8>,
28}
29
30#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33 pub entries: Vec<AnnounceQueueEntry>,
35 pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40 pub fn new() -> Self {
41 InterfaceAnnounceQueue {
42 entries: Vec::new(),
43 announce_allowed_at: 0.0,
44 }
45 }
46
47 pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51 if let Some(pos) = self
53 .entries
54 .iter()
55 .position(|e| e.destination_hash == entry.destination_hash)
56 {
57 let existing = &self.entries[pos];
58 if entry.hops < existing.hops
60 || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61 {
62 self.entries[pos] = entry;
63 }
64 } else {
66 if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68 self.entries.remove(0);
70 }
71 self.entries.push(entry);
72 }
73 }
74
75 pub fn remove_stale(&mut self, now: f64) {
77 self.entries
78 .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79 }
80
81 pub fn select_next(&self) -> Option<usize> {
84 if self.entries.is_empty() {
85 return None;
86 }
87 let mut best_idx = 0;
88 let mut best_hops = self.entries[0].hops;
89 let mut best_time = self.entries[0].time;
90
91 for (i, entry) in self.entries.iter().enumerate().skip(1) {
92 if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93 best_idx = i;
94 best_hops = entry.hops;
95 best_time = entry.time;
96 }
97 }
98 Some(best_idx)
99 }
100
101 pub fn is_allowed(&self, now: f64) -> bool {
103 now >= self.announce_allowed_at
104 }
105
106 pub fn calculate_next_allowed(
111 now: f64,
112 raw_len: usize,
113 bitrate: u64,
114 announce_cap: f64,
115 ) -> f64 {
116 if bitrate == 0 || announce_cap <= 0.0 {
117 return now; }
119 let bits = (raw_len * 8) as f64;
120 let time_to_send = bits / (bitrate as f64);
121 let delay = time_to_send / announce_cap;
122 now + delay
123 }
124}
125
126impl Default for InterfaceAnnounceQueue {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct AnnounceQueues {
135 queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
136 max_interfaces: usize,
137 interface_cap_drops: u64,
138}
139
140impl AnnounceQueues {
141 pub fn new(max_interfaces: usize) -> Self {
142 AnnounceQueues {
143 queues: BTreeMap::new(),
144 max_interfaces,
145 interface_cap_drops: 0,
146 }
147 }
148
149 #[allow(clippy::too_many_arguments)]
154 pub fn gate_announce(
155 &mut self,
156 interface: InterfaceId,
157 raw: Vec<u8>,
158 dest_hash: [u8; 16],
159 hops: u8,
160 emitted: f64,
161 now: f64,
162 bitrate: Option<u64>,
163 announce_cap: f64,
164 ) -> Option<TransportAction> {
165 let bitrate = match bitrate {
167 Some(br) if br > 0 => br,
168 _ => {
169 return Some(TransportAction::SendOnInterface { interface, raw });
170 }
171 };
172
173 if !self.queues.contains_key(&interface) && self.queues.len() >= self.max_interfaces {
174 self.interface_cap_drops = self.interface_cap_drops.saturating_add(1);
175 return None;
176 }
177
178 let queue = self.queues.entry(interface).or_default();
179
180 if queue.is_allowed(now) {
181 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
183 now,
184 raw.len(),
185 bitrate,
186 announce_cap,
187 );
188 Some(TransportAction::SendOnInterface { interface, raw })
189 } else {
190 queue.insert(AnnounceQueueEntry {
192 destination_hash: dest_hash,
193 time: now,
194 hops,
195 emitted,
196 raw,
197 });
198 None
199 }
200 }
201
202 pub fn process_queues(
205 &mut self,
206 now: f64,
207 interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
208 ) -> Vec<TransportAction> {
209 let mut actions = Vec::new();
210 let mut empty_queues = Vec::new();
211
212 for (iface_id, queue) in self.queues.iter_mut() {
213 queue.remove_stale(now);
215
216 while queue.is_allowed(now) {
218 if let Some(idx) = queue.select_next() {
219 let entry = queue.entries.remove(idx);
220
221 let (bitrate, announce_cap) = if let Some(info) = interfaces.get(iface_id) {
223 (info.bitrate.unwrap_or(0), info.announce_cap)
224 } else {
225 (0, constants::ANNOUNCE_CAP)
226 };
227
228 if bitrate > 0 {
229 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
230 now,
231 entry.raw.len(),
232 bitrate,
233 announce_cap,
234 );
235 }
236
237 actions.push(TransportAction::SendOnInterface {
238 interface: *iface_id,
239 raw: entry.raw,
240 });
241 } else {
242 break;
243 }
244 }
245
246 if queue.entries.is_empty() {
247 empty_queues.push(*iface_id);
248 }
249 }
250
251 for iface_id in empty_queues {
252 self.queues.remove(&iface_id);
253 }
254
255 actions
256 }
257
258 pub fn remove_interface(&mut self, interface: InterfaceId) -> bool {
260 self.queues.remove(&interface).is_some()
261 }
262
263 pub fn queue_count(&self) -> usize {
265 self.queues.len()
266 }
267
268 pub fn nonempty_queue_count(&self) -> usize {
270 self.queues
271 .values()
272 .filter(|queue| !queue.entries.is_empty())
273 .count()
274 }
275
276 pub fn total_queued_announces(&self) -> usize {
278 self.queues.values().map(|queue| queue.entries.len()).sum()
279 }
280
281 pub fn total_queued_bytes(&self) -> usize {
283 self.queues
284 .values()
285 .flat_map(|queue| queue.entries.iter())
286 .map(|entry| entry.raw.len())
287 .sum()
288 }
289
290 pub fn interface_cap_drop_count(&self) -> u64 {
292 self.interface_cap_drops
293 }
294
295 #[cfg(test)]
297 pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
298 self.queues.get(id)
299 }
300}
301
302impl Default for AnnounceQueues {
303 fn default() -> Self {
304 Self::new(1024)
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use alloc::string::String;
312
313 fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
314 AnnounceQueueEntry {
315 destination_hash: [dest; 16],
316 time,
317 hops,
318 emitted: time,
319 raw: vec![0x01, 0x02, 0x03],
320 }
321 }
322
323 fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
324 super::super::types::InterfaceInfo {
325 id: InterfaceId(id),
326 name: String::from("test"),
327 mode: crate::constants::MODE_FULL,
328 out_capable: true,
329 in_capable: true,
330 bitrate,
331 announce_rate_target: None,
332 announce_rate_grace: 0,
333 announce_rate_penalty: 0.0,
334 announce_cap: constants::ANNOUNCE_CAP,
335 is_local_client: false,
336 wants_tunnel: false,
337 tunnel_id: None,
338 mtu: constants::MTU as u32,
339 ingress_control: false,
340 ia_freq: 0.0,
341 started: 0.0,
342 }
343 }
344
345 #[test]
348 fn test_queue_entry_creation() {
349 let entry = make_entry(0xAA, 3, 1000.0);
350 assert_eq!(entry.hops, 3);
351 assert_eq!(entry.destination_hash, [0xAA; 16]);
352 }
353
354 #[test]
355 fn test_queue_insert_and_select() {
356 let mut queue = InterfaceAnnounceQueue::new();
357 queue.insert(make_entry(0x01, 3, 100.0));
358 queue.insert(make_entry(0x02, 1, 200.0));
359 queue.insert(make_entry(0x03, 2, 150.0));
360
361 let idx = queue.select_next().unwrap();
363 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
364 }
365
366 #[test]
367 fn test_queue_select_fifo_on_same_hops() {
368 let mut queue = InterfaceAnnounceQueue::new();
369 queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0)); let idx = queue.select_next().unwrap();
374 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
375 }
376
377 #[test]
378 fn test_queue_dedup_update() {
379 let mut queue = InterfaceAnnounceQueue::new();
380 queue.insert(make_entry(0x01, 3, 100.0));
381 assert_eq!(queue.entries.len(), 1);
382
383 queue.insert(make_entry(0x01, 1, 200.0));
385 assert_eq!(queue.entries.len(), 1);
386 assert_eq!(queue.entries[0].hops, 1);
387
388 queue.insert(make_entry(0x01, 5, 300.0));
390 assert_eq!(queue.entries.len(), 1);
391 assert_eq!(queue.entries[0].hops, 1);
392 }
393
394 #[test]
395 fn test_queue_stale_removal() {
396 let mut queue = InterfaceAnnounceQueue::new();
397 queue.insert(make_entry(0x01, 1, 100.0));
398 queue.insert(make_entry(0x02, 2, 200.0));
399
400 queue.remove_stale(86501.0);
402 assert_eq!(queue.entries.len(), 1);
403 assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
404 }
405
406 #[test]
407 fn test_queue_max_size() {
408 let mut queue = InterfaceAnnounceQueue::new();
409 for i in 0..constants::MAX_QUEUED_ANNOUNCES {
410 queue.insert(AnnounceQueueEntry {
411 destination_hash: {
412 let mut d = [0u8; 16];
413 d[0] = (i >> 8) as u8;
414 d[1] = i as u8;
415 d
416 },
417 time: i as f64,
418 hops: 1,
419 emitted: i as f64,
420 raw: vec![0x01],
421 });
422 }
423 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
424
425 queue.insert(make_entry(0xFF, 1, 99999.0));
427 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
428 }
429
430 #[test]
431 fn test_queue_empty_select() {
432 let queue = InterfaceAnnounceQueue::new();
433 assert!(queue.select_next().is_none());
434 }
435
436 #[test]
437 fn test_bandwidth_allowed() {
438 let mut queue = InterfaceAnnounceQueue::new();
439 assert!(queue.is_allowed(0.0));
440 assert!(queue.is_allowed(100.0));
441
442 queue.announce_allowed_at = 200.0;
443 assert!(!queue.is_allowed(100.0));
444 assert!(!queue.is_allowed(199.9));
445 assert!(queue.is_allowed(200.0));
446 assert!(queue.is_allowed(300.0));
447 }
448
449 #[test]
450 fn test_calculate_next_allowed() {
451 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
455 assert!((next - 1040.0).abs() < 0.001);
456 }
457
458 #[test]
459 fn test_calculate_next_allowed_zero_bitrate() {
460 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
461 assert_eq!(next, 1000.0); }
463
464 #[test]
467 fn test_gate_announce_no_bitrate_immediate() {
468 let mut queues = AnnounceQueues::new(1024);
469 let result = queues.gate_announce(
470 InterfaceId(1),
471 vec![0x01, 0x02, 0x03],
472 [0xAA; 16],
473 2,
474 1000.0,
475 1000.0,
476 None, 0.02,
478 );
479 assert!(result.is_some());
480 assert!(matches!(
481 result.unwrap(),
482 TransportAction::SendOnInterface { .. }
483 ));
484 }
485
486 #[test]
487 fn test_gate_announce_bandwidth_available() {
488 let mut queues = AnnounceQueues::new(1024);
489 let result = queues.gate_announce(
490 InterfaceId(1),
491 vec![0x01; 100],
492 [0xBB; 16],
493 2,
494 1000.0,
495 1000.0,
496 Some(10000), 0.02,
498 );
499 assert!(result.is_some());
501
502 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
504 assert!(queue.announce_allowed_at > 1000.0);
505 }
506
507 #[test]
508 fn test_gate_announce_bandwidth_exhausted_queues() {
509 let mut queues = AnnounceQueues::new(1024);
510
511 let r1 = queues.gate_announce(
513 InterfaceId(1),
514 vec![0x01; 100],
515 [0xAA; 16],
516 2,
517 1000.0,
518 1000.0,
519 Some(1000), 0.02,
521 );
522 assert!(r1.is_some());
523
524 let r2 = queues.gate_announce(
526 InterfaceId(1),
527 vec![0x02; 100],
528 [0xBB; 16],
529 3,
530 1000.0,
531 1000.0,
532 Some(1000),
533 0.02,
534 );
535 assert!(r2.is_none()); let queue = queues.queue_for(&InterfaceId(1)).unwrap();
538 assert_eq!(queue.entries.len(), 1);
539 }
540
541 #[test]
542 fn test_process_queues_dequeues_when_allowed() {
543 let mut queues = AnnounceQueues::new(1024);
544
545 let _ = queues.gate_announce(
547 InterfaceId(1),
548 vec![0x01; 10],
549 [0xAA; 16],
550 2,
551 0.0,
552 0.0,
553 Some(1000),
554 0.02,
555 );
556 let _ = queues.gate_announce(
557 InterfaceId(1),
558 vec![0x02; 10],
559 [0xBB; 16],
560 3,
561 0.0,
562 0.0,
563 Some(1000),
564 0.02,
565 );
566
567 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
569
570 let mut interfaces = BTreeMap::new();
571 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
572
573 let allowed_at = queues
575 .queue_for(&InterfaceId(1))
576 .unwrap()
577 .announce_allowed_at;
578 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
579
580 assert_eq!(actions.len(), 1);
581 assert!(matches!(
582 &actions[0],
583 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
584 ));
585
586 assert!(queues.queue_for(&InterfaceId(1)).is_none());
588 }
589
590 #[test]
591 fn test_local_announce_bypasses_cap() {
592 let mut queues = AnnounceQueues::new(1024);
596
597 let _ = queues.gate_announce(
599 InterfaceId(1),
600 vec![0x01; 100],
601 [0xAA; 16],
602 2,
603 0.0,
604 0.0,
605 Some(1000),
606 0.02,
607 );
608
609 let r = queues.gate_announce(
612 InterfaceId(1),
613 vec![0x02; 100],
614 [0xBB; 16],
615 0,
616 0.0,
617 0.0,
618 Some(1000),
619 0.02,
620 );
621 assert!(r.is_none()); }
623
624 #[test]
625 fn test_remove_interface_queue() {
626 let mut queues = AnnounceQueues::new(1024);
627 let _ = queues.gate_announce(
628 InterfaceId(1),
629 vec![0x01; 100],
630 [0xAA; 16],
631 2,
632 0.0,
633 0.0,
634 Some(1000),
635 0.02,
636 );
637 let _ = queues.gate_announce(
638 InterfaceId(1),
639 vec![0x02; 100],
640 [0xBB; 16],
641 3,
642 0.0,
643 0.0,
644 Some(1000),
645 0.02,
646 );
647
648 assert!(queues.queue_for(&InterfaceId(1)).is_some());
649 assert!(queues.remove_interface(InterfaceId(1)));
650 assert!(queues.queue_for(&InterfaceId(1)).is_none());
651 assert!(!queues.remove_interface(InterfaceId(1)));
652 }
653
654 #[test]
655 fn test_process_queues_prunes_empty_queue() {
656 let mut queues = AnnounceQueues::new(1024);
657
658 let _ = queues.gate_announce(
659 InterfaceId(1),
660 vec![0x01; 10],
661 [0xAA; 16],
662 2,
663 0.0,
664 0.0,
665 Some(1000),
666 0.02,
667 );
668 let _ = queues.gate_announce(
669 InterfaceId(1),
670 vec![0x02; 10],
671 [0xBB; 16],
672 3,
673 0.0,
674 0.0,
675 Some(1000),
676 0.02,
677 );
678
679 let mut interfaces = BTreeMap::new();
680 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
681 let allowed_at = queues
682 .queue_for(&InterfaceId(1))
683 .unwrap()
684 .announce_allowed_at;
685
686 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
687 assert_eq!(actions.len(), 1);
688 assert!(queues.queue_for(&InterfaceId(1)).is_none());
689 assert_eq!(queues.queue_count(), 0);
690 }
691
692 #[test]
693 fn test_process_queues_keeps_nonempty_queue() {
694 let mut queues = AnnounceQueues::new(1024);
695 let _ = queues.gate_announce(
696 InterfaceId(1),
697 vec![0x01; 100],
698 [0xAA; 16],
699 2,
700 0.0,
701 0.0,
702 Some(1000),
703 0.02,
704 );
705 let _ = queues.gate_announce(
706 InterfaceId(1),
707 vec![0x02; 100],
708 [0xBB; 16],
709 3,
710 0.0,
711 0.0,
712 Some(1000),
713 0.02,
714 );
715 let _ = queues.gate_announce(
716 InterfaceId(1),
717 vec![0x03; 100],
718 [0xCC; 16],
719 4,
720 0.0,
721 0.0,
722 Some(1000),
723 0.02,
724 );
725
726 let mut interfaces = BTreeMap::new();
727 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
728 let allowed_at = queues
729 .queue_for(&InterfaceId(1))
730 .unwrap()
731 .announce_allowed_at;
732
733 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
734 assert_eq!(actions.len(), 1);
735 assert!(queues.queue_for(&InterfaceId(1)).is_some());
736 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
737 }
738
739 #[test]
740 fn test_gate_announce_refuses_new_interface_when_at_capacity() {
741 let mut queues = AnnounceQueues::new(1);
742
743 let _ = queues.gate_announce(
744 InterfaceId(1),
745 vec![0x01; 100],
746 [0xAA; 16],
747 2,
748 0.0,
749 0.0,
750 Some(1000),
751 0.02,
752 );
753 let second = queues.gate_announce(
754 InterfaceId(1),
755 vec![0x02; 100],
756 [0xBB; 16],
757 3,
758 0.0,
759 0.0,
760 Some(1000),
761 0.02,
762 );
763 assert!(second.is_none());
764 assert_eq!(queues.queue_count(), 1);
765
766 let rejected = queues.gate_announce(
767 InterfaceId(2),
768 vec![0x03; 100],
769 [0xCC; 16],
770 4,
771 0.0,
772 0.0,
773 Some(1000),
774 0.02,
775 );
776 assert!(rejected.is_none());
777 assert_eq!(queues.queue_count(), 1);
778 assert!(queues.queue_for(&InterfaceId(2)).is_none());
779 assert_eq!(queues.interface_cap_drop_count(), 1);
780 }
781
782 #[test]
783 fn test_gate_announce_allows_existing_queue_when_at_capacity() {
784 let mut queues = AnnounceQueues::new(1);
785
786 let _ = queues.gate_announce(
787 InterfaceId(1),
788 vec![0x01; 100],
789 [0xAA; 16],
790 2,
791 0.0,
792 0.0,
793 Some(1000),
794 0.02,
795 );
796 let queued = queues.gate_announce(
797 InterfaceId(1),
798 vec![0x02; 100],
799 [0xBB; 16],
800 3,
801 0.0,
802 0.0,
803 Some(1000),
804 0.02,
805 );
806 assert!(queued.is_none());
807 assert_eq!(queues.queue_count(), 1);
808 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
809 assert_eq!(queues.interface_cap_drop_count(), 0);
810 }
811}