#![feature(generic_associated_types)]
use futures::{io, SinkExt, StreamExt};
use tokio::{net, sync::oneshot};
use async_compat::CompatExt;
#[derive(diny::AsyncSerialization, Copy, Clone, PartialEq, Debug)]
pub struct Id(u32);
#[derive(diny::AsyncSerialization, PartialEq, Debug)]
pub struct Ping(Id);
#[derive(diny::AsyncSerialization, PartialEq, Debug)]
pub struct Pong(Id);
const ADDR: &str = "127.0.0.1:8090";
async fn server(ready: oneshot::Sender<()>) -> io::Result<()> {
let listener = net::TcpListener::bind(ADDR).await?;
assert!(ready.send(()).is_ok());
let (mut socket, _) = listener.accept().await?;
let (rx, tx) = socket.split();
let mut stream = diny::deserializer(
diny_test::format(),
io::BufReader::new(rx.compat()),
).into_stream();
let mut sink = diny::serializer(
diny_test::format(),
io::BufWriter::new(tx.compat()),
).into_sink();
while let Some(Ping(id)) = stream.next().await {
sink.send(Pong(id)).await?;
}
sink.close().await?;
Ok(())
}
async fn client(ready: oneshot::Receiver<()>) -> io::Result<()> {
assert!(ready.await.is_ok());
let mut socket = net::TcpStream::connect(ADDR).await?;
let (rx, tx) = socket.split();
let mut sink = diny::serializer(
diny_test::format(),
io::BufWriter::new(tx.compat()),
).into_sink();
let mut stream = diny::deserializer(
diny_test::format(),
io::BufReader::new(rx.compat()),
).into_stream();
for i in 0..10 {
let id = Id(i);
sink.send(Ping(id)).await?;
assert_eq!(stream.next().await, Some(Pong(id)));
}
sink.close().await?;
assert_eq!(stream.next().await, None);
Ok(())
}
#[tokio::main]
async fn main() -> io::Result<()> {
let (notify, ready) = oneshot::channel();
let server = tokio::spawn(server(notify));
let client = tokio::spawn(client(ready));
client.await??;
server.await??;
Ok(())
}