use alloc::collections::BTreeMap;
use alloc::vec::Vec;
use super::types::{AirtimeProfile, InterfaceId, PacketBytes, TransportAction};
use crate::constants;
#[derive(Debug, Clone)]
pub struct AnnounceQueueEntry {
pub destination_hash: [u8; 16],
pub time: f64,
pub hops: u8,
pub emitted: f64,
pub raw: PacketBytes,
}
#[derive(Debug, Clone)]
pub struct InterfaceAnnounceQueue {
pub entries: Vec<AnnounceQueueEntry>,
pub announce_allowed_at: f64,
}
impl InterfaceAnnounceQueue {
pub fn new() -> Self {
InterfaceAnnounceQueue {
entries: Vec::new(),
announce_allowed_at: 0.0,
}
}
pub fn insert(&mut self, entry: AnnounceQueueEntry) {
if let Some(pos) = self
.entries
.iter()
.position(|e| e.destination_hash == entry.destination_hash)
{
let existing = &self.entries[pos];
if entry.hops < existing.hops
|| (entry.hops == existing.hops && entry.emitted > existing.emitted)
{
self.entries[pos] = entry;
}
} else {
if self.entries.len() >= constants::MAX_QUEUED_ANNOUNCES {
self.entries.remove(0);
}
self.entries.push(entry);
}
}
pub fn remove_stale(&mut self, now: f64) {
self.entries
.retain(|e| now - e.time < constants::QUEUED_ANNOUNCE_LIFE);
}
pub fn select_next(&self) -> Option<usize> {
if self.entries.is_empty() {
return None;
}
let mut best_idx = 0;
let mut best_hops = self.entries[0].hops;
let mut best_time = self.entries[0].time;
for (i, entry) in self.entries.iter().enumerate().skip(1) {
if entry.hops < best_hops || (entry.hops == best_hops && entry.time < best_time) {
best_idx = i;
best_hops = entry.hops;
best_time = entry.time;
}
}
Some(best_idx)
}
pub fn is_allowed(&self, now: f64) -> bool {
now >= self.announce_allowed_at
}
pub fn calculate_next_allowed(
now: f64,
raw_len: usize,
bitrate: u64,
airtime_profile: Option<AirtimeProfile>,
announce_cap: f64,
) -> f64 {
if announce_cap <= 0.0 {
return now; }
let time_to_send = airtime_profile
.map(|profile| profile.transmit_time_secs(raw_len))
.unwrap_or_else(|| {
if bitrate == 0 {
0.0
} else {
let bits = (raw_len * 8) as f64;
bits / (bitrate as f64)
}
});
if time_to_send <= 0.0 {
return now;
}
let delay = time_to_send / announce_cap;
now + delay
}
}
impl Default for InterfaceAnnounceQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct AnnounceQueues {
queues: BTreeMap<InterfaceId, InterfaceAnnounceQueue>,
max_interfaces: usize,
interface_cap_drops: u64,
}
impl AnnounceQueues {
pub fn new(max_interfaces: usize) -> Self {
AnnounceQueues {
queues: BTreeMap::new(),
max_interfaces,
interface_cap_drops: 0,
}
}
#[allow(clippy::too_many_arguments)]
pub fn gate_announce(
&mut self,
interface: InterfaceId,
raw: PacketBytes,
dest_hash: [u8; 16],
hops: u8,
emitted: f64,
now: f64,
bitrate: Option<u64>,
airtime_profile: Option<AirtimeProfile>,
announce_cap: f64,
) -> Option<TransportAction> {
let bitrate = match bitrate {
Some(br) if br > 0 => br,
_ if airtime_profile.is_none() => {
return Some(TransportAction::SendOnInterface { interface, raw });
}
_ => 0,
};
if !self.queues.contains_key(&interface) && self.queues.len() >= self.max_interfaces {
self.interface_cap_drops = self.interface_cap_drops.saturating_add(1);
return None;
}
let queue = self.queues.entry(interface).or_default();
if queue.is_allowed(now) {
queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
now,
raw.len(),
bitrate,
airtime_profile,
announce_cap,
);
Some(TransportAction::SendOnInterface { interface, raw })
} else {
queue.insert(AnnounceQueueEntry {
destination_hash: dest_hash,
time: now,
hops,
emitted,
raw,
});
None
}
}
pub fn process_queues(
&mut self,
now: f64,
interfaces: &BTreeMap<InterfaceId, super::types::InterfaceInfo>,
) -> Vec<TransportAction> {
let mut actions = Vec::new();
let mut empty_queues = Vec::new();
for (iface_id, queue) in self.queues.iter_mut() {
queue.remove_stale(now);
while queue.is_allowed(now) {
if let Some(idx) = queue.select_next() {
let entry = queue.entries.remove(idx);
let (bitrate, airtime_profile, announce_cap) =
if let Some(info) = interfaces.get(iface_id) {
(
info.bitrate.unwrap_or(0),
info.airtime_profile,
info.announce_cap,
)
} else {
(0, None, constants::ANNOUNCE_CAP)
};
if bitrate > 0 || airtime_profile.is_some() {
queue.announce_allowed_at = InterfaceAnnounceQueue::calculate_next_allowed(
now,
entry.raw.len(),
bitrate,
airtime_profile,
announce_cap,
);
} else {
queue.announce_allowed_at = now;
}
actions.push(TransportAction::SendOnInterface {
interface: *iface_id,
raw: entry.raw,
});
} else {
break;
}
}
if queue.entries.is_empty() {
empty_queues.push(*iface_id);
}
}
for iface_id in empty_queues {
self.queues.remove(&iface_id);
}
actions
}
pub fn remove_interface(&mut self, interface: InterfaceId) -> bool {
self.queues.remove(&interface).is_some()
}
pub fn queue_count(&self) -> usize {
self.queues.len()
}
pub fn nonempty_queue_count(&self) -> usize {
self.queues
.values()
.filter(|queue| !queue.entries.is_empty())
.count()
}
pub fn total_queued_announces(&self) -> usize {
self.queues.values().map(|queue| queue.entries.len()).sum()
}
pub fn total_queued_bytes(&self) -> usize {
self.queues
.values()
.flat_map(|queue| queue.entries.iter())
.map(|entry| entry.raw.len())
.sum()
}
pub fn interface_cap_drop_count(&self) -> u64 {
self.interface_cap_drops
}
#[cfg(test)]
pub fn queue_for(&self, id: &InterfaceId) -> Option<&InterfaceAnnounceQueue> {
self.queues.get(id)
}
}
impl Default for AnnounceQueues {
fn default() -> Self {
Self::new(1024)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::string::String;
fn make_entry(dest: u8, hops: u8, time: f64) -> AnnounceQueueEntry {
AnnounceQueueEntry {
destination_hash: [dest; 16],
time,
hops,
emitted: time,
raw: vec![0x01, 0x02, 0x03].into(),
}
}
fn make_interface_info(id: u64, bitrate: Option<u64>) -> super::super::types::InterfaceInfo {
super::super::types::InterfaceInfo {
id: InterfaceId(id),
name: String::from("test"),
mode: crate::constants::MODE_FULL,
out_capable: true,
in_capable: true,
bitrate,
airtime_profile: None,
announce_rate_target: None,
announce_rate_grace: 0,
announce_rate_penalty: 0.0,
announce_cap: constants::ANNOUNCE_CAP,
is_local_client: false,
wants_tunnel: false,
tunnel_id: None,
mtu: constants::MTU as u32,
ingress_control: crate::transport::types::IngressControlConfig::disabled(),
ia_freq: 0.0,
started: 0.0,
}
}
#[test]
fn test_queue_entry_creation() {
let entry = make_entry(0xAA, 3, 1000.0);
assert_eq!(entry.hops, 3);
assert_eq!(entry.destination_hash, [0xAA; 16]);
}
#[test]
fn test_queue_insert_and_select() {
let mut queue = InterfaceAnnounceQueue::new();
queue.insert(make_entry(0x01, 3, 100.0));
queue.insert(make_entry(0x02, 1, 200.0));
queue.insert(make_entry(0x03, 2, 150.0));
let idx = queue.select_next().unwrap();
assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
}
#[test]
fn test_queue_select_fifo_on_same_hops() {
let mut queue = InterfaceAnnounceQueue::new();
queue.insert(make_entry(0x01, 2, 200.0)); queue.insert(make_entry(0x02, 2, 100.0));
let idx = queue.select_next().unwrap();
assert_eq!(queue.entries[idx].destination_hash, [0x02; 16]);
}
#[test]
fn test_queue_dedup_update() {
let mut queue = InterfaceAnnounceQueue::new();
queue.insert(make_entry(0x01, 3, 100.0));
assert_eq!(queue.entries.len(), 1);
queue.insert(make_entry(0x01, 1, 200.0));
assert_eq!(queue.entries.len(), 1);
assert_eq!(queue.entries[0].hops, 1);
queue.insert(make_entry(0x01, 5, 300.0));
assert_eq!(queue.entries.len(), 1);
assert_eq!(queue.entries[0].hops, 1);
}
#[test]
fn test_queue_stale_removal() {
let mut queue = InterfaceAnnounceQueue::new();
queue.insert(make_entry(0x01, 1, 100.0));
queue.insert(make_entry(0x02, 2, 200.0));
queue.remove_stale(86501.0);
assert_eq!(queue.entries.len(), 1);
assert_eq!(queue.entries[0].destination_hash, [0x02; 16]);
}
#[test]
fn test_queue_max_size() {
let mut queue = InterfaceAnnounceQueue::new();
for i in 0..constants::MAX_QUEUED_ANNOUNCES {
queue.insert(AnnounceQueueEntry {
destination_hash: {
let mut d = [0u8; 16];
d[0] = (i >> 8) as u8;
d[1] = i as u8;
d
},
time: i as f64,
hops: 1,
emitted: i as f64,
raw: vec![0x01].into(),
});
}
assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
queue.insert(make_entry(0xFF, 1, 99999.0));
assert_eq!(queue.entries.len(), constants::MAX_QUEUED_ANNOUNCES);
}
#[test]
fn test_queue_empty_select() {
let queue = InterfaceAnnounceQueue::new();
assert!(queue.select_next().is_none());
}
#[test]
fn test_bandwidth_allowed() {
let mut queue = InterfaceAnnounceQueue::new();
assert!(queue.is_allowed(0.0));
assert!(queue.is_allowed(100.0));
queue.announce_allowed_at = 200.0;
assert!(!queue.is_allowed(100.0));
assert!(!queue.is_allowed(199.9));
assert!(queue.is_allowed(200.0));
assert!(queue.is_allowed(300.0));
}
#[test]
fn test_calculate_next_allowed() {
let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 1000, None, 0.02);
assert!((next - 1040.0).abs() < 0.001);
}
#[test]
fn test_calculate_next_allowed_zero_bitrate() {
let next = InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, None, 0.02);
assert_eq!(next, 1000.0); }
#[test]
fn test_calculate_next_allowed_uses_lora_airtime() {
let profile = AirtimeProfile::Lora {
bandwidth: 125_000,
spreading_factor: 8,
coding_rate: 5,
preamble_symbols: 8,
explicit_header: true,
crc: true,
};
let next =
InterfaceAnnounceQueue::calculate_next_allowed(1000.0, 100, 0, Some(profile), 0.02);
assert!((next - 1015.3856).abs() < 0.0001);
}
#[test]
fn test_gate_announce_no_bitrate_immediate() {
let mut queues = AnnounceQueues::new(1024);
let result = queues.gate_announce(
InterfaceId(1),
vec![0x01, 0x02, 0x03].into(),
[0xAA; 16],
2,
1000.0,
1000.0,
None, None,
0.02,
);
assert!(result.is_some());
assert!(matches!(
result.unwrap(),
TransportAction::SendOnInterface { .. }
));
}
#[test]
fn test_gate_announce_uses_airtime_profile_without_bitrate() {
let mut queues = AnnounceQueues::new(1024);
let profile = AirtimeProfile::Lora {
bandwidth: 125_000,
spreading_factor: 8,
coding_rate: 5,
preamble_symbols: 8,
explicit_header: true,
crc: true,
};
let first = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
1000.0,
1000.0,
None,
Some(profile),
0.02,
);
assert!(first.is_some());
let queue = queues.queue_for(&InterfaceId(1)).unwrap();
assert!((queue.announce_allowed_at - 1015.3856).abs() < 0.0001);
let second = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
2,
1000.0,
1000.0,
None,
Some(profile),
0.02,
);
assert!(second.is_none());
assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
}
#[test]
fn test_gate_announce_bandwidth_available() {
let mut queues = AnnounceQueues::new(1024);
let result = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xBB; 16],
2,
1000.0,
1000.0,
Some(10000), None,
0.02,
);
assert!(result.is_some());
let queue = queues.queue_for(&InterfaceId(1)).unwrap();
assert!(queue.announce_allowed_at > 1000.0);
}
#[test]
fn test_gate_announce_bandwidth_exhausted_queues() {
let mut queues = AnnounceQueues::new(1024);
let r1 = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
1000.0,
1000.0,
Some(1000), None,
0.02,
);
assert!(r1.is_some());
let r2 = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
3,
1000.0,
1000.0,
Some(1000),
None,
0.02,
);
assert!(r2.is_none());
let queue = queues.queue_for(&InterfaceId(1)).unwrap();
assert_eq!(queue.entries.len(), 1);
}
#[test]
fn test_process_queues_dequeues_when_allowed() {
let mut queues = AnnounceQueues::new(1024);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 10].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x02; 10].into(),
[0xBB; 16],
3,
0.0,
0.0,
Some(1000),
None,
0.02,
);
assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
let mut interfaces = BTreeMap::new();
interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
let allowed_at = queues
.queue_for(&InterfaceId(1))
.unwrap()
.announce_allowed_at;
let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(1)
));
assert!(queues.queue_for(&InterfaceId(1)).is_none());
}
#[test]
fn test_local_announce_bypasses_cap() {
let mut queues = AnnounceQueues::new(1024);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let r = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
0,
0.0,
0.0,
Some(1000),
None,
0.02,
);
assert!(r.is_none()); }
#[test]
fn test_remove_interface_queue() {
let mut queues = AnnounceQueues::new(1024);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
3,
0.0,
0.0,
Some(1000),
None,
0.02,
);
assert!(queues.queue_for(&InterfaceId(1)).is_some());
assert!(queues.remove_interface(InterfaceId(1)));
assert!(queues.queue_for(&InterfaceId(1)).is_none());
assert!(!queues.remove_interface(InterfaceId(1)));
}
#[test]
fn test_process_queues_prunes_empty_queue() {
let mut queues = AnnounceQueues::new(1024);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 10].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x02; 10].into(),
[0xBB; 16],
3,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let mut interfaces = BTreeMap::new();
interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
let allowed_at = queues
.queue_for(&InterfaceId(1))
.unwrap()
.announce_allowed_at;
let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
assert_eq!(actions.len(), 1);
assert!(queues.queue_for(&InterfaceId(1)).is_none());
assert_eq!(queues.queue_count(), 0);
}
#[test]
fn test_process_queues_keeps_nonempty_queue() {
let mut queues = AnnounceQueues::new(1024);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
3,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x03; 100].into(),
[0xCC; 16],
4,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let mut interfaces = BTreeMap::new();
interfaces.insert(InterfaceId(1), make_interface_info(1, Some(1000)));
let allowed_at = queues
.queue_for(&InterfaceId(1))
.unwrap()
.announce_allowed_at;
let actions = queues.process_queues(allowed_at + 1.0, &interfaces);
assert_eq!(actions.len(), 1);
assert!(queues.queue_for(&InterfaceId(1)).is_some());
assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
}
#[test]
fn test_gate_announce_refuses_new_interface_when_at_capacity() {
let mut queues = AnnounceQueues::new(1);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let second = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
3,
0.0,
0.0,
Some(1000),
None,
0.02,
);
assert!(second.is_none());
assert_eq!(queues.queue_count(), 1);
let rejected = queues.gate_announce(
InterfaceId(2),
vec![0x03; 100].into(),
[0xCC; 16],
4,
0.0,
0.0,
Some(1000),
None,
0.02,
);
assert!(rejected.is_none());
assert_eq!(queues.queue_count(), 1);
assert!(queues.queue_for(&InterfaceId(2)).is_none());
assert_eq!(queues.interface_cap_drop_count(), 1);
}
#[test]
fn test_gate_announce_allows_existing_queue_when_at_capacity() {
let mut queues = AnnounceQueues::new(1);
let _ = queues.gate_announce(
InterfaceId(1),
vec![0x01; 100].into(),
[0xAA; 16],
2,
0.0,
0.0,
Some(1000),
None,
0.02,
);
let queued = queues.gate_announce(
InterfaceId(1),
vec![0x02; 100].into(),
[0xBB; 16],
3,
0.0,
0.0,
Some(1000),
None,
0.02,
);
assert!(queued.is_none());
assert_eq!(queues.queue_count(), 1);
assert_eq!(queues.queue_for(&InterfaceId(1)).unwrap().entries.len(), 1);
assert_eq!(queues.interface_cap_drop_count(), 0);
}
}