confitdb 0.1.4

ConfitDB is an experimental, distributed, real-time database, giving full control on conflict resolution.
Documentation
use crate::cloud_options::CloudOptions;
use crate::http_server::HttpServer;
use crate::server_control::ServerControl;
use crate::server_error::ServerError;
use crate::server_options::ServerOptions;
use crate::server_status::ServerStatus;
use crate::udp_server::UdpServer;
use confitul::{Cluster, Conflict, SizedQueueDrain};
use std::sync::{Arc, RwLock};
use std::thread;
use tokio::runtime::Builder;
use url::ParseError;

const NB_SERVERS: usize = 2;
const UDP_IDX: usize = 0;
const HTTP_IDX: usize = 1;

#[derive(Debug)]
pub struct Cloud {
    cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
    controls: [ServerControl; NB_SERVERS],
}

impl Cloud {
    pub fn new(cloud_options: Option<CloudOptions>) -> Result<Cloud, ParseError> {
        let real_options = cloud_options.unwrap_or(CloudOptions::default());
        let cluster = Cluster::new(Some(real_options.cluster))?;
        let cloud = Cloud {
            cluster: Arc::new(RwLock::new(cluster)),
            controls: [ServerControl::new(), ServerControl::new()],
        };
        Ok(cloud)
    }

    pub async fn run_http(
        &mut self,
        server_options: Option<ServerOptions>,
    ) -> Result<(), ServerError> {
        let (shutdown_receiver, ready_sender, done_sender) = self.controls[HTTP_IDX].get_ready()?;

        let mut http_server = HttpServer::new(self.cluster.clone(), server_options);
        http_server
            .run(shutdown_receiver, ready_sender, done_sender)
            .await
    }

    pub fn start_http(&mut self, server_options: Option<ServerOptions>) -> Result<(), ServerError> {
        let (shutdown_receiver, ready_sender, done_sender) = self.controls[HTTP_IDX].get_ready()?;
        let mut http_server = HttpServer::new(self.cluster.clone(), server_options);
        thread::spawn(move || {
            Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(http_server.run(shutdown_receiver, ready_sender, done_sender))
        });
        Ok({})
    }

    pub fn stop_http(&mut self, msg: &str) -> Result<(), ServerError> {
        self.controls[HTTP_IDX].stop(msg)
    }

    pub fn status_http(&mut self) -> Result<ServerStatus, ServerError> {
        self.controls[HTTP_IDX].update_status();
        match &self.controls[HTTP_IDX].last_error {
            Ok(()) => Ok(self.controls[HTTP_IDX].current_status),
            Err(e) => Err(e.clone()),
        }
    }

    pub async fn run_udp(
        &mut self,
        server_options: Option<ServerOptions>,
    ) -> Result<(), ServerError> {
        let (shutdown_receiver, ready_sender, done_sender) = self.controls[UDP_IDX].get_ready()?;

        let mut udp_server = UdpServer::new(self.cluster.clone(), server_options);
        udp_server
            .run(shutdown_receiver, ready_sender, done_sender)
            .await
    }

    pub fn start_udp(&mut self, server_options: Option<ServerOptions>) -> Result<(), ServerError> {
        let (shutdown_receiver, ready_sender, done_sender) = self.controls[UDP_IDX].get_ready()?;
        let mut udp_server = UdpServer::new(self.cluster.clone(), server_options);
        thread::spawn(move || {
            Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(udp_server.run(shutdown_receiver, ready_sender, done_sender))
        });
        Ok({})
    }

    pub fn stop_udp(&mut self, msg: &str) -> Result<(), ServerError> {
        self.controls[UDP_IDX].stop(msg)
    }

    pub fn status_udp(&mut self) -> Result<ServerStatus, ServerError> {
        self.controls[UDP_IDX].update_status();
        match &self.controls[UDP_IDX].last_error {
            Ok(()) => Ok(self.controls[UDP_IDX].current_status),
            Err(e) => Err(e.clone()),
        }
    }

    pub fn store_items_len(&self) -> usize {
        self.cluster.read().unwrap().store().items_len()
    }

    pub fn store_items_capacity(&self) -> usize {
        self.cluster.read().unwrap().store().items_capacity()
    }

    pub fn store_resize_items(&self, size: usize) -> usize {
        self.cluster.read().unwrap().store().resize_items(size)
    }

    pub fn store_conflicts_len(&self) -> usize {
        self.cluster.read().unwrap().store().conflicts_len()
    }

    pub fn store_conflicts_capacity(&self) -> usize {
        self.cluster.read().unwrap().store().conflicts_capacity()
    }

    pub fn store_resize_conflicts(&self, size: usize) -> usize {
        self.cluster.read().unwrap().store().resize_conflicts(size)
    }

    pub fn store_suggestions_len(&self) -> usize {
        self.cluster.read().unwrap().store().suggestions_len()
    }

    pub fn store_suggestions_capacity(&self) -> usize {
        self.cluster.read().unwrap().store().suggestions_capacity()
    }

    pub fn store_resize_suggestions(&self, size: usize) -> usize {
        self.cluster
            .read()
            .unwrap()
            .store()
            .resize_suggestions(size)
    }

    pub fn store_freeze(&self) -> SizedQueueDrain<Conflict<u64, Vec<u8>>> {
        self.cluster.read().unwrap().store().freeze()
    }

    pub fn store_unfreeze(&self) -> usize {
        self.cluster.read().unwrap().store().unfreeze()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serial_test::serial;
    use std::{thread, time};

    #[test]
    #[serial]
    fn test_start_stop_http() {
        let http_server_options = ServerOptions::new().with_listen_addr("127.0.0.1");
        let mut cloud: Cloud = Cloud::new(None).unwrap();
        let status1 = cloud.status_http();
        println!("status1: {:?}", status1);
        assert!(Ok(ServerStatus::Stopped) == status1);

        cloud.start_http(Some(http_server_options)).ok();
        thread::sleep(time::Duration::from_millis(250));
        let status2 = cloud.status_http();
        println!("status2: {:?}", status2);
        assert!(Ok(ServerStatus::Running) == status2);

        cloud.stop_http("stop this").ok();
        thread::sleep(time::Duration::from_millis(250));
        let status3 = cloud.status_http();
        println!("status3: {:?}", status3);
        assert!(Ok(ServerStatus::Stopped) == status3);
    }

    #[test]
    #[serial]
    fn test_start_stop_udp() {
        let udp_server_options = ServerOptions::new().with_listen_addr("127.0.0.1");
        let mut cloud: Cloud = Cloud::new(None).unwrap();
        let status1 = cloud.status_udp();
        println!("status1: {:?}", status1);
        assert!(Ok(ServerStatus::Stopped) == status1);

        cloud.start_udp(Some(udp_server_options)).ok();
        thread::sleep(time::Duration::from_millis(250));
        let status2 = cloud.status_udp();
        println!("status2: {:?}", status2);
        assert!(Ok(ServerStatus::Running) == status2);

        cloud.stop_udp("stop this").ok();
        thread::sleep(time::Duration::from_millis(250));
        let status3 = cloud.status_udp();
        println!("status3: {:?}", status3);
        assert!(Ok(ServerStatus::Stopped) == status3);
    }
}