use crate::server_error::ServerError;
use crate::server_options::ServerOptions;
use confitul::Cluster;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time;
const HTTP_SERVER_RUN_DELAY_MILLIS: u64 = 10;
pub struct HttpServer {
cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
options: ServerOptions,
shutdown_receiver: Option<tokio::sync::oneshot::Receiver<String>>,
}
async fn handle(
_: Request<Body>,
_cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
) -> Result<Response<Body>, Infallible> {
let response = format!("OK");
Ok(Response::new(response.into()))
}
impl HttpServer {
pub fn new(
cluster: Arc<RwLock<Cluster<u64, Vec<u8>>>>,
options: Option<ServerOptions>,
) -> Self {
let real_options = options.unwrap_or(ServerOptions::default());
HttpServer {
cluster,
options: real_options,
shutdown_receiver: None,
}
}
async fn wait_shutdown(&mut self) {
let sr = self.shutdown_receiver.take().unwrap();
let msg = sr.await.unwrap();
println!("shutdown: {}", msg);
}
pub async fn run(
&mut self,
shutdown_receiver: tokio::sync::oneshot::Receiver<String>,
ready_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
done_sender: tokio::sync::oneshot::Sender<Result<(), ServerError>>,
) -> Result<(), ServerError> {
self.shutdown_receiver = Some(shutdown_receiver);
let addr = match format!("{}:{}", self.options.listen_addr, self.options.listen_port)
.parse::<SocketAddr>()
{
Ok(v) => v,
Err(e) => {
return Err(ServerError::new(
format!("unable to parse addr: {:?}", e).as_str(),
))
}
};
let cluster1 = self.cluster.clone();
let make_svc = make_service_fn(move |_conn| {
let cluster2 = cluster1.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
handle(req, cluster2.clone())
}))
}
});
let server_builder = match Server::try_bind(&addr) {
Ok(builder) => builder,
Err(e) => {
ready_sender
.send(Err(ServerError::new(format!("{}", e).as_str())))
.unwrap();
return Err(ServerError::new(format!("{}", e).as_str()));
}
};
thread::sleep(time::Duration::from_millis(HTTP_SERVER_RUN_DELAY_MILLIS));
ready_sender.send(Ok({})).unwrap();
let graceful = server_builder
.serve(make_svc)
.with_graceful_shutdown(self.wait_shutdown());
if let Err(e) = graceful.await {
done_sender
.send(Err(ServerError::new(format!("{}", e).as_str())))
.unwrap();
return Err(ServerError::new(format!("{}", e).as_str()));
}
done_sender.send(Ok({})).unwrap();
Ok({})
}
}