1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18 pub destination_hash: [u8; 16],
20 pub time: f64,
22 pub hops: u8,
24 pub emitted: f64,
26 pub raw: Vec<u8>,
28}
29
30#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33 pub entries: Vec<AnnounceQueueEntry>,
35 pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40 pub fn new() -> Self {
41 InterfaceAnnounceQueue {
42 entries: Vec::new(),
43 announce_allowed_at: 0.0,
44 }
45 }
46
47 pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51 if let Some(pos) = self
53 .entries
54 .iter()
55 .position(|e| e.destination_hash == entry.destination_hash)
56 {
57 let existing = &self.entries[pos];
58 if entry.hops < existing.hops
60 || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61 {
62 self.entries[pos] = entry;
63 }
64 } else {
66 if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68 self.entries.remove(0);
70 }
71 self.entries.push(entry);
72 }
73 }
74
75 pub fn remove_stale(&mut self, now: f64) {
77 self.entries
78 .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79 }
80
81 pub fn select_next(&self) -> Option<usize> {
84 if self.entries.is_empty() {
85 return None;
86 }
87 let mut best_idx = 0;
88 let mut best_hops = self.entries[0].hops;
89 let mut best_time = self.entries[0].time;
90
91 for (i, entry) in self.entries.iter().enumerate().skip(1) {
92 if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93 best_idx = i;
94 best_hops = entry.hops;
95 best_time = entry.time;
96 }
97 }
98 Some(best_idx)
99 }
100
101 pub fn is_allowed(&self, now: f64) -> bool {
103 now >= self.announce_allowed_at
104 }
105
106 pub fn calculate_next_allowed(
111 now: f64,
112 raw_len: usize,
113 bitrate: u64,
114 announce_cap: f64,
115 ) -> f64 {
116 if bitrate == 0 || announce_cap <= 0.0 {
117 return now; }
119 let bits = (raw_len * 8) as f64;
120 let time_to_send = bits / (bitrate as f64);
121 let delay = time_to_send / announce_cap;
122 now + delay
123 }
124}
125
126impl Default for InterfaceAnnounceQueue {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct AnnounceQueues {
135 queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
136}
137
138impl AnnounceQueues {
139 pub fn new() -> Self {
140 AnnounceQueues {
141 queues: BTreeMap::new(),
142 }
143 }
144
145 #[allow(clippy::too_many_arguments)]
150 pub fn gate_announce(
151 &mut self,
152 interface: InterfaceId,
153 raw: Vec<u8>,
154 dest_hash: [u8; 16],
155 hops: u8,
156 emitted: f64,
157 now: f64,
158 bitrate: Option<u64>,
159 announce_cap: f64,
160 ) -> Option<TransportAction> {
161 let queue = self
162 .queues
163 .entry(interface)
164 .or_default();
165
166 let bitrate = match bitrate {
168 Some(br) if br > 0 => br,
169 _ => {
170 return Some(TransportAction::SendOnInterface { interface, raw });
171 }
172 };
173
174 if queue.is_allowed(now) {
175 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
177 now,
178 raw.len(),
179 bitrate,
180 announce_cap,
181 );
182 Some(TransportAction::SendOnInterface { interface, raw })
183 } else {
184 queue.insert(AnnounceQueueEntry {
186 destination_hash: dest_hash,
187 time: now,
188 hops,
189 emitted,
190 raw,
191 });
192 None
193 }
194 }
195
196 pub fn process_queues(
199 &mut self,
200 now: f64,
201 interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
202 ) -> Vec<TransportAction> {
203 let mut actions = Vec::new();
204
205 for (iface_id, queue) in self.queues.iter_mut() {
206 queue.remove_stale(now);
208
209 while queue.is_allowed(now) {
211 if let Some(idx) = queue.select_next() {
212 let entry = queue.entries.remove(idx);
213
214 let (bitrate, announce_cap) = if let Some(info) = interfaces.get(iface_id) {
216 (info.bitrate.unwrap_or(0), info.announce_cap)
217 } else {
218 (0, constants::ANNOUNCE_CAP)
219 };
220
221 if bitrate > 0 {
222 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
223 now,
224 entry.raw.len(),
225 bitrate,
226 announce_cap,
227 );
228 }
229
230 actions.push(TransportAction::SendOnInterface {
231 interface: *iface_id,
232 raw: entry.raw,
233 });
234 } else {
235 break;
236 }
237 }
238 }
239
240 actions
241 }
242
243 #[cfg(test)]
245 pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
246 self.queues.get(id)
247 }
248}
249
250impl Default for AnnounceQueues {
251 fn default() -> Self {
252 Self::new()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use alloc::string::String;
260
261 fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
262 AnnounceQueueEntry {
263 destination_hash: [dest; 16],
264 time,
265 hops,
266 emitted: time,
267 raw: vec![0x01, 0x02, 0x03],
268 }
269 }
270
271 fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
272 super::super::types::InterfaceInfo {
273 id: InterfaceId(id),
274 name: String::from("test"),
275 mode: crate::constants::MODE_FULL,
276 out_capable: true,
277 in_capable: true,
278 bitrate,
279 announce_rate_target: None,
280 announce_rate_grace: 0,
281 announce_rate_penalty: 0.0,
282 announce_cap: constants::ANNOUNCE_CAP,
283 is_local_client: false,
284 wants_tunnel: false,
285 tunnel_id: None,
286 mtu: constants::MTU as u32,
287 ingress_control: false,
288 ia_freq: 0.0,
289 started: 0.0,
290 }
291 }
292
293 #[test]
296 fn test_queue_entry_creation() {
297 let entry = make_entry(0xAA, 3, 1000.0);
298 assert_eq!(entry.hops, 3);
299 assert_eq!(entry.destination_hash, [0xAA; 16]);
300 }
301
302 #[test]
303 fn test_queue_insert_and_select() {
304 let mut queue = InterfaceAnnounceQueue::new();
305 queue.insert(make_entry(0x01, 3, 100.0));
306 queue.insert(make_entry(0x02, 1, 200.0));
307 queue.insert(make_entry(0x03, 2, 150.0));
308
309 let idx = queue.select_next().unwrap();
311 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
312 }
313
314 #[test]
315 fn test_queue_select_fifo_on_same_hops() {
316 let mut queue = InterfaceAnnounceQueue::new();
317 queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0)); let idx = queue.select_next().unwrap();
322 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
323 }
324
325 #[test]
326 fn test_queue_dedup_update() {
327 let mut queue = InterfaceAnnounceQueue::new();
328 queue.insert(make_entry(0x01, 3, 100.0));
329 assert_eq!(queue.entries.len(), 1);
330
331 queue.insert(make_entry(0x01, 1, 200.0));
333 assert_eq!(queue.entries.len(), 1);
334 assert_eq!(queue.entries[0].hops, 1);
335
336 queue.insert(make_entry(0x01, 5, 300.0));
338 assert_eq!(queue.entries.len(), 1);
339 assert_eq!(queue.entries[0].hops, 1);
340 }
341
342 #[test]
343 fn test_queue_stale_removal() {
344 let mut queue = InterfaceAnnounceQueue::new();
345 queue.insert(make_entry(0x01, 1, 100.0));
346 queue.insert(make_entry(0x02, 2, 200.0));
347
348 queue.remove_stale(86501.0);
350 assert_eq!(queue.entries.len(), 1);
351 assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
352 }
353
354 #[test]
355 fn test_queue_max_size() {
356 let mut queue = InterfaceAnnounceQueue::new();
357 for i in 0..constants::MAX_QUEUED_ANNOUNCES {
358 queue.insert(AnnounceQueueEntry {
359 destination_hash: {
360 let mut d = [0u8; 16];
361 d[0] = (i >> 8) as u8;
362 d[1] = i as u8;
363 d
364 },
365 time: i as f64,
366 hops: 1,
367 emitted: i as f64,
368 raw: vec![0x01],
369 });
370 }
371 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
372
373 queue.insert(make_entry(0xFF, 1, 99999.0));
375 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
376 }
377
378 #[test]
379 fn test_queue_empty_select() {
380 let queue = InterfaceAnnounceQueue::new();
381 assert!(queue.select_next().is_none());
382 }
383
384 #[test]
385 fn test_bandwidth_allowed() {
386 let mut queue = InterfaceAnnounceQueue::new();
387 assert!(queue.is_allowed(0.0));
388 assert!(queue.is_allowed(100.0));
389
390 queue.announce_allowed_at = 200.0;
391 assert!(!queue.is_allowed(100.0));
392 assert!(!queue.is_allowed(199.9));
393 assert!(queue.is_allowed(200.0));
394 assert!(queue.is_allowed(300.0));
395 }
396
397 #[test]
398 fn test_calculate_next_allowed() {
399 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
403 assert!((next - 1040.0).abs() < 0.001);
404 }
405
406 #[test]
407 fn test_calculate_next_allowed_zero_bitrate() {
408 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
409 assert_eq!(next, 1000.0); }
411
412 #[test]
415 fn test_gate_announce_no_bitrate_immediate() {
416 let mut queues = AnnounceQueues::new();
417 let result = queues.gate_announce(
418 InterfaceId(1),
419 vec![0x01, 0x02, 0x03],
420 [0xAA; 16],
421 2,
422 1000.0,
423 1000.0,
424 None, 0.02,
426 );
427 assert!(result.is_some());
428 assert!(matches!(
429 result.unwrap(),
430 TransportAction::SendOnInterface { .. }
431 ));
432 }
433
434 #[test]
435 fn test_gate_announce_bandwidth_available() {
436 let mut queues = AnnounceQueues::new();
437 let result = queues.gate_announce(
438 InterfaceId(1),
439 vec![0x01; 100],
440 [0xBB; 16],
441 2,
442 1000.0,
443 1000.0,
444 Some(10000), 0.02,
446 );
447 assert!(result.is_some());
449
450 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
452 assert!(queue.announce_allowed_at > 1000.0);
453 }
454
455 #[test]
456 fn test_gate_announce_bandwidth_exhausted_queues() {
457 let mut queues = AnnounceQueues::new();
458
459 let r1 = queues.gate_announce(
461 InterfaceId(1),
462 vec![0x01; 100],
463 [0xAA; 16],
464 2,
465 1000.0,
466 1000.0,
467 Some(1000), 0.02,
469 );
470 assert!(r1.is_some());
471
472 let r2 = queues.gate_announce(
474 InterfaceId(1),
475 vec![0x02; 100],
476 [0xBB; 16],
477 3,
478 1000.0,
479 1000.0,
480 Some(1000),
481 0.02,
482 );
483 assert!(r2.is_none()); let queue = queues.queue_for(&InterfaceId(1)).unwrap();
486 assert_eq!(queue.entries.len(), 1);
487 }
488
489 #[test]
490 fn test_process_queues_dequeues_when_allowed() {
491 let mut queues = AnnounceQueues::new();
492
493 let _ = queues.gate_announce(
495 InterfaceId(1),
496 vec![0x01; 10],
497 [0xAA; 16],
498 2,
499 0.0,
500 0.0,
501 Some(1000),
502 0.02,
503 );
504 let _ = queues.gate_announce(
505 InterfaceId(1),
506 vec![0x02; 10],
507 [0xBB; 16],
508 3,
509 0.0,
510 0.0,
511 Some(1000),
512 0.02,
513 );
514
515 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
517
518 let mut interfaces = BTreeMap::new();
519 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
520
521 let allowed_at = queues
523 .queue_for(&InterfaceId(1))
524 .unwrap()
525 .announce_allowed_at;
526 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
527
528 assert_eq!(actions.len(), 1);
529 assert!(matches!(
530 &actions[0],
531 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
532 ));
533
534 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
536 }
537
538 #[test]
539 fn test_local_announce_bypasses_cap() {
540 let mut queues = AnnounceQueues::new();
544
545 let _ = queues.gate_announce(
547 InterfaceId(1),
548 vec![0x01; 100],
549 [0xAA; 16],
550 2,
551 0.0,
552 0.0,
553 Some(1000),
554 0.02,
555 );
556
557 let r = queues.gate_announce(
560 InterfaceId(1),
561 vec![0x02; 100],
562 [0xBB; 16],
563 0,
564 0.0,
565 0.0,
566 Some(1000),
567 0.02,
568 );
569 assert!(r.is_none()); }
571}