use std::io;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use super::config::GatewayConfig;
use super::routing::Router;
use super::stats::{GatewayStats, TopicRateLimiter};
#[derive(Debug)]
pub struct LoRaMessage {
pub node_id: u8,
pub rssi: i16,
pub snr: i8,
pub payload: Vec<u8>,
}
pub struct Bridge {
config: GatewayConfig,
router: Router,
stats: Arc<GatewayStats>,
rate_limiter: TopicRateLimiter,
running: Arc<AtomicBool>,
udp_socket: Option<UdpSocket>,
multicast_addr: SocketAddr,
}
impl Bridge {
pub fn new(config: GatewayConfig) -> io::Result<Self> {
let stats = Arc::new(GatewayStats::new());
let rate_limiter = TopicRateLimiter::new(
config.max_messages_per_second,
config.max_messages_per_second / 10,
);
let mut router = Router::new();
if !config.bridge_all_topics {
let lora_topics: Vec<_> = config.lora_to_wifi_topics.iter().cloned().collect();
let wifi_topics: Vec<_> = config.wifi_to_lora_topics.iter().cloned().collect();
if !lora_topics.is_empty() {
router.set_lora_to_wifi_filter(lora_topics);
}
if !wifi_topics.is_empty() {
router.set_wifi_to_lora_filter(wifi_topics);
}
}
let multicast_addr: SocketAddr =
format!("{}:{}", config.multicast_address, config.udp_port)
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
Ok(Self {
config,
router,
stats,
rate_limiter,
running: Arc::new(AtomicBool::new(false)),
udp_socket: None,
multicast_addr,
})
}
pub fn bind(&mut self) -> io::Result<()> {
let bind_addr = format!("0.0.0.0:{}", self.config.udp_port);
let socket = UdpSocket::bind(&bind_addr)?;
let multicast_ip: std::net::Ipv4Addr = self
.config
.multicast_address
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
socket.join_multicast_v4(&multicast_ip, &std::net::Ipv4Addr::UNSPECIFIED)?;
socket.set_nonblocking(true)?;
socket.set_read_timeout(Some(Duration::from_millis(100)))?;
self.udp_socket = Some(socket);
Ok(())
}
pub fn stats(&self) -> Arc<GatewayStats> {
Arc::clone(&self.stats)
}
pub fn process_lora_message(&mut self, msg: LoRaMessage) -> io::Result<bool> {
self.stats.record_lora_rx(msg.payload.len());
let topic = self.router.extract_topic_from_rtps(&msg.payload);
if !self.router.should_forward_to_wifi(topic.as_deref()) {
self.stats.record_filter_drop();
return Ok(false);
}
if !self.rate_limiter.try_acquire(topic.as_deref()) {
self.stats.record_rate_limit_drop();
return Ok(false);
}
if let Some(ref t) = topic {
self.router.register_lora_node(msg.node_id, t);
self.router.update_topic_stats(t, msg.node_id as u32);
}
self.send_to_wifi(&msg.payload)?;
Ok(true)
}
pub fn process_wifi_message(
&mut self,
data: &[u8],
_src: SocketAddr,
) -> io::Result<Option<Vec<u8>>> {
self.stats.record_wifi_rx(data.len());
let topic = self.router.extract_topic_from_rtps(data);
if !self.router.should_forward_to_lora(topic.as_deref()) {
self.stats.record_filter_drop();
return Ok(None);
}
if !self.rate_limiter.try_acquire(topic.as_deref()) {
self.stats.record_rate_limit_drop();
return Ok(None);
}
if let Some(ref t) = topic {
self.router.update_topic_stats(t, 0);
}
self.stats.record_lora_tx(data.len());
Ok(Some(data.to_vec()))
}
fn send_to_wifi(&mut self, data: &[u8]) -> io::Result<()> {
if let Some(ref socket) = self.udp_socket {
socket.send_to(data, self.multicast_addr)?;
self.stats.record_wifi_tx(data.len());
}
Ok(())
}
pub fn recv_wifi(&self) -> io::Result<Option<(Vec<u8>, SocketAddr)>> {
if let Some(ref socket) = self.udp_socket {
let mut buf = vec![0u8; 2048];
match socket.recv_from(&mut buf) {
Ok((len, src)) => {
buf.truncate(len);
Ok(Some((buf, src)))
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(None),
Err(e) => Err(e),
}
} else {
Ok(None)
}
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
}
pub fn running_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.running)
}
pub fn run<FR, FS>(&mut self, mut lora_recv: FR, mut lora_send: FS) -> io::Result<()>
where
FR: FnMut() -> Option<LoRaMessage>,
FS: FnMut(&[u8]) -> io::Result<()>,
{
self.running.store(true, Ordering::Relaxed);
let stats_interval = Duration::from_secs(self.config.stats_interval_secs);
let mut last_stats = std::time::Instant::now();
while self.running.load(Ordering::Relaxed) {
if let Some(msg) = lora_recv() {
if let Err(e) = self.process_lora_message(msg) {
eprintln!("Error processing LoRa message: {}", e);
self.stats.record_parse_error();
}
}
match self.recv_wifi() {
Ok(Some((data, src))) => match self.process_wifi_message(&data, src) {
Ok(Some(to_send)) => {
if let Err(e) = lora_send(&to_send) {
eprintln!("Error sending to LoRa: {}", e);
}
}
Ok(None) => {}
Err(e) => {
eprintln!("Error processing WiFi message: {}", e);
self.stats.record_parse_error();
}
},
Ok(None) => {}
Err(e) if e.kind() != io::ErrorKind::WouldBlock => {
eprintln!("UDP receive error: {}", e);
}
_ => {}
}
if self.config.enable_stats && last_stats.elapsed() >= stats_interval {
println!("{}", self.stats.format_summary());
last_stats = std::time::Instant::now();
}
thread::sleep(Duration::from_millis(1));
}
Ok(())
}
pub fn router(&self) -> &Router {
&self.router
}
pub fn router_mut(&mut self) -> &mut Router {
&mut self.router
}
}
pub struct BridgeBuilder {
config: GatewayConfig,
}
impl BridgeBuilder {
pub fn new() -> Self {
Self {
config: GatewayConfig::default(),
}
}
pub fn udp_port(mut self, port: u16) -> Self {
self.config.udp_port = port;
self
}
pub fn domain_id(mut self, id: u32) -> Self {
self.config.domain_id = id;
self
}
pub fn node_id(mut self, id: u8) -> Self {
self.config.node_id = id;
self
}
pub fn lora_to_wifi_topic(mut self, topic: &str) -> Self {
self.config.bridge_all_topics = false;
self.config.lora_to_wifi_topics.insert(topic.to_string());
self
}
pub fn wifi_to_lora_topic(mut self, topic: &str) -> Self {
self.config.bridge_all_topics = false;
self.config.wifi_to_lora_topics.insert(topic.to_string());
self
}
pub fn bridge_all(mut self) -> Self {
self.config.bridge_all_topics = true;
self
}
pub fn rate_limit(mut self, msgs_per_sec: u32) -> Self {
self.config.max_messages_per_second = msgs_per_sec;
self
}
pub fn multicast_address(mut self, addr: &str) -> Self {
self.config.multicast_address = addr.to_string();
self
}
pub fn enable_stats(mut self, enable: bool) -> Self {
self.config.enable_stats = enable;
self
}
pub fn build(self) -> io::Result<Bridge> {
Bridge::new(self.config)
}
}
impl Default for BridgeBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bridge_creation() {
let bridge = Bridge::new(GatewayConfig::default());
assert!(bridge.is_ok());
}
#[test]
fn test_bridge_builder() {
let bridge = BridgeBuilder::new()
.udp_port(17402)
.domain_id(1)
.node_id(200)
.lora_to_wifi_topic("Temperature")
.wifi_to_lora_topic("Command")
.rate_limit(50)
.build();
assert!(bridge.is_ok());
}
#[test]
fn test_process_lora_message() {
let mut bridge = BridgeBuilder::new()
.udp_port(17403)
.bridge_all()
.build()
.unwrap();
let msg = LoRaMessage {
node_id: 1,
rssi: -80,
snr: 10,
payload: vec![0x52, 0x54, 0x50, 0x53, 0x02, 0x01], };
let result = bridge.process_lora_message(msg);
assert!(result.is_ok());
let stats = bridge.stats();
assert_eq!(stats.snapshot().lora_rx_count, 1);
}
#[test]
fn test_topic_filtering() {
let mut bridge = BridgeBuilder::new()
.udp_port(17404)
.lora_to_wifi_topic("Temperature")
.build()
.unwrap();
let msg = LoRaMessage {
node_id: 1,
rssi: -80,
snr: 10,
payload: vec![1, 2, 3, 4],
};
let result = bridge.process_lora_message(msg);
assert!(result.is_ok());
}
#[test]
fn test_rate_limiting() {
let mut bridge = BridgeBuilder::new()
.udp_port(17405)
.bridge_all()
.rate_limit(5)
.build()
.unwrap();
for i in 0..20 {
let msg = LoRaMessage {
node_id: 1,
rssi: -80,
snr: 10,
payload: vec![i as u8],
};
let _ = bridge.process_lora_message(msg);
}
let stats = bridge.stats();
let snap = stats.snapshot();
assert!(snap.dropped_rate_limit > 0);
assert!(snap.lora_rx_count == 20); }
#[test]
fn test_running_flag() {
let bridge = Bridge::new(GatewayConfig::default()).unwrap();
assert!(!bridge.is_running());
let flag = bridge.running_flag();
flag.store(true, Ordering::Relaxed);
assert!(bridge.is_running());
bridge.stop();
assert!(!bridge.is_running());
}
}