use futures::{Future, Stream, Sink};
use tokio_core::reactor::Core;
use tokio_core::net::TcpListener;
use tokio_io::AsyncRead;
use tokio_service::{Service, NewService};
use std::io;
use std::net::SocketAddr;
use message::{self, Message, Op, Code};
use cache;
use codec::CacheCodec;
use std::sync::Arc;
use std::error::Error;
use futures::sync::oneshot;
use stats::Stats;
use time;
pub fn serve<T>(addr: SocketAddr, s: T) -> io::Result<()>
where
T: NewService<Request = Message, Response = Message, Error = io::Error> + 'static,
<T::Instance as Service>::Future: 'static,
{
let mut core = Core::new()?;
let handle = core.handle();
let listener = TcpListener::bind(&addr, &handle)?;
let connections = listener.incoming();
let server = connections.for_each(move |(socket, _peer_addr)| {
let (writer, reader) = socket.framed(CacheCodec).split();
let service = s.new_service().unwrap();
let responses = reader.and_then(move |(req_id, msg)| {
service.call(msg).map(move |resp| (req_id, resp))
});
let server = writer.send_all(responses).then(|_| Ok(()));
handle.spawn(server);
Ok(())
});
core.run(server)
}
pub struct CacheService {
pub cache: Arc<cache::Cache>,
}
impl Service for CacheService {
type Request = Message;
type Response = Message;
type Error = io::Error;
type Future = Box<Future<Item = Message, Error = io::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
let (snd, rcv) = oneshot::channel();
self.cache.process(req, snd);
rcv.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
.boxed()
}
}
impl NewService for CacheService {
type Request = Message;
type Response = Message;
type Error = io::Error;
type Instance = CacheService;
fn new_service(&self) -> io::Result<Self::Instance> {
Ok(CacheService { cache: self.cache.clone() })
}
}
pub struct StatService<T> {
pub inner: T,
pub stats: Arc<Stats>,
}
impl<T> Service for StatService<T>
where T: Service<Request = Message, Response = Message, Error = io::Error>,
T::Future: 'static {
type Request = Message;
type Response = Message;
type Error = io::Error;
type Future = Box<Future<Item = Message, Error = io::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
match req.op() {
Op::Stats => {
let data = self.stats.get_stats();
Box::new(self.inner.call(req).map(|resp| match resp {
message::Message::Response(_, _, Some(payload)) => {
let len = payload.type_id();
let s = format!("keys: {} ", len) + data.as_ref();
message::response(Op::Stats, Code::Ok, Some(
message::payload(1, s.into_bytes())))
}
_ => message::response(Op::Stats, Code::Ok,
Some(message::payload(1, data.into_bytes())))
}))
}
_ => {
let stats = self.stats.clone();
let start_time = time::now();
Box::new(self.inner.call(req).and_then(move|resp|{
stats.incr_total_requests();
stats.add_request_time((time::now() - start_time)
.num_microseconds().unwrap() as usize);
Ok(resp)
}))
}
}
}
}
impl<T> NewService for StatService<T>
where
T: NewService<
Request = Message,
Response = Message,
Error = io::Error,
>,
<T::Instance as Service>::Future: 'static,
{
type Request = Message;
type Response = Message;
type Error = io::Error;
type Instance = StatService<T::Instance>;
fn new_service(&self) -> io::Result<Self::Instance> {
let inner = self.inner.new_service()?;
Ok(StatService {
inner: inner,
stats: self.stats.clone(),
})
}
}
pub struct LogService<T> {
pub inner: T,
}
impl<T> Service for LogService<T>
where T: Service<Request = Message, Response = Message, Error = io::Error>,
T::Future: 'static {
type Request = Message;
type Response = Message;
type Error = io::Error;
type Future = Box<Future<Item = Message, Error = io::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
println!("{}", req);
Box::new(self.inner.call(req).and_then(|resp| {
println!("{}", resp);
Ok(resp)
}))
}
}
impl<T> NewService for LogService<T>
where
T: NewService<
Request = Message,
Response = Message,
Error = io::Error,
>,
<T::Instance as Service>::Future: 'static,
{
type Request = Message;
type Response = Message;
type Error = io::Error;
type Instance = LogService<T::Instance>;
fn new_service(&self) -> io::Result<Self::Instance> {
let inner = self.inner.new_service()?;
Ok(LogService { inner: inner })
}
}