1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18 pub destination_hash: [u8; 16],
20 pub time: f64,
22 pub hops: u8,
24 pub emitted: f64,
26 pub raw: Vec<u8>,
28}
29
30#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33 pub entries: Vec<AnnounceQueueEntry>,
35 pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40 pub fn new() -> Self {
41 InterfaceAnnounceQueue {
42 entries: Vec::new(),
43 announce_allowed_at: 0.0,
44 }
45 }
46
47 pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51 if let Some(pos) = self
53 .entries
54 .iter()
55 .position(|e| e.destination_hash == entry.destination_hash)
56 {
57 let existing = &self.entries[pos];
58 if entry.hops < existing.hops
60 || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61 {
62 self.entries[pos] = entry;
63 }
64 } else {
66 if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68 self.entries.remove(0);
70 }
71 self.entries.push(entry);
72 }
73 }
74
75 pub fn remove_stale(&mut self, now: f64) {
77 self.entries
78 .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79 }
80
81 pub fn select_next(&self) -> Option<usize> {
84 if self.entries.is_empty() {
85 return None;
86 }
87 let mut best_idx = 0;
88 let mut best_hops = self.entries[0].hops;
89 let mut best_time = self.entries[0].time;
90
91 for (i, entry) in self.entries.iter().enumerate().skip(1) {
92 if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
93 best_idx = i;
94 best_hops = entry.hops;
95 best_time = entry.time;
96 }
97 }
98 Some(best_idx)
99 }
100
101 pub fn is_allowed(&self, now: f64) -> bool {
103 now >= self.announce_allowed_at
104 }
105
106 pub fn calculate_next_allowed(
111 now: f64,
112 raw_len: usize,
113 bitrate: u64,
114 announce_cap: f64,
115 ) -> f64 {
116 if bitrate == 0 || announce_cap <= 0.0 {
117 return now; }
119 let bits = (raw_len * 8) as f64;
120 let time_to_send = bits / (bitrate as f64);
121 let delay = time_to_send / announce_cap;
122 now + delay
123 }
124}
125
126#[derive(Debug, Clone)]
128pub struct AnnounceQueues {
129 queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
130}
131
132impl AnnounceQueues {
133 pub fn new() -> Self {
134 AnnounceQueues {
135 queues: BTreeMap::new(),
136 }
137 }
138
139 pub fn gate_announce(
144 &mut self,
145 interface: InterfaceId,
146 raw: Vec<u8>,
147 dest_hash: [u8; 16],
148 hops: u8,
149 emitted: f64,
150 now: f64,
151 bitrate: Option<u64>,
152 announce_cap: f64,
153 ) -> Option<TransportAction> {
154 let queue = self
155 .queues
156 .entry(interface)
157 .or_insert_with(InterfaceAnnounceQueue::new);
158
159 let bitrate = match bitrate {
161 Some(br) if br > 0 => br,
162 _ => {
163 return Some(TransportAction::SendOnInterface { interface, raw });
164 }
165 };
166
167 if queue.is_allowed(now) {
168 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
170 now,
171 raw.len(),
172 bitrate,
173 announce_cap,
174 );
175 Some(TransportAction::SendOnInterface { interface, raw })
176 } else {
177 queue.insert(AnnounceQueueEntry {
179 destination_hash: dest_hash,
180 time: now,
181 hops,
182 emitted,
183 raw,
184 });
185 None
186 }
187 }
188
189 pub fn process_queues(
192 &mut self,
193 now: f64,
194 interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
195 ) -> Vec<TransportAction> {
196 let mut actions = Vec::new();
197
198 for (iface_id, queue) in self.queues.iter_mut() {
199 queue.remove_stale(now);
201
202 while queue.is_allowed(now) {
204 if let Some(idx) = queue.select_next() {
205 let entry = queue.entries.remove(idx);
206
207 let (bitrate, announce_cap) = if let Some(info) = interfaces.get(iface_id) {
209 (info.bitrate.unwrap_or(0), info.announce_cap)
210 } else {
211 (0, constants::ANNOUNCE_CAP)
212 };
213
214 if bitrate > 0 {
215 queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
216 now,
217 entry.raw.len(),
218 bitrate,
219 announce_cap,
220 );
221 }
222
223 actions.push(TransportAction::SendOnInterface {
224 interface: *iface_id,
225 raw: entry.raw,
226 });
227 } else {
228 break;
229 }
230 }
231 }
232
233 actions
234 }
235
236 #[cfg(test)]
238 pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
239 self.queues.get(id)
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use alloc::string::String;
247
248 fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
249 AnnounceQueueEntry {
250 destination_hash: [dest; 16],
251 time,
252 hops,
253 emitted: time,
254 raw: vec![0x01, 0x02, 0x03],
255 }
256 }
257
258 fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
259 super::super::types::InterfaceInfo {
260 id: InterfaceId(id),
261 name: String::from("test"),
262 mode: crate::constants::MODE_FULL,
263 out_capable: true,
264 in_capable: true,
265 bitrate,
266 announce_rate_target: None,
267 announce_rate_grace: 0,
268 announce_rate_penalty: 0.0,
269 announce_cap: constants::ANNOUNCE_CAP,
270 is_local_client: false,
271 wants_tunnel: false,
272 tunnel_id: None,
273 mtu: constants::MTU as u32,
274 ingress_control: false,
275 ia_freq: 0.0,
276 started: 0.0,
277 }
278 }
279
280 #[test]
283 fn test_queue_entry_creation() {
284 let entry = make_entry(0xAA, 3, 1000.0);
285 assert_eq!(entry.hops, 3);
286 assert_eq!(entry.destination_hash, [0xAA; 16]);
287 }
288
289 #[test]
290 fn test_queue_insert_and_select() {
291 let mut queue = InterfaceAnnounceQueue::new();
292 queue.insert(make_entry(0x01, 3, 100.0));
293 queue.insert(make_entry(0x02, 1, 200.0));
294 queue.insert(make_entry(0x03, 2, 150.0));
295
296 let idx = queue.select_next().unwrap();
298 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
299 }
300
301 #[test]
302 fn test_queue_select_fifo_on_same_hops() {
303 let mut queue = InterfaceAnnounceQueue::new();
304 queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0)); let idx = queue.select_next().unwrap();
309 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
310 }
311
312 #[test]
313 fn test_queue_dedup_update() {
314 let mut queue = InterfaceAnnounceQueue::new();
315 queue.insert(make_entry(0x01, 3, 100.0));
316 assert_eq!(queue.entries.len(), 1);
317
318 queue.insert(make_entry(0x01, 1, 200.0));
320 assert_eq!(queue.entries.len(), 1);
321 assert_eq!(queue.entries[0].hops, 1);
322
323 queue.insert(make_entry(0x01, 5, 300.0));
325 assert_eq!(queue.entries.len(), 1);
326 assert_eq!(queue.entries[0].hops, 1);
327 }
328
329 #[test]
330 fn test_queue_stale_removal() {
331 let mut queue = InterfaceAnnounceQueue::new();
332 queue.insert(make_entry(0x01, 1, 100.0));
333 queue.insert(make_entry(0x02, 2, 200.0));
334
335 queue.remove_stale(86501.0);
337 assert_eq!(queue.entries.len(), 1);
338 assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
339 }
340
341 #[test]
342 fn test_queue_max_size() {
343 let mut queue = InterfaceAnnounceQueue::new();
344 for i in 0..constants::MAX_QUEUED_ANNOUNCES {
345 queue.insert(AnnounceQueueEntry {
346 destination_hash: {
347 let mut d = [0u8; 16];
348 d[0] = (i >> 8) as u8;
349 d[1] = i as u8;
350 d
351 },
352 time: i as f64,
353 hops: 1,
354 emitted: i as f64,
355 raw: vec![0x01],
356 });
357 }
358 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
359
360 queue.insert(make_entry(0xFF, 1, 99999.0));
362 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
363 }
364
365 #[test]
366 fn test_queue_empty_select() {
367 let queue = InterfaceAnnounceQueue::new();
368 assert!(queue.select_next().is_none());
369 }
370
371 #[test]
372 fn test_bandwidth_allowed() {
373 let mut queue = InterfaceAnnounceQueue::new();
374 assert!(queue.is_allowed(0.0));
375 assert!(queue.is_allowed(100.0));
376
377 queue.announce_allowed_at = 200.0;
378 assert!(!queue.is_allowed(100.0));
379 assert!(!queue.is_allowed(199.9));
380 assert!(queue.is_allowed(200.0));
381 assert!(queue.is_allowed(300.0));
382 }
383
384 #[test]
385 fn test_calculate_next_allowed() {
386 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
390 assert!((next - 1040.0).abs() < 0.001);
391 }
392
393 #[test]
394 fn test_calculate_next_allowed_zero_bitrate() {
395 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
396 assert_eq!(next, 1000.0); }
398
399 #[test]
402 fn test_gate_announce_no_bitrate_immediate() {
403 let mut queues = AnnounceQueues::new();
404 let result = queues.gate_announce(
405 InterfaceId(1),
406 vec![0x01, 0x02, 0x03],
407 [0xAA; 16],
408 2,
409 1000.0,
410 1000.0,
411 None, 0.02,
413 );
414 assert!(result.is_some());
415 assert!(matches!(
416 result.unwrap(),
417 TransportAction::SendOnInterface { .. }
418 ));
419 }
420
421 #[test]
422 fn test_gate_announce_bandwidth_available() {
423 let mut queues = AnnounceQueues::new();
424 let result = queues.gate_announce(
425 InterfaceId(1),
426 vec![0x01; 100],
427 [0xBB; 16],
428 2,
429 1000.0,
430 1000.0,
431 Some(10000), 0.02,
433 );
434 assert!(result.is_some());
436
437 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
439 assert!(queue.announce_allowed_at > 1000.0);
440 }
441
442 #[test]
443 fn test_gate_announce_bandwidth_exhausted_queues() {
444 let mut queues = AnnounceQueues::new();
445
446 let r1 = queues.gate_announce(
448 InterfaceId(1),
449 vec![0x01; 100],
450 [0xAA; 16],
451 2,
452 1000.0,
453 1000.0,
454 Some(1000), 0.02,
456 );
457 assert!(r1.is_some());
458
459 let r2 = queues.gate_announce(
461 InterfaceId(1),
462 vec![0x02; 100],
463 [0xBB; 16],
464 3,
465 1000.0,
466 1000.0,
467 Some(1000),
468 0.02,
469 );
470 assert!(r2.is_none()); let queue = queues.queue_for(&InterfaceId(1)).unwrap();
473 assert_eq!(queue.entries.len(), 1);
474 }
475
476 #[test]
477 fn test_process_queues_dequeues_when_allowed() {
478 let mut queues = AnnounceQueues::new();
479
480 let _ = queues.gate_announce(
482 InterfaceId(1),
483 vec![0x01; 10],
484 [0xAA; 16],
485 2,
486 0.0,
487 0.0,
488 Some(1000),
489 0.02,
490 );
491 let _ = queues.gate_announce(
492 InterfaceId(1),
493 vec![0x02; 10],
494 [0xBB; 16],
495 3,
496 0.0,
497 0.0,
498 Some(1000),
499 0.02,
500 );
501
502 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
504
505 let mut interfaces = BTreeMap::new();
506 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
507
508 let allowed_at = queues
510 .queue_for(&InterfaceId(1))
511 .unwrap()
512 .announce_allowed_at;
513 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
514
515 assert_eq!(actions.len(), 1);
516 assert!(matches!(
517 &actions[0],
518 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
519 ));
520
521 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
523 }
524
525 #[test]
526 fn test_local_announce_bypasses_cap() {
527 let mut queues = AnnounceQueues::new();
531
532 let _ = queues.gate_announce(
534 InterfaceId(1),
535 vec![0x01; 100],
536 [0xAA; 16],
537 2,
538 0.0,
539 0.0,
540 Some(1000),
541 0.02,
542 );
543
544 let r = queues.gate_announce(
547 InterfaceId(1),
548 vec![0x02; 100],
549 [0xBB; 16],
550 0,
551 0.0,
552 0.0,
553 Some(1000),
554 0.02,
555 );
556 assert!(r.is_none()); }
558}