ice_core 0.3.9

High performance Web engine
Documentation
use std;
use std::sync::{Arc, RwLock, Mutex};
use std::rc::Rc;
use std::net::SocketAddr;
use std::os::raw::c_void;
use hyper;
use hyper::server::{Http, Request, Response, Service};
use futures;
use futures::future::Future;
use futures::Stream;
use delegates;
use router;
use tokio_core;
use net2;
use num_cpus;
#[cfg(feature = "use_cervus")]
use cervus;
use static_file;
use logging;
use session_storage::SessionStorage;
use config;
use template::TemplateStorage;
use stat;
use glue;

#[cfg(unix)]
use net2::unix::UnixTcpBuilderExt;

#[derive(Clone)]
pub struct IceServer {
    pub prep: Arc<Preparation>
}

#[cfg(feature = "use_cervus")]
type Modules = cervus::manager::Modules;


#[cfg(not(feature = "use_cervus"))]
type Modules = bool;

pub struct Preparation {
    pub router: Arc<RwLock<router::Router>>,
    pub static_dir: RwLock<Option<String>>,
    pub session_storage: Arc<SessionStorage>,
    pub session_cookie_name: Mutex<String>,
    pub session_timeout_ms: RwLock<u64>,
    pub templates: Arc<TemplateStorage>,
    pub max_request_body_size: Mutex<u32>,
    pub log_requests: Mutex<bool>,
    pub endpoint_timeout_ms: Mutex<u64>,
    pub async_endpoint_cb: Mutex<Option<extern fn (i32, *mut delegates::CallInfo)>>,
    pub custom_app_data: delegates::CustomAppData,
    pub modules: Arc<Modules>
}

pub struct Context {
    pub ev_loop_remote: tokio_core::reactor::Remote,
    pub router: Arc<RwLock<router::Router>>,
    pub static_dir: Option<String>,
    pub session_cookie_name: String,
    pub session_storage: Arc<SessionStorage>,
    pub templates: Arc<TemplateStorage>,
    pub max_request_body_size: u32,
    pub log_requests: bool,
    pub stats: stat::ServerStats,
    pub max_cache_size: u32,
    pub endpoint_timeout_ms: u64,
    pub custom_app_data: delegates::CustomAppData,
    pub modules: Arc<Modules>
}

pub struct LocalContext {
    pub ev_loop_handle: tokio_core::reactor::Handle,
    pub static_file_worker_control_tx: std::sync::mpsc::Sender<static_file::WorkerControlMessage>,
    pub async_endpoint_cb: extern fn (i32, *mut delegates::CallInfo)
}

struct HttpService {
    context: Arc<Context>,
    local_context: Rc<LocalContext>
}

impl IceServer {
    pub fn new() -> IceServer {
        let modules = new_modules();

        IceServer {
            prep: Arc::new(Preparation {
                router: Arc::new(RwLock::new(router::Router::new())),
                static_dir: RwLock::new(None),
                session_storage: Arc::new(SessionStorage::new()),
                session_cookie_name: Mutex::new(config::DEFAULT_SESSION_COOKIE_NAME.to_string()),
                session_timeout_ms: RwLock::new(600000),
                templates: Arc::new(TemplateStorage::new()),
                max_request_body_size: Mutex::new(config::DEFAULT_MAX_REQUEST_BODY_SIZE),
                log_requests: Mutex::new(true),
                async_endpoint_cb: Mutex::new(None),
                endpoint_timeout_ms: Mutex::new(config::DEFAULT_ENDPOINT_TIMEOUT_MS),
                custom_app_data: delegates::CustomAppData::empty(),
                modules: Arc::new(modules)
            })
        }
    }

    pub fn listen_in_this_thread(&self, addr: &SocketAddr, protocol: &Http) {
        let logger = logging::Logger::new("IceServer::listen_in_this_thread");

        let mut ev_loop = tokio_core::reactor::Core::new().unwrap();

        let (control_tx, control_rx) = std::sync::mpsc::channel();
        let remote_handle = ev_loop.handle().remote().clone();

        let session_storage = self.prep.session_storage.clone();

        let ctx = Arc::new(Context {
            ev_loop_remote: remote_handle.clone(),
            router: self.prep.router.clone(),
            static_dir: self.prep.static_dir.read().unwrap().clone(),
            session_cookie_name: self.prep.session_cookie_name.lock().unwrap().clone(),
            session_storage: session_storage.clone(),
            templates: self.prep.templates.clone(),
            max_request_body_size: *self.prep.max_request_body_size.lock().unwrap(),
            log_requests: *self.prep.log_requests.lock().unwrap(),
            stats: stat::ServerStats::new(),
            max_cache_size: config::DEFAULT_MAX_CACHE_SIZE,
            endpoint_timeout_ms: *self.prep.endpoint_timeout_ms.lock().unwrap(),
            custom_app_data: self.prep.custom_app_data.clone(),
            modules: self.prep.modules.clone()
        });

        let local_ctx = Rc::new(LocalContext {
            ev_loop_handle: ev_loop.handle(),
            static_file_worker_control_tx: control_tx,
            async_endpoint_cb: self.prep.async_endpoint_cb.lock().unwrap().clone().unwrap()
        });

        let ctx_cloned = ctx.clone();
        let _ = std::thread::spawn(move || static_file::worker(ctx_cloned, remote_handle, control_rx));

        let this_handle = ev_loop.handle();

        let listener = start_listener(addr);
        
        let listener = tokio_core::net::TcpListener::from_listener(
            listener,
            addr,
            &this_handle
        ).unwrap();

        let server = listener.incoming().for_each(|(sock, addr)| {
            let s = HttpService {
                context: ctx.clone(),
                local_context: local_ctx.clone()
            };
            protocol.bind_connection(&this_handle, sock, addr, s);

            Ok(())
        });

        logger.log(logging::Message::Info(format!("Ice Server v{} listening at {}", env!("CARGO_PKG_VERSION"), addr)));

        ev_loop.run(server).unwrap();
    }

    pub fn listen(&self, addr: &str) {
        let protocol = Arc::new(Http::new());
        let addr: SocketAddr = addr.parse().unwrap();

        self.export_symbols();

        let session_timeout_ms = *self.prep.session_timeout_ms.read().unwrap();
        let session_storage = self.prep.session_storage.clone();
        std::thread::spawn(move || session_storage.run_gc(session_timeout_ms, config::SESSION_GC_PERIOD_MS));

        if cfg!(unix) {
            let mut total_threads = num_cpus::get() - 1;
            if total_threads < 1 {
                total_threads = 1;
            }

            for _ in 0..total_threads {
                let addr = addr.clone();
                let target = self.clone();
                let protocol = protocol.clone();

                std::thread::spawn(move || target.listen_in_this_thread(&addr, &protocol));
            }
        } else {
            let target = self.clone();
            std::thread::spawn(move || target.listen_in_this_thread(&addr, &protocol));
        }
    }

    #[cfg(feature = "use_cervus")]
    fn export_symbols(&self) {
        unsafe {
            cervus::engine::add_global_symbol("ice_glue_create_response", glue::response::ice_glue_create_response as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_add_header", glue::response::ice_glue_response_add_header as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_set_cookie", glue::response::ice_glue_response_set_cookie as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_set_body", glue::response::ice_glue_response_set_body as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_set_file", glue::response::ice_glue_response_set_file as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_set_status", glue::response::ice_glue_response_set_status as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_consume_rendered_template", glue::response::ice_glue_response_consume_rendered_template as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_stream", glue::response::ice_glue_response_stream as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_custom_properties_set", glue::common::ice_glue_custom_properties_set as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_custom_properties_get", glue::common::ice_glue_custom_properties_get as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_response_borrow_custom_properties", glue::response::ice_glue_response_borrow_custom_properties as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_interop_set_rx_field", glue::interop::ice_glue_interop_set_rx_field as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_interop_get_tx_field", glue::interop::ice_glue_interop_get_tx_field as *const c_void);
            cervus::engine::add_global_symbol("ice_glue_interop_read_tx", glue::interop::ice_glue_interop_read_tx as *const c_void);
        }
    }

    #[cfg(not(feature = "use_cervus"))]
    fn export_symbols(&self) {
    }

    #[cfg(feature = "use_cervus")]
    pub fn load_module(&self, name: &str, bitcode: &[u8]) {
        let mut ext_res = cervus::manager::ExternalResources::new();
        let mod_logger = logging::Logger::new(name);

        ext_res.set_logger(Box::new(move |level, msg| {
            print_module_log(
                &mod_logger,
                level,
                msg
            );
        }));

        self.prep.modules.load(name, bitcode, ext_res);
    }

    #[cfg(not(feature = "use_cervus"))]
    pub fn load_module(&self, _: &str, _: &[u8]) {
        unimplemented!()
    }
}

impl Service for HttpService {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<futures::Future<Error=hyper::Error, Item=hyper::Response>>;

    fn call(&self, req: Request) -> Self::Future {
        Box::new(delegates::fire_handlers(self.context.clone(), self.local_context.clone(), req)
        .map_err(|e| hyper::Error::from(std::io::Error::new(std::io::ErrorKind::Other, e))))
    }
}

impl Context {
    #[cfg(feature = "cervus")]
    pub fn get_service_by_name(
        &self,
        module_name: &str,
        service_name: &str
    ) -> Option<cervus::manager::ServiceHandle> {
        let m = match self.modules.get_module_by_name(module_name) {
            Some(v) => v,
            None => return None
        };
        m.get_service_by_name(service_name)
    }
}

#[cfg(feature = "use_cervus")]
fn new_modules() -> Modules {
    let modules = cervus::manager::Modules::new();
    init_modules(&modules);
    modules
}

#[cfg(not(feature = "use_cervus"))]
fn new_modules() -> Modules {
    false
}

#[cfg(feature = "use_cervus")]
fn init_modules(modules: &cervus::manager::Modules) {
    modules.add_downcast_provider("basic_request_info", Box::new(|v| {
        v.downcast_ref::<delegates::BasicRequestInfo>().unwrap()
            as *const delegates::BasicRequestInfo
            as *const c_void
    }));
    modules.add_downcast_provider("glue_response", Box::new(|v| {
        v.downcast_ref::<glue::response::Response>().unwrap()
            as *const glue::response::Response
            as *const c_void
    }));
    modules.add_downcast_provider("interop_context", Box::new(|v| {
        v.downcast_ref::<glue::interop::InteropContext>().unwrap()
            as *const glue::interop::InteropContext
            as *const c_void
    }));
}

#[cfg(feature = "use_cervus")]
fn print_module_log(logger: &logging::Logger, level: cervus::logging::LogLevel, msg: &str) {
    let msg = msg.to_string();

    use cervus::logging::LogLevel;
    logger.log(
        match level {
            LogLevel::Emergency | LogLevel::Alert | LogLevel::Critical | LogLevel::Error => logging::Message::Error(msg),
            LogLevel::Warning | LogLevel::Notice => logging::Message::Warning(msg),
            LogLevel::Info | LogLevel::Debug => logging::Message::Info(msg)
        }
    );
}

#[cfg(unix)]
fn start_listener(addr: &SocketAddr) -> std::net::TcpListener {
    net2::TcpBuilder::new_v4().unwrap()
        .reuse_address(true).unwrap()
        .reuse_port(true).unwrap()
        .bind(addr).unwrap()
        .listen(128).unwrap()
}

#[cfg(not(unix))]
fn start_listener(addr: &SocketAddr) -> std::net::TcpListener {
    net2::TcpBuilder::new_v4().unwrap()
        .reuse_address(true).unwrap()
        .bind(addr).unwrap()
        .listen(128).unwrap()
}