1pub mod mux;
2mod packet;
3mod sink;
4mod stream;
5
6pub use packet::*;
7pub use sink::*;
8pub use stream::*;
9
10use core::future::Future;
11use core::pin::Pin;
12
13type PinFut<O> = Pin<Box<dyn Future<Output = O> + 'static>>;
14
15#[cfg(test)]
16mod tests {
17 use super::*;
18 use byteorder::{BigEndian, ByteOrder};
19 use futures::executor::block_on;
20 use futures::io::AsyncReadExt;
21 use futures::stream::StreamExt;
22 use futures::{future::join, stream::iter, SinkExt, TryStreamExt};
23
24 #[test]
25 fn encode() {
26 let mut p = Packet::new(IsStream::Yes, IsEnd::No, BodyType::Json, 123, vec![0; 25]);
27
28 let expected_head: [u8; 9] = [0b0000_1010, 0, 0, 0, 25, 0, 0, 0, 123];
29 assert_eq!(p.flags(), expected_head[0]);
30 assert_eq!(p.header(), expected_head);
31
32 p.end = IsEnd::Yes;
33 p.stream = IsStream::No;
34 p.body_type = BodyType::Binary;
35 assert_eq!(p.flags(), 0b0000_0100);
36 }
37
38 #[test]
39 fn decode() {
40 let head: [u8; 9] = [0b0000_1101, 0, 0, 0, 25, 0, 0, 0, 200];
41 let body_len = BigEndian::read_u32(&head[1..5]);
42 let id = BigEndian::read_i32(&head[5..]);
43
44 assert_eq!(body_len, 25);
45 assert_eq!(id, 200);
46
47 let p = Packet::new(
48 head[0].into(),
49 head[0].into(),
50 head[0].into(),
51 id,
52 vec![0; body_len as usize],
53 );
54
55 assert_eq!(p.header(), head);
56 }
57
58 #[test]
59 fn sink_stream() {
60 let msgs = vec![
61 Packet::new(
62 IsStream::Yes,
63 IsEnd::No,
64 BodyType::Binary,
65 10,
66 vec![1, 2, 3, 4, 5],
67 ),
68 Packet::new(
69 IsStream::No,
70 IsEnd::Yes,
71 BodyType::Utf8,
72 2002,
73 (0..50).collect(),
74 ),
75 Packet::new(
76 IsStream::Yes,
77 IsEnd::Yes,
78 BodyType::Json,
79 12345,
80 (0..100).collect(),
81 ),
82 ];
83
84 let msgs_clone = msgs.clone();
85
86 let (w, r) = async_ringbuffer::ring_buffer(1024);
87
88 let mut sink = PacketSink::new(w);
89 let stream = PacketStream::new(r);
90
91 let send = async {
92 let mut items = iter(msgs).map(|m| Ok(m));
93 sink.send_all(&mut items).await.unwrap();
94 sink.close().await.unwrap();
95 };
96
97 let recv = async {
98 let r: Vec<Packet> = stream.try_collect().await.unwrap();
99 r
100 };
101
102 let (_, received) = block_on(async { join(send, recv).await });
103
104 for (i, msg) in received.iter().enumerate() {
105 assert_eq!(msg, &msgs_clone[i]);
106 }
107 }
108
109 #[test]
110 fn close() {
111 let (w, r) = async_ringbuffer::ring_buffer(64);
112
113 let mut sink = PacketSink::new(w);
114 let mut stream = PacketStream::new(r);
115
116 block_on(async {
117 sink.send(Packet::new(
118 IsStream::Yes,
119 IsEnd::No,
120 BodyType::Utf8,
121 10,
122 vec![1, 2, 3, 4, 5],
123 ))
124 .await
125 .unwrap();
126
127 let p = stream.try_next().await.unwrap().unwrap();
128 assert!(p.is_stream());
129 assert!(!p.is_end());
130 assert_eq!(p.body_type, BodyType::Utf8);
131 assert_eq!(p.id, 10);
132 assert_eq!(&p.body, &[1, 2, 3, 4, 5]);
133
134 sink.close().await.unwrap();
135
136 let w = sink.into_inner();
137 assert!(w.is_closed());
138
139 let p = stream.try_next().await.unwrap();
140 assert!(p.is_none());
141 assert!(stream.is_closed());
142 });
143 }
144
145 #[test]
146 fn goodbye() {
147 let (w, mut r) = async_ringbuffer::ring_buffer(64);
148
149 let mut sink = PacketSink::new(w);
150
151 block_on(async {
152 sink.send(Packet::new(
153 IsStream::Yes,
154 IsEnd::No,
155 BodyType::Utf8,
156 10,
157 vec![1, 2, 3, 4, 5],
158 ))
159 .await
160 .unwrap();
161
162 sink.close().await.unwrap();
163
164 let mut tmp = [0; 14];
165 let n = r.read(&mut tmp).await.unwrap();
166 assert_eq!(n, 14);
167
168 assert_eq!(&tmp, &[0b0000_1001, 0, 0, 0, 5, 0, 0, 0, 10, 1, 2, 3, 4, 5]);
169
170 let mut head = [0; 9];
171 let n = r.read(&mut head).await.unwrap();
172 assert_eq!(n, 9);
173 assert_eq!(&head, &[0; 9]);
175
176 let n = r.read(&mut head).await.unwrap();
177 assert_eq!(n, 0);
178 });
179 }
180}