ssb_packetstream/
lib.rs

1pub mod mux;
2mod packet;
3mod sink;
4mod stream;
5
6pub use packet::*;
7pub use sink::*;
8pub use stream::*;
9
10use core::future::Future;
11use core::pin::Pin;
12
13type PinFut<O> = Pin<Box<dyn Future<Output = O> + 'static>>;
14
15#[cfg(test)]
16mod tests {
17    use super::*;
18    use byteorder::{BigEndian, ByteOrder};
19    use futures::executor::block_on;
20    use futures::io::AsyncReadExt;
21    use futures::stream::StreamExt;
22    use futures::{future::join, stream::iter, SinkExt, TryStreamExt};
23
24    #[test]
25    fn encode() {
26        let mut p = Packet::new(IsStream::Yes, IsEnd::No, BodyType::Json, 123, vec![0; 25]);
27
28        let expected_head: [u8; 9] = [0b0000_1010, 0, 0, 0, 25, 0, 0, 0, 123];
29        assert_eq!(p.flags(), expected_head[0]);
30        assert_eq!(p.header(), expected_head);
31
32        p.end = IsEnd::Yes;
33        p.stream = IsStream::No;
34        p.body_type = BodyType::Binary;
35        assert_eq!(p.flags(), 0b0000_0100);
36    }
37
38    #[test]
39    fn decode() {
40        let head: [u8; 9] = [0b0000_1101, 0, 0, 0, 25, 0, 0, 0, 200];
41        let body_len = BigEndian::read_u32(&head[1..5]);
42        let id = BigEndian::read_i32(&head[5..]);
43
44        assert_eq!(body_len, 25);
45        assert_eq!(id, 200);
46
47        let p = Packet::new(
48            head[0].into(),
49            head[0].into(),
50            head[0].into(),
51            id,
52            vec![0; body_len as usize],
53        );
54
55        assert_eq!(p.header(), head);
56    }
57
58    #[test]
59    fn sink_stream() {
60        let msgs = vec![
61            Packet::new(
62                IsStream::Yes,
63                IsEnd::No,
64                BodyType::Binary,
65                10,
66                vec![1, 2, 3, 4, 5],
67            ),
68            Packet::new(
69                IsStream::No,
70                IsEnd::Yes,
71                BodyType::Utf8,
72                2002,
73                (0..50).collect(),
74            ),
75            Packet::new(
76                IsStream::Yes,
77                IsEnd::Yes,
78                BodyType::Json,
79                12345,
80                (0..100).collect(),
81            ),
82        ];
83
84        let msgs_clone = msgs.clone();
85
86        let (w, r) = async_ringbuffer::ring_buffer(1024);
87
88        let mut sink = PacketSink::new(w);
89        let stream = PacketStream::new(r);
90
91        let send = async {
92            let mut items = iter(msgs).map(|m| Ok(m));
93            sink.send_all(&mut items).await.unwrap();
94            sink.close().await.unwrap();
95        };
96
97        let recv = async {
98            let r: Vec<Packet> = stream.try_collect().await.unwrap();
99            r
100        };
101
102        let (_, received) = block_on(async { join(send, recv).await });
103
104        for (i, msg) in received.iter().enumerate() {
105            assert_eq!(msg, &msgs_clone[i]);
106        }
107    }
108
109    #[test]
110    fn close() {
111        let (w, r) = async_ringbuffer::ring_buffer(64);
112
113        let mut sink = PacketSink::new(w);
114        let mut stream = PacketStream::new(r);
115
116        block_on(async {
117            sink.send(Packet::new(
118                IsStream::Yes,
119                IsEnd::No,
120                BodyType::Utf8,
121                10,
122                vec![1, 2, 3, 4, 5],
123            ))
124            .await
125            .unwrap();
126
127            let p = stream.try_next().await.unwrap().unwrap();
128            assert!(p.is_stream());
129            assert!(!p.is_end());
130            assert_eq!(p.body_type, BodyType::Utf8);
131            assert_eq!(p.id, 10);
132            assert_eq!(&p.body, &[1, 2, 3, 4, 5]);
133
134            sink.close().await.unwrap();
135
136            let w = sink.into_inner();
137            assert!(w.is_closed());
138
139            let p = stream.try_next().await.unwrap();
140            assert!(p.is_none());
141            assert!(stream.is_closed());
142        });
143    }
144
145    #[test]
146    fn goodbye() {
147        let (w, mut r) = async_ringbuffer::ring_buffer(64);
148
149        let mut sink = PacketSink::new(w);
150
151        block_on(async {
152            sink.send(Packet::new(
153                IsStream::Yes,
154                IsEnd::No,
155                BodyType::Utf8,
156                10,
157                vec![1, 2, 3, 4, 5],
158            ))
159            .await
160            .unwrap();
161
162            sink.close().await.unwrap();
163
164            let mut tmp = [0; 14];
165            let n = r.read(&mut tmp).await.unwrap();
166            assert_eq!(n, 14);
167
168            assert_eq!(&tmp, &[0b0000_1001, 0, 0, 0, 5, 0, 0, 0, 10, 1, 2, 3, 4, 5]);
169
170            let mut head = [0; 9];
171            let n = r.read(&mut head).await.unwrap();
172            assert_eq!(n, 9);
173            // goodbye header is 9 zeros
174            assert_eq!(&head, &[0; 9]);
175
176            let n = r.read(&mut head).await.unwrap();
177            assert_eq!(n, 0);
178        });
179    }
180}