1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//! Async access to a prost-encoded item stream.
//!
//! Highly inspired by [async-bincode](https://docs.rs/async-bincode).

#![deny(missing_docs)]

mod frame;
mod reader;
mod stream;
mod writer;

pub use crate::frame::{Frame, Framed, ShallDecodeBody};
pub use crate::reader::AsyncProstReader;
pub use crate::stream::AsyncProstStream;
pub use crate::writer::{AsyncProstWriter, ProstWriterFor};

/// A marker that indicates that the wrapping type is compatible with `AsyncProstReader` with Prost support.
#[derive(Debug)]
pub struct AsyncDestination;

/// a marker that indicates that the wrapper type is compatible with `AsyncProstReader` with Framed support.
#[derive(Debug)]
pub struct AsyncFrameDestination;

/// A marker that indicates that the wrapping type is compatible with stock `prost` receivers.
#[derive(Debug)]
pub struct SyncDestination;

#[cfg(test)]
mod tests {

    use super::*;
    use bytes::Bytes;
    use futures::prelude::*;
    use prost::Message;
    use tokio::{
        io::AsyncWriteExt,
        net::{TcpListener, TcpStream},
    };

    #[derive(Clone, PartialEq, Message)]
    pub struct Event {
        #[prost(bytes = "bytes", tag = "1")]
        pub id: Bytes,
        #[prost(bytes = "bytes", tag = "2")]
        pub data: Bytes,
    }

    #[tokio::test]
    async fn echo_message_should() {
        let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = echo.local_addr().unwrap();

        tokio::spawn(async move {
            let (stream, _) = echo.accept().await.unwrap();
            let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
            let (r, w) = stream.tcp_split();
            r.forward(w).await.unwrap();
        });

        let stream = TcpStream::connect(&addr).await.unwrap();
        let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
        let event = Event {
            id: Bytes::from_static(b"1234"),
            data: Bytes::from_static(b"hello world"),
        };
        client.send(event.clone()).await.unwrap();
        assert_eq!(client.next().await.unwrap().unwrap(), event);

        let event = Event {
            id: Bytes::from_static(b"1235"),
            data: Bytes::from_static(b"goodbye world"),
        };
        client.send(event.clone()).await.unwrap();
        assert_eq!(client.next().await.unwrap().unwrap(), event);
        drop(client);
    }

    #[tokio::test]
    async fn echo_lots_of_messages_should_work() {
        let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = echo.local_addr().unwrap();

        tokio::spawn(async move {
            let (stream, _) = echo.accept().await.unwrap();
            let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
            let (r, w) = stream.tcp_split();
            r.forward(w).await.unwrap();
        });

        let n = 81920usize;
        let stream = TcpStream::connect(&addr).await.unwrap();
        let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
        futures::stream::iter(0..n)
            .map(|i| {
                Ok(Event {
                    id: Bytes::from(i.to_string()),
                    data: Bytes::from_static(b"goodbye world"),
                })
            })
            .forward(&mut client)
            .await
            .unwrap();

        let stream = client.get_mut();
        stream.flush().await.unwrap();

        let mut at = 0usize;
        while let Some(got) = client.next().await.transpose().unwrap() {
            assert_eq!(Bytes::from(at.to_string()), got.id);
            at += 1;
        }
    }
}