diny 0.2.4

An asynchronous, alloc-free serialization framework
Documentation

#![feature(generic_associated_types)]

use futures::{io, SinkExt, StreamExt};
use tokio::{net, sync::oneshot};
use async_compat::CompatExt;

#[derive(diny::AsyncSerialization, Copy, Clone, PartialEq, Debug)]
pub struct Id(u32);

#[derive(diny::AsyncSerialization, PartialEq, Debug)]
pub struct Ping(Id);

#[derive(diny::AsyncSerialization, PartialEq, Debug)]
pub struct Pong(Id);

const ADDR: &str = "127.0.0.1:8090";

async fn server(ready: oneshot::Sender<()>) -> io::Result<()> {
    let listener = net::TcpListener::bind(ADDR).await?;
    assert!(ready.send(()).is_ok());

    let (mut socket, _) = listener.accept().await?;
    let (rx, tx) = socket.split();

    let mut stream = diny::deserializer(
        diny_test::format(),
        io::BufReader::new(rx.compat()),
    ).into_stream();

    let mut sink = diny::serializer(
        diny_test::format(),
        io::BufWriter::new(tx.compat()),
    ).into_sink();
    
    while let Some(Ping(id)) = stream.next().await {
        sink.send(Pong(id)).await?;
    }
    sink.close().await?;

    Ok(())
}

async fn client(ready: oneshot::Receiver<()>) -> io::Result<()> {
    assert!(ready.await.is_ok());

    let mut socket = net::TcpStream::connect(ADDR).await?;
    let (rx, tx) = socket.split();

    let mut sink = diny::serializer(
        diny_test::format(),
        io::BufWriter::new(tx.compat()),
    ).into_sink();

    let mut stream = diny::deserializer(
        diny_test::format(),
        io::BufReader::new(rx.compat()),
    ).into_stream();

    for i in 0..10 {
        let id = Id(i);
        sink.send(Ping(id)).await?;
        assert_eq!(stream.next().await, Some(Pong(id)));
    }
    sink.close().await?;
    assert_eq!(stream.next().await, None);

    Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let (notify, ready) = oneshot::channel();
    let server = tokio::spawn(server(notify));
    let client = tokio::spawn(client(ready));
    client.await??;
    server.await??;
    
    Ok(())
}