1#![deny(missing_docs)]
6
7mod frame;
8mod reader;
9mod stream;
10mod writer;
11
12pub use crate::frame::{Frame, Framed, ShallDecodeBody};
13pub use crate::reader::AsyncProstReader;
14pub use crate::stream::AsyncProstStream;
15pub use crate::writer::{AsyncProstWriter, ProstWriterFor};
16
17#[derive(Debug)]
19pub struct AsyncDestination;
20
21#[derive(Debug)]
23pub struct AsyncFrameDestination;
24
25#[derive(Debug)]
27pub struct SyncDestination;
28
29#[cfg(test)]
30mod tests {
31
32 use super::*;
33 use bytes::Bytes;
34 use futures::prelude::*;
35 use prost::Message;
36 use tokio::{
37 io::AsyncWriteExt,
38 net::{TcpListener, TcpStream},
39 };
40
41 #[derive(Clone, PartialEq, Message)]
42 pub struct Event {
43 #[prost(bytes = "bytes", tag = "1")]
44 pub id: Bytes,
45 #[prost(bytes = "bytes", tag = "2")]
46 pub data: Bytes,
47 }
48
49 #[tokio::test]
50 async fn echo_message_should() {
51 let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
52 let addr = echo.local_addr().unwrap();
53
54 tokio::spawn(async move {
55 let (stream, _) = echo.accept().await.unwrap();
56 let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
57 let (r, w) = stream.tcp_split();
58 r.forward(w).await.unwrap();
59 });
60
61 let stream = TcpStream::connect(&addr).await.unwrap();
62 let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
63 let event = Event {
64 id: Bytes::from_static(b"1234"),
65 data: Bytes::from_static(b"hello world"),
66 };
67 client.send(event.clone()).await.unwrap();
68 assert_eq!(client.next().await.unwrap().unwrap(), event);
69
70 let event = Event {
71 id: Bytes::from_static(b"1235"),
72 data: Bytes::from_static(b"goodbye world"),
73 };
74 client.send(event.clone()).await.unwrap();
75 assert_eq!(client.next().await.unwrap().unwrap(), event);
76 drop(client);
77 }
78
79 #[tokio::test]
80 async fn echo_lots_of_messages_should_work() {
81 let echo = TcpListener::bind("127.0.0.1:0").await.unwrap();
82 let addr = echo.local_addr().unwrap();
83
84 tokio::spawn(async move {
85 let (stream, _) = echo.accept().await.unwrap();
86 let mut stream = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
87 let (r, w) = stream.tcp_split();
88 r.forward(w).await.unwrap();
89 });
90
91 let n = 81920usize;
92 let stream = TcpStream::connect(&addr).await.unwrap();
93 let mut client = AsyncProstStream::<_, Event, Event, _>::from(stream).for_async();
94 futures::stream::iter(0..n)
95 .map(|i| {
96 Ok(Event {
97 id: Bytes::from(i.to_string()),
98 data: Bytes::from_static(b"goodbye world"),
99 })
100 })
101 .forward(&mut client)
102 .await
103 .unwrap();
104
105 let stream = client.get_mut();
106 stream.flush().await.unwrap();
107
108 let mut at = 0usize;
109 while let Some(got) = client.next().await.transpose().unwrap() {
110 assert_eq!(Bytes::from(at.to_string()), got.id);
111 at += 1;
112 }
113 }
114}