vertx-rust 0.8.6

Simple Rust version of vertx tcp eventbus, tcp server and http server
Documentation
use std::convert::Infallible;
use std::net::SocketAddr;
use std::panic::RefUnwindSafe;
use std::sync::Arc;

use chrono::{DateTime, Local};
use hashbrown::hash_map::{Iter};
use hashbrown::HashMap;
use hyper::{Body, HeaderMap, Method, Response, StatusCode, Uri, Version};
use hyper::body::HttpBody;
use hyper::header::HeaderValue;
use hyper::http::{Error, Extensions};
use hyper::server::conn::AddrStream;
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use log::info;
use regex::{Captures, Regex};
use tokio::runtime::{Builder, Runtime};

use crate::vertx::{cm::ClusterManager, EventBus};

#[cfg(feature = "client")]
pub mod client;


pub struct Request {
    pub request: hyper::Request<Body>,
    pub(crate) paths: HashMap<String, String>,
    pub request_timestamp: DateTime<Local>,
    pub body: Vec<u8>,
}

impl Request {

    #[inline]
    pub fn path_value (&self, key: &str) -> Option<&String> {
        self.paths.get(key)
    }

    #[inline]
    pub fn path_iter (&self) -> Iter<String, String> {
        self.paths.iter()
    }

    #[inline]
    pub fn into_body (self) -> Body {
        self.request.into_body()
    }

    #[inline]
    pub fn method(&self) -> &Method {
        &self.request.method()
    }

    #[inline]
    pub fn uri(&self) -> &Uri {
        &self.request.uri()
    }

    #[inline]
    pub fn version(&self) -> Version {
        self.request.version()
    }

    #[inline]
    pub fn headers(&self) -> &HeaderMap<HeaderValue> {
        &self.request.headers()
    }

    #[inline]
    pub fn extensions(&self) -> &Extensions {
        &self.request.extensions()
    }

    #[inline]
    pub fn body(&self) -> &Body {
        &self.request.body()
    }

}

pub struct HttpServer<CM: 'static + ClusterManager + Send + Sync +  RefUnwindSafe> {
    pub port: u16,
    event_bus: Option<Arc<EventBus<CM>>>,
    callers: Arc<
        HashMap<
            (String, Method),
            Arc<
                dyn FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
                    + 'static
                    + Send
                    + Sync,
            >,
        >,
    >,
    regexes: Arc<HashMap<String, (Regex, String)>>,
    rt: Runtime,
    main_reg: Regex,
}

impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> HttpServer<CM> {
    pub(crate) fn new(event_bus: Option<Arc<EventBus<CM>>>) -> HttpServer<CM> {
        HttpServer {
            port: 0,
            event_bus,
            callers: Arc::new(HashMap::new()),
            regexes: Arc::new(HashMap::new()),
            main_reg: Regex::new("(:\\w+)").unwrap(),
            rt: Builder::new_multi_thread()
                .worker_threads(12)
                .enable_all()
                .build()
                .unwrap(),
        }
    }

    pub fn get<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::GET, op)
    }

    pub fn post<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::POST, op)
    }

    pub fn put<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::PUT, op)
    }

    pub fn delete<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::DELETE, op)
    }

    pub fn head<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::HEAD, op)
    }

    pub fn patch<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::PATCH, op)
    }

    pub fn options<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::OPTIONS, op)
    }

    pub fn connect<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::CONNECT, op)
    }

    pub fn trace<OP>(&mut self, path: &str, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        self.add_op(path, Method::TRACE, op)
    }

    fn add_op<OP>(&mut self, path: &str, method: Method, op: OP) -> &mut Self
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync,
    {
        let callers = Arc::get_mut(&mut self.callers).unwrap();
        let reg_path = self.main_reg.replace_all(path, "(\\w+)");
        callers.insert((reg_path.to_string(), method), Arc::new(op));

        let regexes = Arc::get_mut(&mut self.regexes).unwrap();
        let path = path.replace(":", "");
        regexes.insert(reg_path.to_string(), (Regex::new(&reg_path).unwrap(), path));

        self
    }

    pub fn listen_with_default<OP>(&mut self, port: u16, mut default: OP)
    where
        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
            + 'static
            + Send
            + Sync
            + Copy,
    {
        let ev = self.event_bus.as_ref().unwrap().clone();

        let callers = self.callers.clone();
        let addr = SocketAddr::from(([0, 0, 0, 0], port));
        let new_service = make_service_fn(move |_conn: &AddrStream| {
            let ev = ev.clone();
            let callers = callers.clone();
            async move {
                let x = Ok::<_, Infallible>(service_fn(move |mut req: hyper::Request<Body>| {
                    let ev = ev.clone();
                    let callers = callers.clone();
                    async move {
                        let mut data = vec![];
                        let body = req.body_mut();
                        loop {
                            let bytes = body.data().await;
                            match bytes {
                                Some(bytes) => {
                                    match bytes {
                                        Ok(bytes) => {
                                            data.extend_from_slice(&bytes);
                                        }
                                        Err(_) => {
                                            break;
                                        }
                                    }
                                }
                                None => {break;}
                            }
                        }
                        let ev = ev.to_owned();
                        let op = callers.get(&(req.uri().path().to_owned(), req.method().clone()));
                        let request = Request {
                            request: req,
                            paths: Default::default(),
                            request_timestamp: Local::now(),
                            body: data
                        };
                        match op {
                            Some(op) => {
                                let mut op = op.clone();
                                unsafe {
                                    let op = Arc::get_mut_unchecked(&mut op);
                                    op(request, ev)
                                }
                            }
                            None => default(request, ev),
                        }
                    }
                }));

                x
            }
        });

        self.rt.spawn(async move {
            let server = Server::bind(&addr).serve(new_service);
            info!("start http_server on http://{}", addr);
            let _ = server.await;
        });
    }

    pub fn listen(&mut self, port: u16) {
        let ev = self.event_bus.as_ref().unwrap().clone();

        let callers = self.callers.clone();
        let regexes = self.regexes.clone();
        let addr = SocketAddr::from(([0, 0, 0, 0], port));
        let new_service = make_service_fn(move |_conn: &AddrStream| {
            let ev = ev.clone();
            let callers = callers.clone();
            let regexes = regexes.clone();
            async move {
                Ok::<_, Infallible>(service_fn(move |mut req: hyper::Request<Body>| {
                    let ev = ev.clone();
                    let callers = callers.clone();
                    let regexes = regexes.clone();
                    async move {
                        let mut data = vec![];
                        let body = req.body_mut();
                        loop {
                            let bytes = body.data().await;
                            match bytes {
                                Some(bytes) => {
                                    match bytes {
                                        Ok(bytes) => {
                                            data.extend_from_slice(&bytes);
                                        }
                                        Err(_) => {
                                            break;
                                        }
                                    }
                                }
                                None => {break;}
                            }
                        }
                        let ev = ev.to_owned();
                        let path = req.uri().path().to_owned();
                        let mut path_key = &path;
                        let mut paths = HashMap::new();
                        for (k, v) in regexes.iter() {
                            if v.0.is_match(&path) {
                                path_key = k;
                                let caps_base = v.0.captures_iter(&v.1);
                                let caps : Vec<Captures> = v.0.captures_iter(&path).collect();
                                for (i, c) in caps_base.enumerate() {
                                    for idx in 1..c.len() {
                                        if let Some(name) = c.get(idx) {
                                            if let Some(value) = caps[i].get(idx) {
                                                let _ = paths.insert(name.as_str().to_string(),
                                                                     value.as_str().to_string()
                                                );
                                            }
                                        }
                                    }

                                }
                                break;
                            }
                        }
                        let op = callers.get(&(path_key.to_owned(), req.method().clone()));
                        let request = Request {
                            request: req,
                            paths,
                            request_timestamp: Local::now(),
                            body: data
                        };
                        <HttpServer<CM>>::invoke_function(request, ev, op)
                    }
                }))
            }
        });

        self.rt.spawn(async move {
            let server = Server::bind(&addr).serve(new_service);
            info!("start http_server on http://{}", addr);
            let _ = server.await;
        });
    }

    #[inline]
    fn invoke_function(
        req: Request,
        ev: Arc<EventBus<CM>>,
        op: Option<
            &Arc<
                dyn FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
                    + Send
                    + Sync,
            >,
        >,
    ) -> Result<Response<Body>, Error> {
        match op {
            Some(op) => {
                let mut op = op.clone();
                unsafe {
                    let op = Arc::get_mut_unchecked(&mut op);
                    op(req, ev)
                }
            }
            None => Ok(Response::builder()
                .status(StatusCode::NOT_FOUND)
                .body(b"NOTFOUND".to_vec().into())
                .unwrap()),
        }
    }
}