1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#![doc(html_root_url = "https://docs.rs/pemmican")]
extern crate futures;
extern crate futures_cpupool;
extern crate tokio_service;
extern crate hyper;
extern crate chashmap;
#[macro_use]
extern crate log;
pub mod error;
pub use error::Error;
pub mod config;
pub use config::Config;
pub mod shared;
pub use shared::Shared;
pub mod plugins;
pub use plugins::{PluginData, Plugin};
use std::error::Error as StdError;
use std::sync::Arc;
use futures::Future;
use tokio_service::Service;
use hyper::server::{Http, Request, Response};
use hyper::StatusCode;
pub struct Pemmican<S, E>
{
config: Config,
pub shared: Arc<Shared<S>>,
pub plugins: Vec<Arc<Box<Plugin<S, E>>>>,
}
impl<S, E> Pemmican<S, E>
where S: 'static,
E: Send + Sync + StdError + 'static
{
pub fn new(config: Config,
plugins: Vec<Arc<Box<Plugin<S, E>>>>,
initial_state: S)
-> Pemmican<S, E>
{
let num_threads = config.num_threads;
Pemmican {
config: config,
plugins: plugins,
shared: Arc::new(Shared::new(num_threads, initial_state)),
}
}
pub fn run<F>(self, addr: &str, shutdown_signal: F) -> Result<(), Error>
where F: Future<Item = (), Error = ()>
{
let keep_alive = self.config.keep_alive;
let shutdown_timeout = self.config.shutdown_timeout;
let arcself = Arc::new(self);
let addr = addr.parse()?;
let mut server = Http::new()
.keep_alive(keep_alive)
.bind(&addr, move|| Ok(arcself.clone()))?;
server.shutdown_timeout(shutdown_timeout);
server.run_until(shutdown_signal).map_err(From::from)
}
}
impl<S, E> Service for Pemmican<S, E>
where S: 'static,
E: Send + Sync + StdError + 'static
{
type Request = Request;
type Response = Response;
type Error = ::hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, req: Request) -> Self::Future {
let data = PluginData {
shared: self.shared.clone(),
request: req,
response: Response::new().with_status(StatusCode::NotFound),
};
let mut fut: Box<Future<Item = PluginData<S>, Error = E>> =
Box::new(::futures::future::ok(data));
for plugin in &self.plugins {
let plug = plugin.clone();
fut = Box::new(
fut.and_then(move|data| {
plug.handle(data)
})
);
}
let fut = Box::new( fut.map(|data| data.response) );
Box::new( fut.or_else(|e| {
error!("error: {}", e);
::futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
}))
}
}