1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18 pub destination_hash: [u8; 16],
20 pub time: f64,
22 pub hops: u8,
24 pub emitted: f64,
26 pub raw: Vec<u8>,
28}
29
30#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33 pub entries: Vec<AnnounceQueueEntry>,
35 pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40 pub fn new() -> Self {
41 InterfaceAnnounceQueue {
42 entries: Vec::new(),
43 announce_allowed_at: 0.0,
44 }
45 }
46
47 pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51 if let Some(pos) = self
53 .entries
54 .iter()
55 .position(|e| e.destination_hash == entry.destination_hash)
56 {
57 let existing = &self.entries[pos];
58 if entry.hops < existing.hops
60 || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61 {
62 self.entries[pos] = entry;
63 }
64 } else {
66 if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68 self.entries.remove(0);
70 }
71 self.entries.push(entry);
72 }
73 }
74
75 pub fn remove_stale(&mut self, now: f64) {
77 self.entries
78 .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79 }
80
81 pub fn select_next(&self) -> Option<usize> {
84 if self.entries.is_empty() {
85 return None;
86 }
87 let mut best_idx = 0;
88 let mut best_hops = self.entries[0].hops;
89 let mut best_time = self.entries[0].time;
90
91 for (i, entry) in self.entries.iter().enumerate().skip(1) {
92 if entry.hops < best_hops
93 || (entry.hops == best_hops && entry.time < best_time)
94 {
95 best_idx = i;
96 best_hops = entry.hops;
97 best_time = entry.time;
98 }
99 }
100 Some(best_idx)
101 }
102
103 pub fn is_allowed(&self, now: f64) -> bool {
105 now >= self.announce_allowed_at
106 }
107
108 pub fn calculate_next_allowed(now: f64, raw_len: usize, bitrate: u64, announce_cap: f64) -> f64 {
113 if bitrate == 0 || announce_cap <= 0.0 {
114 return now; }
116 let bits = (raw_len * 8) as f64;
117 let time_to_send = bits / (bitrate as f64);
118 let delay = time_to_send / announce_cap;
119 now + delay
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct AnnounceQueues {
126 queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
127}
128
129impl AnnounceQueues {
130 pub fn new() -> Self {
131 AnnounceQueues {
132 queues: BTreeMap::new(),
133 }
134 }
135
136 pub fn gate_announce(
141 &mut self,
142 interface: InterfaceId,
143 raw: Vec<u8>,
144 dest_hash: [u8; 16],
145 hops: u8,
146 emitted: f64,
147 now: f64,
148 bitrate: Option<u64>,
149 announce_cap: f64,
150 ) -> Option<TransportAction> {
151 let queue = self
152 .queues
153 .entry(interface)
154 .or_insert_with(InterfaceAnnounceQueue::new);
155
156 let bitrate = match bitrate {
158 Some(br) if br > 0 => br,
159 _ => {
160 return Some(TransportAction::SendOnInterface {
161 interface,
162 raw,
163 });
164 }
165 };
166
167 if queue.is_allowed(now) {
168 queue.announce_allowed_at =
170 InterfaceAnnounceQueue::calculate_next_allowed(now, raw.len(), bitrate, announce_cap);
171 Some(TransportAction::SendOnInterface { interface, raw })
172 } else {
173 queue.insert(AnnounceQueueEntry {
175 destination_hash: dest_hash,
176 time: now,
177 hops,
178 emitted,
179 raw,
180 });
181 None
182 }
183 }
184
185 pub fn process_queues(
188 &mut self,
189 now: f64,
190 interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
191 ) -> Vec<TransportAction> {
192 let mut actions = Vec::new();
193
194 for (iface_id, queue) in self.queues.iter_mut() {
195 queue.remove_stale(now);
197
198 while queue.is_allowed(now) {
200 if let Some(idx) = queue.select_next() {
201 let entry = queue.entries.remove(idx);
202
203 let (bitrate, announce_cap) =
205 if let Some(info) = interfaces.get(iface_id) {
206 (info.bitrate.unwrap_or(0), info.announce_cap)
207 } else {
208 (0, constants::ANNOUNCE_CAP)
209 };
210
211 if bitrate > 0 {
212 queue.announce_allowed_at =
213 InterfaceAnnounceQueue::calculate_next_allowed(
214 now,
215 entry.raw.len(),
216 bitrate,
217 announce_cap,
218 );
219 }
220
221 actions.push(TransportAction::SendOnInterface {
222 interface: *iface_id,
223 raw: entry.raw,
224 });
225 } else {
226 break;
227 }
228 }
229 }
230
231 actions
232 }
233
234 #[cfg(test)]
236 pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
237 self.queues.get(id)
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use alloc::string::String;
245
246 fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
247 AnnounceQueueEntry {
248 destination_hash: [dest; 16],
249 time,
250 hops,
251 emitted: time,
252 raw: vec![0x01, 0x02, 0x03],
253 }
254 }
255
256 fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
257 super::super::types::InterfaceInfo {
258 id: InterfaceId(id),
259 name: String::from("test"),
260 mode: crate::constants::MODE_FULL,
261 out_capable: true,
262 in_capable: true,
263 bitrate,
264 announce_rate_target: None,
265 announce_rate_grace: 0,
266 announce_rate_penalty: 0.0,
267 announce_cap: constants::ANNOUNCE_CAP,
268 is_local_client: false,
269 wants_tunnel: false,
270 tunnel_id: None,
271 mtu: constants::MTU as u32,
272 ingress_control: false,
273 ia_freq: 0.0,
274 started: 0.0,
275 }
276 }
277
278 #[test]
281 fn test_queue_entry_creation() {
282 let entry = make_entry(0xAA, 3, 1000.0);
283 assert_eq!(entry.hops, 3);
284 assert_eq!(entry.destination_hash, [0xAA; 16]);
285 }
286
287 #[test]
288 fn test_queue_insert_and_select() {
289 let mut queue = InterfaceAnnounceQueue::new();
290 queue.insert(make_entry(0x01, 3, 100.0));
291 queue.insert(make_entry(0x02, 1, 200.0));
292 queue.insert(make_entry(0x03, 2, 150.0));
293
294 let idx = queue.select_next().unwrap();
296 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
297 }
298
299 #[test]
300 fn test_queue_select_fifo_on_same_hops() {
301 let mut queue = InterfaceAnnounceQueue::new();
302 queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0)); let idx = queue.select_next().unwrap();
307 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
308 }
309
310 #[test]
311 fn test_queue_dedup_update() {
312 let mut queue = InterfaceAnnounceQueue::new();
313 queue.insert(make_entry(0x01, 3, 100.0));
314 assert_eq!(queue.entries.len(), 1);
315
316 queue.insert(make_entry(0x01, 1, 200.0));
318 assert_eq!(queue.entries.len(), 1);
319 assert_eq!(queue.entries[0].hops, 1);
320
321 queue.insert(make_entry(0x01, 5, 300.0));
323 assert_eq!(queue.entries.len(), 1);
324 assert_eq!(queue.entries[0].hops, 1);
325 }
326
327 #[test]
328 fn test_queue_stale_removal() {
329 let mut queue = InterfaceAnnounceQueue::new();
330 queue.insert(make_entry(0x01, 1, 100.0));
331 queue.insert(make_entry(0x02, 2, 200.0));
332
333 queue.remove_stale(86501.0);
335 assert_eq!(queue.entries.len(), 1);
336 assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
337 }
338
339 #[test]
340 fn test_queue_max_size() {
341 let mut queue = InterfaceAnnounceQueue::new();
342 for i in 0..constants::MAX_QUEUED_ANNOUNCES {
343 queue.insert(AnnounceQueueEntry {
344 destination_hash: {
345 let mut d = [0u8; 16];
346 d[0] = (i >> 8) as u8;
347 d[1] = i as u8;
348 d
349 },
350 time: i as f64,
351 hops: 1,
352 emitted: i as f64,
353 raw: vec![0x01],
354 });
355 }
356 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
357
358 queue.insert(make_entry(0xFF, 1, 99999.0));
360 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
361 }
362
363 #[test]
364 fn test_queue_empty_select() {
365 let queue = InterfaceAnnounceQueue::new();
366 assert!(queue.select_next().is_none());
367 }
368
369 #[test]
370 fn test_bandwidth_allowed() {
371 let mut queue = InterfaceAnnounceQueue::new();
372 assert!(queue.is_allowed(0.0));
373 assert!(queue.is_allowed(100.0));
374
375 queue.announce_allowed_at = 200.0;
376 assert!(!queue.is_allowed(100.0));
377 assert!(!queue.is_allowed(199.9));
378 assert!(queue.is_allowed(200.0));
379 assert!(queue.is_allowed(300.0));
380 }
381
382 #[test]
383 fn test_calculate_next_allowed() {
384 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
388 assert!((next - 1040.0).abs() < 0.001);
389 }
390
391 #[test]
392 fn test_calculate_next_allowed_zero_bitrate() {
393 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
394 assert_eq!(next, 1000.0); }
396
397 #[test]
400 fn test_gate_announce_no_bitrate_immediate() {
401 let mut queues = AnnounceQueues::new();
402 let result = queues.gate_announce(
403 InterfaceId(1),
404 vec![0x01, 0x02, 0x03],
405 [0xAA; 16],
406 2,
407 1000.0,
408 1000.0,
409 None, 0.02,
411 );
412 assert!(result.is_some());
413 assert!(matches!(
414 result.unwrap(),
415 TransportAction::SendOnInterface { .. }
416 ));
417 }
418
419 #[test]
420 fn test_gate_announce_bandwidth_available() {
421 let mut queues = AnnounceQueues::new();
422 let result = queues.gate_announce(
423 InterfaceId(1),
424 vec![0x01; 100],
425 [0xBB; 16],
426 2,
427 1000.0,
428 1000.0,
429 Some(10000), 0.02,
431 );
432 assert!(result.is_some());
434
435 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
437 assert!(queue.announce_allowed_at > 1000.0);
438 }
439
440 #[test]
441 fn test_gate_announce_bandwidth_exhausted_queues() {
442 let mut queues = AnnounceQueues::new();
443
444 let r1 = queues.gate_announce(
446 InterfaceId(1),
447 vec![0x01; 100],
448 [0xAA; 16],
449 2,
450 1000.0,
451 1000.0,
452 Some(1000), 0.02,
454 );
455 assert!(r1.is_some());
456
457 let r2 = queues.gate_announce(
459 InterfaceId(1),
460 vec![0x02; 100],
461 [0xBB; 16],
462 3,
463 1000.0,
464 1000.0,
465 Some(1000),
466 0.02,
467 );
468 assert!(r2.is_none()); let queue = queues.queue_for(&InterfaceId(1)).unwrap();
471 assert_eq!(queue.entries.len(), 1);
472 }
473
474 #[test]
475 fn test_process_queues_dequeues_when_allowed() {
476 let mut queues = AnnounceQueues::new();
477
478 let _ = queues.gate_announce(
480 InterfaceId(1),
481 vec![0x01; 10],
482 [0xAA; 16],
483 2,
484 0.0,
485 0.0,
486 Some(1000),
487 0.02,
488 );
489 let _ = queues.gate_announce(
490 InterfaceId(1),
491 vec![0x02; 10],
492 [0xBB; 16],
493 3,
494 0.0,
495 0.0,
496 Some(1000),
497 0.02,
498 );
499
500 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
502
503 let mut interfaces = BTreeMap::new();
504 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
505
506 let allowed_at = queues.queue_for(&InterfaceId(1)).unwrap().announce_allowed_at;
508 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
509
510 assert_eq!(actions.len(), 1);
511 assert!(matches!(
512 &actions[0],
513 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
514 ));
515
516 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
518 }
519
520 #[test]
521 fn test_local_announce_bypasses_cap() {
522 let mut queues = AnnounceQueues::new();
526
527 let _ = queues.gate_announce(
529 InterfaceId(1),
530 vec![0x01; 100],
531 [0xAA; 16],
532 2,
533 0.0,
534 0.0,
535 Some(1000),
536 0.02,
537 );
538
539 let r = queues.gate_announce(
542 InterfaceId(1),
543 vec![0x02; 100],
544 [0xBB; 16],
545 0,
546 0.0,
547 0.0,
548 Some(1000),
549 0.02,
550 );
551 assert!(r.is_none()); }
553}