use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
#[cfg(not(feature = "compio"))]
pub type UdpHandler = Arc<
dyn Fn(Vec<u8>, SocketAddr, Arc<tokio::net::UdpSocket>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync,
>;
#[cfg(feature = "compio")]
pub type UdpHandler = Arc<
dyn Fn(Vec<u8>, SocketAddr, Arc<compio::net::UdpSocket>) -> Pin<Box<dyn Future<Output = ()>>>
+ Send
+ Sync,
>;
#[cfg(not(feature = "compio"))]
pub async fn serve_udp<F>(addr: &str, handler: F) -> std::io::Result<()>
where
F: Fn(Vec<u8>, SocketAddr, Arc<tokio::net::UdpSocket>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
{
let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?);
tracing::info!("UDP server listening on {}", socket.local_addr()?);
let handler = Arc::new(handler);
let mut buf = vec![0u8; 65535];
loop {
let (len, peer) = socket.recv_from(&mut buf).await?;
let data = buf[..len].to_vec();
let socket = Arc::clone(&socket);
let handler = Arc::clone(&handler);
tokio::spawn(async move {
handler(data, peer, socket).await;
});
}
}
#[cfg(not(feature = "compio"))]
pub async fn serve_udp_with_shutdown<F, S>(addr: &str, handler: F, signal: S) -> std::io::Result<()>
where
F: Fn(Vec<u8>, SocketAddr, Arc<tokio::net::UdpSocket>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
S: Future<Output = ()> + Send + 'static,
{
let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?);
tracing::info!("UDP server listening on {}", socket.local_addr()?);
let handler = Arc::new(handler);
let mut buf = vec![0u8; 65535];
tokio::pin!(signal);
loop {
tokio::select! {
result = socket.recv_from(&mut buf) => {
let (len, peer) = result?;
let data = buf[..len].to_vec();
let socket = Arc::clone(&socket);
let handler = Arc::clone(&handler);
tokio::spawn(async move {
handler(data, peer, socket).await;
});
}
() = &mut signal => {
tracing::info!("UDP server shutting down");
break;
}
}
}
Ok(())
}
#[cfg(feature = "compio")]
pub async fn serve_udp<F>(addr: &str, handler: F) -> std::io::Result<()>
where
F: Fn(Vec<u8>, SocketAddr, Arc<compio::net::UdpSocket>) -> Pin<Box<dyn Future<Output = ()>>>
+ Send
+ Sync
+ 'static,
{
let socket = Arc::new(compio::net::UdpSocket::bind(addr).await?);
tracing::info!("UDP server listening on {}", socket.local_addr()?);
let handler = Arc::new(handler);
loop {
let buf = vec![0u8; 65535];
let compio::BufResult(result, mut buf) = socket.recv_from(buf).await;
let (len, peer) = result?;
buf.truncate(len);
let socket = Arc::clone(&socket);
let handler = Arc::clone(&handler);
compio::runtime::spawn(async move {
handler(buf, peer, socket).await;
})
.detach();
}
}
#[cfg(feature = "compio")]
pub async fn serve_udp_with_shutdown<F, S>(addr: &str, handler: F, signal: S) -> std::io::Result<()>
where
F: Fn(Vec<u8>, SocketAddr, Arc<compio::net::UdpSocket>) -> Pin<Box<dyn Future<Output = ()>>>
+ Send
+ Sync
+ 'static,
S: Future<Output = ()> + 'static,
{
let socket = Arc::new(compio::net::UdpSocket::bind(addr).await?);
tracing::info!("UDP server listening on {}", socket.local_addr()?);
let handler = Arc::new(handler);
let signal = std::pin::pin!(signal);
let mut signal = signal;
loop {
let buf = vec![0u8; 65535];
let recv_fut = socket.recv_from(buf);
let recv_fut = std::pin::pin!(recv_fut);
match futures_util::future::select(recv_fut, &mut signal).await {
futures_util::future::Either::Left((compio::BufResult(result, buf_out), _)) => {
let (len, peer) = result?;
let mut buf = buf_out;
buf.truncate(len);
let socket = Arc::clone(&socket);
let handler = Arc::clone(&handler);
compio::runtime::spawn(async move {
handler(buf, peer, socket).await;
})
.detach();
}
futures_util::future::Either::Right(_) => {
tracing::info!("UDP server shutting down");
break;
}
}
}
Ok(())
}