confitdb 0.1.4

ConfitDB is an experimental, distributed, real-time database, giving full control on conflict resolution.
Documentation
use crate::server_error::ServerError;
use crate::server_options::ServerOptions;
use confitul::Cluster;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time;
use tokio::net::UdpSocket;
use tokio::{select, spawn};

// inspired from:
// https://github.com/tokio-rs/tokio/blob/master/examples/echo-udp.rs
// https://idndx.com/writing-highly-efficient-udp-server-in-rust/

const UDP_SERVER_RUN_DELAY_MILLIS: u64 = 10;
const UDP_BUF_SIZE: usize = 2048;
const NB_UDP_TASKS: usize = 100; // [TODO] make this an option

pub struct UdpServer {
    cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
    options: ServerOptions,
    shutdown_receiver: Option<tokio::sync::oneshot::Receiver<String>>,
}

async fn handle(
    _peer: SocketAddr,
    _buf: Vec<u8>,
    _cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
) -> Vec<u8> {
    Vec::<u8>::from("OK")
}

impl UdpServer {
    pub fn new(
        cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
        options: Option<ServerOptions>,
    ) -> Self {
        let real_options = options.unwrap_or(ServerOptions::default());
        UdpServer {
            cluster,
            options: real_options,
            shutdown_receiver: None,
        }
    }

    async fn wait_shutdown(&mut self) {
        let sr = self.shutdown_receiver.take().unwrap();
        let msg = sr.await.unwrap();
        println!("shutdown: {}", msg);
    }

    pub async fn run(
        &mut self,
        shutdown_receiver: tokio::sync::oneshot::Receiver<String>,
        ready_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
        done_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
    ) -> Result<(), ServerError> {
        self.shutdown_receiver = Some(shutdown_receiver);

        let addr = match format!("{}:{}", self.options.listen_addr, self.options.listen_port)
            .parse::<SocketAddr>()
        {
            Ok(v) => v,
            Err(e) => {
                return Err(ServerError::new(
                    format!("unable to parse addr: {:?}", e).as_str(),
                ))
            }
        };

        let socket = match UdpSocket::bind(&addr).await {
            Ok(socket) => socket,
            Err(e) => {
                ready_sender
                    .send(Err(ServerError::new(format!("{}", e).as_str())))
                    .unwrap();
                return Err(ServerError::new(format!("{}", e).as_str()));
            }
        };

        // Yield some timeslice, a few msec will not really impair performance
        // but it breaks any code that expects the thread/worker to immediately start.
        // This helps bugs surfacing earlier.
        thread::sleep(time::Duration::from_millis(UDP_SERVER_RUN_DELAY_MILLIS));

        let mut tasks = VecDeque::with_capacity(NB_UDP_TASKS);

        ready_sender.send(Ok({})).unwrap();
        loop {
            let mut buf = vec![0; UDP_BUF_SIZE];
            select! {
                    _=self.wait_shutdown() => {
                    loop {
                        match tasks.pop_front() {
                Some(task)=>{match task.await {
                Ok(_)=>(),
                Err(_e)=>{//todo log task failed error
                }
                }},
                None=> break
            }
            };
                        done_sender.send(Ok({})).unwrap();
                        return Ok({});
                    },
                    recv = socket.recv_from(&mut buf) => {
                        match recv {
                Ok((req_size,peer))=>{
                    while tasks.len()>=NB_UDP_TASKS {
                        match tasks.pop_front().unwrap().await {
                Ok(_)=>(),
                Err(_e)=>{//todo log task failed error
                }
            };
                    };
            let buf2:Vec<u8>=buf[..req_size].to_vec();
                    tasks.push_back(spawn(handle(peer,buf2,Arc::clone(&self.cluster))));
                },
                Err(_)=>{// [TODO] handle recv error},
                }
                }
            }
            }
        }
    }
}