use crate::server_error::ServerError;
use crate::server_options::ServerOptions;
use confitul::Cluster;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time;
use tokio::net::UdpSocket;
use tokio::{select, spawn};
const UDP_SERVER_RUN_DELAY_MILLIS: u64 = 10;
const UDP_BUF_SIZE: usize = 2048;
const NB_UDP_TASKS: usize = 100;
pub struct UdpServer {
cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
options: ServerOptions,
shutdown_receiver: Option<tokio::sync::oneshot::Receiver<String>>,
}
async fn handle(
_peer: SocketAddr,
_buf: Vec<u8>,
_cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
) -> Vec<u8> {
Vec::<u8>::from("OK")
}
impl UdpServer {
pub fn new(
cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
options: Option<ServerOptions>,
) -> Self {
let real_options = options.unwrap_or(ServerOptions::default());
UdpServer {
cluster,
options: real_options,
shutdown_receiver: None,
}
}
async fn wait_shutdown(&mut self) {
let sr = self.shutdown_receiver.take().unwrap();
let msg = sr.await.unwrap();
println!("shutdown: {}", msg);
}
pub async fn run(
&mut self,
shutdown_receiver: tokio::sync::oneshot::Receiver<String>,
ready_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
done_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
) -> Result<(), ServerError> {
self.shutdown_receiver = Some(shutdown_receiver);
let addr = match format!("{}:{}", self.options.listen_addr, self.options.listen_port)
.parse::<SocketAddr>()
{
Ok(v) => v,
Err(e) => {
return Err(ServerError::new(
format!("unable to parse addr: {:?}", e).as_str(),
))
}
};
let socket = match UdpSocket::bind(&addr).await {
Ok(socket) => socket,
Err(e) => {
ready_sender
.send(Err(ServerError::new(format!("{}", e).as_str())))
.unwrap();
return Err(ServerError::new(format!("{}", e).as_str()));
}
};
thread::sleep(time::Duration::from_millis(UDP_SERVER_RUN_DELAY_MILLIS));
let mut tasks = VecDeque::with_capacity(NB_UDP_TASKS);
ready_sender.send(Ok({})).unwrap();
loop {
let mut buf = vec![0; UDP_BUF_SIZE];
select! {
_=self.wait_shutdown() => {
loop {
match tasks.pop_front() {
Some(task)=>{match task.await {
Ok(_)=>(),
Err(_e)=>{ }
}},
None=> break
}
};
done_sender.send(Ok({})).unwrap();
return Ok({});
},
recv = socket.recv_from(&mut buf) => {
match recv {
Ok((req_size,peer))=>{
while tasks.len()>=NB_UDP_TASKS {
match tasks.pop_front().unwrap().await {
Ok(_)=>(),
Err(_e)=>{ }
};
};
let buf2:Vec<u8>=buf[..req_size].to_vec();
tasks.push_back(spawn(handle(peer,buf2,Arc::clone(&self.cluster))));
},
Err(_)=>{ }
}
}
}
}
}
}