#[macro_use]
extern crate log;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use futures::{future, Future, FutureExt, TryFutureExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;
use msgpack_rpc::{Client, Endpoint, ServiceWithClient, Value};
#[derive(Clone)]
pub struct PingPong {
pub value: Arc<Mutex<i64>>,
}
impl PingPong {
fn new() -> Self {
PingPong {
value: Arc::new(Mutex::new(0)),
}
}
}
impl ServiceWithClient for PingPong {
type RequestFuture = Pin<Box<dyn Future<Output = Result<Value, Value>> + 'static + Send>>;
fn handle_request(
&mut self,
client: &mut Client,
method: &str,
params: &[Value],
) -> Self::RequestFuture {
match method {
"ping" => {
let id = params[0].as_i64().unwrap();
info!("received ping({}), sending pong", id);
let request = client
.request("pong", &[id.into()])
.map_ok(|_| "".into())
.map_err(|_| "".into());
Box::pin(request)
}
"pong" => {
let id = params[0].as_i64().unwrap();
info!("received pong({}), incrementing pong counter", id);
*self.value.lock().unwrap() += 1;
Box::pin(future::ok("".into()))
}
method => {
let err = format!("Invalid method {}", method).into();
Box::pin(future::err(err))
}
}
}
fn handle_notification(&mut self, _: &mut Client, _: &str, _: &[Value]) {
unimplemented!();
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
env_logger::init();
let addr: SocketAddr = "127.0.0.1:54321".parse().unwrap();
let listener = TcpListener::bind(&addr).await?;
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((socket, _)) => {
tokio::spawn(Endpoint::new(socket.compat(), PingPong::new()));
}
Err(e) => debug!("Error accepting connection: {}", e),
}
}
});
let ping_pong_client = PingPong::new();
let pongs = ping_pong_client.value.clone();
let socket = TcpStream::connect(&addr).await?;
let endpoint = Endpoint::new(socket.compat(), ping_pong_client);
let client = endpoint.client();
let mut requests = vec![];
for i in 0..10 {
requests.push(client.request("ping", &[i.into()]).map(|_response| ()));
}
future::select(future::join_all(requests), endpoint.map_err(|_| ())).await;
println!("Received {} pongs", pongs.lock().unwrap());
Ok(())
}