use std::net::{ToSocketAddrs, UdpSocket};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
pub struct UdpProxy {
backends: Vec<String>,
counter: Arc<AtomicUsize>,
reply_timeout: Duration,
buffer_size: usize,
}
impl UdpProxy {
pub fn new<I, S>(backends: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
UdpProxy {
backends: backends.into_iter().map(|b| b.into()).collect(),
counter: Arc::new(AtomicUsize::new(0)),
reply_timeout: Duration::from_secs(5),
buffer_size: 65536,
}
}
pub fn reply_timeout_ms(mut self, ms: u64) -> Self {
self.reply_timeout = Duration::from_millis(ms);
self
}
pub fn buffer_size(mut self, bytes: usize) -> Self {
self.buffer_size = bytes;
self
}
pub fn bind(self, addr: &str) -> Result<(), String> {
if self.backends.is_empty() {
return Err("UdpProxy: no backends configured".to_string());
}
let socket = UdpSocket::bind(addr)
.map_err(|e| format!("UdpProxy: bind on {} failed: {}", addr, e))?;
println!("UdpProxy: listening on {}", addr);
let proxy = Arc::new(self);
loop {
let mut buf = vec![0u8; proxy.buffer_size];
let (n, client_addr) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
eprintln!("UdpProxy: recv_from error: {}", e);
continue;
}
};
let packet = buf[..n].to_vec();
let backend_addr = proxy.pick_backend().to_string();
let reply_socket = match socket.try_clone() {
Ok(s) => s,
Err(e) => {
eprintln!("UdpProxy: socket clone error: {}", e);
continue;
}
};
let timeout = proxy.reply_timeout;
let buf_size = proxy.buffer_size;
std::thread::spawn(move || {
let backend_sock_addr = match backend_addr.to_socket_addrs() {
Ok(mut a) => match a.next() {
Some(addr) => addr,
None => {
eprintln!("UdpProxy: no address for {}", backend_addr);
return;
}
},
Err(e) => {
eprintln!("UdpProxy: DNS lookup for {} failed: {}", backend_addr, e);
return;
}
};
let backend = match UdpSocket::bind("0.0.0.0:0") {
Ok(s) => s,
Err(e) => {
eprintln!("UdpProxy: ephemeral socket error: {}", e);
return;
}
};
let _ = backend.set_read_timeout(Some(timeout));
if let Err(e) = backend.send_to(&packet, backend_sock_addr) {
eprintln!("UdpProxy: send to {} failed: {}", backend_addr, e);
return;
}
let mut reply = vec![0u8; buf_size];
match backend.recv_from(&mut reply) {
Ok((m, _)) => {
let _ = reply_socket.send_to(&reply[..m], client_addr);
}
Err(e) if e.kind() != std::io::ErrorKind::WouldBlock
&& e.kind() != std::io::ErrorKind::TimedOut => {
eprintln!("UdpProxy: backend reply error from {}: {}", backend_addr, e);
}
_ => {} }
});
}
}
fn pick_backend(&self) -> &str {
let i = self.counter.fetch_add(1, Ordering::Relaxed) % self.backends.len();
&self.backends[i]
}
}