confitdb 0.1.4

ConfitDB is an experimental, distributed, real-time database, giving full control on conflict resolution.
Documentation
use crate::server_error::ServerError;
use crate::server_options::ServerOptions;
use confitul::Cluster;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time;

const HTTP_SERVER_RUN_DELAY_MILLIS: u64 = 10;

pub struct HttpServer {
    cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
    options: ServerOptions,
    shutdown_receiver: Option<tokio::sync::oneshot::Receiver<String>>,
}

async fn handle(
    _: Request<Body>,
    _cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
) -> Result<Response<Body>, Infallible> {
    let response = format!("OK");
    Ok(Response::new(response.into()))
}

impl HttpServer {
    pub fn new(
        cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
        options: Option<ServerOptions>,
    ) -> Self {
        let real_options = options.unwrap_or(ServerOptions::default());
        HttpServer {
            cluster,
            options: real_options,
            shutdown_receiver: None,
        }
    }

    async fn wait_shutdown(&mut self) {
        let sr = self.shutdown_receiver.take().unwrap();
        let msg = sr.await.unwrap();
        println!("shutdown: {}", msg);
    }

    pub async fn run(
        &mut self,
        shutdown_receiver: tokio::sync::oneshot::Receiver<String>,
        ready_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
        done_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
    ) -> Result<(), ServerError> {
        self.shutdown_receiver = Some(shutdown_receiver);

        let addr = match format!("{}:{}", self.options.listen_addr, self.options.listen_port)
            .parse::<SocketAddr>()
        {
            Ok(v) => v,
            Err(e) => {
                return Err(ServerError::new(
                    format!("unable to parse addr: {:?}", e).as_str(),
                ))
            }
        };

        let cluster1 = self.cluster.clone();
        let make_svc = make_service_fn(move |_conn| {
            let cluster2 = cluster1.clone();
            async move {
                Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
                    handle(req, cluster2.clone())
                }))
            }
        });
        let server_builder = match Server::try_bind(&addr) {
            Ok(builder) => builder,
            Err(e) => {
                ready_sender
                    .send(Err(ServerError::new(format!("{}", e).as_str())))
                    .unwrap();
                return Err(ServerError::new(format!("{}", e).as_str()));
            }
        };

        // Yield some timeslice, a few msec will not really impair performance
        // but it breaks any code that expects the thread/worker to immediately start.
        // This helps bugs surfacing earlier.
        thread::sleep(time::Duration::from_millis(HTTP_SERVER_RUN_DELAY_MILLIS));

        ready_sender.send(Ok({})).unwrap();
        let graceful = server_builder
            .serve(make_svc)
            .with_graceful_shutdown(self.wait_shutdown());

        if let Err(e) = graceful.await {
            done_sender
                .send(Err(ServerError::new(format!("{}", e).as_str())))
                .unwrap();
            return Err(ServerError::new(format!("{}", e).as_str()));
        }
        done_sender.send(Ok({})).unwrap();
        Ok({})
    }
}