use crate::*;
impl Default for ServerData {
#[inline(always)]
fn default() -> Self {
Self {
server_config: ServerConfigData::default(),
hook: vec![],
task_panic: vec![],
read_error: vec![],
}
}
}
impl Default for Server {
#[inline(always)]
fn default() -> Self {
Self(arc_rwlock(ServerData::default()))
}
}
impl PartialEq for ServerData {
fn eq(&self, other: &Self) -> bool {
self.server_config == other.server_config
&& self.hook.len() == other.hook.len()
&& self.task_panic.len() == other.task_panic.len()
&& self.read_error.len() == other.read_error.len()
}
}
impl Eq for ServerData {}
impl ServerData {
pub(crate) fn get_server_config(&self) -> &ServerConfigData {
&self.server_config
}
pub(crate) fn get_mut_server_config(&mut self) -> &mut ServerConfigData {
&mut self.server_config
}
pub(crate) fn get_hook(&self) -> &ServerHookList {
&self.hook
}
pub(crate) fn get_mut_hook(&mut self) -> &mut ServerHookList {
&mut self.hook
}
pub(crate) fn get_mut_task_panic(&mut self) -> &mut ServerHookList {
&mut self.task_panic
}
}
impl PartialEq for Server {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
if Arc::ptr_eq(&self.0, &other.0) {
return true;
}
if let (Ok(s), Ok(o)) = (self.0.try_read(), other.0.try_read()) {
*s == *o
} else {
false
}
}
}
impl Eq for Server {}
impl Server {
pub async fn new() -> Self {
Self::default()
}
async fn read(&self) -> RwLockReadGuard<'_, ServerData> {
self.0.read().await
}
async fn write(&self) -> RwLockWriteGuard<'_, ServerData> {
self.0.write().await
}
pub async fn server_config(&self, config: ServerConfig) -> &Self {
*self.write().await.get_mut_server_config() = config.get_data().await;
self
}
pub async fn hook<H>(&self) -> &Self
where
H: ServerHook,
{
self.write()
.await
.get_mut_hook()
.push(server_hook_factory::<H>());
self
}
pub async fn task_panic<H>(&self) -> &Self
where
H: ServerHook,
{
self.write()
.await
.get_mut_task_panic()
.push(server_hook_factory::<H>());
self
}
async fn create_udp_socket(&self) -> Result<UdpSocket, ServerError> {
let config: ServerConfigData = self.read().await.get_server_config().clone();
let host: &String = config.get_host();
let port: u16 = config.get_port();
let addr: String = format!("{host}:{port}");
UdpSocket::bind(&addr)
.await
.map_err(|e| ServerError::UdpBind(e.to_string()))
}
async fn spawn_request_handler(
&self,
state: HandlerState,
data: Request,
client_addr: SocketAddr,
) {
let server: Server = self.clone();
tokio::spawn(async move {
server.handle_request(state, data, client_addr).await;
});
}
async fn handle_request(&self, state: HandlerState, data: Request, client_addr: SocketAddr) {
let ctx: Context = Context::new(&state.get_socket(), &data, client_addr);
for hook in self.read().await.get_hook().iter() {
hook(&ctx).await;
if ctx.get_aborted().await {
return;
}
}
}
pub async fn run(&self) -> Result<ServerControlHook, ServerError> {
let socket: UdpSocket = self.create_udp_socket().await?;
let socket: ArcRwLockUdpSocket = ArcRwLockUdpSocket::from_socket(socket);
let server: Server = self.clone();
let (wait_sender, wait_receiver) = channel(());
let (shutdown_sender, mut shutdown_receiver) = channel(());
let server_config: ServerConfigData = self.read().await.get_server_config().clone();
let buffer_size: usize = server_config.get_buffer_size();
let accept_requests: JoinHandle<()> = tokio::spawn(async move {
let mut buf: Vec<u8> = vec![0u8; buffer_size];
loop {
match socket.get_read_lock().await.recv_from(&mut buf).await {
Ok((data_len, client_addr)) => {
let data: Request = buf[..data_len].to_vec();
let state: HandlerState = HandlerState::new(socket.clone());
server.spawn_request_handler(state, data, client_addr).await;
}
Err(e) => {
eprintln!("UDP receive error: {e}");
}
}
}
});
let wait_hook: SharedAsyncTaskFactory<()> = Arc::new(move || {
let mut wait_receiver_clone: Receiver<()> = wait_receiver.clone();
Box::pin(async move {
let _ = wait_receiver_clone.changed().await;
})
});
let shutdown_sender_arc: Arc<Sender<()>> = Arc::new(shutdown_sender);
let shutdown_hook: SharedAsyncTaskFactory<()> = Arc::new(move || {
let shutdown_sender_clone: Arc<Sender<()>> = shutdown_sender_arc.clone();
Box::pin(async move {
let _ = shutdown_sender_clone.send(());
})
});
tokio::spawn(async move {
let _ = shutdown_receiver.changed().await;
accept_requests.abort();
let _ = wait_sender.send(());
});
let mut server_control_hook: ServerControlHook = ServerControlHook::default();
server_control_hook.set_shutdown_hook(shutdown_hook);
server_control_hook.set_wait_hook(wait_hook);
Ok(server_control_hook)
}
}