#[macro_use]
extern crate log;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time;
use futures::{future, Future, FutureExt, TryFutureExt};
use msgpack_rpc::{serve, Service, Value};
use tokio::net::TcpListener;
use tokio::time::sleep;
use tokio_util::compat::TokioAsyncReadCompatExt;
#[derive(Clone)]
pub struct Server;
impl Service for Server {
type RequestFuture = Pin<Box<dyn Future<Output = Result<Value, Value>> + Send>>;
fn handle_request(&mut self, method: &str, params: &[Value]) -> Self::RequestFuture {
if method != "do_long_computation" {
return Box::pin(future::err(format!("Invalid method {}", method).into()));
}
if params.len() != 1 {
return Box::pin(future::err(
"'do_long_computation' takes one argument".into(),
));
}
if let Value::Integer(ref value) = params[0] {
if let Some(value) = value.as_u64() {
return Box::pin(sleep(time::Duration::from_secs(value)).then(|_| {
future::ok(
time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap()
.as_secs()
.into(),
)
}));
}
}
Box::pin(future::err("Argument must be an unsigned integer".into()))
}
fn handle_notification(&mut self, method: &str, _: &[Value]) {
println!("{}", method);
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
env_logger::init();
let addr: SocketAddr = "127.0.0.1:54321".parse().unwrap();
let listener = TcpListener::bind(&addr).await?;
loop {
let socket = match listener.accept().await {
Ok((socket, _)) => socket,
Err(e) => {
info!("error on TcpListener: {}", e);
continue;
}
};
info!("new connection {:?}", socket);
info!("spawning a new Server");
tokio::spawn(serve(socket.compat(), Server).map_err(|e| info!("server error {}", e)));
}
}