async_prost/
lib.rs

1//! Async access to a prost-encoded item stream.
2//!
3//! Highly inspired by [async-bincode](https://docs.rs/async-bincode).
4
5#![deny(missing_docs)]
6
7mod frame;
8mod reader;
9mod stream;
10mod writer;
11
12pub use crate::frame::{Frame, Framed, ShallDecodeBody};
13pub use crate::reader::AsyncProstReader;
14pub use crate::stream::AsyncProstStream;
15pub use crate::writer::{AsyncProstWriter, ProstWriterFor};
16
17/// A marker that indicates that the wrapping type is compatible with `AsyncProstReader` with Prost support.
18#[derive(Debug)]
19pub struct AsyncDestination;
20
21/// a marker that indicates that the wrapper type is compatible with `AsyncProstReader` with Framed support.
22#[derive(Debug)]
23pub struct AsyncFrameDestination;
24
25/// A marker that indicates that the wrapping type is compatible with stock `prost` receivers.
26#[derive(Debug)]
27pub struct SyncDestination;
28
29#[cfg(test)]
30mod tests {
31
32    use super::*;
33    use bytes::Bytes;
34    use futures::prelude::*;
35    use prost::Message;
36    use tokio::{
37        io::AsyncWriteExt,
38        net::{TcpListener, TcpStream},
39    };
40
41    #[derive(Clone, PartialEq, Message)]
42    pub struct Event {
43        #[prost(bytes = "bytes", tag = "1")]
44        pub id: Bytes,
45        #[prost(bytes = "bytes", tag = "2")]
46        pub data: Bytes,
47    }
48
49    #[tokio::test]
50    async fn echo_message_should() {
51        let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
52        let addr = echo.local_addr().unwrap();
53
54        tokio::spawn(async move {
55            let (stream, _) = echo.accept().await.unwrap();
56            let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
57            let (r, w) = stream.tcp_split();
58            r.forward(w).await.unwrap();
59        });
60
61        let stream = TcpStream::connect(&addr).await.unwrap();
62        let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
63        let event = Event {
64            id: Bytes::from_static(b"1234"),
65            data: Bytes::from_static(b"hello world"),
66        };
67        client.send(event.clone()).await.unwrap();
68        assert_eq!(client.next().await.unwrap().unwrap(), event);
69
70        let event = Event {
71            id: Bytes::from_static(b"1235"),
72            data: Bytes::from_static(b"goodbye world"),
73        };
74        client.send(event.clone()).await.unwrap();
75        assert_eq!(client.next().await.unwrap().unwrap(), event);
76        drop(client);
77    }
78
79    #[tokio::test]
80    async fn echo_lots_of_messages_should_work() {
81        let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
82        let addr = echo.local_addr().unwrap();
83
84        tokio::spawn(async move {
85            let (stream, _) = echo.accept().await.unwrap();
86            let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
87            let (r, w) = stream.tcp_split();
88            r.forward(w).await.unwrap();
89        });
90
91        let n = 81920usize;
92        let stream = TcpStream::connect(&addr).await.unwrap();
93        let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
94        futures::stream::iter(0..n)
95            .map(|i| {
96                Ok(Event {
97                    id: Bytes::from(i.to_string()),
98                    data: Bytes::from_static(b"goodbye world"),
99                })
100            })
101            .forward(&mut client)
102            .await
103            .unwrap();
104
105        let stream = client.get_mut();
106        stream.flush().await.unwrap();
107
108        let mut at = 0usize;
109        while let Some(got) = client.next().await.transpose().unwrap() {
110            assert_eq!(Bytes::from(at.to_string()), got.id);
111            at += 1;
112        }
113    }
114}