use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use iroh::PublicKey;
use tokio::sync::mpsc;
use tracing::{debug, warn};
use crate::gossip::GossipEvent;
use crate::packet::{GossipFrame, content_hash};
use crate::radio::{ConfigSource, RadioEvent, TxRequest, TxRetryReason};
use crate::rate_limit::RateLimiter;
#[derive(Debug, Clone)]
pub struct RadioConfigInfo {
pub active: donglora_client::LoRaConfig,
pub requested: donglora_client::LoRaConfig,
pub source: ConfigSource,
pub device: String,
pub connected: bool,
}
#[derive(Debug, Clone)]
pub struct PacketLogEntry {
pub timestamp: Instant,
pub hash: [u8; 32],
pub direction: PacketDirection,
pub size: usize,
pub snr: Option<i16>,
pub rssi: Option<i16>,
pub action: PacketAction,
pub tx_retry: Option<TxRetryInfo>,
}
#[derive(Debug, Clone, Copy)]
pub struct TxRetryInfo {
pub attempt: u8,
pub total_attempts: u8,
pub state: TxRetryState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxRetryState {
InFlight,
Retrying(TxRetryReason),
Succeeded,
Failed,
}
#[derive(Debug, Clone, Copy)]
pub enum PacketDirection {
RadioIn,
GossipIn,
}
#[derive(Debug, Clone, Copy)]
pub enum PacketAction {
Bridged,
DroppedDedup,
DroppedQueueFull,
DroppedRateLimit,
}
impl std::fmt::Display for PacketAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bridged => write!(f, ""),
Self::DroppedDedup => write!(f, "DUP"),
Self::DroppedQueueFull => write!(f, "FULL"),
Self::DroppedRateLimit => write!(f, "RATE"),
}
}
}
#[derive(Debug, Default)]
pub struct Stats {
pub radio_rx: AtomicU64,
pub radio_tx: AtomicU64,
pub gossip_rx: AtomicU64,
pub gossip_tx: AtomicU64,
pub dedup_hits: AtomicU64,
pub rate_limit_drops: AtomicU64,
pub dropped_queue: AtomicU64,
pub neighbor_count: AtomicU64,
pub radio_connected: AtomicU64, pub tx_retries: AtomicU64,
pub tx_failures: AtomicU64,
}
impl Stats {
#[must_use]
pub fn snapshot(&self) -> StatsSnapshot {
StatsSnapshot {
radio_rx: self.radio_rx.load(Ordering::Relaxed),
radio_tx: self.radio_tx.load(Ordering::Relaxed),
gossip_rx: self.gossip_rx.load(Ordering::Relaxed),
gossip_tx: self.gossip_tx.load(Ordering::Relaxed),
dedup_hits: self.dedup_hits.load(Ordering::Relaxed),
rate_limit_drops: self.rate_limit_drops.load(Ordering::Relaxed),
dropped_queue: self.dropped_queue.load(Ordering::Relaxed),
neighbor_count: self.neighbor_count.load(Ordering::Relaxed),
radio_connected: self.radio_connected.load(Ordering::Relaxed) != 0,
tx_retries: self.tx_retries.load(Ordering::Relaxed),
tx_failures: self.tx_failures.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct StatsSnapshot {
pub radio_rx: u64,
pub radio_tx: u64,
pub gossip_rx: u64,
pub gossip_tx: u64,
pub dedup_hits: u64,
pub rate_limit_drops: u64,
pub dropped_queue: u64,
pub neighbor_count: u64,
pub radio_connected: bool,
pub tx_retries: u64,
pub tx_failures: u64,
}
struct DedupCache {
entries: HashMap<[u8; 32], Instant>,
ttl: Duration,
}
impl DedupCache {
fn new(ttl: Duration) -> Self {
Self { entries: HashMap::new(), ttl }
}
fn check_and_insert(&mut self, key: [u8; 32]) -> bool {
let now = Instant::now();
if let Some(ts) = self.entries.get(&key)
&& now.duration_since(*ts) < self.ttl
{
return true;
}
self.entries.insert(key, now);
false
}
fn prune(&mut self) {
let now = Instant::now();
self.entries.retain(|_, ts| now.duration_since(*ts) < self.ttl);
}
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
pub async fn run(
our_id: PublicKey,
dedup_window: Duration,
tx_queue_size: usize,
requested_radio_config: donglora_client::LoRaConfig,
mut rate_limiter: RateLimiter,
mut radio_rx: mpsc::Receiver<RadioEvent>,
radio_tx: mpsc::Sender<TxRequest>,
mut gossip_rx: mpsc::Receiver<GossipEvent>,
gossip_tx: mpsc::Sender<GossipFrame>,
stats: &Stats,
log_tx: mpsc::Sender<PacketLogEntry>,
config_tx: tokio::sync::watch::Sender<RadioConfigInfo>,
) {
let mut dedup = DedupCache::new(dedup_window);
let mut next_tx_seq: u64 = 0;
let mut tx_queue: VecDeque<Vec<u8>> = VecDeque::with_capacity(tx_queue_size);
let mut last_cleanup = Instant::now();
let cleanup_interval = Duration::from_secs(30);
loop {
if last_cleanup.elapsed() >= cleanup_interval {
dedup.prune();
last_cleanup = Instant::now();
}
tokio::select! {
event = radio_rx.recv() => {
let Some(event) = event else { break };
match event {
RadioEvent::Packet(pkt) => {
stats.radio_rx.fetch_add(1, Ordering::Relaxed);
let hash = content_hash(&pkt.payload);
if dedup.check_and_insert(hash) {
stats.dedup_hits.fetch_add(1, Ordering::Relaxed);
debug!("radio RX dedup suppressed ({} bytes)", pkt.payload.len());
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash,
direction: PacketDirection::RadioIn,
size: pkt.payload.len(),
snr: Some(pkt.snr),
rssi: Some(pkt.rssi),
action: PacketAction::DroppedDedup,
tx_retry: None,
}).await;
continue;
}
let frame = GossipFrame::new(&our_id, pkt.rssi, pkt.snr, pkt.payload.clone());
stats.gossip_tx.fetch_add(1, Ordering::Relaxed);
let _ = gossip_tx.send(frame).await;
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash,
direction: PacketDirection::RadioIn,
size: pkt.payload.len(),
snr: Some(pkt.snr),
rssi: Some(pkt.rssi),
action: PacketAction::Bridged,
tx_retry: None,
}).await;
}
RadioEvent::Connected(active_config, source, device) => {
stats.radio_connected.store(1, Ordering::Relaxed);
let _ = config_tx.send(RadioConfigInfo {
active: active_config,
requested: requested_radio_config,
source,
device,
connected: true,
});
}
RadioEvent::Disconnected => {
stats.radio_connected.store(0, Ordering::Relaxed);
config_tx.send_modify(|info| info.connected = false);
}
RadioEvent::TxAttempt { .. } => {
}
RadioEvent::TxRetry { seq, attempt_that_failed, total_attempts, reason, backoff_ms: _ } => {
stats.tx_retries.fetch_add(1, Ordering::Relaxed);
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash: tx_seq_hash(seq),
direction: PacketDirection::GossipIn,
size: 0,
snr: None,
rssi: None,
action: PacketAction::Bridged,
tx_retry: Some(TxRetryInfo {
attempt: attempt_that_failed.saturating_add(1),
total_attempts,
state: TxRetryState::Retrying(reason),
}),
}).await;
}
RadioEvent::TxSucceeded { seq, attempts_used, final_airtime_us: _, size } => {
stats.radio_tx.fetch_add(1, Ordering::Relaxed);
if attempts_used > 1 {
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash: tx_seq_hash(seq),
direction: PacketDirection::GossipIn,
size,
snr: None,
rssi: None,
action: PacketAction::Bridged,
tx_retry: Some(TxRetryInfo {
attempt: attempts_used,
total_attempts: attempts_used,
state: TxRetryState::Succeeded,
}),
}).await;
}
}
RadioEvent::TxFailed { seq, attempts_used, reason, size } => {
stats.tx_failures.fetch_add(1, Ordering::Relaxed);
warn!("TX #{seq} exhausted retries: {reason}");
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash: tx_seq_hash(seq),
direction: PacketDirection::GossipIn,
size,
snr: None,
rssi: None,
action: PacketAction::Bridged,
tx_retry: Some(TxRetryInfo {
attempt: attempts_used,
total_attempts: attempts_used,
state: TxRetryState::Failed,
}),
}).await;
}
}
}
event = gossip_rx.recv() => {
let Some(event) = event else { break };
match event {
GossipEvent::Frame(frame) => {
stats.gossip_rx.fetch_add(1, Ordering::Relaxed);
let hash = content_hash(&frame.payload);
if dedup.check_and_insert(hash) {
stats.dedup_hits.fetch_add(1, Ordering::Relaxed);
debug!("gossip RX dedup suppressed ({} bytes)", frame.payload.len());
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash,
direction: PacketDirection::GossipIn,
size: frame.payload.len(),
snr: Some(frame.snr),
rssi: Some(frame.rssi),
action: PacketAction::DroppedDedup,
tx_retry: None,
}).await;
continue;
}
if !rate_limiter.try_acquire() {
stats.rate_limit_drops.fetch_add(1, Ordering::Relaxed);
debug!("rate limited ({} bytes)", frame.payload.len());
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash,
direction: PacketDirection::GossipIn,
size: frame.payload.len(),
snr: Some(frame.snr),
rssi: Some(frame.rssi),
action: PacketAction::DroppedRateLimit,
tx_retry: None,
}).await;
continue;
}
if tx_queue.len() >= tx_queue_size {
stats.dropped_queue.fetch_add(1, Ordering::Relaxed);
warn!("TX queue full, dropping packet ({} bytes)", frame.payload.len());
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash,
direction: PacketDirection::GossipIn,
size: frame.payload.len(),
snr: Some(frame.snr),
rssi: Some(frame.rssi),
action: PacketAction::DroppedQueueFull,
tx_retry: None,
}).await;
continue;
}
tx_queue.push_back(frame.payload.clone());
let _ = log_tx.send(PacketLogEntry {
timestamp: Instant::now(),
hash,
direction: PacketDirection::GossipIn,
size: frame.payload.len(),
snr: Some(frame.snr),
rssi: Some(frame.rssi),
action: PacketAction::Bridged,
tx_retry: None,
}).await;
}
GossipEvent::NeighborChanged(count) => {
#[allow(clippy::cast_possible_truncation)]
stats.neighbor_count.store(count as u64, Ordering::Relaxed);
}
}
}
}
while let Some(payload) = tx_queue.pop_front() {
let seq = {
next_tx_seq = next_tx_seq.wrapping_add(1);
next_tx_seq
};
if radio_tx.send(TxRequest { seq, data: payload }).await.is_err() {
break;
}
}
}
}
const fn tx_seq_hash(seq: u64) -> [u8; 32] {
let mut out = [0u8; 32];
out[0] = 0x01; let bytes = seq.to_le_bytes();
out[1] = bytes[0];
out[2] = bytes[1];
out[3] = bytes[2];
out[4] = bytes[3];
out[5] = bytes[4];
out[6] = bytes[5];
out[7] = bytes[6];
out[8] = bytes[7];
out
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn dedup_cache_new_key_not_duplicate() {
let mut cache = DedupCache::new(Duration::from_secs(10));
assert!(!cache.check_and_insert([0u8; 32]));
}
#[test]
fn dedup_cache_duplicate_detected() {
let mut cache = DedupCache::new(Duration::from_secs(10));
cache.check_and_insert([1u8; 32]);
assert!(cache.check_and_insert([1u8; 32]));
}
#[test]
fn dedup_cache_different_keys_independent() {
let mut cache = DedupCache::new(Duration::from_secs(10));
cache.check_and_insert([1u8; 32]);
assert!(!cache.check_and_insert([2u8; 32]));
}
#[test]
fn dedup_cache_expired_entry_not_duplicate() {
let mut cache = DedupCache::new(Duration::from_millis(1));
cache.check_and_insert([1u8; 32]);
std::thread::sleep(Duration::from_millis(5));
assert!(!cache.check_and_insert([1u8; 32]));
}
#[test]
fn dedup_cache_prune_removes_expired() {
let mut cache = DedupCache::new(Duration::from_millis(1));
cache.check_and_insert([1u8; 32]);
cache.check_and_insert([2u8; 32]);
std::thread::sleep(Duration::from_millis(5));
cache.prune();
assert!(cache.entries.is_empty());
}
#[test]
fn dedup_cache_prune_keeps_fresh() {
let mut cache = DedupCache::new(Duration::from_secs(10));
cache.check_and_insert([1u8; 32]);
cache.prune();
assert_eq!(cache.entries.len(), 1);
}
#[test]
fn stats_snapshot_reflects_increments() {
let stats = Stats::default();
stats.radio_rx.fetch_add(5, Ordering::Relaxed);
stats.gossip_tx.fetch_add(3, Ordering::Relaxed);
stats.radio_connected.store(1, Ordering::Relaxed);
let snap = stats.snapshot();
assert_eq!(snap.radio_rx, 5);
assert_eq!(snap.gossip_tx, 3);
assert!(snap.radio_connected);
}
#[test]
fn stats_default_is_zeroed() {
let snap = Stats::default().snapshot();
assert_eq!(snap.radio_rx, 0);
assert_eq!(snap.gossip_rx, 0);
assert!(!snap.radio_connected);
}
#[test]
fn packet_action_display() {
assert_eq!(format!("{}", PacketAction::Bridged), "");
assert_eq!(format!("{}", PacketAction::DroppedDedup), "DUP");
assert_eq!(format!("{}", PacketAction::DroppedQueueFull), "FULL");
assert_eq!(format!("{}", PacketAction::DroppedRateLimit), "RATE");
}
#[test]
fn dedup_cache_ttl_boundary_check_and_insert() {
let ttl = Duration::from_secs(10);
let mut cache = DedupCache::new(ttl);
cache.check_and_insert([9u8; 32]);
let exactly_ttl_ago = Instant::now().checked_sub(ttl).unwrap();
cache.entries.insert([9u8; 32], exactly_ttl_ago);
assert!(!cache.check_and_insert([9u8; 32]), "entry at exact TTL must be treated as expired");
}
#[test]
fn dedup_cache_ttl_boundary_prune() {
let ttl = Duration::from_secs(10);
let mut cache = DedupCache::new(ttl);
cache.check_and_insert([8u8; 32]);
let exactly_ttl_ago = Instant::now().checked_sub(ttl).unwrap();
cache.entries.insert([8u8; 32], exactly_ttl_ago);
cache.prune();
assert!(cache.entries.is_empty(), "entry at exact TTL must be pruned");
}
}