fluvio-future 0.2.0

I/O futures for Fluvio project
Documentation
use std::io::Error;
use std::net::SocketAddr;
use std::time;

use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures_lite::future::zip;
use futures_lite::stream::StreamExt;
use futures_util::sink::SinkExt;

use tokio_util::codec::BytesCodec;
use tokio_util::codec::Framed;
use tokio_util::compat::FuturesAsyncReadCompatExt;

use log::debug;

use crate::test_async;
use crate::timer::sleep;

use crate::net::TcpListener;
use crate::net::TcpStream;

fn to_bytes(bytes: Vec<u8>) -> Bytes {
    let mut buf = BytesMut::with_capacity(bytes.len());
    buf.put_slice(&bytes);
    buf.freeze()
}

#[test_async]
async fn test_async_tcp() -> Result<(), Error> {
    let addr = "127.0.0.1:9998".parse::<SocketAddr>().expect("parse");

    let server_ft = async {
        debug!("server: binding");
        let listener = TcpListener::bind(&addr).await?;
        debug!("server: successfully binding. waiting for incoming");
        let mut incoming = listener.incoming();
        let stream = incoming.next().await.expect("no stream");
        debug!("server: got connection from client");
        let tcp_stream = stream?;
        let mut framed = Framed::new(tcp_stream.compat(), BytesCodec::new());
        debug!("server: sending values to client");
        let data = vec![0x05, 0x0a, 0x63];
        framed.send(to_bytes(data)).await?;
        Ok(()) as Result<(), Error>
    };

    let client_ft = async {
        debug!("client: sleep to give server chance to come up");
        sleep(time::Duration::from_millis(100)).await;
        debug!("client: trying to connect");
        let tcp_stream = TcpStream::connect(&addr).await?;
        let mut framed = Framed::new(tcp_stream.compat(), BytesCodec::new());
        debug!("client: got connection. waiting");
        let value = framed.next().await.expect("no value received");
        debug!("client :received first value from server");
        let bytes = value?;
        debug!("client :received bytes len: {}", bytes.len());
        assert_eq!(bytes.len(), 3);
        let values = bytes.take(3).into_inner();
        assert_eq!(values[0], 0x05);
        assert_eq!(values[1], 0x0a);
        assert_eq!(values[2], 0x63);

        Ok(()) as Result<(), Error>
    };

    let _ = zip(client_ft, server_ft).await;

    Ok(())
}