use crate::cloud_options::CloudOptions;
use crate::http_server::HttpServer;
use crate::server_control::ServerControl;
use crate::server_error::ServerError;
use crate::server_options::ServerOptions;
use crate::server_status::ServerStatus;
use crate::udp_server::UdpServer;
use confitul::{Cluster, Conflict, SizedQueueDrain};
use std::sync::{Arc, RwLock};
use std::thread;
use tokio::runtime::Builder;
use url::ParseError;
const NB_SERVERS: usize = 2;
const UDP_IDX: usize = 0;
const HTTP_IDX: usize = 1;
#[derive(Debug)]
pub struct Cloud {
cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
controls: [ServerControl; NB_SERVERS],
}
impl Cloud {
pub fn new(cloud_options: Option<CloudOptions>) -> Result<Cloud, ParseError> {
let real_options = cloud_options.unwrap_or(CloudOptions::default());
let cluster = Cluster::new(Some(real_options.cluster))?;
let cloud = Cloud {
cluster: Arc::new(RwLock::new(cluster)),
controls: [ServerControl::new(), ServerControl::new()],
};
Ok(cloud)
}
pub async fn run_http(
&mut self,
server_options: Option<ServerOptions>,
) -> Result<(), ServerError> {
let (shutdown_receiver, ready_sender, done_sender) = self.controls[HTTP_IDX].get_ready()?;
let mut http_server = HttpServer::new(self.cluster.clone(), server_options);
http_server
.run(shutdown_receiver, ready_sender, done_sender)
.await
}
pub fn start_http(&mut self, server_options: Option<ServerOptions>) -> Result<(), ServerError> {
let (shutdown_receiver, ready_sender, done_sender) = self.controls[HTTP_IDX].get_ready()?;
let mut http_server = HttpServer::new(self.cluster.clone(), server_options);
thread::spawn(move || {
Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(http_server.run(shutdown_receiver, ready_sender, done_sender))
});
Ok({})
}
pub fn stop_http(&mut self, msg: &str) -> Result<(), ServerError> {
self.controls[HTTP_IDX].stop(msg)
}
pub fn status_http(&mut self) -> Result<ServerStatus, ServerError> {
self.controls[HTTP_IDX].update_status();
match &self.controls[HTTP_IDX].last_error {
Ok(()) => Ok(self.controls[HTTP_IDX].current_status),
Err(e) => Err(e.clone()),
}
}
pub async fn run_udp(
&mut self,
server_options: Option<ServerOptions>,
) -> Result<(), ServerError> {
let (shutdown_receiver, ready_sender, done_sender) = self.controls[UDP_IDX].get_ready()?;
let mut udp_server = UdpServer::new(self.cluster.clone(), server_options);
udp_server
.run(shutdown_receiver, ready_sender, done_sender)
.await
}
pub fn start_udp(&mut self, server_options: Option<ServerOptions>) -> Result<(), ServerError> {
let (shutdown_receiver, ready_sender, done_sender) = self.controls[UDP_IDX].get_ready()?;
let mut udp_server = UdpServer::new(self.cluster.clone(), server_options);
thread::spawn(move || {
Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(udp_server.run(shutdown_receiver, ready_sender, done_sender))
});
Ok({})
}
pub fn stop_udp(&mut self, msg: &str) -> Result<(), ServerError> {
self.controls[UDP_IDX].stop(msg)
}
pub fn status_udp(&mut self) -> Result<ServerStatus, ServerError> {
self.controls[UDP_IDX].update_status();
match &self.controls[UDP_IDX].last_error {
Ok(()) => Ok(self.controls[UDP_IDX].current_status),
Err(e) => Err(e.clone()),
}
}
pub fn store_items_len(&self) -> usize {
self.cluster.read().unwrap().store().items_len()
}
pub fn store_items_capacity(&self) -> usize {
self.cluster.read().unwrap().store().items_capacity()
}
pub fn store_resize_items(&self, size: usize) -> usize {
self.cluster.read().unwrap().store().resize_items(size)
}
pub fn store_conflicts_len(&self) -> usize {
self.cluster.read().unwrap().store().conflicts_len()
}
pub fn store_conflicts_capacity(&self) -> usize {
self.cluster.read().unwrap().store().conflicts_capacity()
}
pub fn store_resize_conflicts(&self, size: usize) -> usize {
self.cluster.read().unwrap().store().resize_conflicts(size)
}
pub fn store_suggestions_len(&self) -> usize {
self.cluster.read().unwrap().store().suggestions_len()
}
pub fn store_suggestions_capacity(&self) -> usize {
self.cluster.read().unwrap().store().suggestions_capacity()
}
pub fn store_resize_suggestions(&self, size: usize) -> usize {
self.cluster
.read()
.unwrap()
.store()
.resize_suggestions(size)
}
pub fn store_freeze(&self) -> SizedQueueDrain<Conflict<u64, Vec<u8>>> {
self.cluster.read().unwrap().store().freeze()
}
pub fn store_unfreeze(&self) -> usize {
self.cluster.read().unwrap().store().unfreeze()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::{thread, time};
#[test]
#[serial]
fn test_start_stop_http() {
let http_server_options = ServerOptions::new().with_listen_addr("127.0.0.1");
let mut cloud: Cloud = Cloud::new(None).unwrap();
let status1 = cloud.status_http();
println!("status1: {:?}", status1);
assert!(Ok(ServerStatus::Stopped) == status1);
cloud.start_http(Some(http_server_options)).ok();
thread::sleep(time::Duration::from_millis(250));
let status2 = cloud.status_http();
println!("status2: {:?}", status2);
assert!(Ok(ServerStatus::Running) == status2);
cloud.stop_http("stop this").ok();
thread::sleep(time::Duration::from_millis(250));
let status3 = cloud.status_http();
println!("status3: {:?}", status3);
assert!(Ok(ServerStatus::Stopped) == status3);
}
#[test]
#[serial]
fn test_start_stop_udp() {
let udp_server_options = ServerOptions::new().with_listen_addr("127.0.0.1");
let mut cloud: Cloud = Cloud::new(None).unwrap();
let status1 = cloud.status_udp();
println!("status1: {:?}", status1);
assert!(Ok(ServerStatus::Stopped) == status1);
cloud.start_udp(Some(udp_server_options)).ok();
thread::sleep(time::Duration::from_millis(250));
let status2 = cloud.status_udp();
println!("status2: {:?}", status2);
assert!(Ok(ServerStatus::Running) == status2);
cloud.stop_udp("stop this").ok();
thread::sleep(time::Duration::from_millis(250));
let status3 = cloud.status_udp();
println!("status3: {:?}", status3);
assert!(Ok(ServerStatus::Stopped) == status3);
}
}