1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//! Async access to a prost-encoded item stream.
//!
//! Highly inspired by [async-bincode](https://docs.rs/async-bincode).

#![deny(missing_docs)]

mod frame;
mod reader;
mod stream;
mod writer;

pub use crate::frame::{Frame, Framed, ShallDecodeBody};
pub use crate::reader::AsyncProstReader;
pub use crate::stream::AsyncProstStream;
pub use crate::writer::{AsyncProstWriter, ProstWriterFor};

/// A marker that indicates that the wrapping type is compatible with `AsyncProstReader` with Prost support.
#[derive(Debug)]
pub struct AsyncDestination;

/// a marker that indicates that the wrapper type is compatible with `AsyncProstReader` with Framed support.
#[derive(Debug)]
pub struct AsyncFrameDestination;

/// A marker that indicates that the wrapping type is compatible with stock `prost` receivers.
#[derive(Debug)]
pub struct SyncDestination;

#[cfg(test)]
mod tests {

    use super::*;
    use bytes::Bytes;
    use futures::prelude::*;
    use prost::Message;
    use tokio::{
        io::AsyncWriteExt,
        net::{TcpListener, TcpStream},
    };

    #[derive(Clone, PartialEq, Message)]
    pub struct Event {
        #[prost(bytes = "bytes", tag = "1")]
        pub id: Bytes,
        #[prost(bytes = "bytes", tag = "2")]
        pub data: Bytes,
    }

    #[tokio::test]
    async fn echo_message_should() {
        let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = echo.local_addr().unwrap();

        tokio::spawn(async move {
            let (stream, _) = echo.accept().await.unwrap();
            let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
            let (r, w) = stream.tcp_split();
            r.forward(w).await.unwrap();
        });

        let stream = TcpStream::connect(&addr).await.unwrap();
        let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
        let event = Event {
            id: Bytes::from_static(b"1234"),
            data: Bytes::from_static(b"hello world"),
        };
        client.send(event.clone()).await.unwrap();
        assert_eq!(client.next().await.unwrap().unwrap(), event);

        let event = Event {
            id: Bytes::from_static(b"1235"),
            data: Bytes::from_static(b"goodbye world"),
        };
        client.send(event.clone()).await.unwrap();
        assert_eq!(client.next().await.unwrap().unwrap(), event);
        drop(client);
    }

    #[tokio::test]
    async fn echo_lots_of_messages_should_work() {
        let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = echo.local_addr().unwrap();

        tokio::spawn(async move {
            let (stream, _) = echo.accept().await.unwrap();
            let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
            let (r, w) = stream.tcp_split();
            r.forward(w).await.unwrap();
        });

        let n = 81920usize;
        let stream = TcpStream::connect(&addr).await.unwrap();
        let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
        futures::stream::iter(0..n)
            .map(|i| {
                Ok(Event {
                    id: Bytes::from(i.to_string()),
                    data: Bytes::from_static(b"goodbye world"),
                })
            })
            .forward(&mut client)
            .await
            .unwrap();

        let stream = client.get_mut();
        stream.flush().await.unwrap();

        let mut at = 0usize;
        while let Some(got) = client.next().await.transpose().unwrap() {
            assert_eq!(Bytes::from(at.to_string()), got.id);
            at += 1;
        }
    }
}