#[cfg(not(target_pointer_width = "64"))]
use std::sync::atomic::AtomicU32 as AtomicCounter;
#[cfg(target_pointer_width = "64")]
use std::sync::atomic::AtomicU64 as AtomicCounter;
#[cfg(target_pointer_width = "64")]
type CounterValue = u64;
#[cfg(not(target_pointer_width = "64"))]
type CounterValue = u32;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct GatewayStats {
pub lora_rx_count: AtomicCounter,
pub lora_tx_count: AtomicCounter,
pub wifi_rx_count: AtomicCounter,
pub wifi_tx_count: AtomicCounter,
pub dropped_rate_limit: AtomicCounter,
pub dropped_filter: AtomicCounter,
pub lora_rx_bytes: AtomicCounter,
pub lora_tx_bytes: AtomicCounter,
pub wifi_rx_bytes: AtomicCounter,
pub wifi_tx_bytes: AtomicCounter,
pub parse_errors: AtomicCounter,
start_time: Instant,
}
impl GatewayStats {
pub fn new() -> Self {
Self {
lora_rx_count: AtomicCounter::new(0),
lora_tx_count: AtomicCounter::new(0),
wifi_rx_count: AtomicCounter::new(0),
wifi_tx_count: AtomicCounter::new(0),
dropped_rate_limit: AtomicCounter::new(0),
dropped_filter: AtomicCounter::new(0),
lora_rx_bytes: AtomicCounter::new(0),
lora_tx_bytes: AtomicCounter::new(0),
wifi_rx_bytes: AtomicCounter::new(0),
wifi_tx_bytes: AtomicCounter::new(0),
parse_errors: AtomicCounter::new(0),
start_time: Instant::now(),
}
}
pub fn record_lora_rx(&self, bytes: usize) {
self.lora_rx_count.fetch_add(1, Ordering::Relaxed);
self.lora_rx_bytes
.fetch_add(bytes as CounterValue, Ordering::Relaxed);
}
pub fn record_lora_tx(&self, bytes: usize) {
self.lora_tx_count.fetch_add(1, Ordering::Relaxed);
self.lora_tx_bytes
.fetch_add(bytes as CounterValue, Ordering::Relaxed);
}
pub fn record_wifi_rx(&self, bytes: usize) {
self.wifi_rx_count.fetch_add(1, Ordering::Relaxed);
self.wifi_rx_bytes
.fetch_add(bytes as CounterValue, Ordering::Relaxed);
}
pub fn record_wifi_tx(&self, bytes: usize) {
self.wifi_tx_count.fetch_add(1, Ordering::Relaxed);
self.wifi_tx_bytes
.fetch_add(bytes as CounterValue, Ordering::Relaxed);
}
pub fn record_rate_limit_drop(&self) {
self.dropped_rate_limit.fetch_add(1, Ordering::Relaxed);
}
pub fn record_filter_drop(&self) {
self.dropped_filter.fetch_add(1, Ordering::Relaxed);
}
pub fn record_parse_error(&self) {
self.parse_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub fn snapshot(&self) -> StatsSnapshot {
StatsSnapshot {
lora_rx_count: self.lora_rx_count.load(Ordering::Relaxed),
lora_tx_count: self.lora_tx_count.load(Ordering::Relaxed),
wifi_rx_count: self.wifi_rx_count.load(Ordering::Relaxed),
wifi_tx_count: self.wifi_tx_count.load(Ordering::Relaxed),
dropped_rate_limit: self.dropped_rate_limit.load(Ordering::Relaxed),
dropped_filter: self.dropped_filter.load(Ordering::Relaxed),
lora_rx_bytes: self.lora_rx_bytes.load(Ordering::Relaxed),
lora_tx_bytes: self.lora_tx_bytes.load(Ordering::Relaxed),
wifi_rx_bytes: self.wifi_rx_bytes.load(Ordering::Relaxed),
wifi_tx_bytes: self.wifi_tx_bytes.load(Ordering::Relaxed),
parse_errors: self.parse_errors.load(Ordering::Relaxed),
uptime_secs: self.start_time.elapsed().as_secs(),
}
}
pub fn format_summary(&self) -> String {
let snap = self.snapshot();
format!(
"Gateway Stats (uptime: {}s)\n\
LoRa: RX {} msgs ({} bytes), TX {} msgs ({} bytes)\n\
WiFi: RX {} msgs ({} bytes), TX {} msgs ({} bytes)\n\
Drops: rate_limit={}, filter={}, errors={}",
snap.uptime_secs,
snap.lora_rx_count,
snap.lora_rx_bytes,
snap.lora_tx_count,
snap.lora_tx_bytes,
snap.wifi_rx_count,
snap.wifi_rx_bytes,
snap.wifi_tx_count,
snap.wifi_tx_bytes,
snap.dropped_rate_limit,
snap.dropped_filter,
snap.parse_errors
)
}
}
impl Default for GatewayStats {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct StatsSnapshot {
pub lora_rx_count: CounterValue,
pub lora_tx_count: CounterValue,
pub wifi_rx_count: CounterValue,
pub wifi_tx_count: CounterValue,
pub dropped_rate_limit: CounterValue,
pub dropped_filter: CounterValue,
pub lora_rx_bytes: CounterValue,
pub lora_tx_bytes: CounterValue,
pub wifi_rx_bytes: CounterValue,
pub wifi_tx_bytes: CounterValue,
pub parse_errors: CounterValue,
pub uptime_secs: u64,
}
#[derive(Debug)]
pub struct RateLimiter {
capacity: u32,
tokens: f64,
rate: f64,
last_refill: Instant,
}
impl RateLimiter {
pub fn new(rate: u32, burst: u32) -> Self {
let capacity = if burst == 0 { rate } else { burst };
Self {
capacity,
tokens: capacity as f64,
rate: rate as f64,
last_refill: Instant::now(),
}
}
pub fn try_acquire(&mut self) -> bool {
self.refill();
if self.tokens >= 1.0 {
self.tokens -= 1.0;
true
} else {
false
}
}
pub fn try_acquire_n(&mut self, n: u32) -> bool {
self.refill();
let needed = n as f64;
if self.tokens >= needed {
self.tokens -= needed;
true
} else {
false
}
}
fn refill(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill);
self.last_refill = now;
let add = elapsed.as_secs_f64() * self.rate;
self.tokens = (self.tokens + add).min(self.capacity as f64);
}
pub fn available(&mut self) -> u32 {
self.refill();
self.tokens as u32
}
pub fn reset(&mut self) {
self.tokens = self.capacity as f64;
self.last_refill = Instant::now();
}
}
pub struct TopicRateLimiter {
default_rate: u32,
limiters: std::collections::HashMap<String, RateLimiter>,
global: RateLimiter,
}
impl TopicRateLimiter {
pub fn new(global_rate: u32, default_topic_rate: u32) -> Self {
Self {
default_rate: default_topic_rate,
limiters: std::collections::HashMap::new(),
global: RateLimiter::new(global_rate, global_rate * 2),
}
}
pub fn try_acquire(&mut self, topic: Option<&str>) -> bool {
if !self.global.try_acquire() {
return false;
}
if let Some(t) = topic {
let rate = self.default_rate;
let limiter = self
.limiters
.entry(t.to_string())
.or_insert_with(|| RateLimiter::new(rate, rate * 2));
limiter.try_acquire()
} else {
true
}
}
pub fn set_topic_rate(&mut self, topic: &str, rate: u32) {
self.limiters
.insert(topic.to_string(), RateLimiter::new(rate, rate * 2));
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_stats_recording() {
let stats = GatewayStats::new();
stats.record_lora_rx(100);
stats.record_lora_rx(50);
stats.record_wifi_tx(200);
let snap = stats.snapshot();
assert_eq!(snap.lora_rx_count, 2);
assert_eq!(snap.lora_rx_bytes, 150);
assert_eq!(snap.wifi_tx_count, 1);
assert_eq!(snap.wifi_tx_bytes, 200);
}
#[test]
fn test_rate_limiter_basic() {
let mut limiter = RateLimiter::new(10, 10);
for _ in 0..10 {
assert!(limiter.try_acquire());
}
assert!(!limiter.try_acquire());
}
#[test]
fn test_rate_limiter_refill() {
let mut limiter = RateLimiter::new(100, 10);
for _ in 0..10 {
limiter.try_acquire();
}
assert!(!limiter.try_acquire());
thread::sleep(Duration::from_millis(50));
assert!(limiter.try_acquire());
}
#[test]
fn test_topic_rate_limiter() {
let mut limiter = TopicRateLimiter::new(100, 10);
assert!(limiter.try_acquire(None));
assert!(limiter.try_acquire(Some("Temperature")));
for _ in 0..19 {
limiter.try_acquire(Some("Temperature"));
}
assert!(!limiter.try_acquire(Some("Temperature")));
assert!(limiter.try_acquire(Some("Humidity")));
}
#[test]
fn test_stats_format() {
let stats = GatewayStats::new();
stats.record_lora_rx(100);
stats.record_wifi_tx(200);
let summary = stats.format_summary();
assert!(summary.contains("LoRa:"));
assert!(summary.contains("WiFi:"));
}
}