1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12use super::types::{InterfaceId, TransportAction};
13use crate::constants;
14
15#[derive(Debug, Clone)]
17pub struct AnnounceQueueEntry {
18 pub destination_hash: [u8; 16],
20 pub time: f64,
22 pub hops: u8,
24 pub emitted: f64,
26 pub raw: Vec<u8>,
28}
29
30#[derive(Debug, Clone)]
32pub struct InterfaceAnnounceQueue {
33 pub entries: Vec<AnnounceQueueEntry>,
35 pub announce_allowed_at: f64,
37}
38
39impl InterfaceAnnounceQueue {
40 pub fn new() -> Self {
41 InterfaceAnnounceQueue {
42 entries: Vec::new(),
43 announce_allowed_at: 0.0,
44 }
45 }
46
47 pub fn insert(&mut self, entry: AnnounceQueueEntry) {
51 if let Some(pos) = self
53 .entries
54 .iter()
55 .position(|e| e.destination_hash == entry.destination_hash)
56 {
57 let existing = &self.entries[pos];
58 if entry.hops < existing.hops
60 || (entry.hops == existing.hops && entry.emitted > existing.emitted)
61 {
62 self.entries[pos] = entry;
63 }
64 } else {
66 if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
68 self.entries.remove(0);
70 }
71 self.entries.push(entry);
72 }
73 }
74
75 pub fn remove_stale(&mut self, now: f64) {
77 self.entries
78 .retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
79 }
80
81 pub fn select_next(&self) -> Option<usize> {
84 if self.entries.is_empty() {
85 return None;
86 }
87 let mut best_idx = 0;
88 let mut best_hops = self.entries[0].hops;
89 let mut best_time = self.entries[0].time;
90
91 for (i, entry) in self.entries.iter().enumerate().skip(1) {
92 if entry.hops < best_hops
93 || (entry.hops == best_hops && entry.time < best_time)
94 {
95 best_idx = i;
96 best_hops = entry.hops;
97 best_time = entry.time;
98 }
99 }
100 Some(best_idx)
101 }
102
103 pub fn is_allowed(&self, now: f64) -> bool {
105 now >= self.announce_allowed_at
106 }
107
108 pub fn calculate_next_allowed(now: f64, raw_len: usize, bitrate: u64, announce_cap: f64) -> f64 {
113 if bitrate == 0 || announce_cap <= 0.0 {
114 return now; }
116 let bits = (raw_len * 8) as f64;
117 let time_to_send = bits / (bitrate as f64);
118 let delay = time_to_send / announce_cap;
119 now + delay
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct AnnounceQueues {
126 queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
127}
128
129impl AnnounceQueues {
130 pub fn new() -> Self {
131 AnnounceQueues {
132 queues: BTreeMap::new(),
133 }
134 }
135
136 pub fn gate_announce(
141 &mut self,
142 interface: InterfaceId,
143 raw: Vec<u8>,
144 dest_hash: [u8; 16],
145 hops: u8,
146 emitted: f64,
147 now: f64,
148 bitrate: Option<u64>,
149 announce_cap: f64,
150 ) -> Option<TransportAction> {
151 let queue = self
152 .queues
153 .entry(interface)
154 .or_insert_with(InterfaceAnnounceQueue::new);
155
156 let bitrate = match bitrate {
158 Some(br) if br > 0 => br,
159 _ => {
160 return Some(TransportAction::SendOnInterface {
161 interface,
162 raw,
163 });
164 }
165 };
166
167 if queue.is_allowed(now) {
168 queue.announce_allowed_at =
170 InterfaceAnnounceQueue::calculate_next_allowed(now, raw.len(), bitrate, announce_cap);
171 Some(TransportAction::SendOnInterface { interface, raw })
172 } else {
173 queue.insert(AnnounceQueueEntry {
175 destination_hash: dest_hash,
176 time: now,
177 hops,
178 emitted,
179 raw,
180 });
181 None
182 }
183 }
184
185 pub fn process_queues(
188 &mut self,
189 now: f64,
190 interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
191 ) -> Vec<TransportAction> {
192 let mut actions = Vec::new();
193
194 for (iface_id, queue) in self.queues.iter_mut() {
195 queue.remove_stale(now);
197
198 while queue.is_allowed(now) {
200 if let Some(idx) = queue.select_next() {
201 let entry = queue.entries.remove(idx);
202
203 let (bitrate, announce_cap) =
205 if let Some(info) = interfaces.get(iface_id) {
206 (info.bitrate.unwrap_or(0), info.announce_cap)
207 } else {
208 (0, constants::ANNOUNCE_CAP)
209 };
210
211 if bitrate > 0 {
212 queue.announce_allowed_at =
213 InterfaceAnnounceQueue::calculate_next_allowed(
214 now,
215 entry.raw.len(),
216 bitrate,
217 announce_cap,
218 );
219 }
220
221 actions.push(TransportAction::SendOnInterface {
222 interface: *iface_id,
223 raw: entry.raw,
224 });
225 } else {
226 break;
227 }
228 }
229 }
230
231 actions
232 }
233
234 #[cfg(test)]
236 pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
237 self.queues.get(id)
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use alloc::string::String;
245
246 fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
247 AnnounceQueueEntry {
248 destination_hash: [dest; 16],
249 time,
250 hops,
251 emitted: time,
252 raw: vec![0x01, 0x02, 0x03],
253 }
254 }
255
256 fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
257 super::super::types::InterfaceInfo {
258 id: InterfaceId(id),
259 name: String::from("test"),
260 mode: crate::constants::MODE_FULL,
261 out_capable: true,
262 in_capable: true,
263 bitrate,
264 announce_rate_target: None,
265 announce_rate_grace: 0,
266 announce_rate_penalty: 0.0,
267 announce_cap: constants::ANNOUNCE_CAP,
268 is_local_client: false,
269 wants_tunnel: false,
270 tunnel_id: None,
271 }
272 }
273
274 #[test]
277 fn test_queue_entry_creation() {
278 let entry = make_entry(0xAA, 3, 1000.0);
279 assert_eq!(entry.hops, 3);
280 assert_eq!(entry.destination_hash, [0xAA; 16]);
281 }
282
283 #[test]
284 fn test_queue_insert_and_select() {
285 let mut queue = InterfaceAnnounceQueue::new();
286 queue.insert(make_entry(0x01, 3, 100.0));
287 queue.insert(make_entry(0x02, 1, 200.0));
288 queue.insert(make_entry(0x03, 2, 150.0));
289
290 let idx = queue.select_next().unwrap();
292 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
293 }
294
295 #[test]
296 fn test_queue_select_fifo_on_same_hops() {
297 let mut queue = InterfaceAnnounceQueue::new();
298 queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0)); let idx = queue.select_next().unwrap();
303 assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
304 }
305
306 #[test]
307 fn test_queue_dedup_update() {
308 let mut queue = InterfaceAnnounceQueue::new();
309 queue.insert(make_entry(0x01, 3, 100.0));
310 assert_eq!(queue.entries.len(), 1);
311
312 queue.insert(make_entry(0x01, 1, 200.0));
314 assert_eq!(queue.entries.len(), 1);
315 assert_eq!(queue.entries[0].hops, 1);
316
317 queue.insert(make_entry(0x01, 5, 300.0));
319 assert_eq!(queue.entries.len(), 1);
320 assert_eq!(queue.entries[0].hops, 1);
321 }
322
323 #[test]
324 fn test_queue_stale_removal() {
325 let mut queue = InterfaceAnnounceQueue::new();
326 queue.insert(make_entry(0x01, 1, 100.0));
327 queue.insert(make_entry(0x02, 2, 200.0));
328
329 queue.remove_stale(86501.0);
331 assert_eq!(queue.entries.len(), 1);
332 assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
333 }
334
335 #[test]
336 fn test_queue_max_size() {
337 let mut queue = InterfaceAnnounceQueue::new();
338 for i in 0..constants::MAX_QUEUED_ANNOUNCES {
339 queue.insert(AnnounceQueueEntry {
340 destination_hash: {
341 let mut d = [0u8; 16];
342 d[0] = (i >> 8) as u8;
343 d[1] = i as u8;
344 d
345 },
346 time: i as f64,
347 hops: 1,
348 emitted: i as f64,
349 raw: vec![0x01],
350 });
351 }
352 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
353
354 queue.insert(make_entry(0xFF, 1, 99999.0));
356 assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
357 }
358
359 #[test]
360 fn test_queue_empty_select() {
361 let queue = InterfaceAnnounceQueue::new();
362 assert!(queue.select_next().is_none());
363 }
364
365 #[test]
366 fn test_bandwidth_allowed() {
367 let mut queue = InterfaceAnnounceQueue::new();
368 assert!(queue.is_allowed(0.0));
369 assert!(queue.is_allowed(100.0));
370
371 queue.announce_allowed_at = 200.0;
372 assert!(!queue.is_allowed(100.0));
373 assert!(!queue.is_allowed(199.9));
374 assert!(queue.is_allowed(200.0));
375 assert!(queue.is_allowed(300.0));
376 }
377
378 #[test]
379 fn test_calculate_next_allowed() {
380 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, 0.02);
384 assert!((next - 1040.0).abs() < 0.001);
385 }
386
387 #[test]
388 fn test_calculate_next_allowed_zero_bitrate() {
389 let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, 0.02);
390 assert_eq!(next, 1000.0); }
392
393 #[test]
396 fn test_gate_announce_no_bitrate_immediate() {
397 let mut queues = AnnounceQueues::new();
398 let result = queues.gate_announce(
399 InterfaceId(1),
400 vec![0x01, 0x02, 0x03],
401 [0xAA; 16],
402 2,
403 1000.0,
404 1000.0,
405 None, 0.02,
407 );
408 assert!(result.is_some());
409 assert!(matches!(
410 result.unwrap(),
411 TransportAction::SendOnInterface { .. }
412 ));
413 }
414
415 #[test]
416 fn test_gate_announce_bandwidth_available() {
417 let mut queues = AnnounceQueues::new();
418 let result = queues.gate_announce(
419 InterfaceId(1),
420 vec![0x01; 100],
421 [0xBB; 16],
422 2,
423 1000.0,
424 1000.0,
425 Some(10000), 0.02,
427 );
428 assert!(result.is_some());
430
431 let queue = queues.queue_for(&InterfaceId(1)).unwrap();
433 assert!(queue.announce_allowed_at > 1000.0);
434 }
435
436 #[test]
437 fn test_gate_announce_bandwidth_exhausted_queues() {
438 let mut queues = AnnounceQueues::new();
439
440 let r1 = queues.gate_announce(
442 InterfaceId(1),
443 vec![0x01; 100],
444 [0xAA; 16],
445 2,
446 1000.0,
447 1000.0,
448 Some(1000), 0.02,
450 );
451 assert!(r1.is_some());
452
453 let r2 = queues.gate_announce(
455 InterfaceId(1),
456 vec![0x02; 100],
457 [0xBB; 16],
458 3,
459 1000.0,
460 1000.0,
461 Some(1000),
462 0.02,
463 );
464 assert!(r2.is_none()); let queue = queues.queue_for(&InterfaceId(1)).unwrap();
467 assert_eq!(queue.entries.len(), 1);
468 }
469
470 #[test]
471 fn test_process_queues_dequeues_when_allowed() {
472 let mut queues = AnnounceQueues::new();
473
474 let _ = queues.gate_announce(
476 InterfaceId(1),
477 vec![0x01; 10],
478 [0xAA; 16],
479 2,
480 0.0,
481 0.0,
482 Some(1000),
483 0.02,
484 );
485 let _ = queues.gate_announce(
486 InterfaceId(1),
487 vec![0x02; 10],
488 [0xBB; 16],
489 3,
490 0.0,
491 0.0,
492 Some(1000),
493 0.02,
494 );
495
496 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
498
499 let mut interfaces = BTreeMap::new();
500 interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
501
502 let allowed_at = queues.queue_for(&InterfaceId(1)).unwrap().announce_allowed_at;
504 let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
505
506 assert_eq!(actions.len(), 1);
507 assert!(matches!(
508 &actions[0],
509 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
510 ));
511
512 assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 0);
514 }
515
516 #[test]
517 fn test_local_announce_bypasses_cap() {
518 let mut queues = AnnounceQueues::new();
522
523 let _ = queues.gate_announce(
525 InterfaceId(1),
526 vec![0x01; 100],
527 [0xAA; 16],
528 2,
529 0.0,
530 0.0,
531 Some(1000),
532 0.02,
533 );
534
535 let r = queues.gate_announce(
538 InterfaceId(1),
539 vec![0x02; 100],
540 [0xBB; 16],
541 0,
542 0.0,
543 0.0,
544 Some(1000),
545 0.02,
546 );
547 assert!(r.is_none()); }
549}