use std::{net::SocketAddr, sync::Arc};
use tokio::sync::{mpsc, broadcast};
use tracing::{info, error};
use crate::udp::{receiver::{Receiver, self}, sender::Sender, UdpEvent};
use super::{Error, events::UdpEventTx, datagram::Datagram, UdpEventRx};
#[cfg(feature = "json")]
use std::net::ToSocketAddrs;
#[cfg(feature = "json")]
use serde::Serialize;
pub struct UdpSocket {
tx: mpsc::Sender<Datagram>,
event_tx: UdpEventTx
}
impl UdpSocket {
pub fn new(src: SocketAddr) -> Self {
let (tx, sender_rx) = mpsc::channel(32);
let (event_tx, _) = broadcast::channel(32);
tokio::spawn(run_socket(src, sender_rx, event_tx.clone()));
Self { tx, event_tx }
}
pub fn new_instance(src: SocketAddr, event_tx: UdpEventTx) -> Self {
let (tx, sender_rx) = mpsc::channel(32);
tokio::spawn(run_socket(src, sender_rx, event_tx.clone()));
Self { tx, event_tx }
}
pub async fn send(&self, datagram: Datagram) -> Result<(), Error> {
self.tx.send(datagram).await.map_err(|e| Error::Command(e.to_string()))
}
#[cfg(feature = "json")]
pub async fn send_json<T, A>(&self, object: &T, dest: A) -> Result<(), Error>
where
T: Serialize,
A: ToSocketAddrs,
{
let datagram = Datagram::as_json(object, dest)?;
self.send(datagram).await
}
pub fn get_rx(&self) -> UdpEventRx {
self.event_tx.subscribe()
}
}
async fn run_socket(src: SocketAddr, mut sender_rx: mpsc::Receiver<Datagram>, event_tx: UdpEventTx) {
if let Ok(s) = create_socket(src.clone()) {
let socket = Arc::new(s);
let mut receiver = Receiver::new(socket.clone());
let sender = Sender::new(socket.clone());
info!("Udp socket bound ({:?})", socket.local_addr().unwrap());
loop {
tokio::select! {
recv_result = receiver.on_event() => {
match recv_result {
Some(event) => {
match event {
receiver::Event::Data(data, addr) => {
let _ = event_tx.send(UdpEvent::Data((data, addr).into()));
},
receiver::Event::Closed => {
break;
},
receiver::Event::Dropped => {
break;
},
}
},
None => {
error!("UDP Receiver event channel closed");
break;
}
}
},
send_result = sender_rx.recv() => {
match send_result {
Some(datagram) => {
let dst = datagram.1.clone();
let result = sender.send(datagram).await;
if let Err(e) = result {
error!("Socket ({}) failed to send udp datagram to {:?} - {:?}", socket.local_addr().unwrap(), dst, e);
break;
}
},
None => {
break;
}
}
}
}
}
}
}
fn create_socket(address: SocketAddr) -> Result<tokio::net::UdpSocket, Error> {
let domain = if address.is_ipv4() { socket2::Domain::IPV4 } else { socket2::Domain::IPV6 };
let socket = socket2::Socket::new(domain, socket2::Type::DGRAM, None)?;
socket.set_reuse_address(true)?;
socket.set_reuse_port(true)?;
socket.set_nonblocking(true)?;
socket.bind(&socket2::SockAddr::from(address)).map_err(|error| Error::Bind { address, error })?;
let async_socket = tokio::net::UdpSocket::from_std(socket.into())?;
Ok(async_socket)
}